首页 > 其他分享 >RocketMq学习记录

RocketMq学习记录

时间:2022-12-22 12:23:19浏览次数:54  
标签:RocketMQ 发送 producer 记录 Broker 学习 消息 Topic RocketMq

RocketMQ的部署模型

 

 

 在RocketMq中有四个部分组成,分别是Producer,Consumer,Broker,以及NameServer。

生产者 Producer

发布消息的角色。Producer通过 MQ 的负载均衡模块选择相应的 Broker 集群队列进行消息投递

消费者 Consumer

消费消息的角色。消费者支持pull和push两种模式对消息进行消费。其中push又支持两种push方式:一种是比较原始Pull Consumer,它不提供相关的订阅方法;另一种是Lite Pull Consumer,它提供了Subscribe和Assign两种方式

名字服务器 NameServer

NameServer是一个简单的 Topic 路由注册中心,支持 Topic、Broker 的动态注册与发现。平时主要用于维护心跳检测,和提供Topic-Broker之间的关系数据。

代理服务器 Broker

Broker主要负责消息的存储、投递和查询功能NameServer将保存关于 Broker 集群的整个路由信息和用于客户端查询的队列信息。Producer和Consumer通过NameServer就可以知道整个Broker集群的路由信息,从而进行消息的投递和消费

RocketMQ工作流程

1. 启动NameServer

启动NameServer。NameServer启动后监听端口,等待Broker、Producer、Consumer连接,相当于一个路由控制中心。

2. 启动 Broker

启动 Broker。与所有 NameServer 保持长连接,定时发送心跳包。心跳包中包含当前 Broker 信息以及存储所有 Topic 信息。注册成功后,NameServer 集群中就有 Topic跟Broker 的映射关系。

3. 创建 Topic

创建 Topic 时需要指定该 Topic 要存储在哪些 Broker 上,也可以在发送消息时自动创建Topic。

4. 生产者发送消息

生产者发送消息。启动时先跟 NameServer 集群中的其中一台建立长连接,并从 NameServer 中获取当前发送的 Topic存在于哪些 Broker 上,轮询从队列列表中选择一个队列,然后与队列所在的 Broker建立长连接从而向 Broker发消息。

5. 消费者接受消息

消费者接受消息。跟其中一台NameServer建立长连接,获取当前订阅Topic存在哪些Broker上,然后直接跟Broker建立连接通道,然后开始消费消息。

基本概念

消息

一个消息必须有一个主题(Topic),消息也可以拥有一个可选的标签(Tag)和额处的键值对,它们可以用于设置一个业务 Key 并在 Broker 上查找此消息以便在开发期间查找问题。

TOPIC

消息主题,通过 Topic 对不同的业务消息进行分类。

Tag

消息标签,用来进一步区分某个 Topic 下的消息分类,消息从生产者发出即带上的属性。

Keys

每个消息可以在业务层面的设置唯一标识码 keys 字段,方便将来定位消息丢失问题,应用可以通过 topic、key 来查询这条消息内容

队列

一个Topic可能会有多个队列,并且可能分配在不同的Borker上。一般来说一条消息,如果没有重复发送,则只会存在在 Topic 的其中一个队列中,消息在队列中按照FIFO先进先出的原则存储

消息发送

普通消息发送

RocketMQ可用于以三种方式发送消息:同步、异步和单向传输。前两种消息类型是可靠的。

同步发送

消息发送方发送一条消息后,会在收到服务端同步响应之后发送下一条消息

DefaultMQProducer producer = new DefaultMQProducer("rocketmq_producer_group_1");
producer.setNamesrvAddr("localhost:9876");
producer.start();
try {
	Message msg = new Message("TopicTest" ,"TagA" ,("Hello RocketMQ").getBytes(RemotingHelper.DEFAULT_CHARSET));
        //使用send进行同步发送
	SendResult sendResult = producer.send(msg);
} catch (Exception e) {
	e.printStackTrace();
}
producer.shutdown();

异步发送

异步方式是指发送出消息之后,不等服务器返回响应,接着发送下一条消息。发送方通过回调接口接受服务端的响应

DefaultMQProducer producer = new DefaultMQProducer("rocketmq_producer_group_1");
producer.setNamesrvAddr("localhost:9876");
producer.start();
try {
	Message msg = new Message("TopicTest" ,"TagA" ,("Hello RocketMQ").getBytes(RemotingHelper.DEFAULT_CHARSET));
	   //异步发送,还是调用send方法,但是需要传入sendCallback回调
	   producer.send(msg, new SendCallback() {
		@Override
		public void onSuccess(SendResult sendResult) {
		}
		@Override
		public void onException(Throwable e) {
		}
	  });
} catch (Exception e) {
	e.printStackTrace();
}
producer.shutdown();

单向模式

只管发送,不等待服务器是否响应且没有回调函数,一句话,只管发。

