首页 > 其他分享 >关于 RabbitMQ 做消息推送的一点记录

关于 RabbitMQ 做消息推送的一点记录

时间:2023-10-20 20:44:47浏览次数:31  
标签:uid err 记录 fmt db RabbitMQ mq 推送 consumer

先说需求,需求是很简单的,也就是假设有10w+的用户,每个用户都需要维护一个长链,那么就不可能单机,就需要分布式,而分布式的就需要确保精确推送,确保用户A的数据确实能被推送到用户A连接的机器那,所以一个主要思路就是用消息队列的routing key的逻辑去做
确保所有节点订阅了一个topic,并持有不同的key,再进行推送
压测
首先需要压测,我想看看在我自己的M2上能跑到多高的并发量
先编写代码,用1000个用户不停地推送消息作为模拟,使用了我自己编写的库,所以不是rabbitmq库的代码

package main

import (
    "fmt"
    "math/rand"

    "github.com/Yeuoly/billboards/internal/db"
    "github.com/Yeuoly/billboards/internal/static"
)

func main() {
    static.InitConfig("conf/config.yaml")

    // create 1000 consumers

    for i := 0; i < 1000; i++ {
        i := i
        mq, err := db.GetTopicExchangeRabbitMQ("message", fmt.Sprintf("uid.%d", i))
        if err != nil {
            panic(err)
        }
        go func() {
            c, err := mq.Consume()

            if err != nil {
                panic(err)
            }

            for d := range c {
                fmt.Printf("uid-%d: %s\n", i, string(d.Body))
            }
        }()
    }

    // 1000 producer
    producers := make([]*db.RabbitMQ, 0)
    for i := 0; i < 1000; i++ {
        mq, err := db.GetRabbitMQ("", "message", "topic", fmt.Sprintf("uid.%d", i), false)
        if err != nil {
            panic(err)
        }
        producers = append(producers, mq)
    }

    for {
        random := rand.Int31n(1000)
        producers[random].Publish([]byte(fmt.Sprintf("uid-%d", random)))
    }
}

1000个消费者1000个生产者
image

实际打下来发现1000个用户在内存没有GC压力的时候速度可以稳定在35k/s,但是生产的速度还是明显快于消费的,队列一直在积压的状态(主要还是我没ack),也许多机一起消费会好一些,我主要还是单个机器在消费,即使是多线程
然后试试打到5000,顺带优化了一下代码,前面写的太粗糙了会抛错

package main

import (
    "fmt"
    "math/rand"

    "github.com/Yeuoly/billboards/internal/db"
    "github.com/Yeuoly/billboards/internal/static"
)

const (
    COCURRENT = 5000
)

func main() {
    static.InitConfig("conf/config.yaml")

    consumers := make([]*db.RabbitMQ, 0)
    for i := 0; i < COCURRENT; i++ {
        i := i
        mq, err := db.GetTopicExchangeRabbitMQ("message", fmt.Sprintf("uid.%d", i))
        if err != nil {
            panic(err)
        }
        consumers = append(consumers, mq)
    }

    for i, consumer := range consumers {
        i := i
        consumer := consumer
        c, err := consumer.Consume()

        if err != nil {
            panic(err)
        }

        go func() {
            for d := range c {
                fmt.Printf("uid-%d: %s\n", i, string(d.Body))
            }
        }()
    }

    producers := make([]*db.RabbitMQ, 0)
    for i := 0; i < COCURRENT; i++ {
        mq, err := db.GetRabbitMQ("", "message", "topic", fmt.Sprintf("uid.%d", i), false)
        if err != nil {
            panic(err)
        }
        producers = append(producers, mq)
    }

    for {
        random := rand.Int31n(COCURRENT)
        producers[random].Publish([]byte(fmt.Sprintf("uid-%d", random)))
    }
}

我们会发现deliver明显下降了很多,甚至到了后面直接不干活了,速度全部降到了0附近
image

我不好说这是被打挂了还是内存炸了,但可以确定的事碰到瓶颈了
为了确定问题在哪,我们需要先固定一下生产速度,最好的方法当然是先固定生成速度,通过调整消费者数量看消费速度的变化,现在生产者受到消费者影响太大了,那么我们分为两个进程,避免被锁在单核上了,顺便把ack加上,免得内存积压,至少目前看起来内存压力对rabbitmq的压力还是很大的

package main

import (
    "flag"
    "fmt"
    "math/rand"

    "github.com/Yeuoly/billboards/internal/db"
    "github.com/Yeuoly/billboards/internal/static"
)

const (
    COCURRENT = 1000
)

func consumer() {
    consumers := make([]*db.RabbitMQ, 0)
    for i := 0; i < COCURRENT; i++ {
        i := i
        mq, err := db.GetTopicExchangeRabbitMQ("message", fmt.Sprintf("uid.%d", i))
        if err != nil {
            panic(err)
        }
        consumers = append(consumers, mq)
    }

    for i, consumer := range consumers {
        i := i
        consumer := consumer
        c, err := consumer.Consume()

        if err != nil {
            panic(err)
        }

        go func() {
            for d := range c {
                fmt.Printf("uid-%d: %s\n", i, string(d.Body))
            }
        }()
    }

    select {}
}

