先说需求,需求是很简单的,也就是假设有10w+的用户,每个用户都需要维护一个长链,那么就不可能单机,就需要分布式,而分布式的就需要确保精确推送,确保用户A的数据确实能被推送到用户A连接的机器那,所以一个主要思路就是用消息队列的routing key的逻辑去做
确保所有节点订阅了一个topic,并持有不同的key,再进行推送
压测
首先需要压测,我想看看在我自己的M2上能跑到多高的并发量
先编写代码,用1000个用户不停地推送消息作为模拟,使用了我自己编写的库,所以不是rabbitmq库的代码
package main
import (
"fmt"
"math/rand"
"github.com/Yeuoly/billboards/internal/db"
"github.com/Yeuoly/billboards/internal/static"
)
func main() {
static.InitConfig("conf/config.yaml")
// create 1000 consumers
for i := 0; i < 1000; i++ {
i := i
mq, err := db.GetTopicExchangeRabbitMQ("message", fmt.Sprintf("uid.%d", i))
if err != nil {
panic(err)
}
go func() {
c, err := mq.Consume()
if err != nil {
panic(err)
}
for d := range c {
fmt.Printf("uid-%d: %s\n", i, string(d.Body))
}
}()
}
// 1000 producer
producers := make([]*db.RabbitMQ, 0)
for i := 0; i < 1000; i++ {
mq, err := db.GetRabbitMQ("", "message", "topic", fmt.Sprintf("uid.%d", i), false)
if err != nil {
panic(err)
}
producers = append(producers, mq)
}
for {
random := rand.Int31n(1000)
producers[random].Publish([]byte(fmt.Sprintf("uid-%d", random)))
}
}
1000个消费者1000个生产者
实际打下来发现1000个用户在内存没有GC压力的时候速度可以稳定在35k/s,但是生产的速度还是明显快于消费的,队列一直在积压的状态(主要还是我没ack),也许多机一起消费会好一些,我主要还是单个机器在消费,即使是多线程
然后试试打到5000,顺带优化了一下代码,前面写的太粗糙了会抛错
package main
import (
"fmt"
"math/rand"
"github.com/Yeuoly/billboards/internal/db"
"github.com/Yeuoly/billboards/internal/static"
)
const (
COCURRENT = 5000
)
func main() {
static.InitConfig("conf/config.yaml")
consumers := make([]*db.RabbitMQ, 0)
for i := 0; i < COCURRENT; i++ {
i := i
mq, err := db.GetTopicExchangeRabbitMQ("message", fmt.Sprintf("uid.%d", i))
if err != nil {
panic(err)
}
consumers = append(consumers, mq)
}
for i, consumer := range consumers {
i := i
consumer := consumer
c, err := consumer.Consume()
if err != nil {
panic(err)
}
go func() {
for d := range c {
fmt.Printf("uid-%d: %s\n", i, string(d.Body))
}
}()
}
producers := make([]*db.RabbitMQ, 0)
for i := 0; i < COCURRENT; i++ {
mq, err := db.GetRabbitMQ("", "message", "topic", fmt.Sprintf("uid.%d", i), false)
if err != nil {
panic(err)
}
producers = append(producers, mq)
}
for {
random := rand.Int31n(COCURRENT)
producers[random].Publish([]byte(fmt.Sprintf("uid-%d", random)))
}
}
我们会发现deliver明显下降了很多,甚至到了后面直接不干活了,速度全部降到了0附近
我不好说这是被打挂了还是内存炸了,但可以确定的事碰到瓶颈了
为了确定问题在哪,我们需要先固定一下生产速度,最好的方法当然是先固定生成速度,通过调整消费者数量看消费速度的变化,现在生产者受到消费者影响太大了,那么我们分为两个进程,避免被锁在单核上了,顺便把ack加上,免得内存积压,至少目前看起来内存压力对rabbitmq的压力还是很大的
package main
import (
"flag"
"fmt"
"math/rand"
"github.com/Yeuoly/billboards/internal/db"
"github.com/Yeuoly/billboards/internal/static"
)
const (
COCURRENT = 1000
)
func consumer() {
consumers := make([]*db.RabbitMQ, 0)
for i := 0; i < COCURRENT; i++ {
i := i
mq, err := db.GetTopicExchangeRabbitMQ("message", fmt.Sprintf("uid.%d", i))
if err != nil {
panic(err)
}
consumers = append(consumers, mq)
}
for i, consumer := range consumers {
i := i
consumer := consumer
c, err := consumer.Consume()
if err != nil {
panic(err)
}
go func() {
for d := range c {
fmt.Printf("uid-%d: %s\n", i, string(d.Body))
}
}()
}
select {}
}
func producer() {
producers := make([]*db.RabbitMQ, 0)
for i := 0; i < COCURRENT; i++ {
mq, err := db.GetRabbitMQ("", "message", "topic", fmt.Sprintf("uid.%d", i), false)
if err != nil {
panic(err)
}
producers = append(producers, mq)
}
for {
random := rand.Int31n(COCURRENT)
producers[random].Publish([]byte(fmt.Sprintf("uid-%d", random)))
}
}
func main() {
static.InitConfig("conf/config.yaml")
typ := flag.String("type", "consumer", "consumer or producer")
flag.Parse()
if *typ == "consumer" {
consumer()
} else {
producer()
}
}
现在再来看会发现它就舒服多了,内存没有积压,消息推送速度可以飙到50k左右
但是感觉还没打到极限,把队列数上到10000
大概感觉就是生产速度明显下降了,并且主要是rabbitmq已经被打满了,CPU已经基本上跑满了
那是不是意味着如果CPU和RAM条件更好,RabbitMQ可以打到更高的并发?试一试,我这里上了一台144核1.2T的服务器
现实总是比较骨感
publish速度没变,但是consumer速度明显上升,那么估计就是发布速度太慢了,来吧,多来几个发布者看看,会发现消费者速度下降的特别厉害,估计是真打到rabbitmq的单机瓶颈了,大约在20k左右
考虑到业务量有10w左右的并发量,并且大多数用户长期都是处理闲置状态的,因此实际上不需要这么频繁的消息推送,够用了
标签:uid,err,记录,fmt,db,RabbitMQ,mq,推送,consumer From: https://www.cnblogs.com/yeuoly/p/17777970.html