首页 > 其他分享 >rabbitmq listener注解@RabbitListener里的queues是个数组,你用了吗?

rabbitmq listener注解@RabbitListener里的queues是个数组,你用了吗?

时间:2023-12-26 17:04:41浏览次数:40  
标签:levySignFlow RabbitListener queues rabbitmq httpInvokeLevySignApi api 服务商 levySi

靠谱的程序员具有注重实效的偏执,对于重复多行的代码,总会想办法消除重复。

我们zhongtai-channel里在调用服务商接口发起签约前,使用了mq进行异步处理。即:zhongtai-channel签约RPCAPI接收到上游的请求后,先同步持久化保存签约请求流水,然后将签约数据放入rabbitmq消息队列,等待程序里的消费者方法消费消息队列中的消息,调用服务商对接Service发起HTTP签约请求。

签约RPCAPI,参数校验,持久化数据,写入消息队列


消息队列


监听消息出队,调用服务商对接Service发起HTTP签约请求

系统运行过程中遇到一个问题,程序在消费消息时,有些三方服务商接口的响应时效比较长,进而阻塞消息队列,导致整体签约能力下降。鉴于自有服务商的签约量比重大,考虑自有服务商系统是我们自研的,接口响应时效相对可控,因此,为了提高系统生产能力,将自有服务商的签约与三方服务商的签约一分为二,分别放入两个不同的消息队列。

代码:

package com.emax.channel.provider.modules.mq.sign;

/**
 * @Description 服务商签约MQ异步实现
 * @Author Panda
 * @Date 2023/10/11
 **/
@Slf4j
@Component
@Configuration
public class LevySignApiInvokerMqBroker {

    ...省略一堆@Bean定义

    public void sendLevyGotoSignQueue(LevySignFlow levySignFlow, LevySignDTO levySignDTO) {
        log.info("httpInvokeLevySignApi 调用服务商api签约 mq异步实现 消息入队,{}_{}_{} levySignFlow={}", ...);
        Object[] obj = {levySignFlow, levySignDTO};

        // 获取服务商协议类型
        final LevyChannelConfig levyConfigInfo = levyChannelConfigService.getLevyConfigCache(Long.parseLong(levySignFlow.getTaxSourceId()));
        final String apiSuit = levyConfigInfo.getApiSuit();

        if (StrUtil.equals(apiSuit, LevyChannelRouteEnum.OWN.getCode())) {
            /**
             * 消费者参见{@link #onMessageByOwn(Object[])}
             **/
            rabbitTemplate.convertAndSend(exchangeName, internalQueueName, obj);

        } else {
            /**
             * 消费者参见{@link #onMessage(Object[])}
             **/
            rabbitTemplate.convertAndSend(exchangeName, externalQueueName, obj);
        }
    }


    @RabbitHandler
    @RabbitListener(queues = "#{levySignApiQueueExternal.name}",concurrency = "5", containerFactory = "signPreFetchLimitContainerFactory")
    public void onMessage(Object[] objects) throws Exception {
        LevySignFlow levySignFlow = (LevySignFlow) objects[0];
        LevySignDTO levySignDTO = (LevySignDTO) objects[1];
        log.info("httpInvokeLevySignApi 调用服务商api签约 mq异步实现 消息出队,{}_{}_{} levySignDTO={}", ...);
        long nowTime = System.currentTimeMillis();

        // 执行逻辑
        try {
            levyGotoSignAndSendSignQueryQueue(levySignFlow, levySignDTO);
        } catch (Exception e){
            log.error("httpInvokeLevySignApi 调用服务商api签约异常,levySignFlow:{}",JSON.toJSONString(levySignFlow),e);
        }
        log.info("httpInvokeLevySignApi 耗时={} 调用服务商api签约,签约流水号:{} 服务商名称:{}", System.currentTimeMillis() - nowTime,...);
    }

