首页 > 其他分享 >RabbitMQ延时任务通过死信队列实现(golang)

RabbitMQ延时任务通过死信队列实现(golang)

时间:2024-07-01 17:20:52浏览次数:18  
标签:false err nil 队列 RabbitMQ golang 死信 amqp091 conn

  最近在一个项目中,需要实现在用户上传图片30分钟后,删除对应图片,以保证用户隐私。

  我们使用rabbitmq来实现。基于rabbitmq实现延时任务有两种方式,一种为队列ttl+死信exchange,另一种为安装插件(https://github.com/rabbitmq/rabbitmq-delayed-message-exchange)。

  其中安装插件,其大概原理就是:指定了延时的消息,会被先保存在 Mnesia (erlang编写的数据库管理系统)中,然后有一个定时器去查询最近需要被投递的消息,将其投递到目标队列中。消息延时时间,基本支持任意延迟时间(不能超过1个月)。目前该插件的当前设计并不真正适合包含大量延迟消息(例如数十万以上)的场景,另外该插件的一个可变性来源是依赖于 Erlang 计时器,在系统中使用了一定数量的长时间计时器之后,它们开始争用调度程序资源,并且时间漂移不断累积,rabbitmq内存占用明显增高。

 

  由于项目中使用的rabbitmq为云厂商服务,因此采用方式一实现。这种实现需要两个队列和两个消费者,队列一为我称为”中转队列“,队列二为”死信队列“。我们中转队列设置:x-message-ttl=30*60*1000 (队列中所有消息都30分钟存活时间,也就是任务延时30分钟执行)。死信队列不设置额外参数。消息先到达中转队列,中转队列的消息由一个中间消费者直接拒接消费,并将消息仍保存在中转队列不被删除,然后中间消费者又接到刚被拒绝的消息,继续拒绝消费,一直循环,直到达到消息存活时间(30分钟),这条消息将转发到死信队列,再由死信队列的消费者消费,执行正常业务逻辑(上文提到的删除用户上传图片)。

 

  生产者 producer/main.go

package main

import (
    "log"

    "github.com/rabbitmq/amqp091-go"
)

const (
    source =   "amqp://yourUsername:yourPassword@yourHost:5672/yourVhost"

    // 中间队列配置
    midExchange =   "mid.exchange"
    midRouteKey =   "mid.routeKey"
    midQueue    =   "mid.queue"

    // 死信队列配置
    deadExchange =   "dead.exchange"
    deadRouteKey =   "dead.routeKey"
    deadQueue    =   "dead.queue"

    // 延时任务时间 毫秒
    taskDelayMs = 30 * 60 * 1000
)

func newConn() *amqp091.Connection {
    conn, err := amqp091.Dial(source)
    if err != nil {
        log.Fatal(err)
    }
    return conn
}

func declareQueues(conn *amqp091.Connection) {
    declareMidQueue(conn)
    declareDeadQueue(conn)
}

func declareMidQueue(conn *amqp091.Connection) {
    ch, err := conn.Channel()
    if err != nil {
        log.Fatal(err)
    }
    defer ch.Close()

    err = ch.ExchangeDeclare(
        midExchange,
        amqp091.ExchangeDirect,
        true,
        false,
        false,
        false,
        nil,
    )
    if err != nil {
        log.Fatal(err)
    }

    _, err = ch.QueueDeclare(
        midQueue,
        true,
        false,
        false,
        false,
        amqp091.Table{   // 申明消息过期,到死信exchange routingKey
            "x-message-ttl" :             taskDelayMs,
            "x-dead-letter-exchange" :    deadExchange,
            "x-dead-letter-routing-key" : deadRouteKey,
        },
    )
    if err != nil {
        log.Fatal(err)
    }

    err = ch.QueueBind(
        midQueue,
        midRouteKey,
        midExchange,
        false,
        nil,
    )
    if err != nil {
        log.Fatal(err)
    }

}

func declareDeadQueue(conn *amqp091.Connection) {
    ch, err := conn.Channel()
    if err != nil {
        log.Fatal(err)
    }
    defer ch.Close()

    err = ch.ExchangeDeclare(
        deadExchange,
        amqp091.ExchangeDirect,
        true,
        false,
        false,
        false,
        nil,
    )
    if err != nil {
        log.Fatal(err)
    }

    _, err = ch.QueueDeclare(
        deadQueue,
        true,
        false,
        false,
        false,
        nil,
    )
    if err != nil {
        log.Fatal(err)
    }

    err = ch.QueueBind(
        deadQueue,
        deadRouteKey,
        deadExchange,
        false,
        nil,
    )
    if err != nil {
        log.Fatal(err)
    }
}

func publish(conn *amqp091.Connection, msg []byte) {
    ch, err := conn.Channel()
    if err != nil {
        log.Fatal(err)
    }
    defer ch.Close()

    // 开启生产者确认,保证消息成功发送到交换机后回执,确保消息可靠
    err = ch.Confirm(false)
    confirms := ch.NotifyPublish(make(  chan amqp091.Confirmation, 1))

    err = ch.Publish(
        midExchange,
        midRouteKey,
        false,
        false,
        amqp091.Publishing{
            ContentType:    "text/plain" ,
            DeliveryMode: amqp091.Persistent,
            Body:         msg,
            //Expiration:   "1000", // 1000毫秒,如果队列和消息同时设置了ttl,则取较小的那个作为ttl。
        },
    )
    if err != nil {
        log.Fatal(err)
    }

    <-confirms
}

func main() {
    conn := newConn()
    defer conn.Close()

    // 申明队列只需要项目启动时申明一次,不需要每次发消息都申明
    declareQueues(conn)

    publish(conn, []byte(  "hello rabbitmq" ))
}

  中间队列消费者midconsumer/main.go

package main

import (
    "log"
    "time"

    "github.com/rabbitmq/amqp091-go"
)

const (
    source =      "amqp://yourUsername:yourPassword@yourHost:5672/yourVhost"

    // 中间队列配置
    midExchange =      "mid.exchange"
    midRouteKey =      "mid.routeKey"
    midQueue    =      "mid.queue"

    // 死信队列配置
    deadExchange =      "dead.exchange"
    deadRouteKey =      "dead.routeKey"
    deadQueue    =      "dead.queue"

    // 延时任务时间 毫秒
    taskDelayMs = 30 * 60 * 1000
)

func newConn() *amqp091.Connection {
    conn, err := amqp091.Dial(source)
    if err != nil {
        log.Fatal(err)
    }
    return conn
}

func consume(conn *amqp091.Connection) {
    ch, err := conn.Channel()
    if err != nil {
        log.Fatal(err)
    }
    defer ch.Close()

    msgs, err := ch.Consume(
        midQueue,      // queue
        "" ,            // consumer
        false,         // auto-ack
        false,         // exclusive
        false,         // no-local
        false,         // no-wait
        nil,           // args
    )
    if err != nil {
        log.Fatal(err)
    }

    for d :=      range msgs {
        go func (delivery *amqp091.Delivery) {
            // d.Reject(true) 会拒绝消费、保持在队列中,拿到刚拒绝的消息继续拒绝消费、保持在队列中,如此往复,
            // 直到到达延时时间,如果不执行time.Sleep,频繁调用rabbitmq reject和consume,导致cpu占用过高和rabbitmq负载高
            // 这里使用time.Sleep牺牲一定延时时间精度
            time.Sleep(time.Millisecond * 1000)
            d.Reject(true)
        }(&d)
    }
}

func main() {
    conn := newConn()
    defer conn.Close()

    var forever      chan struct {}
    consume(conn)
    <-forever
}

  死信消费者deadconsumer/main.go

package main

import (
    "log"

    "github.com/rabbitmq/amqp091-go"
)

const (
    source =     "amqp://yourUsername:yourPassword@yourHost:5672/yourVhost"

    // 中间队列配置
    midExchange =     "mid.exchange"
    midRouteKey =     "mid.routeKey"
    midQueue    =     "mid.queue"

    // 死信队列配置
    deadExchange =     "dead.exchange"
    deadRouteKey =     "dead.routeKey"
    deadQueue    =     "dead.queue"

    // 延时任务时间 毫秒
    taskDelayMs = 30 * 60 * 1000
)

func newConn() *amqp091.Connection {
    conn, err := amqp091.Dial(source)
    if err != nil {
        log.Fatal(err)
    }
    return conn
}

func consume(conn *amqp091.Connection) {
    ch, err := conn.Channel()
    if err != nil {
        log.Fatal(err)
    }

    defer ch.Close()

    msgs, err := ch.Consume(
        deadQueue,     // queue
        "" ,            // consumer
        false,         // auto-ack
        false,         // exclusive
        false,         // no-local
        false,         // no-wait
        nil,           // args
    )
    if err != nil {
        log.Fatal(err)
    }

    for d :=     range msgs {
        go func (delivery *amqp091.Delivery) {
            if err := handleMsg(d.Body); err == nil {
                d.Ack(false)     // 因为我们设置了autoAck false,这里要手动ack
            }
        }(&d)
    }

}

func handleMsg(msg []byte) error {
    log.Printf(    "Received a message: %s\n" , msg)
    return nil
}

func main() {
    conn := newConn()
    defer conn.Close()

    var forever     chan struct {}
    consume(conn)
    <-forever
}

  

 

标签:false,err,nil,队列,RabbitMQ,golang,死信,amqp091,conn
From: https://www.cnblogs.com/ALXPS/p/18278227

相关文章

  • rabbitmq 启动报错 unknown exchange type ‘x-delayed-message‘
    产生问题的原因rabbitmq中默认只有四中交换机类型:headers、direct、fanout、topic。所以我们需要自己安装一个x-delayed-message类型的交换机x-delayed-message的安装1、下载插件点击,下载rabbitmq_delayed_message_exchange-3.8.0.ez。2、将下载的包放到/RABBIT_HOME/plugin......
  • Centos7 安装Rabbitmq3.9.11
    安装erlang安装依赖包yum-yinstallgccglibc-develmakencurses-developenssl-develxmltoperlwgetgtk2-develbinutils-devel下载wgethttps://github.com/erlang/otp/releases/download/OTP-24.1.7/otp_src_24.1.7.tar.gz解压tar-zxvfotp_src_24.1.7.tar......
  • Golang:go-querystring将struct编码为URL查询参数的库
    Golang:go-querystring将struct编码为URL查询参数的库原创 吃个大西瓜 CodingBigTree 2024-05-0908:30 北京go-querystringisaGolibraryforencodingstructsintoURLqueryparameters.译文:go-querystring将struct编码为URL查询参数的Golang库文档ht......
  • RabbitMQ如何备份与恢复数据
    阅读目录一、场景二、元数据备份和还原1、操作2、导出数据 3、导入数据4、验证数据 三、消息数据备份和还原1、确定数据目录2、为避免数据的一致性,需先停掉服务3、备份数据目录 4、还原数据5、修改数据目录权限6、启动B服务器上rabbitmq服务7、验证消息数......
  • 消息队列选型之 Kafka vs RabbitMQ
    在面对众多的消息队列时,我们往往会陷入选择的困境:“消息队列那么多,该怎么选啊?Kafka和RabbitMQ比较好用,用哪个更好呢?”想必大家也曾有过类似的疑问。对此本文将在接下来的内容中以Kafka和RabbitMQ为例分享消息队列选型的一些经验。一、什么是消息队列消息队列即Messag......
  • webrtc 的datachannel在golang中的使用
    因为在发送端需要接收一些接收端的统计信息,而且具有不可丢失的需求,所以采取利用datachannel进行传输。datachannel是基于sctp协议的传输通道,sctp可提供按需可靠到达的服务,在datachannel中可以设置是否按序,是否可靠,最大重传次数,数据最大保存时间(当数据超过保存时间仍未发出时将被丢......
  • Golang文件操作
    文件是数据源(保存数据的地方)的一种,word文档,txt文件,excel文件...都是文件。文件最主要的作用就是保存数据,它既可以保存一张图片,也可以保持视频,声音...os.file封装了所有对文件的操作,且file是一个结构体: 打开和关闭文件1.打开文件,用于读取: 传入的是一个字符......
  • golang使用grpc
    (1)安装protoc,这是通用的,所有语言都需要​#下载网址:https://github.com/protocolbuffers/protobuf/releases/download/v3.9.0/protoc-3.9.0-win64.zip解压后将将protoc的bin目录添加到环境变量中 如果不会添加环境变量请百度运行protoc--version查看是否已经添加到环境......
  • [Golang并发]GMP模型
    什么是GoroutineGoroutine=Golang+Coroutine。Goroutine是golang实现的协程,是用户级线程。Goroutine的特点:相比线程,其启动的代价很小,以很小栈空间启动(2Kb左右)能够动态地伸缩栈的大小,最大可以支持到Gb级别工作在用户态,切换成很小与线程关系是n:m,即可以在n个系统线程上多......
  • [Golang基础]Goroutine
    协程(CoRoutine)是一种轻量级的用户态线程。简单来说,线程(thread)的调度是由操作系统负责,线程的睡眠、等待、唤醒的时机是由操作系统控制,开发者无法决定。使用协程,开发者可以自行控制程序切换的时机,可以在一个函数执行到一半的时候中断执行,让出CPU,在需要的时候再回到中断点继续执行。......