首页 > 其他分享 >Go 使用gRPC协议操作RocketMQ 5.3

Go 使用gRPC协议操作RocketMQ 5.3

时间:2024-08-23 09:38:26浏览次数:13  
标签:5.3 err gRPC fmt golang apache time Go rocketmq

docker-compose安装RocketMQ

docker-compose.yml

version: '3.8'
services:
  namesrv:
    image: apache/rocketmq:5.3.0
    container_name: rmqnamesrv
    ports:
      - 9876:9876
    networks:
      - rocketmq
    command: sh mqnamesrv
  broker:
    image: apache/rocketmq:5.3.0
    container_name: rmqbroker
    ports:
      - 10909:10909
      - 10911:10911
      - 10912:10912
    environment:
      - NAMESRV_ADDR=rmqnamesrv:9876
    depends_on:
      - namesrv
    networks:
      - rocketmq
    command: sh mqbroker
  proxy:
    image: apache/rocketmq:5.3.0
    container_name: rmqproxy
    networks:
      - rocketmq
    depends_on:
      - broker
      - namesrv
    ports:
      - 8080:8080
      - 8081:8081
    restart: on-failure
    environment:
      - NAMESRV_ADDR=rmqnamesrv:9876
    command: sh mqproxy
networks:
  rocketmq:
    driver: bridge

docker-compose up -d

进入rmqbroker,创建topic,group

docker exec -it rmqbroker /bin/bash

简单消息类型。更多消息类型见官网 https://rocketmq.apache.org/zh/docs/featureBehavior/01normalmessage/

topic

sh ./mqadmin updatetopic -n 192.168.252.128:9876 -c DefaultCluster -t demo_topic 

group

sh ./mqadmin updateSubGroup -n 192.168.252.128:9876 -c DefaultCluster -g demo_group

rpc协议端口为8081

go生产者producer

package main

import (
	"context"
	"fmt"
	"log"
	"os"
	"strconv"
	"time"

	"github.com/apache/rocketmq-clients/golang/v5"
	"github.com/apache/rocketmq-clients/golang/v5/credentials"
)

const (
	ConsumerGroup = "demo_group"
	Topic         = "demo_topic"
	Endpoint      = "192.168.252.128:8081"
	AccessKey     = "xxxxxx"
	SecretKey     = "xxxxxx"
)

func main() {
	os.Setenv("mq.consoleAppender.enabled", "true")
	golang.ResetLogger()

	// In most case, you don't need to create many producers, singleton pattern is more recommended.
	producer, err := golang.NewProducer(&golang.Config{
		Endpoint: Endpoint,
		Credentials: &credentials.SessionCredentials{
			AccessKey:    AccessKey,
			AccessSecret: SecretKey,
		},
	},
		golang.WithTopics(Topic),
	)
	if err != nil {
		log.Fatal(err)
	}
	// start producer
	err = producer.Start()
	if err != nil {
		log.Fatal(err)
	}
	// graceful stop producer
	defer producer.GracefulStop()

	for i := 0; i < 5; i++ {
		// new a message
		msg := &golang.Message{
			Topic: Topic,
			Body:  []byte("我是消息内容 : " + strconv.Itoa(i+10)),
		}
		// set keys and tag
		msg.SetKeys("a", "b")
		msg.SetTag("ab")
		// send message in sync
		resp, err := producer.Send(context.TODO(), msg)
		if err != nil {
			log.Fatal(err)
		}
		for i := 0; i < len(resp); i++ {
			fmt.Printf("%#v\n", resp[i])
		}
		// wait a moment
		time.Sleep(time.Second * 1)
	}
}

消费者 consumer

package main

import (
	"context"
	"fmt"
	"log"
	"os"
	"time"

	"github.com/apache/rocketmq-clients/golang/v5"
	"github.com/apache/rocketmq-clients/golang/v5/credentials"
)

const (
	ConsumerGroup = "demo_group"
	Topic         = "demo_topic"
	Endpoint      = "192.168.252.128:8081"
	AccessKey     = "xxxxxx"
	SecretKey     = "xxxxxx"
)

var (
	// maximum waiting time for receive func
	awaitDuration = time.Second * 5
	// maximum number of messages received at one time
	maxMessageNum int32 = 16
	// invisibleDuration should > 20s
	invisibleDuration = time.Second * 20
	// receive messages in a loop
)

