首页 > 其他分享 >RocketMQ发送消息之同步异步单向

RocketMQ发送消息之同步异步单向

时间:2023-09-23 17:44:40浏览次数:42  
标签:异步 producer 单向 发送 消息 new consumer public RocketMQ

官网教程:https://rocketmq.apache.org/zh/docs/quickStart/01quickstart
基于双主双从异步方式开启的前提下,在maven项目中引入下列依赖

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.9.1</version>
</dependency>

消息发送者步骤分析

  • 创建消息生产者producer,并制定生产者组名
  • 指定Nameserver地址
  • 启动producer
  • 创建消息对象,指定主题Topic、Tag和消息体
  • 发送消息
  • 关闭生产者producer

消息消费者步骤分析

  • 创建消费者Consumer,制定消费者组名
  • 指定Nameserver地址
  • 订阅主题Topic和Tag
  • 设置回调函数,处理消息
  • 启动消费者consumer

消息发送

发送同步消息

根据rockerMq提供的源码教程中文档所示,Producer端发送同步消息,这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。

/**
 * 发送同步消息
 */
public class SyncProducer {
	public static void main(String[] args) throws Exception {
		// 实例化消息生产者Producer,指定一个生产者组
		DefaultMQProducer producer = new DefaultMQProducer("group1");
		// 设置NameServer的地址
		producer.setNamesrvAddr("192.168.78.129:9876;192.168.78.130:9876");
		// 启动Producer实例
		producer.start();
		for (int i = 0; i < 100; i++) {
			// 创建消息,并指定Topic(主题),Tag(消息Tag)和消息体(消息内容)
			Message msg = new Message("base",
					"Tag1",
					("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
			);
			// 发送消息到一个Broker
			SendResult sendResult = producer.send(msg);
			TimeUnit.SECONDS.sleep(1);
			// 通过sendResult返回消息是否成功送达
			System.out.printf("%s%n", sendResult);
		}
		// 如果不再发送消息,关闭Producer实例。
		producer.shutdown();
    }
}

SendResult即发送消息后的回调如下:

SendResult [sendStatus=SEND_OK, msgId=7F00000130A418B4AAC27429D3CA0063, offsetMsgId=C0A84E8100002A9F0000000000002DE2, messageQueue=MessageQueue [topic=base, brokerName=broker-a, queueId=3], queueOffset=11]

由于搭建的是双主双从架构,所以消息都发送到了主节点上,消息的消费是由从节点消费的。

发送异步消息

异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。
发送消息后的回调接口:

public interface SendCallback {
	// 消息成功发送的回调方法
    void onSuccess(final SendResult sendResult);
	// 消息发送失败的回调方法
    void onException(final Throwable e);
}

实现:

/**
 * 发送异步消息
 */
public class AsyncProducer {
    public static void main(String[] args) throws Exception {
        // 1.创建消费者
        DefaultMQProducer producer = new DefaultMQProducer("group1");
        // 2.设置nameserver
        producer.setNamesrvAddr("192.168.78.129:9876;192.168.78.130:9876");
        // 启动Producer实例
        producer.start();
        for (int i = 0; i < 100; i++) {
            // 创建消息,并指定Topic(主题),Tag(消息Tag)和消息体(消息内容)
            Message msg = new Message("base",
                    "Tag2",
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
            );
            // 发送消息到一个Broker,采用异步发送
            // SendCallback接收异步返回结果的回调
            producer.send(msg, new SendCallback() {
                // 发送成功的回调函数
                @Override
                public void onSuccess(SendResult sendResult) {
                    System.out.println("发送结果:"+sendResult);
                }
                // 发送失败的回调函数
                @Override
                public void onException(Throwable e) {
                    System.out.println("发送异常:"+e.getMessage());
                }
            });
            // 线程休眠一秒
            TimeUnit.SECONDS.sleep(1);
        }
        // 如果不再发送消息,关闭Producer实例。
        producer.shutdown();
    }
}

发送单向消息

这种方式主要用在不特别关心发送结果的场景,例如日志发送。

/**
发送单向消息
*/
public class OnewayProducer {
	public static void main(String[] args) throws Exception{
    	// 实例化消息生产者Producer
        DefaultMQProducer producer = new DefaultMQProducer("group1");
    	// 设置NameServer的地址
		producer.setNamesrvAddr("192.168.78.129:9876;192.168.78.130:9876");
    	// 启动Producer实例
        producer.start();
    	for (int i = 0; i < 100; i++) {
        	// 创建消息,并指定Topic,Tag和消息体
        	Message msg = new Message("base",
                "Tag3",
                ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
        	);
        	// 发送单向消息,没有任何返回结果
        	producer.sendOneway(msg);

    	}
    	// 如果不再发送消息,关闭Producer实例。
    	producer.shutdown();
    }
}

消费消息

负载均衡模式

消费者采用负载均衡方式消费消息,多个消费者共同消费队列消息,每个消费者处理的消息不同
image
从图上可以看出,所谓负载均衡就是多个消费者在消费同一个topic下的消息时,他们之间是轮询的方式消费的。

在消费者代码中,通过consumer.setMessageModel(MessageModel.CLUSTERING);既可设置负载均衡
完整代码如下:

/**
 * 消息的接收者
 */
public class Consumer {
    public static void main(String[] args) throws MQClientException {
        // 1.创建消费者Consumer,指定消费者组名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
        // 2.指定NameServer地址
        consumer.setNamesrvAddr("192.168.78.129:9876;192.168.78.130:9876");
        // 3.订阅主题Topic和Tag
        consumer.subscribe("base","Tag3");
        //负载均衡模式消费
        consumer.setMessageModel(MessageModel.CLUSTERING);
        // 4.设置回调函数,处理消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            // 接收消息内容的方法
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println("接收到的消息是:"+new String(msg.getBody()));
                }
                // 标记该消息已经被成功消费
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 5.启动消费者consumer
        consumer.start();
    }
}

