生产者
package main
import (
"context"
"fmt"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/apache/rocketmq-client-go/v2/producer"
"time"
)
func main() {
// 消息消费失败重试两次
newProducer, err := rocketmq.NewProducer(producer.WithNameServer([]string{"192.168.252.128:9876"}), producer.WithRetry(2))
defer func(newProducer rocketmq.Producer) {
err := newProducer.Shutdown()
if err != nil {
panic("关闭producer失败")
}
}(newProducer)
if err != nil {
panic("生成producer失败")
}
if err = newProducer.Start(); err != nil {
panic("启动producer失败")
}
message := primitive.NewMessage("DelayTopic", []byte("一条延时消息"))
// WithDelayTimeLevel 设置要消耗的消息延迟时间。参考延迟等级定义:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
// 延迟等级从1开始,例如设置param level=1,则延迟时间为1s。
// 这里使用的是延时30s发送
message.WithDelayTimeLevel(4)
res, err := newProducer.SendSync(context.Background(), message)
if err != nil {
panic("消息发送失败" + err.Error())
}
nowStr := time.Now().Format("2006-01-02 15:04:05")
fmt.Printf("%s: 消息: %s发送成功 \n", nowStr, res.String())
}
消费者
package main
import (
"context"
"fmt"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/consumer"
"github.com/apache/rocketmq-client-go/v2/primitive"
"time"
)
func main() {
newPushConsumer, err := rocketmq.NewPushConsumer(consumer.WithNameServer([]string{"192.168.252.128:9876"}), consumer.WithGroupName("test"),)
defer func(newPushConsumer rocketmq.PushConsumer) {
err := newPushConsumer.Shutdown()
if err != nil {
panic("关闭consumer失败")
}
}(newPushConsumer)
err = newPushConsumer.Subscribe("DelayTopic", consumer.MessageSelector{}, func(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
for _, msg := range msgs {
nowStr := time.Now().Format("2006-01-02 15:04:05")
fmt.Printf("%s 读取到一条消息,消息内容: %s \n", nowStr, string(msg.Body))
}
return consumer.ConsumeSuccess, nil
})
if err != nil {
fmt.Println("读取消息失败")
}
if err = newPushConsumer.Start(); err != nil {
panic("启动consumer失败")
}
// 不能让主goroutine退出
time.Sleep(time.Hour)
}
标签:err,nil,producer,newProducer,Go,consumer,rocketmq,延迟
From: https://www.cnblogs.com/qcy-blog/p/18384053