服务端安装及配置
docker安装
docker pull rocketmqinc/rocketmq:4.4.0
指定版本号是为了后面确定配置文件的路径
启动namesrv
docker run -d -p 9876:9876 --name rocketmq-nameservice -e MAX_POSSIBLE_HEAP=100000000 rocketmqinc/rocketmq:4.4.0 sh mqnamesrv
运行成功执行mqnamesrv脚本
启动broker
broker.conf
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
# 如果是本地程序调用云主机 mq,这个需要设置成 云主机 IP
# 如果Docker环境需要设置成宿主机IP
brokerIP1 = 42.192.20.119
# 开启acl权限控制
aclEnable=true
plain_acl.yml
globalWhiteRemoteAddresses:
accounts:
- accessKey: admin
secretKey: szz123
whiteRemoteAddress:
admin: true
docker run -d -p 10911:10911 -p 10909:10909 -v /root/test_rocketmq/broker.conf:/opt/rocketmq-4.4.0/conf/broker.conf -v /root/test_rocketmq/plain_acl.yml:/opt/rocketmq-4.4.0/conf/plain_acl.yml --name rocketmq-broker --link rocketmq-nameservice:namesrv -e "NAMESRV_ADDR=namesrv:9876" -e "MAX_POSSIBLE_HEAP=200000000" rocketmqinc/rocketmq:4.4.0 sh mqbroker -c /opt/rocketmq-4.4.0/conf/broker.conf
--link可以用来链接2个容器,使得源容器(被链接的容器)和接收容器(主动去链接的容器)之间可以互相通信,并且接收容器可以获取源容器的一些数据,如源容器的环境变量。
启动broker遇到的问题
java.lang.Class cannot be cast to org.apache.rocketmq.acl.AccessValidator
登录用户名accessKey,长度必须大于6个字符。登录密码secretKey,长度也必须大于6个字符。
客户端报错
CODE: 1 DESC: org.apache.rocketmq.acl.common.AclException: [10015:signature-failed] unable to calculate a request signature. error=[10015:signature-failed] unable to calculate a request signature.
error=Algorithm HmacSHA1 not available, org.apache.rocketmq.acl.common.AclSigner.signAndBase64Encode(AclSigner.java:84)
缺少sunjce_provider.jar
docker exec -it -u root rocketmq-broker /bin/bash # 必须以root权限进入
find / -name sunjce_provider.jar
cp xxx/sunjce_provider.jar /opt/rocketmq-4.4.0/lib/ # 如果没有以root权限进入,这个就没有权限操作
exit
docker restart rocketmq-broker
又遇到了其他问题
CODE: 1 DESC: java.lang.NullPointerException, org.apache.rocketmq.acl.plain.PlainPermissionLoader.checkPerm(PlainPermissionLoader.java:131)
可能是因为rocketmq4.4.0版本对acl功能的支持还不完善 具体issue
安装更新的版本
docker run -d -p 9876:9876 --name rocketmq-nameservice -e MAX_POSSIBLE_HEAP=100000000 apache/rocketmq:4.9.2 sh mqnamesrv
docker run -d -p 10911:10911 -p 10909:10909 -v /root/test_rocketmq/broker.conf:/home/rocketmq/rocketmq-4.9.2/conf/broker.conf -v /root/test_rocketmq/plain_acl.yml:/home/rocketmq/rocketmq-4.9.2/conf/plain_acl.yml --name rocketmq-broker --link rocketmq-nameservice:namesrv -e "NAMESRV_ADDR=namesrv:9876" -e "MAX_POSSIBLE_HEAP=200000000" apache/rocketmq:4.9.2 sh mqbroker -c /home/rocketmq/rocketmq-4.9.2/conf/broker.conf
注意,这个镜像的rocketmq目录和上面的不一致,运行过程也没有上面出现的那些问题。
新版本broker启动失败
/home/rocketmq/rocketmq-4.9.2/bin/runbroker.sh: line 156: 28 Killed $JAVA ${JAVA_OPT} $@
原因:内存设置太大
docker run -d -p 10911:10911 -p 10909:10909 -e "JAVA_OPT_EXT=-Xmx128m -Xms128m" -v /root/test_rocketmq/broker.conf:/home/rocketmq/rocketmq-4.9.2/conf/broker.conf -v /root/test_rocketmq/plain_acl.yml:/home/rocketmq/rocketmq-4.9.2/conf/plain_acl.yml --name rocketmq-broker --link rocketmq-nameservice:namesrv -e "NAMESRV_ADDR=namesrv:9876" -e "MAX_POSSIBLE_HEAP=200000000" apache/rocketmq:4.9.2 sh mqbroker -c /home/rocketmq/rocketmq-4.9.2/conf/broker.conf
注意是 JAVA_OPT_EXT
安装控制台
docker pull styletang/rocketmq-console-ng
docker run -d -p 9999:8080 -e "JAVA_OPTS=-Drocketmq.config.namesrvAddr=ip:9876 -Drocketmq.config.isVIPChannel=false -Drocketmq.config.accessKey=my_admin -Drocketmq.config.secretKey=szz123456" --name rocketmq-console styletang/rocketmq-console-ng
通过 http://ip:9999 访问,注意,此镜像不支持acl权限控制,可能使用的rocketmq版本太低,所以使用其他镜像
docker pull apacherocketmq/rocketmq-dashboard
docker run -d -p 9999:8080 -e "JAVA_OPTS=-Drocketmq.config.namesrvAddr=ip:9876 -Drocketmq.config.isVIPChannel=false -Drocketmq.config.accessKey=my_admin -Drocketmq.config.secretKey=szz123456" --name rocketmq-console apacherocketmq/rocketmq-dashboard
也是通过 http://ip:9999 访问
客户端访问
使用java客户端发送及消费消息
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.9.4</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-acl</artifactId>
<version>4.9.4</version>
</dependency>
点击查看代码
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.remoting.RPCHook;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.TimeUnit;
public class TestRocketMq1 {
public static void main(String[] args) throws Exception {
testSyncSendMessage();
// testAsyncSendMessage()
// testSyncSendDelayMessage();
// testConcurrentlyConsumerMessage();
TimeUnit.SECONDS.sleep(3);
testOrderlyConsumerMessage();
}
/**
* 并发消费(非顺序)
*
* @throws Exception
*/
private static void testConcurrentlyConsumerMessage() throws Exception {
DefaultMQPushConsumer consumer = createConsumer();
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
String content = new String(msg.getBody(), StandardCharsets.UTF_8);
System.out.println("content:" + content + "," + msg);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
/**
* 顺序消费 先发送的消息先消费
*
* @throws Exception
*/
private static void testOrderlyConsumerMessage() throws Exception {
DefaultMQPushConsumer consumer = createConsumer();
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
for (MessageExt msg : msgs) {
String content = new String(msg.getBody(), StandardCharsets.UTF_8);
System.out.println("content:" + content + "," + msg);
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
}
/**
* 发送同步消息
*
* @throws Exception
*/
private static void testSyncSendMessage() throws Exception {
MQProducer producer = createProducer();
producer.start();
//第一个参数是topic,这是主题
//第二个参数是tags,这是可选参数,用于消费端过滤消息
//第三个参数是keys,这也是可选参数,如果有多个,用空格隔开。RocketMQ可以根据这些key快速检索到消息,相当于
//消息的索引,可以设置为消息的唯一编号(主 键)。
Message msg = new Message("test_topic", "TagA", "6666", "RocketMQ Test message".getBytes());
//SendResult是发送结果的封装,包括消息状态,消息id,选择的队列等等,只要不抛异常,就代表发送成功
SendResult sendResult = producer.send(msg);
System.out.println("sync sendResult: " + sendResult);
producer.shutdown();
}
/**
* 发送同步延迟消息
*
* @throws Exception
*/
private static void testSyncSendDelayMessage() throws Exception {
MQProducer producer = createProducer();
producer.start();
//第一个参数是topic,这是主题
//第二个参数是tags,这是可选参数,用于消费端过滤消息
//第三个参数是keys,这也是可选参数,如果有多个,用空格隔开。RocketMQ可以根据这些key快速检索到消息,相当于
//消息的索引,可以设置为消息的唯一编号(主 键)。
Message msg = new Message("test_topic", "TagA", "6666", "RocketMQ Test message".getBytes());
//delayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
//设置消息延迟级别等于0时,则该消息为非延迟消息。
//设置消息延迟级别大于等于1并且小于等于18时,消息延迟特定时间,如:设置消息延迟级别等于1,则延迟1s;设置消息延迟级别等于2,则延迟5s,以此类推。
//设置消息延迟级别大于18时,则该消息延迟级别为18,如:设置消息延迟级别等于20,则延迟2h。
//设置了此延迟级别就是延迟消息了
msg.setDelayTimeLevel(2);
//SendResult是发送结果的封装,包括消息状态,消息id,选择的队列等等,只要不抛异常,就代表发送成功
SendResult sendResult = producer.send(msg);
System.out.println("sync sendResult: " + sendResult);
producer.shutdown();
}
/**
* 发送异步消息
*
* @throws Exception
*/
private static void testAsyncSendMessage() throws Exception {
MQProducer producer = createProducer();
producer.start();
//第一个参数是topic,这是主题
//第二个参数是tags,这是可选参数,用于消费端过滤消息
//第三个参数是keys,这也是可选参数,如果有多个,用空格隔开。RocketMQ可以根据这些key快速检索到消息,相当于
//消息的索引,可以设置为消息的唯一编号(主 键)。
Message msg = new Message("test_topic", "TagA", "6666", "RocketMQ Async message".getBytes());
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("async sendResult: " + sendResult);
}
@Override
public void onException(Throwable e) {
System.out.println("async sendResult: " + e);
}
});
//必须等回调结果之后才能shutdown
TimeUnit.SECONDS.sleep(5);
producer.shutdown();
}
private static MQProducer createProducer() {
DefaultMQProducer producer = new DefaultMQProducer("test_producer_group", createRpcHook());
//生产者需用通过NameServer获取所有broker的路由信息,多个用分号隔开,这个跟Redis哨兵一样
producer.setNamesrvAddr("42.192.20.119:9876");
return producer;
}
private static DefaultMQPushConsumer createConsumer() throws Exception {
//消费者组
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(createRpcHook());
consumer.setConsumerGroup("test_consumer_group");
//消费者从NameServer拿到topic的queue所在的Broker地址,多个用分号隔开
consumer.setNamesrvAddr("42.192.20.119:9876");
//设置Consumer第一次启动是从队列头部开始消费
//如果非第一次启动,那么按照上次消费的位置继续消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//subscribe订阅的第一个参数就是topic,第二个参数为生产者发送时候的tags,*代表匹配所有消息,
//想要接收具体消息时用||隔开,如"TagA||TagB||TagD"
consumer.subscribe("test_topic", "TagA");
//Consumer可以用两种模式启动,广播(Broadcast)和集群(Cluster),广播模式下,一条消息会发送给所有Consumer,
//集群模式下消息只会发送给一个Consumer
consumer.setMessageModel(MessageModel.CLUSTERING);
//批量消费,每次拉取10条
consumer.setConsumeMessageBatchMaxSize(10);
return consumer;
}
private static RPCHook createRpcHook() {
//账号密码
String accessKey = "my_admin";
String secretKey = "szz123456";
return new AclClientRPCHook(new SessionCredentials(accessKey, secretKey));
}
}
注意,consumer被关闭之后,就不能接收到消息了
顺序消息
-
需要保证顺序的消息要发送到同一个message queue中。其次,一个message queue只能被一个消费者消费,这点是由消息队列的分配机制来保证的。最后,一个消费者内部对一个mq的消费要保证是有序的。我们要做到生产者 - message queue - 消费者之间是一对一对一的关系。
-
生产者发送消息的时候,到达Broker应该是有序的。所以对于生产者,不能使用多线程异步发送,而是顺序发送。
写入Broker的时候,应该是顺序写入的。也就是相同主题的消息应该集中写入,选择同一个Message Queue,而不是分散写入。要达到这个效果很简单,只需要我们在发送的时候传入相同的hashKey,就会选择同一个队列。
-
消费者消费的时候只能有一个线程,否则由于消费的速率不同,有可能出现记录到数据库的时候无序。 在Spring Boot中,consumeMode设置为ORDERLY,在Java API中,传入MessageListenerOrderly的实现类即可。
当然顺序消费会带来一些问题:
- 遇到消息失败的消息,无法跳过,当前队列消费暂停
- 降低了消息处理的性能
整合Spring
- 主要是RocketMQAutoConfiguration配置类,它会创建DefaultMQProducer,RocketMQTemplate等Bean
- ListenerContainerConfiguration配置类来查询出所有包含@RocketMQMessageListener注解的类,就是我们定义的消息消费者
- 将查询到的么一个类封装为DefaultRocketMQListenerContainer类,它会创建DefaultMQPushConsumer,用来实际处理接收到的消息
MQ选型分析
参考
RocketMQ扫盲贴及Java API使用精讲
深度好文!RocketMQ高级进阶知识精讲!
RocketMQ-Docker安装