func main() {
	// log to console
	os.Setenv("mq.consoleAppender.enabled", "true")
	golang.ResetLogger()
	// In most case, you don't need to create many consumers, singleton pattern is more recommended.
	simpleConsumer, err := golang.NewSimpleConsumer(&golang.Config{
		Endpoint:      Endpoint,
		ConsumerGroup: ConsumerGroup,
		Credentials: &credentials.SessionCredentials{
			AccessKey:    AccessKey,
			AccessSecret: SecretKey,
		},
	},
		golang.WithAwaitDuration(awaitDuration),
		golang.WithSubscriptionExpressions(map[string]*golang.FilterExpression{
			Topic: golang.SUB_ALL,
		}),
	)
	if err != nil {
		log.Fatal(err)
	}
	// start simpleConsumer
	err = simpleConsumer.Start()
	if err != nil {
		log.Fatal(err)
	}
	// graceful stop simpleConsumer
	defer simpleConsumer.GracefulStop()

	go func() {
		for {
			fmt.Println("开始接受消息")
			mvs, err := simpleConsumer.Receive(context.TODO(), maxMessageNum, invisibleDuration)
			if err != nil {
				fmt.Println(err)
			}
			fmt.Println(len(mvs))
			// ack message
			for _, mv := range mvs {
				//消息内容
				fmt.Println(string(mv.GetBody()))
				simpleConsumer.Ack(context.TODO(), mv)
				fmt.Println(mv)
			}
			fmt.Println("wait a moment")
			fmt.Println()
			time.Sleep(time.Second * 3)
		}
	}()
	// run for a while
	time.Sleep(time.Minute)
}

标签:5.3,err,gRPC,fmt,golang,apache,time,Go,rocketmq
From: https://www.cnblogs.com/qcy-blog/p/18375277

相关文章

  • 在Spring Boot项目中集成Geth(Go Ethereum)
    在SpringBoot项目中集成Geth(GoEthereum)客户端,通常是为了与以太坊区块链进行交互。以下是一些基本的步骤和考虑因素,帮助你在SpringBoot应用程序中集成Geth。安装Geth首先,你需要在你的机器上安装Geth。你可以从官方网站下载适合你操作系统的版本。启动Geth安装完成后......
  • Study Plan For Algorithms - Part8
    1.三数之和题目链接:https://leetcode.cn/problems/3sum/给定一个整数数组nums,判断是否存在三元组[nums[i],nums[j],nums[k]]满足i!=j、i!=k且j!=k,同时还满足nums[i]+nums[j]+nums[k]==0。返回所有和为0且不重复的三元组。classSolution:deft......
  • CF163E e-Government 题解
    题目传送门前置知识AC自动机|树状数组解法一次性将所有模式串加入AC自动机,然后处理加入和删除,考虑单次操作对答案的贡献。因为模式串\(T\)在文本串\(S\)中出现的次数之和等价于\(T\)在\(S\)的所有前缀中作为后缀出现的次数之和。这就很和\(fail\)树上跳到根节......
  • Go语言基础
    基础使用//hello.go//packagedeclarationpackagemain//importpackageimport"fmt"//functionfuncadd(a,bint)int{returna+b}//globalvariablevargint=100funcmain(){a,b:=1,2res:=add(a,b)fmt.Println("a=&......
  • Kubernetes: client-go 源码剖析(一)
    kubernetes:client-go 系列文章:Kubernetes:client-go源码剖析(一)Kubernetes:client-go源码剖析(二)0.前言在看 kube-scheduler 组件的过程中遇到了 kube-scheduler 对于 client-go 的调用,泛泛的理解调用过程总有种隔靴搔痒的感觉,于是调转头先把 client-go 理清楚......
  • BanG Dream! It's MyGO!!!!!
    BanGDream!It'sMyGO!!!!!题目描述在“BanGDream!It'sMyGO!!!”的世界里,各个乐团的演出和排练场地像星星一样被连接在一起,形成了一张美丽的网络图。每个乐团都有自己独特的演出场地和练习室,这些地点通过各种路径互相连接,组成了一张复杂的图谱。koala作为一名热爱音乐的乐......
  • candence allego 差分信号设置
    一、设置差分对1、Logic→AssignDifferential;2、依次点击要建立差分对的走线,并在DiffPairname处给差分对命名。二、差分规则Setup→Constraint→ConstraintManager,进入线束约束管理器,在线束约束管理器界面,左侧有一个WorksheetSelector,在WorksheetSelector里选择Phys......
  • 基于django+vue农产品在线管理系统【开题报告+程序+论文】计算机毕设
    本系统(程序+源码+数据库+调试部署+开发环境)带论文文档1万字以上,文末可获取,系统界面在最后面。系统程序文件列表开题报告内容研究背景随着农业现代化的推进和互联网技术的飞速发展,农产品销售与管理模式正经历着深刻的变革。传统农产品市场面临着信息不对称、流通效率低下、......
  • 基于django+vue农产品销售与管理系统【开题报告+程序+论文】计算机毕设
    本系统(程序+源码+数据库+调试部署+开发环境)带论文文档1万字以上,文末可获取,系统界面在最后面。系统程序文件列表开题报告内容研究背景随着农业生产的不断发展和消费者需求的日益多样化,农产品销售与管理面临着新的挑战。传统农产品销售模式往往存在信息不对称、流通环节多、......
  • Go语言基础--函数基本介绍及包
    Go语言(也称为Golang)的函数是执行特定任务的代码块。它们允许你重用代码,让程序更加模块化和易于维护。在Go中,函数可以接收参数(输入值)并返回结果(输出值)。下面详细解释Go语言中的函数:函数定义函数通过 func 关键字定义。基本结构如下:funcfunctionName(parameterList)......