靠谱的程序员具有注重实效的偏执,对于重复多行的代码,总会想办法消除重复。
我们zhongtai-channel里在调用服务商接口发起签约前,使用了mq进行异步处理。即:zhongtai-channel签约RPCAPI接收到上游的请求后,先同步持久化保存签约请求流水,然后将签约数据放入rabbitmq消息队列,等待程序里的消费者方法消费消息队列中的消息,调用服务商对接Service发起HTTP签约请求。
签约RPCAPI,参数校验,持久化数据,写入消息队列 | → | 消息队列 | → | 监听消息出队,调用服务商对接Service发起HTTP签约请求 |
---|
系统运行过程中遇到一个问题,程序在消费消息时,有些三方服务商接口的响应时效比较长,进而阻塞消息队列,导致整体签约能力下降。鉴于自有服务商的签约量比重大,考虑自有服务商系统是我们自研的,接口响应时效相对可控,因此,为了提高系统生产能力,将自有服务商的签约与三方服务商的签约一分为二,分别放入两个不同的消息队列。
代码:
package com.emax.channel.provider.modules.mq.sign; /** * @Description 服务商签约MQ异步实现 * @Author Panda * @Date 2023/10/11 **/ @Slf4j @Component @Configuration public class LevySignApiInvokerMqBroker { ...省略一堆@Bean定义 public void sendLevyGotoSignQueue(LevySignFlow levySignFlow, LevySignDTO levySignDTO) { log.info("httpInvokeLevySignApi 调用服务商api签约 mq异步实现 消息入队,{}_{}_{} levySignFlow={}", ...); Object[] obj = {levySignFlow, levySignDTO}; // 获取服务商协议类型 final LevyChannelConfig levyConfigInfo = levyChannelConfigService.getLevyConfigCache(Long.parseLong(levySignFlow.getTaxSourceId())); final String apiSuit = levyConfigInfo.getApiSuit(); if (StrUtil.equals(apiSuit, LevyChannelRouteEnum.OWN.getCode())) { /** * 消费者参见{@link #onMessageByOwn(Object[])} **/ rabbitTemplate.convertAndSend(exchangeName, internalQueueName, obj); } else { /** * 消费者参见{@link #onMessage(Object[])} **/ rabbitTemplate.convertAndSend(exchangeName, externalQueueName, obj); } } @RabbitHandler @RabbitListener(queues = "#{levySignApiQueueExternal.name}",concurrency = "5", containerFactory = "signPreFetchLimitContainerFactory") public void onMessage(Object[] objects) throws Exception { LevySignFlow levySignFlow = (LevySignFlow) objects[0]; LevySignDTO levySignDTO = (LevySignDTO) objects[1]; log.info("httpInvokeLevySignApi 调用服务商api签约 mq异步实现 消息出队,{}_{}_{} levySignDTO={}", ...); long nowTime = System.currentTimeMillis(); // 执行逻辑 try { levyGotoSignAndSendSignQueryQueue(levySignFlow, levySignDTO); } catch (Exception e){ log.error("httpInvokeLevySignApi 调用服务商api签约异常,levySignFlow:{}",JSON.toJSONString(levySignFlow),e); } log.info("httpInvokeLevySignApi 耗时={} 调用服务商api签约,签约流水号:{} 服务商名称:{}", System.currentTimeMillis() - nowTime,...); } @RabbitHandler @RabbitListener(queues = "#{levySignApiQueueInternal.name}",concurrency = "5", containerFactory = "signPreFetchLimitContainerFactory") public void onMessageByOwn(Object[] objects) throws Exception { LevySignFlow levySignFlow = (LevySignFlow) objects[0]; LevySignDTO levySignDTO = (LevySignDTO) objects[1]; log.info("httpInvokeLevySignApi_owm 调用服务商api签约 mq异步实现 消息出队,{}_{}_{} levySignDTO={}", ...); long nowTime = System.currentTimeMillis(); // 执行逻辑 try { levyGotoSignAndSendSignQueryQueue(levySignFlow, levySignDTO); }catch (Exception e){ log.error("httpInvokeLevySignApi_own 调用服务商api签约异常,levySignFlow:{}",JSON.toJSONString(levySignFlow),e); } log.info("httpInvokeLevySignApi_owm 耗时={} 调用服务商api签约,签约流水号:{} 服务商名称:{}", System.currentTimeMillis() - nowTime,...); } private void levyGotoSignAndSendSignQueryQueue(LevySignFlow levySignFlow, LevySignDTO levySignDTO) { TaxSignStatusEnum taxSignStatusEnum = levyGotoSignService.syncGotoSign(levySignFlow, levySignDTO); if (TaxSignStatusEnum.isFinalState(taxSignStatusEnum)) { LevySignFlow levySignFlowFinal = levySignFlowManager.getById(levySignFlow.getId()); levySignResultNotifyService.async(levySignFlowFinal); }else { levySignQueryApiInvokerMqBroker.sendLevySignQueryQueue(levySignFlow,5); } } }
注意其中的两个onMessage方法,看到冗余代码了吗?靠谱的程序员觉得别扭!
先介绍一下两个技术点
- @RabbitListener注解的queues是一个数组。 - - - - 当两个或多个的其他属性如concurrency都相同时,此技术点满足。
- 一个onMessage方法可以同时标注两个@RabbitListener。 - - - - - - 当两个或多个的属性各不相同时, 使用此技术点。
So,代码重构就easy了。使用上面的技术点1。将两个onMessage合二为一。(BTW,此时已经没必要有levyGotoSignAndSendSignQueryQueue方法了)
@RabbitHandler @RabbitListener(queues = "#{levySignApiQueueInternal.name}, #{levySignApiQueueExternal.name}", concurrency = "5", containerFactory = "signPreFetchLimitContainerFactory") public void onMessage(Object[] objects) throws Exception { LevySignFlow levySignFlow = (LevySignFlow) objects[0]; LevySignDTO levySignDTO = (LevySignDTO) objects[1]; log.info("httpInvokeLevySignApi 调用服务商api签约 mq异步实现 消息出队,{}_{}_{} levySignDTO={}", ...); long nowTime = System.currentTimeMillis(); // 执行逻辑 try { levyGotoSignAndSendSignQueryQueue(levySignFlow, levySignDTO); } catch (Exception e){ log.error("httpInvokeLevySignApi 调用服务商api签约异常,levySignFlow:{}",JSON.toJSONString(levySignFlow),e); } log.info("httpInvokeLevySignApi 耗时={} 调用服务商api签约,签约流水号:{} 服务商名称:{}", System.currentTimeMillis() - nowTime,...); }
这时,善于思考的同学可能会提问了。 你消费者方法写成一个了,那方法里怎么知道是自有服务商签约还是三方服务商签约呢?我日后可能会添加不同的性能控制策略。
easy,只要思想不滑坡,办法总比困难多。
- 办法1:生产者代码里写入消息时,增加自有服务商签约或三方服务商签约的标识。
- 办法2:消费者代码根据levyId查询LevyConfigCache,即可知道是自有服务商还是三方服务商。
- 办法3:你琢磨。
标签:levySignFlow,RabbitListener,queues,rabbitmq,httpInvokeLevySignApi,api,服务商,levySi From: https://www.cnblogs.com/buguge/p/17917306.html