首页 > 其他分享 >Go使用rocketmq实现延迟消息

Go使用rocketmq实现延迟消息

时间:2024-08-28 10:17:04浏览次数:14  
标签:err nil producer newProducer Go consumer rocketmq 延迟

生产者

package main

import (
	"context"
	"fmt"
	"github.com/apache/rocketmq-client-go/v2"
	"github.com/apache/rocketmq-client-go/v2/primitive"
	"github.com/apache/rocketmq-client-go/v2/producer"
	"time"
)

func main() {
	// 消息消费失败重试两次
	newProducer, err := rocketmq.NewProducer(producer.WithNameServer([]string{"192.168.252.128:9876"}), producer.WithRetry(2))

	defer func(newProducer rocketmq.Producer) {
		err := newProducer.Shutdown()
		if err != nil {
			panic("关闭producer失败")
		}
	}(newProducer)
	if err != nil {
		panic("生成producer失败")
	}
	if err = newProducer.Start(); err != nil {
		panic("启动producer失败")
	}
	message := primitive.NewMessage("DelayTopic", []byte("一条延时消息"))
	// WithDelayTimeLevel 设置要消耗的消息延迟时间。参考延迟等级定义:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
	// 延迟等级从1开始,例如设置param level=1,则延迟时间为1s。
	// 这里使用的是延时30s发送
	message.WithDelayTimeLevel(4)
	res, err := newProducer.SendSync(context.Background(), message)
	if err != nil {
		panic("消息发送失败" + err.Error())
	}
	nowStr := time.Now().Format("2006-01-02 15:04:05")
	fmt.Printf("%s: 消息: %s发送成功 \n", nowStr, res.String())
}

消费者

package main

import (
	"context"
	"fmt"
	"github.com/apache/rocketmq-client-go/v2"
	"github.com/apache/rocketmq-client-go/v2/consumer"
	"github.com/apache/rocketmq-client-go/v2/primitive"
	"time"
)

func main() {
	newPushConsumer, err := rocketmq.NewPushConsumer(consumer.WithNameServer([]string{"192.168.252.128:9876"}), consumer.WithGroupName("test"),)
	defer func(newPushConsumer rocketmq.PushConsumer) {
		err := newPushConsumer.Shutdown()
		if err != nil {
			panic("关闭consumer失败")
		}
	}(newPushConsumer)

	err = newPushConsumer.Subscribe("DelayTopic", consumer.MessageSelector{}, func(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
		for _, msg := range msgs {
			nowStr := time.Now().Format("2006-01-02 15:04:05")
			fmt.Printf("%s 读取到一条消息,消息内容: %s \n", nowStr, string(msg.Body))
		}
		return consumer.ConsumeSuccess, nil
	})

	if err != nil {
		fmt.Println("读取消息失败")
	}
	if err = newPushConsumer.Start(); err != nil {
		panic("启动consumer失败")
	}
	// 不能让主goroutine退出
	time.Sleep(time.Hour)
}

标签:err,nil,producer,newProducer,Go,consumer,rocketmq,延迟
From: https://www.cnblogs.com/qcy-blog/p/18384053

相关文章

  • Go实现大文件分片上传
    index.html<!DOCTYPEhtml><htmllang="en"><head><metacharset="UTF-8"><title>uploadfile</title></head><bodyid="app"><h1style="text-align:center"&......
  • 调整 MongoDB 以适应批量加载
    将几十亿条记录加载到MongoDB中,开始时加载速度还不错,但一段时间后就开始明显放缓。通过观察指标进行了一些研究,发现随着时间的推移,WiredTiger的检查点时间越来越长。检查点时间从最初的几秒到后面的几分钟。在检查点期间,性能基本上是直线下降: WiredTiger检查点从MongoDB......
  • Dijkstra's algorithm All In One
    Dijkstra'salgorithmAllInOne迪杰斯特拉算法DijkstraDijkstra'salgorithm(/ˈdaɪkstrəz/DYKE-strəz)isanalgorithmforfindingtheshortestpathsbetweennodesinaweightedgraph,whichmayrepresent,forexample,roadnetworks.Dijkstra算法是一种......
  • RocketMQ在基金大厂的分布式事务实践
    1行业背景基金公司核心业务主要分为:投研线业务,即投资管理和行业研究业务,体现基金公司核心竞争力市场线业务,即基金公司利用自身渠道和市场能力完成基金销售并做好客户服务随互联网技术发展,基金销售渠道更加多元化,线上成为基金销售重要渠道。相比传统基金客户,线上渠道具有客......
  • 从0到1部署django项目至阿里云服务器
    1.前言最近学院一个志愿服务项目要做个网站展示,并且要求部署上线。趁着学校报销,我租了个阿里云服务器爽一把hhh。这篇文章大概写下我从买服务器到部署上线的历程以及报错的解决,给大家分享的同时,我自己也相当于纪念一下做个笔记。2.部署历程 阿里云配着学生认证,有个一年的基......
  • 【喀什大学支持 | 工商管理与数据科学相结合的主题 | EI ,Scopus, CNKI,Google Scholar
    重要信息大会网站:https://ais.cn/u/uuuMFr【投稿参会】截稿时间:以官网信息为准大会时间:2024年10月25-27日大会地点:中国-重庆提交检索:EICompendex,Scopus,CNKI(知网检索快速稳定),GoogleScholar*现场可领取会议资料(如纪念品、参会证书等),【click】投稿优惠、优先审核!......