首页 > 其他分享 >rabbitmq listener注解@RabbitListener里的queues是个数组,你用了吗?

rabbitmq listener注解@RabbitListener里的queues是个数组,你用了吗?

时间:2023-12-21 09:14:22浏览次数:28  
标签:levySignFlow RabbitListener queues rabbitmq httpInvokeLevySignApi api 服务商 levySi

靠谱的程序员具有注重实效的偏执,对于重复多行的代码,总会想办法消除重复。

我们zhongtai-channel里在调用服务商接口发起签约前,使用了mq进行异步处理。即:zhongtai-channel签约RPCAPI接收到上游的请求后,先同步持久化保存签约请求流水,然后将签约数据放入rabbitmq消息队列,等待程序里的消费者方法消费消息队列中的消息,调用服务商对接Service发起HTTP签约请求。

签约RPCAPI,参数校验,持久化数据,写入消息队列消息队列监听消息出队,调用服务商对接Service发起HTTP签约请求

系统运行过程中遇到一个问题,程序在消费消息时,有些三方服务商接口的响应时效比较长,进而阻塞消息队列,导致整体签约能力下降。鉴于自有服务商的签约量比重大,考虑自有服务商系统是我们自研的,接口响应时效相对可控,因此,为了提高系统生产能力,将自有服务商的签约与三方服务商的签约一分为二,分别放入两个不同的消息队列。

代码:

package com.emax.channel.provider.modules.mq.sign;

/**
 * @Description 服务商签约MQ异步实现
 * @Author Panda
 * @Date 2023/10/11
 **/
@Slf4j
@Component
@Configuration
public class LevySignApiInvokerMqBroker {

    ...省略一堆@Bean定义

    public void sendLevyGotoSignQueue(LevySignFlow levySignFlow, LevySignDTO levySignDTO) {
        log.info("httpInvokeLevySignApi 调用服务商api签约 mq异步实现 消息入队,{}_{}_{} levySignFlow={}", ...);
        Object[] obj = {levySignFlow, levySignDTO};

        // 获取服务商协议类型
        final LevyChannelConfig levyConfigInfo = levyChannelConfigService.getLevyConfigCache(Long.parseLong(levySignFlow.getTaxSourceId()));
        final String apiSuit = levyConfigInfo.getApiSuit();

        if (StrUtil.equals(apiSuit, LevyChannelRouteEnum.OWN.getCode())) {
            /**
             * 消费者参见{@link #onMessageByOwn(Object[])}
             **/
            rabbitTemplate.convertAndSend(exchangeName, internalQueueName, obj);

        } else {
            /**
             * 消费者参见{@link #onMessage(Object[])}
             **/
            rabbitTemplate.convertAndSend(exchangeName, externalQueueName, obj);
        }
    }


    @RabbitHandler
    @RabbitListener(queues = "#{levySignApiQueueExternal.name}",concurrency = "5", containerFactory = "signPreFetchLimitContainerFactory")
    public void onMessage(Object[] objects) throws Exception {
        LevySignFlow levySignFlow = (LevySignFlow) objects[0];
        LevySignDTO levySignDTO = (LevySignDTO) objects[1];
        log.info("httpInvokeLevySignApi 调用服务商api签约 mq异步实现 消息出队,{}_{}_{} levySignDTO={}", ...);
        long nowTime = System.currentTimeMillis();

        // 执行逻辑
        try {
            levyGotoSignAndSendSignQueryQueue(levySignFlow, levySignDTO);
        } catch (Exception e){
            log.error("httpInvokeLevySignApi 调用服务商api签约异常,levySignFlow:{}",JSON.toJSONString(levySignFlow),e);
        }
        log.info("httpInvokeLevySignApi 耗时={} 调用服务商api签约,签约流水号:{} 服务商名称:{}", System.currentTimeMillis() - nowTime,...);
    }

