首页 > 其他分享 >rocketmq-client-go

rocketmq-client-go

时间:2023-06-15 11:55:21浏览次数:32  
标签:producer err fmt client go consumer rocketmq

关注几个配置项:

topic

groupName

tag

按需配置即可。

关于producer和consumer的入口启动略去,客户端层面,关于producer和consumer可以按照自己业务特点,进行配置。

以下为simple样例。

生产者

 1 package producer
 2 
 3 import (
 4     "context"
 5     "fmt"
 6     "github.com/apache/rocketmq-client-go/v2"
 7     "github.com/apache/rocketmq-client-go/v2/primitive"
 8     "github.com/apache/rocketmq-client-go/v2/producer"
 9     "os"
10 )
11 
12 func ProTagSimple() {
13     p, _ := rocketmq.NewProducer(
14         producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
15         producer.WithRetry(2),
16         producer.WithGroupName("testGroup"),
17     )
18     err := p.Start()
19     if err != nil {
20         fmt.Printf("start producer error: %s", err.Error())
21         os.Exit(1)
22     }
23     tags := []string{"TagA", "TagB", "TagC"}
24     for i := 0; i < 3; i++ {
25         tag := tags[i%3]
26         msg := primitive.NewMessage("test",
27             []byte("Hello RocketMQ Go Client!"))
28         msg.WithTag(tag)
29 
30         res, err := p.SendSync(context.Background(), msg)
31         if err != nil {
32             fmt.Printf("send message error: %s\n", err)
33         } else {
34             fmt.Printf("send message success: result=%s\n", res.String())
35         }
36     }
37     err = p.Shutdown()
38     if err != nil {
39         fmt.Printf("shutdown producer error: %s", err.Error())
40     }
41 }

消费者

 1 package consumer
 2 
 3 import (
 4     "context"
 5     "fmt"
 6     "github.com/apache/rocketmq-client-go/v2"
 7     "github.com/apache/rocketmq-client-go/v2/consumer"
 8     "github.com/apache/rocketmq-client-go/v2/primitive"
 9     "os"
10     "time"
11 )
12 
13 func ConTagSimple() {
14     c, _ := rocketmq.NewPushConsumer(
15         consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
16         consumer.WithGroupName("testGroup"),
17     )
18     selector := consumer.MessageSelector{
19         Type:       consumer.TAG,
20         Expression: "TagA || TagC",
21     }
22     err := c.Subscribe("test", selector, func(ctx context.Context,
23         msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
24         fmt.Printf("subscribe callback: %v \n", msgs)
25         return consumer.ConsumeSuccess, nil
26     })
27     if err != nil {
28         fmt.Println(err.Error())
29     }
30     err = c.Start()
31     if err != nil {
32         fmt.Println(err.Error())
33         os.Exit(-1)
34     }
35     time.Sleep(time.Minute * 5)
36     err = c.Shutdown()
37     if err != nil {
38         fmt.Printf("shutdown Consumer error: %s", err.Error())
39     }
40 }

 

标签:producer,err,fmt,client,go,consumer,rocketmq
From: https://www.cnblogs.com/supermarx/p/17482499.html

相关文章

  • docker-compose构建kratos微服务项目运行失败,提示:runtime/cgo: pthread_create failed
    这个问题网上解决方案较少,我们这边问题定位是docker-compose.yaml配置问题在配置文件中新增配置如下:privileged:true设置容器的权限为root 最后解决......
  • How to Render Django Form Manually
    DealingwithuserinputisaverycommontaskinanyWebapplicationorWebsite.ThestandardwaytodoitisthroughHTMLforms,wheretheuserinputsomedata,submitittotheserver,andthentheserverdoessomethingwithit.Now,thechancesareth......
  • Mongodb - org.springframework.dao.DuplicateKeyException
    首先明确场景为mongodb,此异常在进行mongodb的插入操作时抛出,插入的主键已经存在。衍生场景,使用upsert时抛出,此处的包括了$set和$setOnInsert由于upsert非原子操作,如果在多线程环境下:线程A和线程B同时对数据库未存在的记录record1进行upsert,有可能会出现两个线程都判断为应该进行......
  • 深入探讨go语言开发的技术特点及其应用
    一、引言Go语言,又称Golang,是Google于2007年发布的一种静态类型、编译型语言。它融合了众多语言的优点,具有高效、安全、简洁等特点,迅速在开发领域崭露头角。本文将深入探讨go语言开发的技术特点及其应用,以期为相关领域的研究和实践提供参考。二、Go语言的技术特点简洁高效:Go语言的语......
  • 云原生时代崛起的编程语言Go远程调用gRPC实战
    @目录概述定义背景特点四种服务方法实战环境配置proto文件简单RPCToken认证服务器流式RPC客户端流式RPC双向流式RPC概述定义gRPC官网地址https://grpc.io/源码release最新版本v1.55.1gRPC官网文档地址https://grpc.io/docs/gRPC源码地址https://github.com/grpc/grp......
  • django 更改了modules.py 数据库模型,但是 python3 manage.py makemigrations 提示无
    现象:明明改了modules.py文件。删了appname/migrations/下所有内容。而且也删除了django模型变更记录表django_migrations中appname项目的记录 原因:删多了: appname/migrations/下所有内容。__init__.py不能删,需要重新创建一个,否则识别不了包了  ......
  • Going Deeper with Convolutions-Inception网络
    以下内容是结合原文和参考翻译的一些总结。摘要:谷歌在ImageNet大规模视觉识别挑战赛2014(ILSVRC14)上提出了一种新网络,就是Inception。这个架构的主要特点是提高了网络内部计算资源的利用率。通过精心的手工设计,我们在增加了网络深度和广度的同时保持了计算预算不变。为了优化质量,......
  • mongodb
      ......
  • mongo聚合字符串类型的数字进行排序
    设置collationCollationcollation=Collation.of(Locale.CHINESE).numericOrdering(true);设置聚合选项Aggregationaggregation=Aggregation.newAggregation(Aggregation.match(orOperator),).withOptions(AggregationO......
  • go语言编写算法
    1、冒泡排序//冒泡排序a:=[]uint8{9,20,10,23,7,22,88,102}fori:=0;i<len(a);i++{fork:=i+1;k<(len(a)-i);k++{ifa[i]>a[k]{a[i],a[k]=a[k],a[i]}}......