首页 > 其他分享 >RocketMQ

RocketMQ

时间:2022-09-21 15:00:35浏览次数:41  
标签:opt store broker RocketMQ conf docker rocketmq

1、安装

1、Docker安装

1、单机部署

# 镜像拉取
docker pull foxiswho/rocketmq:server-4.3.2 
docker pull foxiswho/rocketmq:broker-4.3.2

# 挂载目录
mkdir -p /opt/docker/rocketmq/rmqserver01/logs
mkdir -p /opt/docker/rocketmq/rmqserver01/store
mkdir -p /opt/docker/rocketmq/rmqbroker01/logs
mkdir -p /opt/docker/rocketmq/rmqbroker01/store
mkdir -p /opt/docker/rocketmq/rmqbroker01/conf

Broker配置文件修改

vi /opt/docker/rocketmq/rmqbroker01/conf/broker.conf
# 命名服务器地址
namesrvAddr=172.17.88.204:9876
# 集群名称
brokerClusterName = DefaultCluster
# broker名称
brokerName = broker-a
# broker id
brokerId = 0
# 删除时间
deleteWhen = 04
fileReservedTime = 48
# 角色
brokerRole = ASYNC_MASTER
# 刷盘方式
flushDiskType = ASYNC_FLUSH
# 绑定IP
brokerIP1 = 172.17.88.204
# 绑定端口
listenPort=10911

启动

#启动nameserver容器 
docker run -d -p 9876:9876 --name rmqserver01 -e "JAVA_OPT_EXT=-server -Xms128m -Xmx128m -Xmn128m" -e "JAVA_OPTS=-Duser.home=/opt" -v /opt/docker/rocketmq/rmqserver01/logs:/opt/logs -v /opt/docker/rocketmq/rmqserver01/store:/opt/store foxiswho/rocketmq:server-4.3.2

# 启动broker,这里net是为了注册上nameserver的服务地址是可访问的
docker run -it -d -p 10911:10911 --name rmqbroker01 -e "JAVA_OPT_EXT=-server -Xms128m -Xmx128m -Xmn128m" -e "JAVA_OPTS=-Duser.home=/opt" -v /opt/docker/rocketmq/rmqbroker01/conf/broker.conf:/etc/rocketmq/broker.conf -v /opt/docker/rocketmq/rmqbroker01/logs:/opt/logs -v /opt/docker/rocketmq/rmqbroker01/store:/opt/store --privileged=true foxiswho/rocketmq:broker-4.3.2

管理工具安装

#创建并启动容器 
docker run -d -e "JAVA_OPTS=-Drocketmq.namesrv.addr=172.17.88.204:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false" -p 8082:8080 -t styletang/rocketmq-console-ng:1.0.0

2、集群部署

2个nameserver、2个master、两个slave,使用同步方式同步数据

mkdir -p /opt/docker/rocketmq/name-server01/logs
mkdir -p /opt/docker/rocketmq/name-server01/store
mkdir -p /opt/docker/rocketmq/name-server02/logs
mkdir -p /opt/docker/rocketmq/name-server02/store

mkdir -p /opt/docker/rocketmq/broker01-master/logs
mkdir -p /opt/docker/rocketmq/broker01-master/store
mkdir -p /opt/docker/rocketmq/broker01-master/conf
mkdir -p /opt/docker/rocketmq/broker01-slave/logs
mkdir -p /opt/docker/rocketmq/broker01-slave/store
mkdir -p /opt/docker/rocketmq/broker01-slave/conf
mkdir -p /opt/docker/rocketmq/broker02-master/logs
mkdir -p /opt/docker/rocketmq/broker02-master/store
mkdir -p /opt/docker/rocketmq/broker02-master/conf
mkdir -p /opt/docker/rocketmq/broker02-slave/logs
mkdir -p /opt/docker/rocketmq/broker02-slave/store
mkdir -p /opt/docker/rocketmq/broker02-slave/conf

修改两个Broker配置

Master1

vim /opt/docker/rocketmq/broker01-master/conf/broker.conf