结果:
image

image

可以发现,确实以及实现了负载均衡了,而且在rockermq中,默认的消费模式就是负载均衡

广播模式

image
广播模式中,生产者发送的消息,多个消费者可以同时消费
在消费者代码中设置: consumer.setMessageModel(MessageModel.BROADCASTING);
完整代码:

/**
 * 消息的接收者
 */
public class Consumer {
    public static void main(String[] args) throws MQClientException {
        // 1.创建消费者Consumer,指定消费者组名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
        // 2.指定NameServer地址
        consumer.setNamesrvAddr("192.168.78.129:9876;192.168.78.130:9876");
        // 3.订阅主题Topic和Tag
        consumer.subscribe("base","Tag1");//消费Tag1中的消息,通过 || xxx 可以消费多个消息,通过*可以消费base下的所有消息
        //采用广播发送,也就是多个消费者可以消费同一个组中的base下的Tag1中的所有消息
        consumer.setMessageModel(MessageModel.BROADCASTING);
        // 4.设置回调函数,处理消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            // 接收消息内容的方法
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println("接收到的消息是:"+new String(msg.getBody()));
                }
                // 标记该消息已经被成功消费
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 5.启动消费者consumer
        consumer.start();
    }
}

结果:
image

image

标签:异步,producer,单向,发送,消息,new,consumer,public,RocketMQ
From: https://www.cnblogs.com/zgf123/p/17724786.html

相关文章

  • WPF异步命令以及SqlSugar异步增删改查
    1、异步///<summary>///查询全部///</summary>///<returns></returns>publicasyncTask<List<IgniteTubeInfo>>QueryListAsync(){returnawaitdb.Queryable<IgniteTubeInfo>().ToListAsync();}///<summa......
  • RocketMQ源码(六):RocketMQ消费者启动流程
    RocketMQ通过Consumer消费消息,可并发和顺序的处理消息,这里以并发消费普通消息为例,分析消息下佛诶的整体流程。Consumer的示例代码如下:1importcom.snails.rmq.common.RMQConstant;2importorg.apache.rocketmq.client.consumer.DefaultMQPushConsumer;3importorg.a......
  • 当一个接口需要调用多个其他服务的接口时,可以使用异步编程来实现并发调用,以提高效率
    usingSystem;usingSystem.Collections.Generic;usingSystem.Threading.Tasks;publicclassOrderController{publicasyncTask<OrderInfo>GetOrderInfo(intorderId){//并发调用多个接口Task<UserInfo>getUserInfoTask=GetUserInfoAsync(orderId);Task......
  • 从 5s 到 0.5s!CompletableFuture 异步任务优化技巧,确实优雅!
    一个接口可能需要调用N个其他服务的接口,这在项目开发中还是挺常见的。举个例子:用户请求获取订单信息,可能需要调用用户信息、商品详情、物流信息、商品推荐等接口,最后再汇总数据统一返回。如果是串行(按顺序依次执行每个任务)执行的话,接口的响应速度会非常慢。考虑到这些接口之间......
  • Python异步编程高并发执行爬虫采集,用回调函数解析响应
    一、问题:当发送API请求,读写数据库任务较重时,程序运行效率急剧下降。异步技术是Python编程中对提升性能非常重要的一项技术。在实际应用,经常面临对外发送网络请求,调用外部接口,或者不断更新数据库或文件等操作。这这些操作,通常90%以上时间是在等待,如通过REST,gRPC向服务器发送请......
  • RocketMQ
    https://blog.csdn.net/weixin_44981707/article/details/124138939https://zhuanlan.zhihu.com/p/528956421?utm_id=0......
  • 基于channel的异步事件总线
    生成者/消费者概念编程模型通道是生成者/使用者概念编程模型的实现。在此编程模型中,生成者异步生成数据,使用者异步使用该数据。换句话说,此模型将数据从一方移交给另一方。尝试将通道视为任何其他常见的泛型集合类型,例如List。主要区别在于,此集合管理同步,并通过工厂创建选项......
  • Asyncio 协程异步笔记
    协程&asyncio&异步1.协程(coroutine)协程不是计算机提供,而是程序员人为创造。协程(coroutine),也可以被称为微线程,是一种用户态内的上下文切换技术。简而言之,其实就是通过一个线程实现代码块互相切换运行。例如:deffunc1():print(1)...print(2)deffu......
  • 【RocketMQ】消息的消费总结
    消费者从Broker拉取到消息之后,会将消息提交到线程池中进行消费,RocketMQ消息消费是批量进行的,如果一批消息的个数小于预先设置的批量消费大小,直接构建消费请求ConsumeRequest将消费请求提交到线程池处理,否则需要分批构建进行提交。消息消费在消息被提交到线程池后进行处理时,会调......
  • KingbaseES V8R3集群运维案例之---流复制异步同步及全同步模式配置
    案例说明:通过案例描述KingbaseESV8R3集群异步、同步及全同步强一致性配置,本案例为一主二备的架构。适用版本:KingbaseESV8R3集群架构:集群复制配置参数说明:1)sync_flag[kingbase@node101bin]$cat../etc/HAmodule.conf|grep-isync_#1->synchronouscluster,0->async......