func producer() {
    producers := make([]*db.RabbitMQ, 0)
    for i := 0; i < COCURRENT; i++ {
        mq, err := db.GetRabbitMQ("", "message", "topic", fmt.Sprintf("uid.%d", i), false)
        if err != nil {
            panic(err)
        }
        producers = append(producers, mq)
    }

    for {
        random := rand.Int31n(COCURRENT)
        producers[random].Publish([]byte(fmt.Sprintf("uid-%d", random)))
    }
}

func main() {
    static.InitConfig("conf/config.yaml")
    typ := flag.String("type", "consumer", "consumer or producer")
    flag.Parse()

    if *typ == "consumer" {
        consumer()
    } else {
        producer()
    }
}

现在再来看会发现它就舒服多了,内存没有积压,消息推送速度可以飙到50k左右
image

但是感觉还没打到极限,把队列数上到10000
image

大概感觉就是生产速度明显下降了,并且主要是rabbitmq已经被打满了,CPU已经基本上跑满了
image

那是不是意味着如果CPU和RAM条件更好,RabbitMQ可以打到更高的并发?试一试,我这里上了一台144核1.2T的服务器
现实总是比较骨感
image

publish速度没变,但是consumer速度明显上升,那么估计就是发布速度太慢了,来吧,多来几个发布者看看,会发现消费者速度下降的特别厉害,估计是真打到rabbitmq的单机瓶颈了,大约在20k左右
image

考虑到业务量有10w左右的并发量,并且大多数用户长期都是处理闲置状态的,因此实际上不需要这么频繁的消息推送,够用了

标签:uid,err,记录,fmt,db,RabbitMQ,mq,推送,consumer
From: https://www.cnblogs.com/yeuoly/p/17777970.html

相关文章

  • 简单记录
    P1268树的重量先考虑二,三个,然后考虑四个,发现这种向外延展的思路.看树形背包发现动态开数组这个主要是针对N*W这种的开二维数组//如果你感觉这里的二维数组很难定义,可以先开一个一维的int数组,假设名字为pool;等到读入了N,W后再这样声明//但是,你甚至可以开ve......
  • 记录--谁还没个靠bug才能正常运行的程序
    这里给大家分享我在网上总结出来的一些知识,希望对大家有所帮助最近遇到一个问题,计算滚动距离,滚动比例达到某界定值时,显示mask,很常见吧^ _ ^这里讲的不是这个需求的实现,是其中遇到了一个比较有意思的bug,靠这个bug才达到了正确效果,以及这个bug是如何暴露的(很重要)。下面是演示......
  • flutter vscode iOS app debug 出错 记录1
    出现类似错误Error(Xcode):Targetdebug_unpack_iosfailed:Exception:Failedtocodesign/Users/cappuccino/Desktop/develop/code/app1/flutter_application_1/build/ios/Debug-iphonesimulator/Flutter.framework/Flutterwithidentity-.这个是由于代码所在文件夹被iC......
  • 它让你1小时精通RabbitMQ消息队列、且能扛高并发
    支持.NetCore(2.0及以上)与.NetFramework(4.5及以上)本文所述方案近期被江苏省某亿级数据量+高并发的政府"物联网"项目采用,获得圆满成功!!【目录】发送消息、获取消息、使用消息延时队列&死信队列展望RabbitMQ作为一款主流的消息队列工具早已广受欢迎。相比于其它......
  • ASP.net百度主动推送功能实现
    百度站长提供了curl、post、php、ruby的实现示例,并没有C#的官方示例。既然提供了post的方式,那么就可以用C#实现,下面是实现代码:ASP.net百度主动推送代码范例publicstaticstringPostUrl(string[]urls){try{stringformUrl="http://......
  • Rabbitmq消息队列调优
     RabbitMQ每增加一个连接,Erlang都会给这个连接分配三个Erlang进程,每个进程都会分配一定大小内存空间,所以随着连接数的增长,内存和Erlang进程数呈现有规律的增长,所以RabbitMQ连接数的无限增大会压垮mq服务,导致RabbitMQ服务崩溃。 客户端与RabbitMQ建立的是长连接,而不是建立短连......
  • 刷题记录——MISTAKES 慢慢更新
    刷题记录——MISTAKES慢慢更新截止到:20231020(有时会忘记改日期)。信友队——CSP-S2023复赛模拟赛T2忘了取模和二分了,直接爆longlong和TLE然后\(0\text{pts}\).CF1065CMakeItEqual桶桶桶桶桶!!!\(2e5\)你不用桶难道还要二分的吗?洛谷CSP-S2023模拟赛模\(9982443......
  • RabbitMQ 安装与配置
    1.安装Erlang下载地址:https://www.erlang.org/downloads下载文件为otp_win64_26.1.1.exe,点击安装,如下图:默认安装地址为C:\ProgramFiles\ErlangOTP,可自行修改安装地址(这里将安装地址改为D盘)安装完成后,设置环境变量,新建ERLANG_HOME修改环境变量path,增加:%ERLANG_H......
  • L1-011记录
    关于这道题目,今天其实有自己的一些思路 L1-011A-B分数20全屏浏览题目切换布局作者 陈越单位 浙江大学本题要求你计算A−B。不过麻烦的是,A和B都是字符串——即从字符串A中把字符串B所包含的字符全删掉,剩下的字符组成的就是字符串A−B。输入......
  • 【Django | 开发】中间件配置(记录响应耗时日志)
    ......