关注几个配置项:
topic
groupName
tag
按需配置即可。
关于producer和consumer的入口启动略去,客户端层面,关于producer和consumer可以按照自己业务特点,进行配置。
以下为simple样例。
生产者
1 package producer 2 3 import ( 4 "context" 5 "fmt" 6 "github.com/apache/rocketmq-client-go/v2" 7 "github.com/apache/rocketmq-client-go/v2/primitive" 8 "github.com/apache/rocketmq-client-go/v2/producer" 9 "os" 10 ) 11 12 func ProTagSimple() { 13 p, _ := rocketmq.NewProducer( 14 producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})), 15 producer.WithRetry(2), 16 producer.WithGroupName("testGroup"), 17 ) 18 err := p.Start() 19 if err != nil { 20 fmt.Printf("start producer error: %s", err.Error()) 21 os.Exit(1) 22 } 23 tags := []string{"TagA", "TagB", "TagC"} 24 for i := 0; i < 3; i++ { 25 tag := tags[i%3] 26 msg := primitive.NewMessage("test", 27 []byte("Hello RocketMQ Go Client!")) 28 msg.WithTag(tag) 29 30 res, err := p.SendSync(context.Background(), msg) 31 if err != nil { 32 fmt.Printf("send message error: %s\n", err) 33 } else { 34 fmt.Printf("send message success: result=%s\n", res.String()) 35 } 36 } 37 err = p.Shutdown() 38 if err != nil { 39 fmt.Printf("shutdown producer error: %s", err.Error()) 40 } 41 }
消费者
1 package consumer 2 3 import ( 4 "context" 5 "fmt" 6 "github.com/apache/rocketmq-client-go/v2" 7 "github.com/apache/rocketmq-client-go/v2/consumer" 8 "github.com/apache/rocketmq-client-go/v2/primitive" 9 "os" 10 "time" 11 ) 12 13 func ConTagSimple() { 14 c, _ := rocketmq.NewPushConsumer( 15 consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})), 16 consumer.WithGroupName("testGroup"), 17 ) 18 selector := consumer.MessageSelector{ 19 Type: consumer.TAG, 20 Expression: "TagA || TagC", 21 } 22 err := c.Subscribe("test", selector, func(ctx context.Context, 23 msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) { 24 fmt.Printf("subscribe callback: %v \n", msgs) 25 return consumer.ConsumeSuccess, nil 26 }) 27 if err != nil { 28 fmt.Println(err.Error()) 29 } 30 err = c.Start() 31 if err != nil { 32 fmt.Println(err.Error()) 33 os.Exit(-1) 34 } 35 time.Sleep(time.Minute * 5) 36 err = c.Shutdown() 37 if err != nil { 38 fmt.Printf("shutdown Consumer error: %s", err.Error()) 39 } 40 }
标签:producer,err,fmt,client,go,consumer,rocketmq From: https://www.cnblogs.com/supermarx/p/17482499.html