    @RabbitHandler
    @RabbitListener(queues = "#{levySignApiQueueInternal.name}",concurrency = "5", containerFactory = "signPreFetchLimitContainerFactory")
    public void onMessageByOwn(Object[] objects) throws Exception {
        LevySignFlow levySignFlow = (LevySignFlow) objects[0];
        LevySignDTO levySignDTO = (LevySignDTO) objects[1];
        log.info("httpInvokeLevySignApi_owm 调用服务商api签约 mq异步实现 消息出队,{}_{}_{} levySignDTO={}", ...);
        long nowTime = System.currentTimeMillis();

        // 执行逻辑
        try {
            levyGotoSignAndSendSignQueryQueue(levySignFlow, levySignDTO);
        }catch (Exception e){
            log.error("httpInvokeLevySignApi_own 调用服务商api签约异常,levySignFlow:{}",JSON.toJSONString(levySignFlow),e);
        }
        log.info("httpInvokeLevySignApi_owm 耗时={} 调用服务商api签约,签约流水号:{} 服务商名称:{}", System.currentTimeMillis() - nowTime,...);

    }

    private void levyGotoSignAndSendSignQueryQueue(LevySignFlow levySignFlow, LevySignDTO levySignDTO) {
        TaxSignStatusEnum taxSignStatusEnum = levyGotoSignService.syncGotoSign(levySignFlow, levySignDTO);
        if (TaxSignStatusEnum.isFinalState(taxSignStatusEnum)) {
            LevySignFlow levySignFlowFinal = levySignFlowManager.getById(levySignFlow.getId());
            levySignResultNotifyService.async(levySignFlowFinal);
        }else {
            levySignQueryApiInvokerMqBroker.sendLevySignQueryQueue(levySignFlow,5);
        }
    }
}

 

注意其中的两个onMessage方法,看到冗余代码了吗?靠谱的程序员觉得别扭!

 

先介绍一下两个技术点

  1. @RabbitListener注解的queues是一个数组。 - - - - 当两个或多个的其他属性如concurrency都相同时,此技术点满足。
  2. 一个onMessage方法可以同时标注两个@RabbitListener。 - - - - - - 当两个或多个的属性各不相同时, 使用此技术点。

 

So,代码重构就easy了。使用上面的技术点1。将两个onMessage合二为一。(BTW,此时已经没必要有levyGotoSignAndSendSignQueryQueue方法了)

    @RabbitHandler
    @RabbitListener(queues = "#{levySignApiQueueInternal.name}, #{levySignApiQueueExternal.name}", concurrency = "5", containerFactory = "signPreFetchLimitContainerFactory")
    public void onMessage(Object[] objects) throws Exception {
        LevySignFlow levySignFlow = (LevySignFlow) objects[0];
        LevySignDTO levySignDTO = (LevySignDTO) objects[1];
        log.info("httpInvokeLevySignApi 调用服务商api签约 mq异步实现 消息出队,{}_{}_{} levySignDTO={}", ...);
        long nowTime = System.currentTimeMillis();

        // 执行逻辑
        try {
            levyGotoSignAndSendSignQueryQueue(levySignFlow, levySignDTO);
        } catch (Exception e){
            log.error("httpInvokeLevySignApi 调用服务商api签约异常,levySignFlow:{}",JSON.toJSONString(levySignFlow),e);
        }
        log.info("httpInvokeLevySignApi 耗时={} 调用服务商api签约,签约流水号:{} 服务商名称:{}", System.currentTimeMillis() - nowTime,...);
    }

 

 

这时,善于思考的同学可能会提问了。 你消费者方法写成一个了,那方法里怎么知道是自有服务商签约还是三方服务商签约呢?我日后可能会添加不同的性能控制策略。

easy,只要思想不滑坡,办法总比困难多。

  • 办法1:生产者代码里写入消息时,增加自有服务商签约或三方服务商签约的标识。
  • 办法2:消费者代码根据levyId查询LevyConfigCache,即可知道是自有服务商还是三方服务商。
  • 办法3:你琢磨。

 