# 命名服务器地址
namesrvAddr=172.17.88.204:9876;172.17.88.204:9877
# 集群名称
brokerClusterName = DefaultCluster
# broker名称
brokerName = broker-a
# broker id
brokerId = 0
# 删除时间
deleteWhen = 04
fileReservedTime = 48
# 同步方式
brokerRole = SYNC_MASTER
# 刷盘方式
flushDiskType = SYNC_FLUSH
# 绑定IP
brokerIP1 = 112.74.39.44
# 绑定端口
listenPort = 10911

Slave1

vim /opt/docker/rocketmq/broker01-slave/conf/broker.conf

# 命名服务器地址
namesrvAddr=172.17.88.204:9876;172.17.88.204:9877
# 集群名称
brokerClusterName = DefaultCluster
# broker名称
brokerName = broker-a
# broker id
brokerId = 1
# 删除时间
deleteWhen = 04
fileReservedTime = 48
# 角色
brokerRole = SLAVE
# 刷盘方式
flushDiskType = SYNC_FLUSH
# 绑定IP
brokerIP1 = 112.74.39.44
# 绑定端口
listenPort = 10912

Master2

vim /opt/docker/rocketmq/broker02-master/conf/broker.conf

# 命名服务器地址
namesrvAddr=172.17.88.204:9876;172.17.88.204:9877
# 集群名称
brokerClusterName = DefaultCluster
# broker名称
brokerName = broker-b
# broker id
brokerId = 0
# 删除时间
deleteWhen = 04
fileReservedTime = 48
# 同步方式
brokerRole = SYNC_MASTER
# 刷盘方式
flushDiskType = SYNC_FLUSH
# 绑定IP
brokerIP1 = 112.74.39.44
# 绑定端口
listenPort = 10913

Slave2

vim /opt/docker/rocketmq/broker02-slave/conf/broker.conf

# 命名服务器地址
namesrvAddr=172.17.88.204:9876;172.17.88.204:9877
# 集群名称
brokerClusterName = DefaultCluster
# broker名称
brokerName = broker-b
# broker id
brokerId = 1
# 删除时间
deleteWhen = 04
fileReservedTime = 48
# 角色
brokerRole = SLAVE
# 刷盘方式
flushDiskType = SYNC_FLUSH
# 绑定IP
brokerIP1 = 112.74.39.44
# 绑定端口
listenPort = 10914

启动容器

#启动nameserver容器 
docker run -d -p 9876:9876 --name name-server01 -e "JAVA_OPT_EXT=-server -Xms128m -Xmx128m -Xmn128m" -e "JAVA_OPTS=-Duser.home=/opt" -v /opt/docker/rocketmq/name-server01/logs:/opt/logs -v /opt/docker/rocketmq/name-server01/store:/opt/store foxiswho/rocketmq:server-4.3.2
docker run -d -p 9877:9876 --name name-server02 -e "JAVA_OPT_EXT=-server -Xms128m -Xmx128m -Xmn128m" -e "JAVA_OPTS=-Duser.home=/opt" -v /opt/docker/rocketmq/name-server02/logs:/opt/logs -v /opt/docker/rocketmq/name-server02/store:/opt/store foxiswho/rocketmq:server-4.3.2

# 启动broker,双主双从

docker run -it -d -p 10911:10911 --name broker-master01 -e "JAVA_OPT_EXT=-server -Xms128m -Xmx128m -Xmn128m" -e "JAVA_OPTS=-Duser.home=/opt" -v /opt/docker/rocketmq/broker01-master/conf/broker.conf:/etc/rocketmq/broker.conf -v /opt/docker/rocketmq/broker01-master/logs:/opt/logs -v /opt/docker/rocketmq/broker01-master/store:/opt/store --privileged=true foxiswho/rocketmq:broker-4.3.2

docker run -it -d -p 10912:10912 --name broker-slave01 -e "JAVA_OPT_EXT=-server -Xms128m -Xmx128m -Xmn128m" -e "JAVA_OPTS=-Duser.home=/opt" -v /opt/docker/rocketmq/broker01-slave/conf/broker.conf:/etc/rocketmq/broker.conf -v /opt/docker/rocketmq/broker01-slave/logs:/opt/logs -v /opt/docker/rocketmq/broker01-slave/store:/opt/store --privileged=true foxiswho/rocketmq:broker-4.3.2

