首页 > 其他分享 >RocketMQ使用实例

RocketMQ使用实例

时间:2023-05-30 15:25:41浏览次数:39  
标签:producer org 实例 rocketmq 使用 apache import consumer RocketMQ

下面是一个使用Java实现的RocketMQ示例代码,用于发送和消费消息:

首先,您需要下载并安装RocketMQ,并启动NameServer和Broker。

接下来,您可以使用以下示例代码来发送和消费消息:

Producer.java文件:

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;

public class Producer {
    public static void main(String[] args) {
        try {
            // 创建一个DefaultMQProducer实例
            DefaultMQProducer producer = new DefaultMQProducer("producer_group");
            
            // 设置NameServer地址
            producer.setNamesrvAddr("localhost:9876");
            
            // 启动Producer实例
            producer.start();
            
            // 创建消息对象
            Message message = new Message("topic_name", "tag", "Hello, RocketMQ!".getBytes());
            
            // 发送消息
            producer.send(message);
            
            // 关闭Producer实例
            producer.shutdown();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

Consumer.java文件:

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;

public class Consumer {
    public static void main(String[] args) {
        try {
            // 创建一个DefaultMQPushConsumer实例
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group");
            
            // 设置NameServer地址
            consumer.setNamesrvAddr("localhost:9876");
            
            // 设置消费开始位置
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            
            // 订阅主题和标签
            consumer.subscribe("topic_name", "tag");
            
            // 注册消息监听器
            consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
                for (MessageExt msg : msgs) {
                    System.out.println("Received message: " + new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            });
            
            // 启动Consumer实例
            consumer.start();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

在上述示例代码中,Producer类用于发送消息,而Consumer类用于接收和处理消息。

在Producer类中,我们创建一个DefaultMQProducer实例,并设置NameServer的地址。然后,我们创建一个消息对象,指定主题、标签和消息内容。最后,通过调用send方法发送消息。

在Consumer类中,我们创建一个DefaultMQPushConsumer实例,并设置NameServer的地址。然后,我们设置消费开始位置和订阅特定的主题和标签。注册一个消息监听器,用于处理接收到的消息。最后,调用start方法启动Consumer实例。

请确保您已正确配置RocketMQ服务器和相关依赖项,并根据需要更改服务器地址、主题和标签。运行Producer和Consumer代码后,您将看到Producer发送的消息在Consumer端被接收和打印出来。



当然!这里是另一个使用Java实现的RocketMQ示例代码,用于发送事务消息:

首先,您需要下载并安装RocketMQ,并启动NameServer和Broker。

接下来,您可以使用以下示例代码来发送和处理事务消息:

TransactionProducer.java文件:

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;

public class TransactionProducer {
    public static void main(String[] args) {
        try {
            // 创建一个TransactionMQProducer实例
            TransactionMQProducer producer = new TransactionMQProducer("producer_group");
            
            // 设置NameServer地址
            producer.setNamesrvAddr("localhost:9876");
            
            // 设置事务监听器
            producer.setTransactionListener(new TransactionListener() {
                @Override
                public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
                    // 执行本地事务,根据业务逻辑返回LocalTransactionState
                    // 如果事务执行成功,返回COMMIT_MESSAGE
                    // 如果事务执行失败,返回ROLLBACK_MESSAGE
                    // 如果事务状态不确定,返回UNKNOW
                    return LocalTransactionState.COMMIT_MESSAGE;
                }
                
                @Override
                public LocalTransactionState checkLocalTransaction(MessageExt msg) {
                    // 检查本地事务状态,根据消息内容返回LocalTransactionState
                    // 如果事务已提交,返回COMMIT_MESSAGE
                    // 如果事务已回滚,返回ROLLBACK_MESSAGE
                    // 如果事务状态不确定,返回UNKNOW
                    return LocalTransactionState.COMMIT_MESSAGE;
                }
            });
            
            // 启动Producer实例
            producer.start();
            
            // 创建消息对象
            Message message = new Message("topic_name", "tag", "Hello, RocketMQ!".getBytes());
            
            // 发送事务消息
            TransactionSendResult sendResult = producer.sendMessageInTransaction(message, null);
            
            System.out.println("Transaction Send Result: " + sendResult.getLocalTransactionState());
            
            // 关闭Producer实例
            producer.shutdown();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

在上述示例代码中,我们创建了一个TransactionMQProducer实例,并设置了NameServer的地址。然后,我们设置了事务监听器,其中包含executeLocalTransaction方法和checkLocalTransaction方法。在executeLocalTransaction方法中,您可以执行本地事务操作,并根据事务结果返回适当的LocalTransactionState。在checkLocalTransaction方法中,您可以检查本地事务状态,并返回相应的LocalTransactionState。

在主程序中,我们创建了一个消息对象,并通过调用sendMessageInTransaction方法发送事务消息。然后,我们打印事务发送结果的LocalTransactionState。

请确保您已正确配置RocketMQ服务器和相关依赖项,并根据需要更改服务器地址、主题和标签。运行TransactionProducer代码后,您将看到事务消息发送的结果。根据本地事务执行的结果和检查的结果,LocalTransactionState将被设置为相应的状态(

COMMIT_MESSAGE、ROLLBACK_MESSAGE或UNKNOW)。



当然!这里是另一个使用Java实现的RocketMQ示例代码,用于顺序消息的发送和消费:

首先,您需要下载并安装RocketMQ,并启动NameServer和Broker。

接下来,您可以使用以下示例代码来发送和消费顺序消息:

顺序消息生产者 (OrderedProducer.java):

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueueSelector;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;

import java.util.List;

public class OrderedProducer {
    public static void main(String[] args) {
        try {
            // 创建一个DefaultMQProducer实例
            DefaultMQProducer producer = new DefaultMQProducer("producer_group");
            
            // 设置NameServer地址
            producer.setNamesrvAddr("localhost:9876");
            
            // 启动Producer实例
            producer.start();
            
            // 发送顺序消息
            for (int i = 0; i < 10; i++) {
                Message message = new Message("topic_name", "tag", ("Hello, RocketMQ! " + i).getBytes());
                SendResult sendResult = producer.send(message, new MessageQueueSelector() {
                    @Override
                    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                        Integer orderId = (Integer) arg;
                        int index = orderId % mqs.size();
                        return mqs.get(index);
                    }
                }, i);
                System.out.println("Send Result: " + sendResult);
            }
            
            // 关闭Producer实例
            producer.shutdown();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

顺序消息消费者 (OrderedConsumer.java):

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class OrderedConsumer {
    public static void main(String[] args) {
        try {
            // 创建一个DefaultMQPushConsumer实例
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group");
            
            // 设置NameServer地址
            consumer.setNamesrvAddr("localhost:9876");
            
            // 设置消费模式为集群模式
            consumer.setMessageModel(MessageModel.CLUSTERING);
            
            // 订阅主题和标签
            consumer.subscribe("topic_name", "tag");
            
            // 注册消息监听器
            consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
                for (MessageExt msg : msgs) {
                    System.out.println("Received message: " + new String(msg.getBody()) + ", QueueId: " + msg.getQueueId());
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            });
            
            // 启动Consumer实例
            consumer.start();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

在上述示例代码中,OrderedProducer类用于发送顺序消息,而OrderedConsumer类用于接收和处理顺序消息。

在OrderedProducer类中,我们创建了一个DefaultMQProducer实例,并设置了Name

Server的地址。然后,我们使用循环发送了10条顺序消息。在发送消息时,我们使用MessageQueueSelector来选择要发送的消息队列,并通过参数指定了消息的顺序。每条消息都会返回一个SendResult对象。

在OrderedConsumer类中,我们创建了一个DefaultMQPushConsumer实例,并设置了NameServer的地址。我们还设置了消费模式为集群模式,通过subscribe方法订阅特定的主题和标签。注册了一个消息监听器,用于处理接收到的消息。收到的消息将被打印出来,并附带消息所在的队列ID。

请确保您已正确配置RocketMQ服务器和相关依赖项,并根据需要更改服务器地址、主题和标签。运行OrderedProducer和OrderedConsumer代码后,您将看到顺序消息发送和消费的结果。每条消息都将按照发送时指定的顺序被消费。

标签:producer,org,实例,rocketmq,使用,apache,import,consumer,RocketMQ
From: https://www.cnblogs.com/lukairui/p/17443326.html

相关文章

  • 详解RocketMQ 顺序消费机制
    摘要:顺序消息是指对于一个指定的Topic,消息严格按照先进先出(FIFO)的原则进行消息发布和消费,即先发布的消息先消费,后发布的消息后消费。本文分享自华为云社区《RocketMQ顺序消费机制》,作者:勇哥java实战分享。顺序消息是指对于一个指定的Topic,消息严格按照先进先出(FIFO)的原则......
  • Hazelcast使用实例
    当涉及到演示如何使用Hazelcast时,以下是一些更多的示例代码,涵盖了Hazelcast的不同功能和用例。示例1:使用Hazelcast分布式Mapimportcom.hazelcast.core.Hazelcast;importcom.hazelcast.core.HazelcastInstance;importcom.hazelcast.core.IMap;publicclassHazelcastMapDe......
  • spring security使用实例
    下面是一个使用Java和SpringSecurity的详细示例代码。该示例演示了如何设置身份验证和授权规则,并保护特定的URL路径。请注意,这只是一个基本示例,您可以根据自己的需求进行修改和扩展。首先,确保您已经安装了Java开发环境(JDK)和Maven构建工具。接下来,我们将创建一个Maven项目,并在项......
  • gradle使用实例
    以下是一个详细的Gradle示例代码,用于构建和管理Java项目:build.gradle文件:plugins{id'java'}group'com.example'version'1.0-SNAPSHOT'sourceCompatibility=1.8repositories{mavenCentral()}dependencies{implementation'......
  • memcache使用实例
    以下是一个详细的Java示例代码,用于使用Memcached进行缓存操作:首先,您需要在Java项目中添加对spymemcached库的依赖项。您可以使用Maven或Gradle等构建工具添加以下依赖项:Maven依赖项(将以下代码添加到pom.xml文件中):<dependencies><dependency><groupId>net.spy</gr......
  • 钉钉日志推送实例
    背景:jeecgboot集成钉钉小程序,进行日志填报,同时推送到钉钉日志系统给相关人员。主要方便日志问题的讨论,回复等。效果: 接口:通过数据ID查找封装Marckdown方式进行推送;publicStringdoLogsDataDingtalk(StringtableId,StringtoUsers){    if(!thirdAppConfig.......
  • store文件夹 vue_vue-cli2使用store存储全局变量
    1.引入store安装引入vuex,在main.js里面:importstorefrom'./store'//store引入newVue({el:'#app',router,store,//store引入components:{App},template:''})在store文件夹下创建index.js入口文件,添加下面内容:importVuefrom'vue';im......
  • 基于ZigBee3.0技术的数传电台功能使用详解
    一、ZigBee3.0数传电台功能简介1、4G DTU数传电台LINK灯详解基于zigbee3.0通信技术的4G DTU数传电台LINK灯用于指示模块当前网络状态,设备入网成功后LINK灯常亮,当设备没有网络时LINK灯熄灭;在协调器模式下,该引脚指示zigbee模块是否正常建立网络,协调器和路由器在配网模式下1Hz闪......
  • springboot使用jdbc连接mysql(不用配置文件)
     1、连接mysql的工具类:packagecom.jzproject.common.mysql;importcom.alibaba.fastjson.JSON;importcom.alibaba.fastjson.JSONArray;importorg.springframework.jdbc.core.JdbcTemplate;importjava.sql.*;importjava.util.ArrayList;importjava.util.HashMap;......
  • 使用gifski制作高质量的gif动态图
    如何在Linux安装gifski:https://snapcraft.io/install/gifski/debian EnablesnapdOnDebian9(Stretch)andnewer,snapcanbeinstalleddirectlyfromthecommandline:sudoaptupdatesudoaptinstallsnapdsudosnapinstallcoreInstallGifski......