首页 > 其他分享 >RocketMQ简单入门

RocketMQ简单入门

时间:2023-10-02 14:22:22浏览次数:48  
标签:入门 producer import RocketMQ 简单 apache org consumer rocketmq

服务端安装及配置

docker安装

docker pull rocketmqinc/rocketmq:4.4.0

指定版本号是为了后面确定配置文件的路径

启动namesrv

docker run -d -p 9876:9876 --name rocketmq-nameservice -e MAX_POSSIBLE_HEAP=100000000 rocketmqinc/rocketmq:4.4.0 sh mqnamesrv

运行成功执行mqnamesrv脚本

启动broker

broker.conf

brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
# 如果是本地程序调用云主机 mq,这个需要设置成 云主机 IP
# 如果Docker环境需要设置成宿主机IP
brokerIP1 = 42.192.20.119
# 开启acl权限控制
aclEnable=true

plain_acl.yml

globalWhiteRemoteAddresses:
accounts:
- accessKey: admin
  secretKey: szz123
  whiteRemoteAddress:
  admin: true
docker run -d -p 10911:10911 -p 10909:10909 -v  /root/test_rocketmq/broker.conf:/opt/rocketmq-4.4.0/conf/broker.conf -v /root/test_rocketmq/plain_acl.yml:/opt/rocketmq-4.4.0/conf/plain_acl.yml --name rocketmq-broker --link rocketmq-nameservice:namesrv -e "NAMESRV_ADDR=namesrv:9876" -e "MAX_POSSIBLE_HEAP=200000000" rocketmqinc/rocketmq:4.4.0 sh mqbroker -c /opt/rocketmq-4.4.0/conf/broker.conf

--link可以用来链接2个容器,使得源容器(被链接的容器)和接收容器(主动去链接的容器)之间可以互相通信,并且接收容器可以获取源容器的一些数据,如源容器的环境变量。

启动broker遇到的问题

java.lang.Class cannot be cast to org.apache.rocketmq.acl.AccessValidator

登录用户名accessKey,长度必须大于6个字符。登录密码secretKey,长度也必须大于6个字符。

客户端报错

CODE: 1  DESC: org.apache.rocketmq.acl.common.AclException: [10015:signature-failed] unable to calculate a request signature. error=[10015:signature-failed] unable to calculate a request signature. 
error=Algorithm HmacSHA1 not available, org.apache.rocketmq.acl.common.AclSigner.signAndBase64Encode(AclSigner.java:84)

缺少sunjce_provider.jar

docker exec -it -u root rocketmq-broker /bin/bash # 必须以root权限进入
find / -name sunjce_provider.jar
cp xxx/sunjce_provider.jar /opt/rocketmq-4.4.0/lib/ # 如果没有以root权限进入,这个就没有权限操作
exit
docker restart rocketmq-broker

又遇到了其他问题

CODE: 1  DESC: java.lang.NullPointerException, org.apache.rocketmq.acl.plain.PlainPermissionLoader.checkPerm(PlainPermissionLoader.java:131)

可能是因为rocketmq4.4.0版本对acl功能的支持还不完善 具体issue

安装更新的版本

docker run -d -p 9876:9876 --name rocketmq-nameservice -e MAX_POSSIBLE_HEAP=100000000 apache/rocketmq:4.9.2 sh mqnamesrv
docker run -d -p 10911:10911 -p 10909:10909 -v  /root/test_rocketmq/broker.conf:/home/rocketmq/rocketmq-4.9.2/conf/broker.conf -v /root/test_rocketmq/plain_acl.yml:/home/rocketmq/rocketmq-4.9.2/conf/plain_acl.yml --name rocketmq-broker --link rocketmq-nameservice:namesrv -e "NAMESRV_ADDR=namesrv:9876" -e "MAX_POSSIBLE_HEAP=200000000" apache/rocketmq:4.9.2 sh mqbroker -c /home/rocketmq/rocketmq-4.9.2/conf/broker.conf

注意,这个镜像的rocketmq目录和上面的不一致,运行过程也没有上面出现的那些问题。

新版本broker启动失败

/home/rocketmq/rocketmq-4.9.2/bin/runbroker.sh: line 156:    28 Killed                  $JAVA ${JAVA_OPT} $@

原因:内存设置太大