docker run -it -d -p 10913:10913 --name broker-master02 -e "JAVA_OPT_EXT=-server -Xms128m -Xmx128m -Xmn128m" -e "JAVA_OPTS=-Duser.home=/opt" -v /opt/docker/rocketmq/broker02-master/conf/broker.conf:/etc/rocketmq/broker.conf -v /opt/docker/rocketmq/broker02-master/logs:/opt/logs -v /opt/docker/rocketmq/broker02-master/store:/opt/store --privileged=true foxiswho/rocketmq:broker-4.3.2

docker run -it -d -p 10914:10914 --name broker-slave02 -e "JAVA_OPT_EXT=-server -Xms128m -Xmx128m -Xmn128m" -e "JAVA_OPTS=-Duser.home=/opt" -v /opt/docker/rocketmq/broker02-slave/conf/broker.conf:/etc/rocketmq/broker.conf -v /opt/docker/rocketmq/broker02-slave/logs:/opt/logs -v /opt/docker/rocketmq/broker02-slave/store:/opt/store --privileged=true foxiswho/rocketmq:broker-4.3.2

# 启动dashbord
docker run -d -p 8082:8080 --name rocketma-console -e "JAVA_OPTS=-Drocketmq.namesrv.addr=172.17.88.204:9876;172.17.88.204:9877 -Dcom.rocketmq.sendMessageWithVIPChannel=false" styletang/rocketmq-console-ng:1.0.0

2、快速开始

1、测试收发

# 快速向TopTest发送1000条消息
export NAMESRV_ADDR=localhost:9876
./tools.sh org.apache.rocketmq.example.quickstart.Producer

# 消费消息
export NAMESRV_ADDR=localhost:9876
./tools.sh org.apache.rocketmq.example.quickstart.Consumer

一些问题

RocketMQ的结构

基础架构分为Namespace和Broker

Broker下有多个Topic,每个Topic下有多个队列

生产者和发送者收发消息的时候可以指定Tag和队列

RocketMQ的存储结构

消息的相关信息存储到commitlog文件中,为了加速查询的速度,会在ConsumerQueue和Index目录下存放消息的offset;

consumerqueue存放的是消费组的索引,Index存放时间等其它条件的索引;

消息消费时,先根据消费组和Topic去consumerQueue中找到对应的offset,然后去commitlog中读取。

RocketMQ的负载均衡

集群模式默认就是负载均衡的,当有多个消费者(同一个消费组)时,则每个consumer平均分配consumer queue;

还有一种时环状分流的算法,即在broker基础上再进行平均分配;

消费者数量要小于Queue的数量;

RocketMQ中的消息重试

顺序消息:

顺序消息,单一消费者,消费失败时,会进行重试导致阻塞,需要监控并处理失败的情况。

顺序消息将无限重试,是否是顺序消息取决于消费者端的设置。

无序消息:

集群模式重试,广播模式失败不再重试;

重试时间不断递增,最多16次;

消费时返回null,抛出异常,返回reconsumer都将导致重试;

重试次数在消费者端进行设置(对整个消费者组生效),传入Properties,超过16次,每次等待2小时后重试。

RocketMQ的Tag

消费组消费时,如果对应消息不符合消费者的tag,则会跳过该消息,并且将offset移动

意味着后面消费组改了tag过滤规则,也无法读取到前面符合规则的消息了,因为当前的offset已经是最新的了。

RocketMQ的offset

RocketMQ是以消费组的方式进行消费的,首先消息通过生产者发送到对应的Broker下的队列中;

消费者注册上来指定消费者组,Broker存储指定消费组对应Topic下的每个队列的偏移量,存储位置如下

vim ${user.home}/store/config/consumerOffset.json

格式如下:

