最近在一个项目中,需要实现在用户上传图片30分钟后,删除对应图片,以保证用户隐私。
我们使用rabbitmq来实现。基于rabbitmq实现延时任务有两种方式,一种为队列ttl+死信exchange,另一种为安装插件(https://github.com/rabbitmq/rabbitmq-delayed-message-exchange)。
其中安装插件,其大概原理就是:指定了延时的消息,会被先保存在 Mnesia (erlang编写的数据库管理系统)中,然后有一个定时器去查询最近需要被投递的消息,将其投递到目标队列中。消息延时时间,基本支持任意延迟时间(不能超过1个月)。目前该插件的当前设计并不真正适合包含大量延迟消息(例如数十万以上)的场景,另外该插件的一个可变性来源是依赖于 Erlang 计时器,在系统中使用了一定数量的长时间计时器之后,它们开始争用调度程序资源,并且时间漂移不断累积,rabbitmq内存占用明显增高。
由于项目中使用的rabbitmq为云厂商服务,因此采用方式一实现。这种实现需要两个队列和两个消费者,队列一为我称为”中转队列“,队列二为”死信队列“。我们中转队列设置:x-message-ttl=30*60*1000
(队列中所有消息都30分钟存活时间,也就是任务延时30分钟执行)。死信队列不设置额外参数。消息先到达中转队列,中转队列的消息由一个中间消费者直接拒接消费,并将消息仍保存在中转队列不被删除,然后中间消费者又接到刚被拒绝的消息,继续拒绝消费,一直循环,直到达到消息存活时间(30分钟),这条消息将转发到死信队列,再由死信队列的消费者消费,执行正常业务逻辑(上文提到的删除用户上传图片)。
生产者 producer/main.go
package main import ( "log" "github.com/rabbitmq/amqp091-go" ) const ( source = "amqp://yourUsername:yourPassword@yourHost:5672/yourVhost" // 中间队列配置 midExchange = "mid.exchange" midRouteKey = "mid.routeKey" midQueue = "mid.queue" // 死信队列配置 deadExchange = "dead.exchange" deadRouteKey = "dead.routeKey" deadQueue = "dead.queue" // 延时任务时间 毫秒 taskDelayMs = 30 * 60 * 1000 ) func newConn() *amqp091.Connection { conn, err := amqp091.Dial(source) if err != nil { log.Fatal(err) } return conn } func declareQueues(conn *amqp091.Connection) { declareMidQueue(conn) declareDeadQueue(conn) } func declareMidQueue(conn *amqp091.Connection) { ch, err := conn.Channel() if err != nil { log.Fatal(err) } defer ch.Close() err = ch.ExchangeDeclare( midExchange, amqp091.ExchangeDirect, true, false, false, false, nil, ) if err != nil { log.Fatal(err) } _, err = ch.QueueDeclare( midQueue, true, false, false, false, amqp091.Table{ // 申明消息过期,到死信exchange routingKey "x-message-ttl" : taskDelayMs, "x-dead-letter-exchange" : deadExchange, "x-dead-letter-routing-key" : deadRouteKey, }, ) if err != nil { log.Fatal(err) } err = ch.QueueBind( midQueue, midRouteKey, midExchange, false, nil, ) if err != nil { log.Fatal(err) } } func declareDeadQueue(conn *amqp091.Connection) { ch, err := conn.Channel() if err != nil { log.Fatal(err) } defer ch.Close() err = ch.ExchangeDeclare( deadExchange, amqp091.ExchangeDirect, true, false, false, false, nil, ) if err != nil { log.Fatal(err) } _, err = ch.QueueDeclare( deadQueue, true, false, false, false, nil, ) if err != nil { log.Fatal(err) } err = ch.QueueBind( deadQueue, deadRouteKey, deadExchange, false, nil, ) if err != nil { log.Fatal(err) } } func publish(conn *amqp091.Connection, msg []byte) { ch, err := conn.Channel() if err != nil { log.Fatal(err) } defer ch.Close() // 开启生产者确认,保证消息成功发送到交换机后回执,确保消息可靠 err = ch.Confirm(false) confirms := ch.NotifyPublish(make( chan amqp091.Confirmation, 1)) err = ch.Publish( midExchange, midRouteKey, false, false, amqp091.Publishing{ ContentType: "text/plain" , DeliveryMode: amqp091.Persistent, Body: msg, //Expiration: "1000", // 1000毫秒,如果队列和消息同时设置了ttl,则取较小的那个作为ttl。 }, ) if err != nil { log.Fatal(err) } <-confirms } func main() { conn := newConn() defer conn.Close() // 申明队列只需要项目启动时申明一次,不需要每次发消息都申明 declareQueues(conn) publish(conn, []byte( "hello rabbitmq" )) }
中间队列消费者midconsumer/main.go
package main import ( "log" "time" "github.com/rabbitmq/amqp091-go" ) const ( source = "amqp://yourUsername:yourPassword@yourHost:5672/yourVhost" // 中间队列配置 midExchange = "mid.exchange" midRouteKey = "mid.routeKey" midQueue = "mid.queue" // 死信队列配置 deadExchange = "dead.exchange" deadRouteKey = "dead.routeKey" deadQueue = "dead.queue" // 延时任务时间 毫秒 taskDelayMs = 30 * 60 * 1000 ) func newConn() *amqp091.Connection { conn, err := amqp091.Dial(source) if err != nil { log.Fatal(err) } return conn } func consume(conn *amqp091.Connection) { ch, err := conn.Channel() if err != nil { log.Fatal(err) } defer ch.Close() msgs, err := ch.Consume( midQueue, // queue "" , // consumer false, // auto-ack false, // exclusive false, // no-local false, // no-wait nil, // args ) if err != nil { log.Fatal(err) } for d := range msgs { go func (delivery *amqp091.Delivery) { // d.Reject(true) 会拒绝消费、保持在队列中,拿到刚拒绝的消息继续拒绝消费、保持在队列中,如此往复, // 直到到达延时时间,如果不执行time.Sleep,频繁调用rabbitmq reject和consume,导致cpu占用过高和rabbitmq负载高 // 这里使用time.Sleep牺牲一定延时时间精度 time.Sleep(time.Millisecond * 1000) d.Reject(true) }(&d) } } func main() { conn := newConn() defer conn.Close() var forever chan struct {} consume(conn) <-forever }
死信消费者deadconsumer/main.go
package main import ( "log" "github.com/rabbitmq/amqp091-go" ) const ( source = "amqp://yourUsername:yourPassword@yourHost:5672/yourVhost" // 中间队列配置 midExchange = "mid.exchange" midRouteKey = "mid.routeKey" midQueue = "mid.queue" // 死信队列配置 deadExchange = "dead.exchange" deadRouteKey = "dead.routeKey" deadQueue = "dead.queue" // 延时任务时间 毫秒 taskDelayMs = 30 * 60 * 1000 ) func newConn() *amqp091.Connection { conn, err := amqp091.Dial(source) if err != nil { log.Fatal(err) } return conn } func consume(conn *amqp091.Connection) { ch, err := conn.Channel() if err != nil { log.Fatal(err) } defer ch.Close() msgs, err := ch.Consume( deadQueue, // queue "" , // consumer false, // auto-ack false, // exclusive false, // no-local false, // no-wait nil, // args ) if err != nil { log.Fatal(err) } for d := range msgs { go func (delivery *amqp091.Delivery) { if err := handleMsg(d.Body); err == nil { d.Ack(false) // 因为我们设置了autoAck false,这里要手动ack } }(&d) } } func handleMsg(msg []byte) error { log.Printf( "Received a message: %s\n" , msg) return nil } func main() { conn := newConn() defer conn.Close() var forever chan struct {} consume(conn) <-forever }
标签:false,err,nil,队列,RabbitMQ,golang,死信,amqp091,conn From: https://www.cnblogs.com/ALXPS/p/18278227