首页 > 其他分享 >Go使用rocketmq实现延迟消息

Go使用rocketmq实现延迟消息

时间:2024-08-28 10:17:04浏览次数:9  
标签:err nil producer newProducer Go consumer rocketmq 延迟

生产者

package main

import (
	"context"
	"fmt"
	"github.com/apache/rocketmq-client-go/v2"
	"github.com/apache/rocketmq-client-go/v2/primitive"
	"github.com/apache/rocketmq-client-go/v2/producer"
	"time"
)

func main() {
	// 消息消费失败重试两次
	newProducer, err := rocketmq.NewProducer(producer.WithNameServer([]string{"192.168.252.128:9876"}), producer.WithRetry(2))

	defer func(newProducer rocketmq.Producer) {
		err := newProducer.Shutdown()
		if err != nil {
			panic("关闭producer失败")
		}
	}(newProducer)
	if err != nil {
		panic("生成producer失败")
	}
	if err = newProducer.Start(); err != nil {
		panic("启动producer失败")
	}
	message := primitive.NewMessage("DelayTopic", []byte("一条延时消息"))
	// WithDelayTimeLevel 设置要消耗的消息延迟时间。参考延迟等级定义:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
	// 延迟等级从1开始,例如设置param level=1,则延迟时间为1s。
	// 这里使用的是延时30s发送
	message.WithDelayTimeLevel(4)
	res, err := newProducer.SendSync(context.Background(), message)
	if err != nil {
		panic("消息发送失败" + err.Error())
	}
	nowStr := time.Now().Format("2006-01-02 15:04:05")
	fmt.Printf("%s: 消息: %s发送成功 \n", nowStr, res.String())
}

消费者

package main

import (
	"context"
	"fmt"
	"github.com/apache/rocketmq-client-go/v2"
	"github.com/apache/rocketmq-client-go/v2/consumer"
	"github.com/apache/rocketmq-client-go/v2/primitive"
	"time"
)

func main() {
	newPushConsumer, err := rocketmq.NewPushConsumer(consumer.WithNameServer([]string{"192.168.252.128:9876"}), consumer.WithGroupName("test"),)
	defer func(newPushConsumer rocketmq.PushConsumer) {
		err := newPushConsumer.Shutdown()
		if err != nil {
			panic("关闭consumer失败")
		}
	}(newPushConsumer)

	err = newPushConsumer.Subscribe("DelayTopic", consumer.MessageSelector{}, func(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
		for _, msg := range msgs {
			nowStr := time.Now().Format("2006-01-02 15:04:05")
			fmt.Printf("%s 读取到一条消息,消息内容: %s \n", nowStr, string(msg.Body))
		}
		return consumer.ConsumeSuccess, nil
	})

	if err != nil {
		fmt.Println("读取消息失败")
	}
	if err = newPushConsumer.Start(); err != nil {
		panic("启动consumer失败")
	}
	// 不能让主goroutine退出
	time.Sleep(time.Hour)
}

标签:err,nil,producer,newProducer,Go,consumer,rocketmq,延迟
From: https://www.cnblogs.com/qcy-blog/p/18384053

相关文章

  • Go实现大文件分片上传
    index.html<!DOCTYPEhtml><htmllang="en"><head><metacharset="UTF-8"><title>uploadfile</title></head><bodyid="app"><h1style="text-align:center"&......
  • C. Turtle and Good Pairs
    https://codeforces.com/contest/2003/problem/C题意:。。。思路:如果要使满足条件的有序对最多,那么首先如果两个字符相等,那么无论如何排列,最终的贡献值都不会变。再看字符不相等的情况,假如有aabbcc,那么abcabc总是优于aabbcc,因为如果一个字符出现了多次,那么像aab,bcc这种就会没......
  • Apache RocketMQ 中文社区全新升级丨阿里云云原生 7 月产品月报
    云原生月度动态云原生是企业数字创新的最短路径。《阿里云云原生每月动态》,从趋势热点、产品新功能、服务客户、开源与开发者动态等方面,为企业提供数字化的路径与指南。趋势热点......
  • 调整 MongoDB 以适应批量加载
    将几十亿条记录加载到MongoDB中,开始时加载速度还不错,但一段时间后就开始明显放缓。通过观察指标进行了一些研究,发现随着时间的推移,WiredTiger的检查点时间越来越长。检查点时间从最初的几秒到后面的几分钟。在检查点期间,性能基本上是直线下降: WiredTiger检查点从MongoDB......
  • Dijkstra's algorithm All In One
    Dijkstra'salgorithmAllInOne迪杰斯特拉算法DijkstraDijkstra'salgorithm(/ˈdaɪkstrəz/DYKE-strəz)isanalgorithmforfindingtheshortestpathsbetweennodesinaweightedgraph,whichmayrepresent,forexample,roadnetworks.Dijkstra算法是一种......
  • RocketMQ在基金大厂的分布式事务实践
    1行业背景基金公司核心业务主要分为:投研线业务,即投资管理和行业研究业务,体现基金公司核心竞争力市场线业务,即基金公司利用自身渠道和市场能力完成基金销售并做好客户服务随互联网技术发展,基金销售渠道更加多元化,线上成为基金销售重要渠道。相比传统基金客户,线上渠道具有客......
  • 从0到1部署django项目至阿里云服务器
    1.前言最近学院一个志愿服务项目要做个网站展示,并且要求部署上线。趁着学校报销,我租了个阿里云服务器爽一把hhh。这篇文章大概写下我从买服务器到部署上线的历程以及报错的解决,给大家分享的同时,我自己也相当于纪念一下做个笔记。2.部署历程 阿里云配着学生认证,有个一年的基......
  • 【喀什大学支持 | 工商管理与数据科学相结合的主题 | EI ,Scopus, CNKI,Google Scholar
    重要信息大会网站:https://ais.cn/u/uuuMFr【投稿参会】截稿时间:以官网信息为准大会时间:2024年10月25-27日大会地点:中国-重庆提交检索:EICompendex,Scopus,CNKI(知网检索快速稳定),GoogleScholar*现场可领取会议资料(如纪念品、参会证书等),【click】投稿优惠、优先审核!......
  • Go基础语法知识整理
    Go基础语法知识go入门go历史(简单了解)go优势强大的编译能力、媲美C的执行速度、并发效率极高、异步语言快速写同步程序、严格的语法下载及配置(已配置,带过)开发工具,推荐goland,电脑没装,先用vscode变量声明格式var和:=、驼峰标识、值变量自动初始化赋对应零值,......
  • golang新特性:泛型
    泛型Go的泛型(或者或类型形参)目前可使用在3个地方泛型类型-类型定义中带类型形参的类型泛型receiver-泛型类型的receiver泛型函数-带类型形参的函数为了实现泛型,Go引入了一些新的概念:类型形参类型形参列表类型实参类型约束实例化-泛型类型不能直接使用,要......