首页 > 其他分享 >rocketmq记录

rocketmq记录

时间:2023-10-19 19:45:49浏览次数:28  
标签:producer 记录 org apache import consumer rocketmq

Rocketmq  生产者、消费者


maven引用
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.7.0</version>
</dependency>
<!--一个好用的工具包,可以不引入-->
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.3.0</version>
</dependency>

生产者实现

1、NamesrvAddr参数在多个节点时,用英文分号分隔,例: 192.168.9.58:9876;192.168.9.59:9876

import cn.hutool.core.util.RandomUtil;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;

public class Producer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer(“ProducerGroupName”);
producer.setNamesrvAddr(“127.0.0.1:9876”);
//发送超时时间,默认3000 单位ms
producer.setSendMsgTimeout(5000);
producer.start();

try {
Message msg = new Message("TestTopic",// topic
"177", // tag 可以为空,用以简单的筛选。
RandomUtil.randomString(8), // key 可以为空,可用以查询。
("test" + RandomUtil.randomString(8)).getBytes()); // body ,我常将对象转json再获取byte[] 进行传输。
SendResult send = producer.send(msg);
if (send.getSendStatus().equals(SendStatus.SEND_OK)) {
//发送成功处理
}else {
//发送失败处理
}
} catch (Exception e) {
e.printStackTrace();
}
producer.shutdown();
}
}

多线程加批量生产者模拟实现

1、批量发送时,topic必须为同一个,否则发送会报异常。

2、批量发送相较于单条发送速度提升很大。

import cn.hutool.core.util.RandomUtil;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;

public class Producer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer(“ProducerGroupName”);
producer.setNamesrvAddr(“127.0.0.1:9876”);
//发送超时时间,默认3000 单位ms
producer.setSendMsgTimeout(5000);
producer.start();

int threadCount = 20;
int forCount = 100000;
CountDownLatch latch = new CountDownLatch(threadCount);
long start = System.currentTimeMillis();
for (int i = 0; i < threadCount; i++) {
new Thread(() -> {
try {
List<Message> list = new ArrayList<>();
for (int j = 0; j < forCount; j++) {
try {
Message msg = new Message("TestTopic",// topic
"177", // tag
RandomUtil.randomString(8), // key
("test" + RandomUtil.randomString(8)).getBytes()); // body
list.add(msg);
if (list.size() >= 100) {
SendResult send = producer.send(list);
if (send.getSendStatus().equals(SendStatus.SEND_OK)) {
//发送成功处理
list.clear();
}else {
//发送失败处理
}
}
} catch (Exception e) {
//发送失败处理
e.printStackTrace();
}
}
if (list.size() > 0) {
SendResult send = producer.send(list);
if (!send.getSendStatus().equals(SendStatus.SEND_OK)) {
System.out.println(send);
}
list.clear();
}
} catch (Exception e) {
e.printStackTrace();
} finally {
latch.countDown();
}
}).start();
}
latch.await();
long hs = System.currentTimeMillis() - start;
System.out.println(hs);

long speed = (threadCount * forCount) / (hs >= 0 ? 1 : hs / 1000);
System.out.println("速度" + speed);
//正式环境不要发完就就shutdown,要在应用退出时再shutdown。
producer.shutdown();
}
}

push消费者
import cn.hutool.core.lang.Console;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;

public class PushConsumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(“PushConsumerGroupName”);
consumer.setNamesrvAddr(“127.0.0.1:9876”);
//一个GroupName第一次消费时的位置
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
consumer.setConsumeThreadMin(20);
consumer.setConsumeThreadMax(20);
//要消费的topic,可使用tag进行简单过滤
consumer.subscribe(“TestTopic”, “*”);
//一次最大消费的条数
consumer.setConsumeMessageBatchMaxSize(100);
//消费模式,广播或者集群,默认集群。
consumer.setMessageModel(MessageModel.CLUSTERING);
//在同一jvm中 需要启动两个同一GroupName的情况需要这个参数不一样。
consumer.setInstanceName(“InstanceName”);
//配置消息监听
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
try {
//业务处理
msgs.forEach(msg -> {
Console.log(msg);
});
} catch (Exception e) {
System.err.println(“接收异常” + e);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
System.out.println(“Consumer Started.”);
}
}

 

pull消费者
从4.6之后,提供了DefaultLitePullConsumer 大大简化了pull的操作。以下为新实现,4.6之前的版本不支持。

