本文主要介绍使用 Java 来操作 RocketMQ,文中所使用到的软件版本:Java 1.8.0_341、RocketMQ 5.1.3、rocketmq-client-java 5.0.5。
1、引入依赖
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client-java</artifactId> <version>5.0.5</version> </dependency>
2、Java 操作 RocketMQ
2.1、创建主题
bin/mqadmin updateTopic -n 10.49.196.33:9876 -t NORMAL_TOPIC -c DefaultCluster -a +message.type=NORMAL #普通消息 bin/mqadmin updateTopic -n 10.49.196.33:9876 -t DELAY_TOPIC -c DefaultCluster -a +message.type=DELAY #定时/延时消息 bin/mqadmin updateTopic -n 10.49.196.33:9876 -t FIFO_TOPIC -c DefaultCluster -a +message.type=FIFO #顺序消息 bin/mqadmin updateTopic -n 10.49.196.33:9876 -t TRANSACTION_TOPIC -c DefaultCluster -a +message.type=TRANSACTION #事务消息
2.2、生产者
2.1.1、普通消息
A、同步发送
@Test public void normal() throws ClientException, IOException { ClientServiceProvider provider = ClientServiceProvider.loadService(); //SessionCredentialsProvider sessionCredentialsProvider = new StaticSessionCredentialsProvider("RocketMQ", "12345678"); ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder() .setEndpoints(endpoints) //.setCredentialProvider(sessionCredentialsProvider) .build(); String topic = "NORMAL_TOPIC"; Producer producer = provider.newProducerBuilder() .setTopics(topic) .setClientConfiguration(clientConfiguration) .build(); Message message = provider.newMessageBuilder() .setTopic(topic) .setKeys("messageKey", "messageKey2") .setTag("messageTag") .setBody("normalMessage".getBytes()) .build(); SendReceipt sendReceipt = producer.send(message); log.info("Send message successfully, messageId={}", sendReceipt.getMessageId()); producer.close(); }
B、异步发送
@Test public void normalAsync() throws ClientException, InterruptedException { ClientServiceProvider provider = ClientServiceProvider.loadService(); ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder().setEndpoints(endpoints).build(); String topic = "NORMAL_TOPIC"; Producer producer = provider.newProducerBuilder() .setTopics(topic) .setClientConfiguration(clientConfiguration) .build(); Message message = provider.newMessageBuilder() .setTopic(topic) .setKeys("messageKey", "messageKey2") .setTag("messageTag") .setBody("normalMessage".getBytes()) .build(); CompletableFuture<SendReceipt> sendReceiptCompletableFuture = producer.sendAsync(message); sendReceiptCompletableFuture.whenComplete(new BiConsumer<SendReceipt, Throwable>() { @Override public void accept(SendReceipt sendReceipt, Throwable throwable) { log.info("Send message successfully, messageId={}", sendReceipt.getMessageId()); } }); Thread.sleep(Long.MAX_VALUE); }
2.1.2、定时/延时消息
@Test public void delay() throws ClientException, IOException { ClientServiceProvider provider = ClientServiceProvider.loadService(); ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder().setEndpoints(endpoints).build(); String topic = "DELAY_TOPIC"; Producer producer = provider.newProducerBuilder() .setTopics(topic) .setClientConfiguration(clientConfiguration) .build(); Long deliverTimeStamp = System.currentTimeMillis() + 1 * 60 * 1000; Message message = provider.newMessageBuilder() .setTopic(topic) .setKeys("messageKey", "messageKey2") .setTag("messageTag") .setBody(("delayMessage-" + LocalDateTime.now()).getBytes()) .setDeliveryTimestamp(deliverTimeStamp) .build(); SendReceipt sendReceipt = producer.send(message); log.info("Send message successfully, messageId={}", sendReceipt.getMessageId()); producer.close(); }
2.1.3、顺序消息
@Test public void fifo() throws ClientException, IOException { ClientServiceProvider provider = ClientServiceProvider.loadService(); ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder().setEndpoints(endpoints).build(); String topic = "FIFO_TOPIC"; Producer producer = provider.newProducerBuilder() .setTopics(topic) .setClientConfiguration(clientConfiguration) .build(); Message message = provider.newMessageBuilder() .setTopic(topic) .setKeys("messageKey", "messageKey2") .setTag("messageTag") .setBody(("fifoMessage").getBytes()) .setMessageGroup("fifoGroup") .build(); SendReceipt sendReceipt = producer.send(message); log.info("Send message successfully, messageId={}", sendReceipt.getMessageId()); producer.close(); }
2.1.4、事务消息
@Test public void transaction() throws ClientException, IOException { ClientServiceProvider provider = ClientServiceProvider.loadService(); ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder().setEndpoints(endpoints).build(); String topic = "TRANSACTION_TOPIC"; Producer producer = provider.newProducerBuilder() .setTopics(topic) .setClientConfiguration(clientConfiguration) .setTransactionChecker(new TransactionChecker() { @Override public TransactionResolution check(MessageView messageView) { TransactionResolution result = TransactionResolution.COMMIT; //TODO:检查业务是否正常处理,如果失败则 result = TransactionResolution.ROLLBACK return result; } }) .build(); Message message = provider.newMessageBuilder() .setTopic(topic) .setKeys("messageKey", "messageKey2") .setTag("messageTag") .setBody(("transactionMessage").getBytes()) .build(); Transaction transaction = producer.beginTransaction(); try { SendReceipt sendReceipt = producer.send(message, transaction); log.info("Send message successfully, messageId={}", sendReceipt.getMessageId()); //TODO:业务处理 transaction.commit(); } catch (Exception e) { transaction.rollback(); } producer.close(); }
2.3、消费者
2.3.1、PushConsumer
@Test public void pushConsumer() throws ClientException, InterruptedException { ClientServiceProvider provider = ClientServiceProvider.loadService(); ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder() .setEndpoints(endpoints) .build(); FilterExpression filterExpression = new FilterExpression("*", FilterExpressionType.TAG); Map<String, FilterExpression> subscriptionExpressions = new HashMap<>(); subscriptionExpressions.put("NORMAL_TOPIC", filterExpression); subscriptionExpressions.put("DELAY_TOPIC", filterExpression); subscriptionExpressions.put("FIFO_TOPIC", filterExpression); subscriptionExpressions.put("TRANSACTION_TOPIC", filterExpression); PushConsumer pushConsumer = provider.newPushConsumerBuilder() .setClientConfiguration(clientConfiguration) .setConsumerGroup(group) .setSubscriptionExpressions(subscriptionExpressions) .setMessageListener(messageView -> { log.info("接受到消息:messageId={},body={}", messageView.getMessageId(), StandardCharsets.UTF_8.decode(messageView.getBody())); return ConsumeResult.SUCCESS; }).build(); log.info("开始接受消息..."); Thread.sleep(Long.MAX_VALUE); }
2.3.2、SimpleConsumer
A、同步订阅
@Test public void simpleConsumer() throws ClientException { ClientServiceProvider provider = ClientServiceProvider.loadService(); ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder() .setEndpoints(endpoints) .build(); FilterExpression filterExpression = new FilterExpression("*", FilterExpressionType.TAG); Map<String, FilterExpression> subscriptionExpressions = new HashMap<>(); subscriptionExpressions.put("NORMAL_TOPIC", filterExpression); subscriptionExpressions.put("DELAY_TOPIC", filterExpression); subscriptionExpressions.put("FIFO_TOPIC", filterExpression); subscriptionExpressions.put("TRANSACTION_TOPIC", filterExpression); SimpleConsumer simpleConsumer = provider.newSimpleConsumerBuilder() .setClientConfiguration(clientConfiguration) .setConsumerGroup(group) .setSubscriptionExpressions(subscriptionExpressions) .setAwaitDuration(Duration.ofSeconds(30)) .build(); log.info("开始接受消息..."); while (true) { List<MessageView> messageViews = simpleConsumer.receive(10, Duration.ofSeconds(30)); for (MessageView messageView : messageViews) { log.info("接受到消息:messageId={},body={}", messageView.getMessageId(), StandardCharsets.UTF_8.decode(messageView.getBody())); simpleConsumer.ack(messageView); } } }
B、异步订阅
@Test public void simpleConsumerAsync() throws ClientException { ClientServiceProvider provider = ClientServiceProvider.loadService(); ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder() .setEndpoints(endpoints) .build(); FilterExpression filterExpression = new FilterExpression("*", FilterExpressionType.TAG); Map<String, FilterExpression> subscriptionExpressions = new HashMap<>(); subscriptionExpressions.put("NORMAL_TOPIC", filterExpression); subscriptionExpressions.put("DELAY_TOPIC", filterExpression); subscriptionExpressions.put("FIFO_TOPIC", filterExpression); subscriptionExpressions.put("TRANSACTION_TOPIC", filterExpression); SimpleConsumer simpleConsumer = provider.newSimpleConsumerBuilder() .setClientConfiguration(clientConfiguration) .setConsumerGroup(group) .setSubscriptionExpressions(subscriptionExpressions) .setAwaitDuration(Duration.ofSeconds(30)) .build(); log.info("开始接受消息..."); while (true) { CompletableFuture<List<MessageView>> future = simpleConsumer.receiveAsync(10, Duration.ofSeconds(30)); future.whenCompleteAsync((messageViews, throwable) -> { if (throwable != null) { log.error("Failed to receive message", throwable); return; } for (MessageView messageView : messageViews) { log.info("接受到消息:messageId={},body={}", messageView.getMessageId(), StandardCharsets.UTF_8.decode(messageView.getBody())); CompletableFuture<Void> completableFuture = simpleConsumer.ackAsync(messageView); completableFuture.whenCompleteAsync(new BiConsumer<Void, Throwable>() { @Override public void accept(Void unused, Throwable throwable) { if (null != throwable) { log.error("Message is failed to be acknowledged, messageId={}", messageView.getMessageId(), throwable); return; } log.info("Message is acknowledged successfully, messageId={}", messageView.getMessageId()); } }); } }); } }
异步订阅会报错:DEADLINE_EXCEEDED: deadline exceeded after 32.999993800s;可能是 RocketMQ 的 bug。
2.4、完整代码
2.4.1、生产者
package com.abc.demo.rocketmq; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.apis.*; import org.apache.rocketmq.client.apis.message.Message; import org.apache.rocketmq.client.apis.message.MessageView; import org.apache.rocketmq.client.apis.producer.*; import org.junit.Test; import java.io.IOException; import java.time.LocalDateTime; import java.util.concurrent.CompletableFuture; import java.util.function.BiConsumer; @Slf4j public class ProducerCase { private static final String endpoints = "10.49.196.33:8081"; @Test public void normal() throws ClientException, IOException { ClientServiceProvider provider = ClientServiceProvider.loadService(); //SessionCredentialsProvider sessionCredentialsProvider = new StaticSessionCredentialsProvider("RocketMQ", "12345678"); ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder() .setEndpoints(endpoints) //.setCredentialProvider(sessionCredentialsProvider) .build(); String topic = "NORMAL_TOPIC"; Producer producer = provider.newProducerBuilder() .setTopics(topic) .setClientConfiguration(clientConfiguration) .build(); Message message = provider.newMessageBuilder() .setTopic(topic) .setKeys("messageKey", "messageKey2") .setTag("messageTag") .setBody("normalMessage".getBytes()) .build(); SendReceipt sendReceipt = producer.send(message); log.info("Send message successfully, messageId={}", sendReceipt.getMessageId()); producer.close(); } @Test public void normalAsync() throws ClientException, InterruptedException { ClientServiceProvider provider = ClientServiceProvider.loadService(); ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder().setEndpoints(endpoints).build(); String topic = "NORMAL_TOPIC"; Producer producer = provider.newProducerBuilder() .setTopics(topic) .setClientConfiguration(clientConfiguration) .build(); Message message = provider.newMessageBuilder() .setTopic(topic) .setKeys("messageKey", "messageKey2") .setTag("messageTag") .setBody("normalMessage".getBytes()) .build(); CompletableFuture<SendReceipt> sendReceiptCompletableFuture = producer.sendAsync(message); sendReceiptCompletableFuture.whenComplete(new BiConsumer<SendReceipt, Throwable>() { @Override public void accept(SendReceipt sendReceipt, Throwable throwable) { log.info("Send message successfully, messageId={}", sendReceipt.getMessageId()); } }); Thread.sleep(Long.MAX_VALUE); } @Test public void delay() throws ClientException, IOException { ClientServiceProvider provider = ClientServiceProvider.loadService(); ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder().setEndpoints(endpoints).build(); String topic = "DELAY_TOPIC"; Producer producer = provider.newProducerBuilder() .setTopics(topic) .setClientConfiguration(clientConfiguration) .build(); Long deliverTimeStamp = System.currentTimeMillis() + 1 * 60 * 1000; Message message = provider.newMessageBuilder() .setTopic(topic) .setKeys("messageKey", "messageKey2") .setTag("messageTag") .setBody(("delayMessage-" + LocalDateTime.now()).getBytes()) .setDeliveryTimestamp(deliverTimeStamp) .build(); SendReceipt sendReceipt = producer.send(message); log.info("Send message successfully, messageId={}", sendReceipt.getMessageId()); producer.close(); } @Test public void fifo() throws ClientException, IOException { ClientServiceProvider provider = ClientServiceProvider.loadService(); ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder().setEndpoints(endpoints).build(); String topic = "FIFO_TOPIC"; Producer producer = provider.newProducerBuilder() .setTopics(topic) .setClientConfiguration(clientConfiguration) .build(); Message message = provider.newMessageBuilder() .setTopic(topic) .setKeys("messageKey", "messageKey2") .setTag("messageTag") .setBody(("fifoMessage").getBytes()) .setMessageGroup("fifoGroup") .build(); SendReceipt sendReceipt = producer.send(message); log.info("Send message successfully, messageId={}", sendReceipt.getMessageId()); producer.close(); } @Test public void transaction() throws ClientException, IOException { ClientServiceProvider provider = ClientServiceProvider.loadService(); ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder().setEndpoints(endpoints).build(); String topic = "TRANSACTION_TOPIC"; Producer producer = provider.newProducerBuilder() .setTopics(topic) .setClientConfiguration(clientConfiguration) .setTransactionChecker(new TransactionChecker() { @Override public TransactionResolution check(MessageView messageView) { TransactionResolution result = TransactionResolution.COMMIT; //TODO:检查业务是否正常处理,如果失败则 result = TransactionResolution.ROLLBACK return result; } }) .build(); Message message = provider.newMessageBuilder() .setTopic(topic) .setKeys("messageKey", "messageKey2") .setTag("messageTag") .setBody(("transactionMessage").getBytes()) .build(); Transaction transaction = producer.beginTransaction(); try { SendReceipt sendReceipt = producer.send(message, transaction); log.info("Send message successfully, messageId={}", sendReceipt.getMessageId()); //TODO:业务处理 transaction.commit(); } catch (Exception e) { transaction.rollback(); } producer.close(); } }ProducerCase
2.4.2、消费者
package com.abc.demo.rocketmq; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.apis.ClientConfiguration; import org.apache.rocketmq.client.apis.ClientException; import org.apache.rocketmq.client.apis.ClientServiceProvider; import org.apache.rocketmq.client.apis.consumer.*; import org.apache.rocketmq.client.apis.message.MessageView; import org.junit.Test; import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.function.BiConsumer; @Slf4j public class ConsumerCase { private static final String endpoints = "10.49.196.33:8081"; private static final String group = "myGroup"; @Test public void pushConsumer() throws ClientException, InterruptedException { ClientServiceProvider provider = ClientServiceProvider.loadService(); ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder() .setEndpoints(endpoints) .build(); FilterExpression filterExpression = new FilterExpression("*", FilterExpressionType.TAG); Map<String, FilterExpression> subscriptionExpressions = new HashMap<>(); subscriptionExpressions.put("NORMAL_TOPIC", filterExpression); subscriptionExpressions.put("DELAY_TOPIC", filterExpression); subscriptionExpressions.put("FIFO_TOPIC", filterExpression); subscriptionExpressions.put("TRANSACTION_TOPIC", filterExpression); PushConsumer pushConsumer = provider.newPushConsumerBuilder() .setClientConfiguration(clientConfiguration) .setConsumerGroup(group) .setSubscriptionExpressions(subscriptionExpressions) .setMessageListener(messageView -> { log.info("接受到消息:messageId={},body={}", messageView.getMessageId(), StandardCharsets.UTF_8.decode(messageView.getBody())); return ConsumeResult.SUCCESS; }).build(); log.info("开始接受消息..."); Thread.sleep(Long.MAX_VALUE); } @Test public void simpleConsumer() throws ClientException { ClientServiceProvider provider = ClientServiceProvider.loadService(); ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder() .setEndpoints(endpoints) .build(); FilterExpression filterExpression = new FilterExpression("*", FilterExpressionType.TAG); Map<String, FilterExpression> subscriptionExpressions = new HashMap<>(); subscriptionExpressions.put("NORMAL_TOPIC", filterExpression); subscriptionExpressions.put("DELAY_TOPIC", filterExpression); subscriptionExpressions.put("FIFO_TOPIC", filterExpression); subscriptionExpressions.put("TRANSACTION_TOPIC", filterExpression); SimpleConsumer simpleConsumer = provider.newSimpleConsumerBuilder() .setClientConfiguration(clientConfiguration) .setConsumerGroup(group) .setSubscriptionExpressions(subscriptionExpressions) .setAwaitDuration(Duration.ofSeconds(30)) .build(); log.info("开始接受消息..."); while (true) { List<MessageView> messageViews = simpleConsumer.receive(10, Duration.ofSeconds(30)); for (MessageView messageView : messageViews) { log.info("接受到消息:messageId={},body={}", messageView.getMessageId(), StandardCharsets.UTF_8.decode(messageView.getBody())); simpleConsumer.ack(messageView); } } } /** * 该写法会报错:DEADLINE_EXCEEDED: deadline exceeded after 32.999993800s,可能是 RocketMQ 的 bug */ @Test public void simpleConsumerAsync() throws ClientException { ClientServiceProvider provider = ClientServiceProvider.loadService(); ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder() .setEndpoints(endpoints) .build(); FilterExpression filterExpression = new FilterExpression("*", FilterExpressionType.TAG); Map<String, FilterExpression> subscriptionExpressions = new HashMap<>(); subscriptionExpressions.put("NORMAL_TOPIC", filterExpression); subscriptionExpressions.put("DELAY_TOPIC", filterExpression); subscriptionExpressions.put("FIFO_TOPIC", filterExpression); subscriptionExpressions.put("TRANSACTION_TOPIC", filterExpression); SimpleConsumer simpleConsumer = provider.newSimpleConsumerBuilder() .setClientConfiguration(clientConfiguration) .setConsumerGroup(group) .setSubscriptionExpressions(subscriptionExpressions) .setAwaitDuration(Duration.ofSeconds(30)) .build(); log.info("开始接受消息..."); while (true) { CompletableFuture<List<MessageView>> future = simpleConsumer.receiveAsync(10, Duration.ofSeconds(30)); future.whenCompleteAsync((messageViews, throwable) -> { if (throwable != null) { log.error("Failed to receive message", throwable); return; } for (MessageView messageView : messageViews) { log.info("接受到消息:messageId={},body={}", messageView.getMessageId(), StandardCharsets.UTF_8.decode(messageView.getBody())); CompletableFuture<Void> completableFuture = simpleConsumer.ackAsync(messageView); completableFuture.whenCompleteAsync(new BiConsumer<Void, Throwable>() { @Override public void accept(Void unused, Throwable throwable) { if (null != throwable) { log.error("Message is failed to be acknowledged, messageId={}", messageView.getMessageId(), throwable); return; } log.info("Message is acknowledged successfully, messageId={}", messageView.getMessageId()); } }); } }); } } }ConsumerCase
标签:ClientServiceProvider,Java,--,subscriptionExpressions,TOPIC,build,provider,messa From: https://www.cnblogs.com/wuyongyin/p/17624789.html