标签:levySignFlow,RabbitListener,queues,rabbitmq,httpInvokeLevySignApi,api,服务商,levySi
From: https://www.cnblogs.com/buguge/p/17917306.html

相关文章

  • RabbitMQの延迟消息
    目录1、什么是延时队列2、延时队列使用场景3、RabbitMQ中的TTL4、如果使用RabbitMQ来实现延时队列4.1、安装插件4.2、代码实现4.2.1、新建maven工程,pom.xml文件4.2.2、属性配置文件application.yml4.2.3、定义常量4.2.3、配置RabbitMQ4.2.4、定义重试时间枚举类型4.2.5、定义消息......
  • 消息中间件的选择:RabbitMQ是一个明智的选择
    ......
  • RabbitMQ 安装(Centos7)
    1.下载rabbitmq和erlang1.1erlanghttps://github.com/rabbitmq/erlang-rpm/releases1.2rabbitMQhttps://github.com/rabbitmq/rabbitmq-server/releases/download/v3.12.10/rabbitmq-server-3.12.10-1.el8.noarch.rpm2.安装上传工具(非必要)yuminstalllrzsz.x86_64......
  • window下rabbitmq安装教程
    RabbitMq简介1.1消息队列中间件简介消息队列中间件是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削锋等问题实现高性能,高可用,可伸缩和最终一致性[架构]使用较多的消息队列有ActiveMQ(安全),RabbitMQ,ZeroMQ,Kafka(大数据),MetaMQ,RocketMQ以下介绍消息队列在实际应用中常用的使......
  • 创建rabbitmq用户时报错
    报错内容如下:Error:unabletoperformanoperationonnode'rabbit@bogon'.Pleaseseediagnosticsinformationandsuggestionsbelow.Mostcommonreasonsforthisare:Targetnodeisunreachable(e.g.duetohostnameresolution,TCPconnectionorfirewal......
  • RabbitMQ
    RabbitMQ入门1、什么是MQ消息队列(MessageQueue),是基础数据结构中“先进先出”的一种数据结构。一般用来解决应用解耦、异步消息、流量削峰等问题,实现高性能、高可用、可伸缩和最终一致性架构。2、MQ的作用1、流量消峰当有一家商店,最多可以访问100人访问,这时人流量特别大......
  • SpringBoot集成多个RabbitMq(多个MQ链接)
    ##2023年12月16日20:25:36 项目中使用RabbitMQ作为应用间信息互通,本次梳理下关于MQ的使用。1、引入依赖<!--引入依赖,使用v2.5.6版本--><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-b......
  • 记录rabbitMQ的广播队列的错误使用导致未能正确广播的问题
    背景说明:有3个服务S1、S2、S3现在服务S1需要发布消息到广播交换机E,并建立了两个普通队列Q1,Q2,将其绑定到广播交换机E上服务S2和服务S3同时监听队列Q1,Q2本意是,服务S1通过广播交换机E把消息同时推送给服务S2和S3后面测试时,同事发现,服务S2和服务S3都只接收到了部分消息,而不是全......
  • RabbitMQ介绍
    一、RabbitMQ介绍1.1现存问题服务调用:两个服务调用时,我们可以通过传统的HTTP方式,让服务A直接去调用服务B的接口,但是这种方式是同步的方式,虽然可以采用SpringBoot提供的@Async注解实现异步调用,但是这种方式无法确保请求一定回访问到服务B的接口。那如何保证服务A的请求信息......
  • RabbitMQ
    简介作用流量消峰:相当于等待队列。应用解耦:当子系统出现故障,该系统的要处理的信息被缓存在消息队列中,待修复完成后即可恢复。异步处理。四大核心概念生产者:产生数据发送消息的程序。交换机:一方面它接收来自生产者的消息,另一方面它将消息推送到队列中。队列:队列是Rabbit......