docker run -d -p 10911:10911 -p 10909:10909  -e "JAVA_OPT_EXT=-Xmx128m -Xms128m" -v  /root/test_rocketmq/broker.conf:/home/rocketmq/rocketmq-4.9.2/conf/broker.conf -v /root/test_rocketmq/plain_acl.yml:/home/rocketmq/rocketmq-4.9.2/conf/plain_acl.yml --name rocketmq-broker --link rocketmq-nameservice:namesrv -e "NAMESRV_ADDR=namesrv:9876" -e "MAX_POSSIBLE_HEAP=200000000" apache/rocketmq:4.9.2 sh mqbroker -c /home/rocketmq/rocketmq-4.9.2/conf/broker.conf

注意是 JAVA_OPT_EXT

安装控制台

docker pull styletang/rocketmq-console-ng
docker run -d -p 9999:8080 -e "JAVA_OPTS=-Drocketmq.config.namesrvAddr=ip:9876 -Drocketmq.config.isVIPChannel=false -Drocketmq.config.accessKey=my_admin -Drocketmq.config.secretKey=szz123456" --name rocketmq-console styletang/rocketmq-console-ng

通过 http://ip:9999 访问,注意,此镜像不支持acl权限控制,可能使用的rocketmq版本太低,所以使用其他镜像

docker pull apacherocketmq/rocketmq-dashboard
docker run -d -p 9999:8080 -e "JAVA_OPTS=-Drocketmq.config.namesrvAddr=ip:9876 -Drocketmq.config.isVIPChannel=false -Drocketmq.config.accessKey=my_admin -Drocketmq.config.secretKey=szz123456" --name rocketmq-console apacherocketmq/rocketmq-dashboard

也是通过 http://ip:9999 访问

客户端访问

使用java客户端发送及消费消息

<dependency>
  <groupId>org.apache.rocketmq</groupId>
  <artifactId>rocketmq-client</artifactId>
  <version>4.9.4</version>
</dependency>
<dependency>
  <groupId>org.apache.rocketmq</groupId>
  <artifactId>rocketmq-acl</artifactId>
  <version>4.9.4</version>
</dependency>
点击查看代码
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.remoting.RPCHook;

import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.TimeUnit;

public class TestRocketMq1 {
    public static void main(String[] args) throws Exception {
        testSyncSendMessage();
//        testAsyncSendMessage()
//        testSyncSendDelayMessage();
//        testConcurrentlyConsumerMessage();
        TimeUnit.SECONDS.sleep(3);
        testOrderlyConsumerMessage();
    }

    /**
     * 并发消费(非顺序)
     *
     * @throws Exception
     */
    private static void testConcurrentlyConsumerMessage() throws Exception {
        DefaultMQPushConsumer consumer = createConsumer();
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    String content = new String(msg.getBody(), StandardCharsets.UTF_8);
                    System.out.println("content:" + content + "," + msg);
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
    }