import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.lang.Console;
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class PullConsumer {
private static boolean runFlag = true;
public static void main(String[] args) throws Exception {
DefaultLitePullConsumer consumer = new DefaultLitePullConsumer(“PullConsumerGroupName”);
consumer.setNamesrvAddr(“127.0.0.1:9876”);
//要消费的topic,可使用tag进行简单过滤
consumer.subscribe(“TestTopic”, “*”);
//一次最大消费的条数
consumer.setPullBatchSize(100);
//无消息时,最大阻塞时间。默认5000 单位ms
consumer.setPollTimeoutMillis(5000);
consumer.start();
while (runFlag){
try {
//拉取消息,无消息时会阻塞
List msgs = consumer.poll();
if (CollUtil.isEmpty(msgs)){
continue;
}
//业务处理
msgs.forEach(msg-> Console.log(new String(msg.getBody())));
//同步消费位置。不执行该方法,应用重启会存在重复消费。
consumer.commitSync();
}catch (Exception e){
e.printStackTrace();
}
}
consumer.shutdown();
}
}

标签:producer,记录,org,apache,import,consumer,rocketmq
From: https://www.cnblogs.com/wyy1/p/17775457.html

相关文章

  • Rocketmq使用
    Rocketmq 生产者、消费者maven引用<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.7.0</version></dependency><!--一个好用的工具包,可以不......
  • 记录--如何判断两个数组的内容是否相等
    这里给大家分享我在网上总结出来的一些知识,希望对大家有所帮助题目给定两个数组,判断两数组内容是否相等。不使用排序不考虑元素位置例:[1,2,3]和[1,3,2]//true[1,2,3]和[1,2,4]//false思考几秒:有了......
  • SpringBoot3.0 + RocketMq 构建企业级数据中台[内附资料]
    点击下载:SpringBoot3.0+RocketMq构建企业级数据中台[内附资料]  提取码:3cnfSpringBoot3.0是SpringBoot框架的最新版本,它提供了愈加简单、快速和高效的方式来构建企业级应用程序。RocketMq是一款高性能的音讯中间件,能够完成散布式音讯传送和处置。将SpringBoot3.0和Rocket......
  • 博客成立啦!亲爱的小博,以后将会和你共同记录下我的成长路上的点点滴滴!加油!
    大学中的第一篇博客来啦~初秋的落叶承载了新一轮的悲欢离合,校园的晚霞昭示着再一次的轻装上阵,或许像那句话所说“我们出发,皆因我们相信,也因我们可以”。我们相信前路坎坷但意义非凡,勇敢向前不惧阻挠。我们可以迎接挑战并全力以赴,不负韶华再赴前章。夏末不能仅仅停留在遗憾的书......
  • jemeter使用jp@gc - PerfMon Metrics Collector性能监控startAgent2.2.1版本崩溃记录
    jemeter进行性能测试时,一开启startAgent就退出,以下是正常情况:原因:JDK版本与startAgent版本不对应解决方式:之前使用的是jdk1.8.0_321,更换为jdk1.8.0_141后就正常了 ......
  • Git的状态记录
    Gitdiff-files下A:添加文件C:将文件复制到新文件中D:删除文件M:修改文件的内容或模式R:重命名文件T:文件类型的更改(常规文件、符号链接或子模块)U:文件未合并(必须先完成合并,然后才能提交)X:“未知”更改类型(很可能是错误,请报告)参考:Git-git-diff-filesDocumentation(......
  • (华为欧拉操作系统)openEuler 22.03 LTS SP2 安装使用记录
    本来是准备在虚拟机中安装rockylinux,,结果安装失败,你可以从第4步开始看。1.到 https://www.virtualbox.org/ 下载VirtualBox-7.0.12-159484-Win.exe  并安装 2.到 https://rockylinux.org/zh_CN/download/下载   Rocky-9.2-x86_64-dvd.iso 由于这个iso有8.8G,正......
  • 记录最近学习到的一些windows常用命令
    1、ping命令可以用来测试网络是否联通,使用步骤如下:1.1、在电脑上面同时按住win+R,输入cmd,回车 1.2、在窗口里面输入一行格式为“ping+空格+IP地址(或者网站地址)”的命令,如“pingwww.baidu.com” 如上图可见,本台计算机可以与百度通信2、cd命令cd命令可以更改命令提......
  • 【问题记录】自定义注解处理程序 AbstractProcessor,就是不生效,执行没效果
    1  前言最近在看注解处理程序,也想打包的时候,生成一点自己的东西,写了一个 AbstractProcessor,奶奶的花了两个早上,一直想不明白为什么不生效:唉,仅记录哈。......
  • linux系统安装ftp记录
    使用的是yum工具安装ftp服务器1.首先安装ftp-0.17-89.el9.x86_64,这个不是ftp服务器程序,只是ftp访问工具:yuminstall-yftp2.安装vsftpd,ftp服务程序:yuminstall-y vsftpd启动服务:       systemctlstartvsftpd.service随系统启动:       ......