DefaultMQProducer producer = new DefaultMQProducer("rocketmq_producer_group_1");
producer.setNamesrvAddr("localhost:9876");
producer.start();
try {
	Message msg = new Message("TopicTest" ,"TagA" ,("Hello RocketMQ").getBytes(RemotingHelper.DEFAULT_CHARSET));
	//使用sendOneway方法进行单向发送
	producer.sendOneway(msg);
} catch (Exception e) {
	e.printStackTrace();
}
producer.shutdown();

  tips:可以看出同步、异步、单向三种发送方式,区别在于调用的发送方法不同。

顺序消息发送

顺序发送只指按照FIFO先进先出的方式发送消息和消费消息。RocketMQ消息的顺序分为两部分,生产顺序和消费顺序,只有同时满足才能实现顺序消息。

对于同一个Topic,RocketMQ生产者在发布消息时,可能会将消息发送到不同的队列中,有可能是这个borker,也有可能是其他的borker。而消费者又是从不同的队列中消费,所以普通消费无法实现顺序消息发送。在RocketMQ中可以对某一个消息进行分区,同一个分区中的消息会被分配到同一个队列中,并按照顺序消费。RokcetMQ只会保证小范围的顺序,即队列中的顺序,无法保证大范围的顺序,即可能先访问的是这个队列,然后开始访问其他的队列。

DefaultMQProducer producer = new DefaultMQProducer("rocketmq_producer_group_1");
            producer.setNamesrvAddr("localhost:9876");
            producer.start();

            String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
            for (int i = 0; i < 100; i++) {
                int orderId = i % 10;
                Message msg =
                    new Message("order_message", tags[i % tags.length], "KEY" + i,
                        ("order_message_" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                //使用该方法进行顺序发送
                SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                    @Override
                    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                        Integer id = (Integer) arg;
                        int index = id % mqs.size();
                        return mqs.get(index);
                    }
                }, orderId);
            }
            producer.shutdown();
        } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
            e.printStackTrace();
        }

  调用SendResult send(Message msg, MessageQueueSelector selector, Object arg)方法,MessageQueueSelector 是队列选择器,arg 是一个 Java Object 对象,

其中 mqs 是可以发送的队列,select方法返回一个队列,意思发送到该队列中,使用计算方法计算出同一个队列,则保证了该队列中的是顺序消息。

值得注意的是如果其中一个borker掉线,RocketMQ 提供两种模式:保证严格顺序而不是可用性或者保证可用性而不是严格顺序。

延迟消息发送

延迟消息是指生产者发送消息到RocketMQ后,不希望立马投递该消息,而是延迟一定时间后才投递到消费者进行消费。

Apache RocketMQ 一共支持18个等级的延迟投递,具体如下:

投递等级(delay level) 延迟时间 投递等级(delay level) 延迟时间
1 1s 10 6min
2 5s 11 7min
3 10s 12 8min
4 30s 13 9min
5 1min 14 10min
6 2min 15 20min
7 3min 16 30min
8 4min 17 1h
9 5min 18 2h
DefaultMQProducer producer = new DefaultMQProducer("ScheduledMessageProducer");
producer.setNamesrvAddr("localhost:9876");

producer.start();

Message message = new Message("Topic_Test",
		("Hello RocketMQ").getBytes(RemotingHelper.DEFAULT_CHARSET));
//设置延迟等级
message.setDelayTimeLevel(3);
SendResult send = producer.send(message);
System.out.println(send);
producer.shutdown();

批量消息发送

RocketMQ可以将一些消息凝聚成一批消息后进行发送。

 DefaultMQProducer producer = new DefaultMQProducer("BatchProducerGroupName");
 producer.start();
 String topic = "BatchTest";
 List<Message> messages = new ArrayList<>();
 messages.add(new Message(topic, "Tag", "OrderID001", "Hello world 0".getBytes()));
 messages.add(new Message(topic, "Tag", "OrderID002", "Hello world 1".getBytes()));
 messages.add(new Message(topic, "Tag", "OrderID003", "Hello world 2".getBytes()));

 producer.send(messages);

  将消息打包成List传入即可,批量消息的大小不能超过1MB,否则会进行拆分,且同一批的消息的Topic必须一致。

事物消息发送

RocketMQ的特色。

 

 

 

事务消息发送步骤如下:

  1. 生产者将半事务消息发送至 RocketMQ Broker
  2. RocketMQ Broker 将消息持久化成功之后,向生产者返回 Ack 确认消息已经发送成功,此时消息暂不能投递,为半事务消息。
  3. 生产者开始执行本地事务逻辑。
  4. 生产者根据本地事务执行结果向服务端提交二次确认结果(Commit或是Rollback),服务端收到确认结果后处理逻辑如下:
  • 二次确认结果为Commit:服务端将半事务消息标记为可投递,并投递给消费者。
  • 二次确认结果为Rollback:服务端将回滚事务,不会将半事务消息投递给消费者。
  1. 在断网或者是生产者应用重启的特殊情况下,若服务端未收到发送者提交的二次确认结果,或服务端收到的二次确认结果为Unknown未知状态,经过固定时间后,服务端将对消息生产者即生产者集群中任一生产者实例发起消息回查。

  2. :::note 需要注意的是,服务端仅仅会按照参数尝试指定次数,超过次数后事务会强制回滚

  事务消息回查步骤如下: 7. 生产者收到消息回查后,需要检查对应消息的本地事务执行的最终结果。 8. 生产者根据检查得到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行处理。

