靠谱的程序员具有注重实效的偏执,对于重复多行的代码,总会想办法消除重复。
我们zhongtai-channel里在调用服务商接口发起签约前,使用了mq进行异步处理。即:zhongtai-channel签约RPCAPI接收到上游的请求后,先同步持久化保存签约请求流水,然后将签约数据放入rabbitmq消息队列,等待程序里的消费者方法消费消息队列中的消息,调用服务商对接Service发起HTTP签约请求。
签约RPCAPI,参数校验,持久化数据,写入消息队列 | → | 消息队列 | → | 监听消息出队,调用服务商对接Service发起HTTP签约请求 |
系统运行过程中遇到一个问题,程序在消费消息时,有些三方服务商接口的响应时效比较长,进而阻塞消息队列,导致整体签约能力下降。鉴于自有服务商的签约量比重大,考虑自有服务商系统是我们自研的,接口响应时效相对可控,因此,为了提高系统生产能力,将自有服务商的签约与三方服务商的签约一分为二,分别放入两个不同的消息队列。
代码:
package com.emax.channel.provider.modules.mq.sign;
/**
* @Description 服务商签约MQ异步实现
* @Author Panda
* @Date 2023/10/11
**/
@Slf4j
@Component
@Configuration
public class LevySignApiInvokerMqBroker {
...省略一堆@Bean定义
public void sendLevyGotoSignQueue(LevySignFlow levySignFlow, LevySignDTO levySignDTO) {
log.info("httpInvokeLevySignApi 调用服务商api签约 mq异步实现 消息入队,{}_{}_{} levySignFlow={}", ...);
Object[] obj = {levySignFlow, levySignDTO};
// 获取服务商协议类型
final LevyChannelConfig levyConfigInfo = levyChannelConfigService.getLevyConfigCache(Long.parseLong(levySignFlow.getTaxSourceId()));
final String apiSuit = levyConfigInfo.getApiSuit();
if (StrUtil.equals(apiSuit, LevyChannelRouteEnum.OWN.getCode())) {
/**
* 消费者参见{@link #onMessageByOwn(Object[])}
**/
rabbitTemplate.convertAndSend(exchangeName, internalQueueName, obj);
} else {
/**
* 消费者参见{@link #onMessage(Object[])}
**/
rabbitTemplate.convertAndSend(exchangeName, externalQueueName, obj);
}
}
@RabbitHandler
@RabbitListener(queues = "#{levySignApiQueueExternal.name}",concurrency = "5", containerFactory = "signPreFetchLimitContainerFactory")
public void onMessage(Object[] objects) throws Exception {
LevySignFlow levySignFlow = (LevySignFlow) objects[0];
LevySignDTO levySignDTO = (LevySignDTO) objects[1];
log.info("httpInvokeLevySignApi 调用服务商api签约 mq异步实现 消息出队,{}_{}_{} levySignDTO={}", ...);
long nowTime = System.currentTimeMillis();
// 执行逻辑
try {
levyGotoSignAndSendSignQueryQueue(levySignFlow, levySignDTO);
} catch (Exception e){
log.error("httpInvokeLevySignApi 调用服务商api签约异常,levySignFlow:{}",JSON.toJSONString(levySignFlow),e);
}
log.info("httpInvokeLevySignApi 耗时={} 调用服务商api签约,签约流水号:{} 服务商名称:{}", System.currentTimeMillis() - nowTime,...);
}
@RabbitHandler
@RabbitListener(queues = "#{levySignApiQueueInternal.name}",concurrency = "5", containerFactory = "signPreFetchLimitContainerFactory")
public void onMessageByOwn(Object[] objects) throws Exception {
LevySignFlow levySignFlow = (LevySignFlow) objects[0];
LevySignDTO levySignDTO = (LevySignDTO) objects[1];
log.info("httpInvokeLevySignApi_owm 调用服务商api签约 mq异步实现 消息出队,{}_{}_{} levySignDTO={}", ...);
long nowTime = System.currentTimeMillis();
// 执行逻辑
try {
levyGotoSignAndSendSignQueryQueue(levySignFlow, levySignDTO);
}catch (Exception e){
log.error("httpInvokeLevySignApi_own 调用服务商api签约异常,levySignFlow:{}",JSON.toJSONString(levySignFlow),e);
}
log.info("httpInvokeLevySignApi_owm 耗时={} 调用服务商api签约,签约流水号:{} 服务商名称:{}", System.currentTimeMillis() - nowTime,...);
}
private void levyGotoSignAndSendSignQueryQueue(LevySignFlow levySignFlow, LevySignDTO levySignDTO) {
TaxSignStatusEnum taxSignStatusEnum = levyGotoSignService.syncGotoSign(levySignFlow, levySignDTO);
if (TaxSignStatusEnum.isFinalState(taxSignStatusEnum)) {
LevySignFlow levySignFlowFinal = levySignFlowManager.getById(levySignFlow.getId());
levySignResultNotifyService.async(levySignFlowFinal);
}else {
levySignQueryApiInvokerMqBroker.sendLevySignQueryQueue(levySignFlow,5);
}
}
}
注意其中的两个onMessage方法,看到冗余代码了吗?靠谱的程序员觉得别扭!
先介绍一下两个技术点
- @RabbitListener注解的queues是一个数组。 - - - - 当两个或多个的其他属性如concurrency都相同时,此技术点满足。
- 一个onMessage方法可以同时标注两个@RabbitListener。 - - - - - - 当两个或多个的属性各不相同时, 使用此技术点。
So,代码重构就easy了。使用上面的技术点1。将两个onMessage合二为一。(BTW,此时已经没必要有levyGotoSignAndSendSignQueryQueue方法了)
@RabbitHandler
@RabbitListener(queues = "#{levySignApiQueueInternal.name}, #{levySignApiQueueExternal.name}", concurrency = "5", containerFactory = "signPreFetchLimitContainerFactory")
public void onMessage(Object[] objects) throws Exception {
LevySignFlow levySignFlow = (LevySignFlow) objects[0];
LevySignDTO levySignDTO = (LevySignDTO) objects[1];
log.info("httpInvokeLevySignApi 调用服务商api签约 mq异步实现 消息出队,{}_{}_{} levySignDTO={}", ...);
long nowTime = System.currentTimeMillis();
// 执行逻辑
try {
levyGotoSignAndSendSignQueryQueue(levySignFlow, levySignDTO);
} catch (Exception e){
log.error("httpInvokeLevySignApi 调用服务商api签约异常,levySignFlow:{}",JSON.toJSONString(levySignFlow),e);
}
log.info("httpInvokeLevySignApi 耗时={} 调用服务商api签约,签约流水号:{} 服务商名称:{}", System.currentTimeMillis() - nowTime,...);
}
这时,善于思考的同学可能会提问了。 你消费者方法写成一个了,那方法里怎么知道是自有服务商签约还是三方服务商签约呢?我日后可能会添加不同的性能控制策略。
easy,只要思想不滑坡,办法总比困难多。
- 办法1:生产者代码里写入消息时,增加自有服务商签约或三方服务商签约的标识。
- 办法2:消费者代码根据levyId查询LevyConfigCache,即可知道是自有服务商还是三方服务商。
- 办法3:你琢磨。
标签:levySignFlow,RabbitListener,queues,rabbitmq,httpInvokeLevySignApi,api,服务商,levySi From: https://blog.51cto.com/u_15708799/8985237