    @RabbitHandler
    @RabbitListener(queues = "#{levySignApiQueueInternal.name}",concurrency = "5", containerFactory = "signPreFetchLimitContainerFactory")
    public void onMessageByOwn(Object[] objects) throws Exception {
        LevySignFlow levySignFlow = (LevySignFlow) objects[0];
        LevySignDTO levySignDTO = (LevySignDTO) objects[1];
        log.info("httpInvokeLevySignApi_owm 调用服务商api签约 mq异步实现 消息出队,{}_{}_{} levySignDTO={}", ...);
        long nowTime = System.currentTimeMillis();

        // 执行逻辑
        try {
            levyGotoSignAndSendSignQueryQueue(levySignFlow, levySignDTO);
        }catch (Exception e){
            log.error("httpInvokeLevySignApi_own 调用服务商api签约异常,levySignFlow:{}",JSON.toJSONString(levySignFlow),e);
        }
        log.info("httpInvokeLevySignApi_owm 耗时={} 调用服务商api签约,签约流水号:{} 服务商名称:{}", System.currentTimeMillis() - nowTime,...);

    }

    private void levyGotoSignAndSendSignQueryQueue(LevySignFlow levySignFlow, LevySignDTO levySignDTO) {
        TaxSignStatusEnum taxSignStatusEnum = levyGotoSignService.syncGotoSign(levySignFlow, levySignDTO);
        if (TaxSignStatusEnum.isFinalState(taxSignStatusEnum)) {
            LevySignFlow levySignFlowFinal = levySignFlowManager.getById(levySignFlow.getId());
            levySignResultNotifyService.async(levySignFlowFinal);
        }else {
            levySignQueryApiInvokerMqBroker.sendLevySignQueryQueue(levySignFlow,5);
        }
    }
}

 

注意其中的两个onMessage方法,看到冗余代码了吗?靠谱的程序员觉得别扭!

 

先介绍一下两个技术点

  1. @RabbitListener注解的queues是一个数组。 - - - - 当两个或多个的其他属性如concurrency都相同时,此技术点满足。
  2. 一个onMessage方法可以同时标注两个@RabbitListener。 - - - - - - 当两个或多个的属性各不相同时, 使用此技术点。

 

So,代码重构就easy了。使用上面的技术点1。将两个onMessage合二为一。(BTW,此时已经没必要有levyGotoSignAndSendSignQueryQueue方法了)

@RabbitHandler
    @RabbitListener(queues = "#{levySignApiQueueInternal.name}, #{levySignApiQueueExternal.name}", concurrency = "5", containerFactory = "signPreFetchLimitContainerFactory")
    public void onMessage(Object[] objects) throws Exception {
        LevySignFlow levySignFlow = (LevySignFlow) objects[0];
        LevySignDTO levySignDTO = (LevySignDTO) objects[1];
        log.info("httpInvokeLevySignApi 调用服务商api签约 mq异步实现 消息出队,{}_{}_{} levySignDTO={}", ...);
        long nowTime = System.currentTimeMillis();

        // 执行逻辑
        try {
            levyGotoSignAndSendSignQueryQueue(levySignFlow, levySignDTO);
        } catch (Exception e){
            log.error("httpInvokeLevySignApi 调用服务商api签约异常,levySignFlow:{}",JSON.toJSONString(levySignFlow),e);
        }
        log.info("httpInvokeLevySignApi 耗时={} 调用服务商api签约,签约流水号:{} 服务商名称:{}", System.currentTimeMillis() - nowTime,...);
    }

 

 

这时,善于思考的同学可能会提问了。 你消费者方法写成一个了,那方法里怎么知道是自有服务商签约还是三方服务商签约呢?我日后可能会添加不同的性能控制策略。

easy,只要思想不滑坡,办法总比困难多。

  • 办法1:生产者代码里写入消息时,增加自有服务商签约或三方服务商签约的标识。
  • 办法2:消费者代码根据levyId查询LevyConfigCache,即可知道是自有服务商还是三方服务商。
  • 办法3:你琢磨。

 

