首页 > 其他分享 >七、获取消息的方式

七、获取消息的方式

时间:2023-07-09 20:22:07浏览次数:27  
标签:消费 方式 pullConsumer 获取 消息 namesrv consumer public

RocketMQ获取消息的方式有两种:

  1. PULL(消费者主动去Broker拉取):拉取消息需要编写代码去Broker获取。通过DefaultMQPullConsumer,关联namesrv后,通过topic获取到关联的所有Message Queue。遍历所有的Message Queue,批量获取消息。并消费。直到处理完所有的Message Queue。用户需要自己保存消费进度,也就是Message Queue下一次的OffSet。

  2. PUSH(服务端推送):当消息到达时,服务端主动推送消息给消费者。RocketMQ的实现方式还是使用PULL,但是封装了遍历Message Queue的过程,并注册MessageListener,取到消息后唤醒MessageListener消费消息。

优缺点比较

  • PUSH实时性高,但是增加了服务端负载,而且可能会造成消费者消息堆积,消费者消费能力不同,如果服务端推送消息过快,消费较慢就会造成消息堆积。消费消息逻辑简单,只需添加MessageListener,用户不用自己维护消费进度。

  • PULL是由消费者自动从服务端拉取,较灵活,但是需要自己编写代码拉取消息,而且拉取消息的时间间隔不好控制,间隔太短,空请求太多,间隔太长,消息不能处理。还需要用户维护消费进度。

 

上个例子的消费者OldVersionConsumer 就是PUSH方式:

@Slf4j
@Component
public class OldVersionConsumer implements InitializingBean, DisposableBean {
    private DefaultMQPushConsumer consumer;

    @Value("${rocketmq.namesrv}")
    private String namesrv;

    String topic = "MyTopic";

    @Override
    public void destroy() throws Exception {
        if (consumer != null) {
            consumer.shutdown();
        }
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        consumer = new DefaultMQPushConsumer("my-consumer");

        consumer.setNamesrvAddr(namesrv);
       // 设置从之前的消费位置开始消费
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);

        consumer.subscribe(topic, "*");

        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                log.info("消费消息:{}", list);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });


        consumer.start();

    }
}

新增OldVersionPullConsumer pull方式的消费者:

@Slf4j
@Component
public class OldVersionPullConsumer implements InitializingBean, DisposableBean {

    private DefaultMQPullConsumer pullConsumer;

    @Value("${rocketmq.namesrv}")
    private String namesrv;

    String topic = "MyTopic";

    Map<MessageQueue, Long> OFFSET_MAP = new HashMap<>();

    @Override
    public void destroy() throws Exception {
        if (pullConsumer != null) {
            pullConsumer.shutdown();
        }
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        pullConsumer = new DefaultMQPullConsumer("my-pull-consumer");

        pullConsumer.setNamesrvAddr(namesrv);

        pullConsumer.start();

        handleMassage();
    }


    @Scheduled(cron = "0/30 * * * * ?")
    private void handleMassage() throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
        Set<MessageQueue> messageQueues = pullConsumer.fetchSubscribeMessageQueues(topic);
        log.info("message queue:{}", messageQueues);
        for (MessageQueue messageQueue : messageQueues) {
            PullResult pullResult = pullConsumer.pull(messageQueue, (String) null,getOffSet(messageQueue) , 32);
            // 保存消费进度
            OFFSET_MAP.put(messageQueue, pullResult.getNextBeginOffset());
            List<MessageExt> list = null;
            switch (pullResult.getPullStatus()) {
                case FOUND:
                    list = pullResult.getMsgFoundList();
                    break;
                case  NO_NEW_MSG:
                    break;
                case NO_MATCHED_MSG:
                    break;
                case   OFFSET_ILLEGAL:
                    break;
                default:
                    break;
            }

            if (!CollectionUtils.isEmpty(list)) {
                log.info("消费消息:{}", list);
            }
        }
    }

    private Long getOffSet(MessageQueue queue) {
        Long aLong = OFFSET_MAP.get(queue);
        if (Objects.isNull(aLong)) {
            return 0L;
        } else {
            return aLong;
        }
    }
}

