一、生产者
1、pom.xml
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.3</version>
</dependency>
2、application.properties
server.port=2049
rocketmq.name-server=127.0.0.1:9876
rocketmq.producer.group=my-group
3、Producer
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.io.UnsupportedEncodingException;
import java.util.List;
public class Producer {
public static void main(String[] args) throws MQClientException, UnsupportedEncodingException,
RemotingException, InterruptedException, MQBrokerException {
// 1、创建 DefaultMQProducer
DefaultMQProducer producer = new DefaultMQProducer("demo-producer");
// 2、设置 name server
producer.setNamesrvAddr("127.0.0.1:9876");
// 3、开启 producer
producer.start();
// 连续发送 5 条信息
for (int index = 200; index <= 250; index++) {
// 创建消息
Message message = new Message("springboot-mq2023", "TAG_A", "KEYS_!",
("send msg to 【2050】HELLO!" + index).getBytes(RemotingHelper.DEFAULT_CHARSET));
// 指定 MessageQueue,顺序发送消息
// 第一个参数:消息体
// 第二个参数:选中指定的消息队列对象(会将所有的消息队列传进来,需要自己选择)
// 第三个参数:选择对应的队列下标
SendResult result = producer.send(message, new MessageQueueSelector() {
// 第一个参数:所有的消息队列对象
// 第二个参数:消息体
// 第三个参数:传入的消息队列下标
@Override
public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
// 获取队列下标
int index = (int) o;
return list.get(index);
}
}, 0);
System.out.println("发送第:" + index + " 条信息成功:" + result);
}
// 关闭 producer
producer.shutdown();
}
}
二、消费者
1、pom.xml
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.3</version>
</dependency>
2、application.properties
server.port=2050
rocketmq.name-server=127.0.0.1:9876
rocketmq.producer.group=my-group
3、Consumer
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component
@RocketMQMessageListener(topic = "springboot-mq2023",consumerGroup = "springboot-mq-consumer-1")
public class Consumer implements RocketMQListener<String> {
@Override
public void onMessage(String s) {
log.info("Receive message from【2049】:"+s);
}
}
4、Consumberrocket2050Application
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class Consumberrocket2050Application {
public static void main(String[] args) {
SpringApplication.run(Consumberrocket2050Application.class, args);
}
}
标签:producer,org,boot,Consumer20230331,RocketMQ,import,apache,SB,rocketmq From: https://www.cnblogs.com/smallfa/p/17277145.html