在Spring Boot中使用RocketMQ进行同步和异步消息传输的关键是使用RocketMQTemplate
类。下面是两个例子,分别演示了如何实现同步和异步消息传输。
首先,确保你已经添加了RocketMQ的依赖到你的pom.xml
中,如下所示:
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.0</version>
</dependency>
然后在application.yml
中配置RocketMQ的相关属性:
rocketmq:
name-server: 127.0.0.1:9876 # 替换为你的RocketMQ NameServer地址
producer:
group: my-producer-group
同步消息传输例子
在这个例子中,我们将创建一个服务来发送同步消息。
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class SyncProducerService {
@Autowired
private RocketMQTemplate rocketMQTemplate;
public SendResult send(String topic, String message) {
// 同步发送消息
return rocketMQTemplate.syncSend(topic, message);
}
}
异步消息传输例子
在这个例子中,我们将创建一个服务来发送异步消息。
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class AsyncProducerService {
@Autowired
private RocketMQTemplate rocketMQTemplate;
public void send(String topic, String message) {
// 异步发送消息
rocketMQTemplate.asyncSend(topic, message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
// 消息发送成功的回调
System.out.printf("Async send message success. MsgId: %s%n", sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
// 消息发送失败的回调
System.out.printf("Async send message failed. Exception: %s%n", e);
}
});
}
}
批量消息传输例子
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import java.util.List;
import java.util.stream.Collectors;
@Service
public class BatchProducerService {
@Autowired
private RocketMQTemplate rocketMQTemplate;
public SendResult sendBatch(String topic, List<String> messages) {
// 创建批量消息
List<org.springframework.messaging.Message> messageList = messages.stream()
.map(msg -> MessageBuilder.withPayload(msg).build())
.collect(Collectors.toList());
// 批量发送消息
return rocketMQTemplate.syncSend(topic, messageList);
}
}
事务消息传输例子
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.springframework.messaging.Message;
@RocketMQTransactionListener(txProducerGroup = "my-tx-producer-group")
public class TransactionListenerImpl implements RocketMQLocalTransactionListener {
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
// 执行本地事务
try {
// 这里写本地事务的业务逻辑
System.out.println("执行本地事务");
// 如果本地事务执行成功,返回COMMIT
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
// 如果本地事务执行失败,返回ROLLBACK
return RocketMQLocalTransactionState.ROLLBACK;
}
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
// 检查本地事务状态
System.out.println("检查本地事务状态");
// 根据本地事务状态返回相应的状态
// RocketMQLocalTransactionState.COMMIT
// RocketMQLocalTransactionState.ROLLBACK
// RocketMQLocalTransactionState.UNKNOWN
return RocketMQLocalTransactionState.COMMIT;
}
}
然后创建一个服务来发送事务消息:
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
@Service
public class TransactionProducerService {
@Autowired
private RocketMQTemplate rocketMQTemplate;
public void sendMessageInTransaction(String topic, String message) {
// 发送事务消息
rocketMQTemplate.sendMessageInTransaction("my-tx-producer-group", topic,
MessageBuilder.withPayload(message).build(), null);
}
}
在主类Application.java
中,你可以创建一个命令行运行器来测试这两个服务。
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
@SpringBootApplication
public class RocketMQDemoApplication implements CommandLineRunner {
public static void main(String[] args) {
SpringApplication.run(RocketMQDemoApplication.class, args);
}
@Override
public void run(String... args) throws Exception {
ConfigurableApplicationContext context = SpringApplication.run(RocketMQDemoApplication.class, args);
SyncProducerService syncProducerService = context.getBean(SyncProducerService.class);
AsyncProducerService asyncProducerService = context.getBean(AsyncProducerService.class);
// 发送同步消息
SendResult sendResult = syncProducerService.send("test-topic-sync", "Hello, RocketMQ Sync!");
System.out.println("Sync message sent. MsgId: " + sendResult.getMsgId());
// 发送异步消息
asyncProducerService.send("test-topic-async", "Hello, RocketMQ Async!");
}
}
在这个例子中,我们创建了两个服务,分别用于同步和异步发送消息。在同步发送的例子中,syncSend
方法会等待消息发送成功或失败,并返回一个SendResult
对象。在异步发送的例子中,asyncSend
方法会立即返回,并提供一个SendCallback
回调来处理消息发送成功或失败的情况。
请注意,为了运行这些例子,你需要确保RocketMQ服务器正在运行,并且application.yml
中配置的name-server
地址是正确的。此外,你可能需要根据实际情况调整Topic和消费者组名。