OFFSET_MAP 保存了每个Message Queue的消费进度。

标签:消费,方式,pullConsumer,获取,消息,namesrv,consumer,public
From: https://www.cnblogs.com/shigongp/p/17539242.html

相关文章

  • 所有源码关注公众号获取
    一、所有源码获取方法:1.关注公众号->商业项目->杂货铺->cv视觉源码;2.根据关键字搜索项目二、项目合作、学生毕设:1.专业的团队,985毕业,大厂工作;2.专业的服务,支持答疑;        公众号:                    个人号: ......
  • python获取小红书web_session,以及解决x-s签名验证(2023-07-09)
    一、web_session请求接口:https://edith.xiaohongshu.com/api/sns/web/v1/login/activate请求类型:post提交数据:{}这儿是两个字符{},笔者最初提交None,总得不到结果,chromeF12才发现需要这两个字符。二、签名验证x-s 该请求需要x-s签名验证,签名代码如下:a1="186d30820a4......
  • [笔记]组成原理_指令系统_指令的寻址方式(题)
    指令系统中采用不同寻址方式的目的是()A.提供扩展操作码的可能,并降低指令译码难度。B.可缩短指令字长扩大寻址空间,提高编程的灵活性.C.实现程序控制.D.三者都正确.采用不同寻址方式提高了指令译码的复杂度,所以A错。实现程序控制是通过转移指令而非寻址方式进行的,与寻址方式无......
  • C# httpclient获取cookies实现模拟web登录
    目前在公司做一款平台化的产品,我主要负责PC端上的开发,在产品推荐过程中为了节省开发时间很多功能模块没来得及做原生,用CEF嵌入了很多带功能web页面,与客户端进行交互从而实现功能。在二期开发中,产品需求说明书中需要把登录功能放在客户端来做,这当中涉及到一个问题客户端做登入登出......
  • 五、普通消息
    生产者能发送的消息类型有:Normal:普通消息,消息本身无特殊语义,消息之间也没有任何关联。FIFO:顺序消息,ApacheRocketMQ通过消息分组MessageGroup标记一组特定消息的先后顺序,可以保证消息的投递顺序严格按照消息发送时的顺序。Delay:定时/延时消息,通过指定延时时间控制消息生......
  • excle的几种写出方式
    importpandaslis1=['a1','b1','c1','d1','e1']lis2=['a2','b2','c2','d2','e2']lis3=['a3','b3','c3','d3......
  • 滑动验证码-获取滑动长度
    滑动验证码-获取滑动长度思路:为了更快精准的找出滑动的距离,因此可以给图片做适当的处理灰度化。所谓灰度,就是图像没有色彩。举个例子,原本的彩色图片是有RGB三种颜色组成的(也就是每个像素点由三个值),现在给图像灰度化,每个像素点只有一个值(0-255)表示颜色的深度为了方......
  • springcloud - 通过消息总线bus进行刷新
    修改3344服务pom文件 <!--添加消息总线RabbitMQ支持--> <dependency>   <groupId>org.springframework.cloud</groupId>   <artifactId>spring-cloud-starter-bus-amqp</artifactId> </dependency>yaml文件 #rabbitmq相关配置 spring: ......
  • 2023-07-08:RabbitMQ如何做到消息不丢失?
    2023-07-08:RabbitMQ如何做到消息不丢失?答案2023-07-08:1.持久化发送消息时设置delivery_mode属性为2,使消息被持久化保存到磁盘,即使RabbitMQ服务器宕机也能保证消息不丢失。同时,创建队列时设置durable属性为True,以确保队列也被持久化保存。2.确认机制消费者通过basic.ack命令向......
  • ubuntu 通过软链接的方式修改 Docker 镜像默认存储位置以防止空间占满
    和之前的修改conda存储位置一样,我们同样可以通过软链接的方式,修改存储位置。前文:https://www.cnblogs.com/odesey/p/17218519.htmlhttps://www.cnblogs.com/odesey/p/17512848.html默认情况下Docker容器的存放位置在/var/lib/docker目录下面,可以通过下面命令查看具体......