添加RMQ配置
<!-- 使用高级消息队列来解决分布式事务一致性 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
- application.properties
# ===== RabbitMQ配置 ======
spring.rabbitmq.host=192.168.10.10
spring.rabbitmq.port=5672
# 虚拟主机配置
spring.rabbitmq.virtual-host=/
# 开启发送端消息抵达Broker确认
spring.rabbitmq.publisher-confirms=true
# 开启发送端消息抵达Queue确认
spring.rabbitmq.publisher-returns=true
# 只要消息抵达Queue,就会异步发送优先回调returnfirm
spring.rabbitmq.template.mandatory=true
# 手动ack消息,不使用默认的消费端确认
spring.rabbitmq.listener.simple.acknowledge-mode=manual
创建RMQ配置文件
@Configuration
public class MyRabbitMQConfig {
/**
* 使用JSON序列化机制,进行消息转换
* @return
*/
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
// @RabbitListener(queues = "stock.release.stock.queue")
// public void handle(Message message) {
//
// }
/**
* 库存服务默认的交换机
* @return
*/
@Bean
public Exchange stockEventExchange() {
//String name, boolean durable, boolean autoDelete, Map<String, Object> arguments
TopicExchange topicExchange = new TopicExchange("stock-event-exchange", true, false);
return topicExchange;
}
/**
* 普通队列
* @return
*/
@Bean
public Queue stockReleaseStockQueue() {
//String name, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
Queue queue = new Queue("stock.release.stock.queue", true, false, false);
return queue;
}
/**
* 延迟队列
* @return
*/
@Bean
public Queue stockDelay() {
HashMap<String, Object> arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange", "stock-event-exchange");
arguments.put("x-dead-letter-routing-key", "stock.release");
// 消息过期时间 2分钟
arguments.put("x-message-ttl", 120000);
Queue queue = new Queue("stock.delay.queue", true, false, false,arguments);
return queue;
}
/**
* 交换机与普通队列绑定
* @return
*/
@Bean
public Binding stockLocked() {
//String destination, DestinationType destinationType, String exchange, String routingKey,
// Map<String, Object> arguments
Binding binding = new Binding("stock.release.stock.queue",
Binding.DestinationType.QUEUE,
"stock-event-exchange",
"stock.release.#",
null);
return binding;
}
/**
* 交换机与延迟队列绑定
* @return
*/
@Bean
public Binding stockLockedBinding() {
return new Binding("stock.delay.queue",
Binding.DestinationType.QUEUE,
"stock-event-exchange",
"stock.locked",
null);
}
}
- 解锁库存流程
可以看到,在锁定库存时,我们增加了库存工作单,用来记录库存锁定的明细记录,如果库存锁定异常,则会回滚,该表不会有数据记录,如果锁定成功,则会有具体的锁定记录,锁定成功后会发送消息到延时队列,过段时间会根据订单创建的状态(订单取消或订单未创建成功)来解锁库存。
- 解锁库存具体步骤
业务代码锁库存
@Service("wareSkuService")
public class WareSkuServiceImpl extends ServiceImpl<WareSkuDao, WareSkuEntity> implements WareSkuService {
@Autowired
WareSkuDao wareSkuDao;
@Autowired
ProductFeignService productFeignService;
@Autowired
private WareOrderTaskService wareOrderTaskService;
@Autowired
private WareOrderTaskDetailService wareOrderTaskDetailService;
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private OrderFeignService orderFeignService;
/**
* 为某个订单锁定库存
* @param vo
* @return
*/
@Transactional(rollbackFor = Exception.class)
@Override
public boolean orderLockStock(WareSkuLockVo vo) {
/**
* 保存库存工作单详情信息
* 追溯
*/
WareOrderTaskEntity wareOrderTaskEntity = new WareOrderTaskEntity();
wareOrderTaskEntity.setOrderSn(vo.getOrderSn());
wareOrderTaskEntity.setCreateTime(new Date());
wareOrderTaskService.save(wareOrderTaskEntity);
//1、按照下单的收货地址,找到一个就近仓库,锁定库存
//2、找到每个商品在哪个仓库都有库存
List<OrderItemVo> locks = vo.getLocks();
List<SkuWareHasStock> collect = locks.stream().map((item) -> {
SkuWareHasStock stock = new SkuWareHasStock();
Long skuId = item.getSkuId();
stock.setSkuId(skuId);
stock.setNum(item.getCount());
//查询这个商品在哪个仓库有库存
List<Long> wareIdList = wareSkuDao.listWareIdHasSkuStock(skuId);
stock.setWareId(wareIdList);
return stock;
}).collect(Collectors.toList());
//2、锁定库存
for (SkuWareHasStock hasStock : collect) {
boolean skuStocked = false;
Long skuId = hasStock.getSkuId();
List<Long> wareIds = hasStock.getWareId();
if (org.springframework.util.StringUtils.isEmpty(wareIds)) {
//没有任何仓库有这个商品的库存,抛出异常,前边已经锁定的库存也一起会回滚
throw new NoStockException(skuId);
}
//1、如果每一个商品都锁定成功,将当前商品锁定了几件的工作单记录发给MQ
//2、锁定失败。前面保存的工作单信息都回滚了。发送出去的消息,即使要解锁库存,由于在数据库查不到指定的id,所有就不用解锁
for (Long wareId : wareIds) {
//锁定成功就返回1,失败就返回0
Long count = wareSkuDao.lockSkuStock(skuId,wareId,hasStock.getNum());
if (count == 1) {
skuStocked = true;
WareOrderTaskDetailEntity taskDetailEntity = WareOrderTaskDetailEntity.builder()
.skuId(skuId)
.skuName("")
.skuNum(hasStock.getNum())
.taskId(wareOrderTaskEntity.getId())
.wareId(wareId)
.lockStatus(1)
.build();
wareOrderTaskDetailService.save(taskDetailEntity);
//TODO 告诉MQ库存锁定成功
StockLockedTo lockedTo = new StockLockedTo();
lockedTo.setId(wareOrderTaskEntity.getId());
StockDetailTo detailTo = new StockDetailTo();
BeanUtils.copyProperties(taskDetailEntity,detailTo);
lockedTo.setDetailTo(detailTo);
rabbitTemplate.convertAndSend("stock-event-exchange","stock.locked",lockedTo);
break;
} else {
//当前仓库锁失败,重试下一个仓库
}
}
if (skuStocked == false) {
//当前商品所有仓库都没有锁住
throw new NoStockException(skuId);
}
}
//3、肯定全部都是锁定成功的
return true;
}
@Override
public void unlockStock(StockLockedTo to) {
//库存工作单的id
StockDetailTo detail = to.getDetailTo();
Long detailId = detail.getId();
/**
* 解锁
* 1、查询数据库关于这个订单锁定库存信息
* 有:证明库存锁定成功了
* 解锁:订单状况
* 1、没有这个订单,必须解锁库存
* 2、有这个订单,不一定解锁库存
* 订单状态:已取消:解锁库存
* 已支付:不能解锁库存
*/
WareOrderTaskDetailEntity taskDetailInfo = wareOrderTaskDetailService.getById(detailId);
if (taskDetailInfo != null) {
//查出wms_ware_order_task工作单的信息
Long id = to.getId();
WareOrderTaskEntity orderTaskInfo = wareOrderTaskService.getById(id);
//获取订单号查询订单状态
String orderSn = orderTaskInfo.getOrderSn();
//远程查询订单信息
R orderData = orderFeignService.getOrderStatus(orderSn);
if (orderData.getCode() == 0) {
//订单数据返回成功
OrderVo orderInfo = orderData.getData("data", new TypeReference<OrderVo>() {});
//判断订单状态是否已取消或者支付或者订单不存在
if (orderInfo == null || orderInfo.getStatus() == 4) {
//订单已被取消,才能解锁库存
if (taskDetailInfo.getLockStatus() == 1) {
//当前库存工作单详情状态1,已锁定,但是未解锁才可以解锁
unLockStock(detail.getSkuId(),detail.getWareId(),detail.getSkuNum(),detailId);
}
}
} else {
//消息拒绝以后重新放在队列里面,让别人继续消费解锁
//远程调用服务失败
throw new RuntimeException("远程调用服务失败");
}
} else {
//无需解锁
}
}
/**
* 解锁库存的方法
* @param skuId
* @param wareId
* @param num
* @param taskDetailId
*/
public void unLockStock(Long skuId,Long wareId,Integer num,Long taskDetailId) {
//库存解锁
wareSkuDao.unLockStock(skuId,wareId,num);
//更新工作单的状态
WareOrderTaskDetailEntity taskDetailEntity = new WareOrderTaskDetailEntity();
taskDetailEntity.setId(taskDetailId);
taskDetailEntity.setLockStatus(2); //变为已解锁
wareOrderTaskDetailService.updateById(taskDetailEntity);
}
@Data
class SkuWareHasStock {
private Long skuId;
private Integer num;
private List<Long> wareId;
}
}
监听队列
/**
* 库存解锁监听
*
* @desc
* 库存锁定成功发送消息到延时队列 stock.locked(路由key),超时TTL,消息进入私信路由,然后转发到解锁库存的队列。
*
*/
@Slf4j
@RabbitListener(queues = "stock.release.stock.queue")
@Service
public class StockReleaseListener {
@Autowired
private WareSkuService wareSkuService;
/**
* 1、库存自动解锁
* 下订单成功,库存锁定成功,接下来的业务调用失败,导致订单回滚。之前锁定的库存就要自动解锁
*
* 2、订单失败
* 库存锁定失败
*
* 只要解锁库存的消息失败,一定要告诉服务解锁失败
*/
@RabbitHandler
public void handleStockLockedRelease(StockLockedTo to, Message message, Channel channel) throws IOException {
log.info("******收到解锁库存的信息******");
try {
//当前消息是否被第二次及以后(重新)派发过来了
// Boolean redelivered = message.getMessageProperties().getRedelivered();
//解锁库存
wareSkuService.unlockStock(to);
// 手动删除消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
} catch (Exception e) {
// 解锁失败 将消息重新放回队列,让别人消费
channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);
}
}
}
然后等到消息过期进入死信路由,TTL后客户端监听消息判断是否释放库存,消息在判断的时候先根据生成的订单号远程查询gulimall-order是否存在对应的订单,如果不存在,则直接释放锁定的库存,因为在生成订单的时候抛出异常生成的订单回滚了,所以 oms_order 表不存在订单,这时监听的消息拿到延时消息后,做完判断后会触发解锁库存的动作。
过了几分钟后,我们可以看到消息已经被消费了,并且锁定的库存也释放了,变回原来的 3 件。
可以看到,RMQ在解决分布式事务一致性问题上非常强大,不仅实现了解耦,而且还保证了可靠消息+最终一致性。
标签:库存,return,解锁,RabbitMQ,关单,new,锁定,stock From: https://blog.51cto.com/u_15993308/6451694