架构支持
根据实际业务场景,分析集群分流的具体处理方案,假设基金购买接口单次处理时间为500ms,tomcat使用默认线程数200,则单个tomcat处理基金购买接口的QPS=1000/500*200=400。
场景1-4000QPS
要求实现4000QPS的并发量,可以部署10个tomcat集群应用,使用nginx做负载均衡,轮询分配到tomcat上。
场景2-10w QPS
要求实现10w QPS的并发量,nginx一般可支持5w QPS,可使用多个nginx进行集群部署,每个nginx轮询多个tomcat应用,nginx的入口流量使用硬件(Lvs/F5)进行负载均衡,可支持低于100w级别的QPS。
场景3-100w级别QPS
针对该级别的并发量,可按地域部署机房,通过DNS域名解析将网络请求分配到所属管辖区域的机房,实现流量分割。
业务实现
在基金购买交易过程中添加一个中间态,后端接收到购买交易请求时,先对请求做简单的过滤,并生成唯一的业务标识,然后向RocketMQ发送一条购买交易消息,之后向终端用户返回交易的中间态,友情提示用户主动查询交易状态;交易的真实处理逻辑放在RocketMq消费者中进行处理。这种方式可以更大限度的接收终端用户的请求数量。
业务接口处理终端请求的代码:
public void buyFundProdSpikes(TradeDTO param) {
//1.为当前交易生成申请单号
String reqNo = TransactionIdGenerator.generateReqTransactionId();
// 创建 RocketMQ 的 交易申请消息
Message<String> smsMessage = MessageBuilder
.withPayload(JSON.toJSONString(param))
.setHeader(RocketMQHeaders.KEYS, "SMS_" + reqNo)
.build();
//异步发送交易通知消息
rocketMQTemplate.asyncSend("fundTradeTopic:tradeBuyTag", smsMessage, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
logger.info("产品预约申请消息已经发送");
}
@Override
public void onException(Throwable throwable) {
logger.info("产品预约申请消息发送失败");
}
});
}
可在单独的服务器部署交易核心的处理代码
消费者监听部分:
@Component
@RocketMQMessageListener(
topic = "fundTradeTopic",
consumerGroup = "buyTradeConsumerGroup"
)
public class FundTradeTopicListener implements RocketMQListener<MessageExt> {
private static final Logger logger = LoggerFactory.getLogger(FundTradeTopicListener.class);
@Autowired
private TradeService tradeService;
@Override
public void onMessage(MessageExt msgExt) {
// 执行本地事务逻辑,返回事务状态
try {
String bodyStr = new String(msgExt.getBody());
logger.info("消息消费开始:{}", bodyStr);
TradeDTO param = JSON.parseObject(bodyStr, TradeDTO.class);
tradeService.buyFundProdHandle(param, msgExt.getKeys().replace("SMS_", ""));
logger.info("消息消费结束");
} catch (Exception e) {
logger.error("交易消息消费失败!", e);
throw e;
}
}
}
业务逻辑处理部分:
public void buyFundProdHandle(TradeDTO param, String reqNo) {
logger.info("创建订单服务调用开始。");
//1. 校验消息是否已经消费
boolean locked = redisUtils.setnx(reqNo, 60*10);
if (!locked) {
logger.info("订单:{}的创建消息已经消费", reqNo);
return;
}
//2. 处理业务代码
TradeEntity entity = BeanConverter.dataConvert(param, TradeEntity.class);
FundInfoParam fundParam = new FundInfoParam();
fundParam.setCode(param.getFundCode());
fundParam.setTotalShr(param.getReqShr());
//调用基金服务减少基金池份额
logger.info("调用基金服务减少基金池份额开始。");
R<String> r = fundService.buyFund(fundParam);
logger.info("调用基金服务减少基金池份额结束。");
if (R.ERROR.equals(r.getCode())) {
throw new RuntimeException(r.getMsg());
}
if (param.getReqShr().compareTo(BigDecimal.valueOf(20000000)) == 0) {
throw new RuntimeException("手动抛出异常");
}
//插入交易信息
entity.setReqNo(reqNo);
tradeMapper.insertTradeInfo(entity);
}
标签:info,reqNo,param,秒杀,购买,QPS,logger,public,RocketMQ
From: https://www.cnblogs.com/zly1015/p/17997987