首页 > 其他分享 >SB-RocketMQ-Provider-Consumer20230331

SB-RocketMQ-Provider-Consumer20230331

时间:2023-03-31 18:24:41浏览次数:42  
标签:producer org boot Consumer20230331 RocketMQ import apache SB rocketmq

 一、生产者

1、pom.xml
  <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
  </dependency>
  <dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.0.3</version>
  </dependency>

2、application.properties

  server.port=2049
  rocketmq.name-server=127.0.0.1:9876
  rocketmq.producer.group=my-group

3、Producer
  import org.apache.rocketmq.client.exception.MQBrokerException;
  import org.apache.rocketmq.client.exception.MQClientException;
  import org.apache.rocketmq.client.producer.DefaultMQProducer;
  import org.apache.rocketmq.client.producer.MessageQueueSelector;
  import org.apache.rocketmq.client.producer.SendResult;
  import org.apache.rocketmq.common.message.Message;
  import org.apache.rocketmq.common.message.MessageQueue;
  import org.apache.rocketmq.remoting.common.RemotingHelper;
  import org.apache.rocketmq.remoting.exception.RemotingException;
  import java.io.UnsupportedEncodingException;
  import java.util.List;

  public class Producer {
     public static void main(String[] args) throws MQClientException, UnsupportedEncodingException,

         RemotingException, InterruptedException, MQBrokerException {
      // 1、创建 DefaultMQProducer
      DefaultMQProducer producer = new DefaultMQProducer("demo-producer");

      // 2、设置 name server
      producer.setNamesrvAddr("127.0.0.1:9876");

         // 3、开启 producer
         producer.start();

       // 连续发送 5 条信息
      for (int index = 200; index <= 250; index++) {
       // 创建消息
      Message message = new Message("springboot-mq2023", "TAG_A", "KEYS_!",

           ("send msg to 【2050】HELLO!" + index).getBytes(RemotingHelper.DEFAULT_CHARSET));

      // 指定 MessageQueue,顺序发送消息
      // 第一个参数:消息体
      // 第二个参数:选中指定的消息队列对象(会将所有的消息队列传进来,需要自己选择)
      // 第三个参数:选择对应的队列下标
      SendResult result = producer.send(message, new MessageQueueSelector() {
        // 第一个参数:所有的消息队列对象
        // 第二个参数:消息体
        // 第三个参数:传入的消息队列下标
        @Override
        public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
          // 获取队列下标
          int index = (int) o;
          return list.get(index);
        }
      }, 0);
      System.out.println("发送第:" + index + " 条信息成功:" + result);
    }
    // 关闭 producer
    producer.shutdown();
    }
  }

 

 

 

 

 二、消费者

1、pom.xml
  <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
  </dependency>
  <dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.0.3</version>
  </dependency>

2、application.properties

  server.port=2050
  rocketmq.name-server=127.0.0.1:9876
  rocketmq.producer.group=my-group

3、Consumer
  import lombok.extern.slf4j.Slf4j;
  import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
  import org.apache.rocketmq.spring.core.RocketMQListener;
  import org.springframework.stereotype.Component;
  @Slf4j
  @Component
  @RocketMQMessageListener(topic = "springboot-mq2023",consumerGroup = "springboot-mq-consumer-1")
  public class Consumer implements RocketMQListener<String> {

    @Override
    public void onMessage(String s) {
      log.info("Receive message from【2049】:"+s);
    }
  }

4、Consumberrocket2050Application
  import org.springframework.boot.SpringApplication;
  import org.springframework.boot.autoconfigure.SpringBootApplication;

  @SpringBootApplication
  public class Consumberrocket2050Application {

    public static void main(String[] args) {
      SpringApplication.run(Consumberrocket2050Application.class, args);
    }

  }

 

 

 

 

标签:producer,org,boot,Consumer20230331,RocketMQ,import,apache,SB,rocketmq
From: https://www.cnblogs.com/smallfa/p/17277145.html

相关文章