消费者概念

RocketMQ 有两种消费模式,分别是:

  • 集群消费模式:当使用集群消费模式时,RocketMQ 认为任意一条消息只需要被消费组内的任意一个消费者处理即可。
  • 广播消费模式:当使用广播消费模式时,RocketMQ 会将每条消息推送给消费组所有的消费者,保证消息至少被每个消费者消费一次。
//集群模式
consumer.setMessageModel(MessageModel.CLUSTERING);
//广播模式
consumer.setMessageModel(MessageModel.BROADCASTING);

负载均衡

集群模式下,同一个消费组(组名一样)内的消费者会分担收到的全量消息。默认是的是平均分配,RocketMQ 提供了多种集群模式下的分配策略。

//设置负载均衡算法 
consumer.setAllocateMessageQueueStrategy(new AllocateMessageQueueAveragely());

消息过滤

消息过滤是指消息生产者向Topic中发送消息时,设置消息属性对消息进行分类,消费者订阅Topic时,根据消息属性设置过滤条件对消息进行过滤,只有符合过滤条件的消息才会被投递到消费端进行消费。

消息重试

若Consumer消费某条消息失败,则RocketMQ会在重试间隔时间后,将消息重新投递给Consumer消费,若达到最大重试次数后消息还没有成功被消费,则消息将被投递至死信队列,消息重试只针对集群消费模式生效;广播消费模式不提供失败重试特性,即消费失败后,失败消息不再重试,继续消费新的消息

consumer.setMaxReconsumeTimes(10);

死信队列

当一条消息初次消费失败,RocketMQ会自动进行消息重试,达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息。此时,该消息不会立刻被丢弃,而是将其发送到该消费者对应的特殊队列中,这类消息称为死信消息,存储死信消息的特殊队列称为死信队列。

标签:RocketMQ,发送,producer,记录,Broker,学习,消息,Topic,RocketMq
From: https://www.cnblogs.com/swayer/p/16998001.html

相关文章

  • win电脑使用记录查询
    下载LastActivityViewv1.36程序打开后可查看从系统安装开始到现在为止,所有的使用记录,包含到打开某个文件夹。https://www.nirsoft.net/utils/computer_activity_view.htm......
  • Java学习笔记6
    1.类和对象1.1类和对象​ 客观存在的事物皆为对象,所以我们也常常说万物皆对象。类:类的理解类是对现实生活中一类具有共同属性和行为的事物的抽象类是对象的数据......
  • 【学习记录】用电池问题及其解答
    用电池问题指的是这样的一类问题:你有一台游戏机,这台游戏机需要同时装上$k$节电池才能正常工作。你现在有$n$节电池,每节电池的电量是$a_i$,代表它在使用$a_i$分钟之后会没......
  • 字符串 记录2
    http://ybt.ssoier.cn:8088/problem_show.php?pid=1471字典树板子 http://ybt.ssoier.cn:8088/problem_show.php?pid=1472"异或",都每个数,在树上每次贪心地走相反地路......
  • 今天学习一下坐标系这个东西,看上去不简单呀!在此记录一下!不断更新中。。。
    参心坐标系、地心坐标系空间直角坐标系、大地坐标系导航坐标系、地球坐标系、载体坐标系左手坐标系、右手坐标系天球坐标系、地球坐标系、地理坐标系、投影坐标系协议地心......
  • Java学习笔记5
    1.方法概述概念​ 方法(method)是程序中最小的执行单元。注意:方法必须先创建才可以使用,该过程称为方法的定义。方法创建后并不是直接可以运行的,进行方法的调用才会执......
  • nginx学习记录【一】在windows上的安装nginx的教程
    1、下载地址http://nginx.org/en/download.html2、选择windows版本如下图:   3、解压并运行解压到指定目录,如下图 打开cmd,然后cd到那个目录,如下图: ......
  • nginx学习记录【二】nginx跟.net core结合,实现一个域名访问多个.net core应用
    1、实现转发打开conf下的nginx.conf文件,如下图: 2、添加.netcore网站的转发按下面的进行修改,修改完后,就把localhost的80转发到了https://localhost:5004的.netcore......
  • 【博学谷学习记录】超强总结,用心分享 | 动态规划经典例题总结三
    题目给你两个单词word1和word2,请返回将word1转换成word2所使用的最少操作数。你可以对一个单词进行如下三种操作:插入一个字符删除一个字符替换一个字符......
  • scala快速学习笔记
    val是不可变,var是可变对象trait相当于interface(多继承)和abstract(变量、方法实现)的结合体3.模式匹配:更灵活的Switchcase(1匹配值的时候,值类型可以不一样。2可......