标签:levySignFlow,RabbitListener,queues,rabbitmq,httpInvokeLevySignApi,api,服务商,levySi
From: https://blog.51cto.com/u_15708799/8985237

相关文章

  • SpringBoot集成多个RabbitMq(多个MQ链接)
    ##2023年12月16日20:25:36 项目中使用RabbitMQ作为应用间信息互通,本次梳理下关于MQ的使用。1、引入依赖<!--引入依赖,使用v2.5.6版本--><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot......
  • rabbitMq怎么查看队列日志消息-Tracing日志
    Trace是Rabbitmq用于记录每一次发送的消息,方便使用Rabbitmq的开发者调试、排错。1、启动Tracing插件在RabbitMQ中默认是关闭的,需手动开启。此处rabbitMQ是使用docker部署的##进入rabbitMq中dockerexec-itrabbitmq1bash##启动日志插件rabbitmq-pluginsenablerabbitmq_tr......
  • RabbitMQ vs Kafka:正面交锋!
    介绍作为一名有着大量微服务系统处理经验的软件架构师,我经常遇到一个不断重复的问题:“我应该使用RabbitMQ还是Kafka?”出于某种原因,许多开发人员认为这些技术是可以互换的。虽然在某些情况下确实如此,但RabbitMQ还是Kafka之间存在根本上的差异。因此,不同的场景需要不同的,选择......
  • Rabbitmq消息大量堆积,我慌了!
    背景记得有次公司搞促销活动,流量增加,但是系统一直很平稳(我们开发的系统真牛),大家很开心的去聚餐,谈笑风声,气氛融洽,突然电话响起....运维:小李,你们系统使用的rabbitmq的消息大量堆积,导致服务器cpu飙升,赶紧回来看看,服务器要顶不住了小李:好的系统架构描述我们使用rabbitmq主要是为了系统......
  • RabbitMQ面试题【理论知识】
    常用交换机DirectExchange直连交换机消费方式为一对一,即每个消息只会被消费一次,当有多个消费者时,消费方式为轮询。TopicExchange主题交换机,可以绑定一个路由,路由可以是固定的也可以是通配符,当发送消息的路由同时满足时,都可以收到消息,多个消费者时,消费方式为轮询。FanoutExchange扇......
  • rabbitmq添加延时通道时报错
    rabbitmq添加延时通道时报错'x-delayed-type'mustbeanexistingexchangetype解决方案:我实际用的是x-delayed-type:topic ......
  • RabbitMQ vs Kafka:正面交锋!
    介绍作为一名有着大量微服务系统处理经验的软件架构师,我经常遇到一个不断重复的问题:“我应该使用RabbitMQ还是Kafka?”出于某种原因,许多开发人员认为这些技术是可以互换的。虽然在某些情况下确实如此,但RabbitMQ还是Kafka之间存在根本上的差异。因此,不同的场景需要不同的,......
  • rabbitmq listener注解@RabbitListener里的queues是个数组,你用了吗?
    靠谱的程序员具有注重实效的偏执,对于重复多行的代码,总会想办法消除重复。我们zhongtai-channel里在调用服务商接口发起签约前,使用了mq进行异步处理。即:zhongtai-channel签约RPCAPI接收到上游的请求后,先同步持久化保存签约请求流水,然后将签约数据放入rabbitmq消息队列,等待程序里......
  • RabbitMQの延迟消息
    目录1、什么是延时队列2、延时队列使用场景3、RabbitMQ中的TTL4、如果使用RabbitMQ来实现延时队列4.1、安装插件4.2、代码实现4.2.1、新建maven工程,pom.xml文件4.2.2、属性配置文件application.yml4.2.3、定义常量4.2.3、配置RabbitMQ4.2.4、定义重试时间枚举类型4.2.5、定义消息......
  • 消息中间件的选择:RabbitMQ是一个明智的选择
    ......