    /**
     * 顺序消费 先发送的消息先消费
     *
     * @throws Exception
     */
    private static void testOrderlyConsumerMessage() throws Exception {
        DefaultMQPushConsumer consumer = createConsumer();
        consumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                for (MessageExt msg : msgs) {
                    String content = new String(msg.getBody(), StandardCharsets.UTF_8);
                    System.out.println("content:" + content + "," + msg);
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });
        consumer.start();
    }

    /**
     * 发送同步消息
     *
     * @throws Exception
     */
    private static void testSyncSendMessage() throws Exception {
        MQProducer producer = createProducer();
        producer.start();
        //第一个参数是topic,这是主题
        //第二个参数是tags,这是可选参数,用于消费端过滤消息
        //第三个参数是keys,这也是可选参数,如果有多个,用空格隔开。RocketMQ可以根据这些key快速检索到消息,相当于
        //消息的索引,可以设置为消息的唯一编号(主  键)。
        Message msg = new Message("test_topic", "TagA", "6666", "RocketMQ Test message".getBytes());
        //SendResult是发送结果的封装,包括消息状态,消息id,选择的队列等等,只要不抛异常,就代表发送成功
        SendResult sendResult = producer.send(msg);
        System.out.println("sync sendResult: " + sendResult);
        producer.shutdown();
    }

    /**
     * 发送同步延迟消息
     *
     * @throws Exception
     */
    private static void testSyncSendDelayMessage() throws Exception {
        MQProducer producer = createProducer();
        producer.start();
        //第一个参数是topic,这是主题
        //第二个参数是tags,这是可选参数,用于消费端过滤消息
        //第三个参数是keys,这也是可选参数,如果有多个,用空格隔开。RocketMQ可以根据这些key快速检索到消息,相当于
        //消息的索引,可以设置为消息的唯一编号(主  键)。
        Message msg = new Message("test_topic", "TagA", "6666", "RocketMQ Test message".getBytes());
        //delayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
        //设置消息延迟级别等于0时,则该消息为非延迟消息。
        //设置消息延迟级别大于等于1并且小于等于18时,消息延迟特定时间,如:设置消息延迟级别等于1,则延迟1s;设置消息延迟级别等于2,则延迟5s,以此类推。
        //设置消息延迟级别大于18时,则该消息延迟级别为18,如:设置消息延迟级别等于20,则延迟2h。
        //设置了此延迟级别就是延迟消息了
        msg.setDelayTimeLevel(2);
        //SendResult是发送结果的封装,包括消息状态,消息id,选择的队列等等,只要不抛异常,就代表发送成功
        SendResult sendResult = producer.send(msg);
        System.out.println("sync sendResult: " + sendResult);
        producer.shutdown();
    }

    /**
     * 发送异步消息
     *
     * @throws Exception
     */
    private static void testAsyncSendMessage() throws Exception {
        MQProducer producer = createProducer();
        producer.start();
        //第一个参数是topic,这是主题
        //第二个参数是tags,这是可选参数,用于消费端过滤消息
        //第三个参数是keys,这也是可选参数,如果有多个,用空格隔开。RocketMQ可以根据这些key快速检索到消息,相当于
        //消息的索引,可以设置为消息的唯一编号(主  键)。
        Message msg = new Message("test_topic", "TagA", "6666", "RocketMQ Async message".getBytes());
        producer.send(msg, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                System.out.println("async sendResult: " + sendResult);
            }

            @Override
            public void onException(Throwable e) {
                System.out.println("async sendResult: " + e);
            }
        });
        //必须等回调结果之后才能shutdown
        TimeUnit.SECONDS.sleep(5);
        producer.shutdown();
    }

    private static MQProducer createProducer() {

        DefaultMQProducer producer = new DefaultMQProducer("test_producer_group", createRpcHook());
        //生产者需用通过NameServer获取所有broker的路由信息,多个用分号隔开,这个跟Redis哨兵一样
        producer.setNamesrvAddr("42.192.20.119:9876");
        return producer;
    }

    private static DefaultMQPushConsumer createConsumer() throws Exception {
        //消费者组
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(createRpcHook());
        consumer.setConsumerGroup("test_consumer_group");
        //消费者从NameServer拿到topic的queue所在的Broker地址,多个用分号隔开
        consumer.setNamesrvAddr("42.192.20.119:9876");
        //设置Consumer第一次启动是从队列头部开始消费
        //如果非第一次启动,那么按照上次消费的位置继续消费
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        //subscribe订阅的第一个参数就是topic,第二个参数为生产者发送时候的tags,*代表匹配所有消息,
        //想要接收具体消息时用||隔开,如"TagA||TagB||TagD"
        consumer.subscribe("test_topic", "TagA");
        //Consumer可以用两种模式启动,广播(Broadcast)和集群(Cluster),广播模式下,一条消息会发送给所有Consumer,
        //集群模式下消息只会发送给一个Consumer
        consumer.setMessageModel(MessageModel.CLUSTERING);
        //批量消费,每次拉取10条
        consumer.setConsumeMessageBatchMaxSize(10);
        return consumer;
    }

    private static RPCHook createRpcHook() {
        //账号密码
        String accessKey = "my_admin";
        String secretKey = "szz123456";
        return new AclClientRPCHook(new SessionCredentials(accessKey, secretKey));
    }
}

注意,consumer被关闭之后,就不能接收到消息了

顺序消息

  1. 需要保证顺序的消息要发送到同一个message queue中。其次,一个message queue只能被一个消费者消费,这点是由消息队列的分配机制来保证的。最后,一个消费者内部对一个mq的消费要保证是有序的。我们要做到生产者 - message queue - 消费者之间是一对一对一的关系。

  2. 生产者发送消息的时候,到达Broker应该是有序的。所以对于生产者,不能使用多线程异步发送,而是顺序发送。
    写入Broker的时候,应该是顺序写入的。也就是相同主题的消息应该集中写入,选择同一个Message Queue,而不是分散写入。要达到这个效果很简单,只需要我们在发送的时候传入相同的hashKey,就会选择同一个队列。
    image

  3. 消费者消费的时候只能有一个线程,否则由于消费的速率不同,有可能出现记录到数据库的时候无序。 在Spring Boot中,consumeMode设置为ORDERLY,在Java API中,传入MessageListenerOrderly的实现类即可。