{
  "offsetTable":{
    "zanpocc@base_consumer_group":{0:0,1:0,2:5,3:0,4:0,5:0,6:0,7:0,8:0,9:0,10:0,11:0,12:0,13:0,14:0,15:0},
    "%RETRY%order_consumer_group@order_consumer_group":{0:0},
    "test_topic@base_consumer_group":{0:8,1:7,2:6,3:4},
    "zanpocc@order_consumer_group":{0:0,1:0,2:5,3:0,4:3,5:0,6:4,7:0,8:3,9:0,10:4,11:0,12:3,13:3,14:2,15:2},
    "%RETRY%base_consumer_group@base_consumer_group":{0:0}
  }
}

// 即
"offsetTable":{
  "topic名称@消费组名称":{队列ID:偏移}
}

RocketMQ中的死信队列

当消息重试次数超过限制后,消息进入死信队列中;

死信队列对应的是一个消费者组。消费者组的所有重试失败消息都在这个队列,与Topic无关;

RocketMQ的事物消息

半消息会同步发送,发送成功后会运行本地事物,本地事物执行成功后MQ回调检查本地事物,回调检查成功后提交消息。

幂等性

使用业务主键等唯一标识,判断消息是否重复消费

线程安全

RocketMQ的生产者和消费者客户端对象是线程安全的

标签:opt,store,broker,RocketMQ,conf,docker,rocketmq
From: https://www.cnblogs.com/zanpocc/p/16715592.html

相关文章

  • 阿里云基于全新 RocketMQ 5.0 内核的落地实践
    简介: 本篇文章的核心就消息架构以及产品能力的云原生化,介绍了阿里云是如何基于全新的RocketMQ5.0内核做出自己的判断和演进,以及如何适配越来越多的企业客户在技术和能......
  • 【RocketMQ 课程笔记】7.RocketMQ高可用方案
    RocketMQ高可用消息生产消费流程​ Broker即MQ服务器;​ NameServer可理解为注册中心。Broker主挂了的情况Broker主从都挂了的情况Broker双主挂了的情......
  • 【RocketMQ 课程笔记】11.RocketMQ消息发送之普通消息
    RocketMQ消息发送之普通消息架构拓扑NameServer:192.168.31.103Master:192.168.31.105Slave:192.168.31.111执行流程Master与Slave启动向NameServer注册生产者Prod......
  • rocketMQ客户端和nameService、broker之间的信息交互
    客户端(包含生产者和消费者)定时任务里updateTopicRouteInfoFromNameServer方法,定时向nameService获取topic(当前客户端所包含的所有消费者者消费的和生产者要发送的)的信......
  • RocketMQ实战与原理解析-杨开元.pdf
    这是一本学习RocketMQ实战与实现原理的非常好的资料,内容言简意赅,非常适合初学者和对RocketMQ有一定使用经验的人,能够快速从全局层面掌握RocketMQ设计思想与核心实现。点击......
  • 读完 RocketMQ 源码,我学会了如何优雅的创建线程
    RocketMQ是一款开源的分布式消息系统,基于高可用分布式集群技术,提供低延时、高可靠的消息发布与订阅服务。这篇文章,笔者整理了RocketMQ源码中创建线程的几点技巧,希望大......
  • RocketMq使用过程中问题场景和解决方案
    MQ使用过程中可能出现的问题以及解决方案一、MQ如何避免消息堆积的问题:1)产生背景:producer发送消息的速率远大于consumer消费消息的速率,从而导致消息堆积在mq服务端中;2)......
  • 十问 RocketMQ:十年再出发,到底有何不同?
    背景作为一种实时数据的处理平台,消息系统的发展跟业务架构的变迁一直息息相关,那么我们可以透过业务架构的变化来看消息系统的发展历程和未来趋势。经过十多年的发展,Rocket......
  • RocketMQ:RocketMQ常见面试题整理
    RocketMQ常见面试题整理MQ优缺点:优点:异步;解耦;削峰。RocketMQ默认端口号:9876。RocketMQ三大功能:缺点:系统可用性降低;系统复杂性提高;存在消息(数据)一致性问题。消息可靠......
  • RocketMQ
    一、RocketMQ整体架构设计整体的架构设计主要分为四大部分,分别是:Producer(生产者)、Consumer(消费者)、Broker(服务器)、NameServer(注册中心)。 NameServer Na......