当然顺序消费会带来一些问题:

  1. 遇到消息失败的消息,无法跳过,当前队列消费暂停
  2. 降低了消息处理的性能

整合Spring

  • 主要是RocketMQAutoConfiguration配置类,它会创建DefaultMQProducer,RocketMQTemplate等Bean
  • ListenerContainerConfiguration配置类来查询出所有包含@RocketMQMessageListener注解的类,就是我们定义的消息消费者
  • 将查询到的么一个类封装为DefaultRocketMQListenerContainer类,它会创建DefaultMQPushConsumer,用来实际处理接收到的消息

MQ选型分析

image

参考

RocketMQ扫盲贴及Java API使用精讲
深度好文!RocketMQ高级进阶知识精讲!
RocketMQ-Docker安装

标签:入门,producer,import,RocketMQ,简单,apache,org,consumer,rocketmq
From: https://www.cnblogs.com/strongmore/p/17147872.html

相关文章

  • Kafka简单入门
    使用docker安装dockerpullbitnami/kafkadockerrun-d-p9092:9092--namekafka-server\-eALLOW_PLAINTEXT_LISTENER=yes\-eKAFKA_CFG_ZOOKEEPER_CONNECT=ip:2181\-eKAFKA_BROKER_ID=1\-eKAFKA_CFG_LISTENERS=PLAINTEXT://:9092\-eKAFKA_CFG_ADVERTISE......
  • 26、Flink 的SQL之概览与入门示例
    文章目录Flink系列文章一、SQL1、数据类型2、保留关键字二、SQL入门1、FlinkSQL环境准备1)、安装Flink及提交任务方式2)、SQL客户端使用介绍3)、简单示例2、Source表介绍及示例3、连续查询介绍及示例4、Sink表介绍及示例本文简单的介绍了SQL和SQL的入门,并以三个简单的示例进行介......
  • Mybatis入门
    Mybatis入门前言在前面我们学习MySQL数据库时,都是利用图形化客户端工具(如:idea、datagrip),来操作数据库的。在客户端工具中,编写增删改查的SQL语句,发给MySQL数据库管理系统,由数据库管理系统执行SQL语......
  • 01. Kubernetes基础入门
    目录1、前言2、Kubernetes介绍2.1、什么是Kubernetes2.2、主要功能2.3、与Docker的关系2.4、Kubernetes集群架构体系3、Kubernetes组件3.1、核心组件3.2、附加组件4、Kubernetes对象4.1、对象管理4.2、命名空间4.3、标签1、前言Docker容器技术将应用及其依赖打包到镜像中,从而很好......
  • 实验1 C语言输入输出和简单程序编写
    1.实验任务1task1_1.c源代码1#include<stdio.h>2intmain()3{4printf("o\n");5printf("<H>\n");6printf("II\n");7printf("o\n");8printf("<H>\n"......
  • 实验1c语言的简单输入输出和简单程序编写
    实验1#include<stdio.h>#include<stdlib.h>intmain(){printf("0\n");printf("<H>\n");printf("II\n");system("pause");return0;}实验2#include<stdio.h>#include<stdlib.......
  • 掌握这些技巧,让Excel批量数据清洗变得简单高效!
    什么是数据清洗数据清洗是指在数据处理过程中对原始数据进行筛选、转换和修正,以确保数据的准确性、一致性和完整性的过程。它是数据预处理的一部分,旨在处理和纠正可能存在的错误、缺失值、异常值和不一致性等数据质量问题。为什么要数据清洗Excel在数据采集场景中非常常用。作......
  • 实验1C语言输入输出和简单程序编写
    1.实验1实验1.1源代码 1//打印一个字符小人23#include<stdio.h>4intmain()5{6printf("0\n");7printf("<H>\n");8printf("II\n");9printf("0\n");10printf("<H>......
  • aws awswrangler 集成minio 简单试用
    awsawswrangler现在已经改名为aws-sdk-pandas,但是对于python使用的时候安装已经是使用awswrangler名称以下是一个简单的集成minio的测试,核心是配置环境变量,这个也比较符合aws对于相关资源的集成玩法环境准备docker-compose文件 version:'3'services......
  • 【C语言入门】第一天
    [例题1]输入两个正整数a和b,输出a+b的值。其中a,b<10000.#include<stdio.h>intmain(){inta,b;scanf("%d%d",&a,&b);printf("%d",(a+b));return0;}[例题2]先输入一个t(t<100),然后输入t组数据。对于每组数据,输入两个整数a和b,输出a+b值......