1. 智能调度
在神领物流项目中,采用智能调度的方式对车辆任务、快递员的取派件任务进行调度管理,这样更加有效的进行管理,降低企业运营成本。
1.1 为什么需要调度?
可能你会这样的疑问,用户下单了,快递员上门取件,取件后送回网点,网点有车辆运走,再经过车辆的一系列的运输,最后进行派件,对方就能收到快件,不就是这么简单的流程吗?为什么需要调度?
没错,看起来是简单的流程,实际上通过仔细的分析就会发现这个过程并不简单,甚至会非常的复杂,比如:
- 用户下单后,应该哪个网点的快递员上门呢?
- 这样就需要通过用户的发件人地址信息定位到所属服务范围内的网点进行服务
- 紧接着又会有一个问题,确定了网点后,这个网点有多个快递员,这个取件任务应该是指派谁呢?
- 这里就需要对快递员的工作情况以及排班情况进行分析,才能确定哪个快递员进行服务。
- 快递员把快件拿回到网点后,假设这个快件是从上海寄往北京的,是网点直接开车送到北京吗?
显然不是的,直接送的话成本太高了,怎么样成本最低呢?显然是车辆尽可能的满载,集中化运输(这个车上装的都是从A点→B点的快件,这里的A和B可能代表的网点或转运中心,而非全路线)
- 一般物流公司会有很多的车辆、路线、司机,而每个路线都会设置不同的车次,如何能够将快件合理的分配到车辆上,分配时需要参考车辆的载重、司机的排班,车辆的状态以及车次等信息。
- 快件到收件人地址所在服务范围内的网点了,系统该如何分配快递员?
- 还有一些其他的情况,比如:快件拒收应该怎么处理?车辆故障不能使用怎么处理?一车多个司机,运输任务是如何划分?等等
- 基于以上的问题分析,这就需要系统进行计算处理,这就是我们所说的【智能调度系统】,就是让整个物流流程中的参与者都通过系统的计算,可以井然有序的工作。
1.2 整体核心业务流程
关键流程说明:
- 用户下单后,会产生取件任务,该任务也是由调度中心进行调度的
- 订单转运单后,会发送消息到调度中心,在调度中心中对相同节点的运单进行合并(这里是指最小转运单元)
- 调度中心同样也会对派件任务进行调度,用于生成快递员的派件任务
- 司机的出库和入库操作也是流程中的核心动作,尤其是入库操作,是推动运单流转的关键
2. 订单转运单
快递员上门取件成功后,会将订单转成运单,后续将进行一系列的转运流程。
2.1 业务流程
运单表结构
CREATE TABLE `sl_transport_order` (
`id` varchar(18) CHARACTER SET utf16 COLLATE utf16_general_ci NOT NULL DEFAULT '' COMMENT 'id',
`order_id` bigint NOT NULL COMMENT '订单ID',
`status` int DEFAULT NULL COMMENT '运单状态(1.新建 2.已装车 3.运输中 4.到达终端网点 5.已签收 6.拒收)',
`scheduling_status` int DEFAULT NULL COMMENT '调度状态(1.待调度2.未匹配线路3.已调度)',
`start_agency_id` bigint DEFAULT NULL COMMENT '起始网点id',
`end_agency_id` bigint DEFAULT NULL COMMENT '终点网点id',
`current_agency_id` bigint DEFAULT NULL COMMENT '当前所在机构id',
`next_agency_id` bigint DEFAULT NULL COMMENT '下一个机构id',
`transport_line` text CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci COMMENT '完整的运输路线',
`total_volume` decimal(32,4) DEFAULT NULL COMMENT '货品总体积,单位:立方米',
`total_weight` decimal(32,2) DEFAULT NULL COMMENT '货品总重量,单位:kg',
`is_rejection` tinyint(1) DEFAULT NULL COMMENT '是否为拒收运单',
`created` datetime DEFAULT NULL COMMENT '创建时间',
`updated` datetime DEFAULT NULL COMMENT '更新时间',
PRIMARY KEY (`id`) USING BTREE,
KEY `order_id` (`order_id`) USING BTREE,
KEY `created` (`created`) USING BTREE,
KEY `status` (`status`) USING BTREE,
KEY `scheduling_status` (`scheduling_status`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci ROW_FORMAT=DYNAMIC COMMENT='运单表';
2.2 揽收成功Msg
订单转运单的业务的触发点在于快递员的揽收成功,这个通过是通过消息传递的,之所以通过消息是为了减少系统间的耦合度。
【发送消息】快递员揽件成功后,发出消息,这个逻辑是在 sl-express-ms-web-courier 工程的 com.sl.ms.web.courier.service.impl.TaskServiceImpl#pickup()
方法中实现的。
【消费消息】消息的消费是在 sl-express-ms-work-service 工程中的com.sl.ms.work.mq.CourierMQListener#listenCourierPickupMsg()
方法中完成。
/**
* 快递员取件成功
* @param msg 消息
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = Constants.MQ.Queues.WORK_COURIER_PICKUP_SUCCESS),
exchange = @Exchange(name = Constants.MQ.Exchanges.COURIER, type = ExchangeTypes.TOPIC),
key = Constants.MQ.RoutingKeys.COURIER_PICKUP
))
public void listenCourierPickupMsg(String msg) {
log.info("接收到快递员取件成功的消息 >>> msg = {}", msg);
// 解析消息
CourierMsg courierMsg = JSONUtil.toBean(msg, CourierMsg.class);
// 调用transportOrderService的订单转运单方法
transportOrderService.orderToTransportOrder(courierMsg.getOrderId());
// TODO 发送运单跟踪消息
}
2.3 生成运单号
对于运单号的生成有特殊的要求,格式:SL + 13 位数字,例如:SL1000000000760,对于这个需求,如果采用 MP 提供的雪花 id 生成是 19 位,是不能满足需求的,所以我们需要自己生成 id,并且要确保唯一不能重复。
在这里我们采用美团的 Leaf 作为 id 生成服务,其源码托管于 GitHub:https://github.com/Meituan-Dianping/Leaf
这里有个美团的技术播客,专门介绍了 Leaf:https://tech.meituan.com/2017/04/21/mt-leaf.html
目前 Leaf 覆盖了美团点评公司内部金融、餐饮、外卖、酒店旅游、猫眼电影等众多业务线。在 4C8G VM 基础上,通过公司 RPC 方式调用,QPS 压测结果近 5w/s,TP999 1ms。
Leaf 提供两种生成的 ID 的方式(Segment 模式和 Snowflake 模式),我们采用 Segment 模式(号段)来生成运单号。
a. 号段模式
号段模式采用的是基于 MySQL 数据生成 id 的,它并不是基于 MySQL 表中的自增长实现的,因为基于 MySQL 的自增长方案对于数据库的依赖太大了,性能不好,Leaf 的号段模式是基于一张表来实现,每次获取一个号段,生成 id 时从内存中自增长,当号段用完后再去更新数据库表,如下:
字段说明:
- biz_tag:业务标签,用来区分业务
- max_id:表示该 biz_tag 目前所被分配的 ID 号段的最大值
- step:表示每次分配的号段长度。如果把 step 设置得足够大,比如 1000,那么只有当 1000 个号被消耗完了之后才会去重新读写一次数据库。
- description:描述
- update_time:更新时间
Leaf 架构图:
说明:test_tag 在第一台 Leaf 机器上是 1~1000 的号段,当这个号段用完时,会去加载另一个长度为 step=1000 的号段,假设另外两台号段都没有更新,这个时候第一台机器新加载的号段就应该是 3001~4000。同时数据库对应的 biz_tag 这条数据的 max_id 会从 3000 被更新成 4000,更新号段的 SQL 语句如下:
Leaf 取号段的时机是在号段消耗完的时候进行的,也就意味着号段临界点的 ID 下发时间取决于下一次从 DB 取回号段的时间,并且在这期间进来的请求也会因为 DB 号段没有取回来,导致线程阻塞。如果请求 DB 的网络和 DB 的性能稳定,这种情况对系统的影响是不大的,但是假如取 DB 的时候网络发生抖动,或者 DB 发生慢查询就会导致整个系统的响应时间变慢。Leaf 为此做了优化,增加了双 buffer 优化。
当号段消费到某个点时就异步的把下一个号段加载到内存中。而不需要等到号段用尽的时候才去更新号段。这样做就可以很大程度上的降低系统的 TP999 指标。
采用双 buffer 的方式,Leaf 服务内部有两个号段缓存区 Segment。当前号段已下发 10% 时,如果下一个号段未更新,则另启一个更新线程去更新下一个号段。当前号段全部下发完后,如果下个号段准备好了则切换到下个号段为当前 Segment 接着下发,循环往复。
- 每个 biz-tag 都有消费速度监控,通常推荐 Segment 长度设置为服务高峰期发号 QPS(秒处理事务数)的 600 倍(10 分钟),这样即使 DB 宕机,Leaf 仍能持续发号 10-20 分钟不受影响。
- 每次请求来临时都会判断下个号段的状态,从而更新此号段,所以偶尔的网络抖动不会影响下个号段的更新。
b. 部署服务
我们只用到了号段的方式,并没有使用雪花方式,所以只需要创建数据库表即可,无需安装 ZooKeeper。
Leaf 官方是没有 docker 镜像的,我们将其进行了镜像制作,并且上传到阿里云仓库,可以直接下载使用。目前已经在 101 机器部署完成。
$ docker run \
-d \
-v /itcast/meituan-leaf/leaf.properties:/app/conf/leaf.properties \
--name meituan-leaf \
-p 28838:8080 \
--restart=always \
registry.cn-hangzhou.aliyuncs.com/itheima/meituan-leaf:1.0.1
创建 sl_leaf 数据库脚本:
CREATE TABLE `leaf_alloc` (
`biz_tag` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL DEFAULT '',
`max_id` bigint NOT NULL DEFAULT '1',
`step` int NOT NULL,
`description` varchar(256) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL,
`update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`biz_tag`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
-- 插入运单号生成规划数据
INSERT INTO `leaf_alloc` (`biz_tag`, `max_id`, `step`, `description`, `update_time`) VALUES ('transport_order', 1000000000001, 100, 'Test leaf Segment Mode Get Id', '2022-07-07 11:32:16');
测试:
# transport_order 与 biz_tag字段的值相同
http://192.168.150.101:28838/api/segment/get/transport_order
# 监控
http://192.168.150.101:28838/cache
c. 封装服务
在项目中,已经将 Leaf 集成到 sl-express-common 工程中,代码如下:
package com.sl.transport.common.service;
import cn.hutool.core.util.StrUtil;
import cn.hutool.http.HttpRequest;
import cn.hutool.http.HttpResponse;
import com.sl.transport.common.enums.IdEnum;
import com.sl.transport.common.exception.SLException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
/**
* id服务,用于生成自定义的id
*/
@Service
public class IdService {
@Value("${sl.id.leaf:}")
private String leafUrl;
/**
* 生成自定义id
*
* @param idEnum id配置
* @return id值
*/
public String getId(IdEnum idEnum) {
String idStr = this.doGet(idEnum);
return idEnum.getPrefix() + idStr;
}
private String doGet(IdEnum idEnum) {
if (StrUtil.isEmpty(this.leafUrl)) {
throw new SLException("生成id,sl.id.leaf配置不能为空.");
}
// 访问leaf服务获取id
String url = StrUtil.format("{}/api/{}/get/{}", this.leafUrl, idEnum.getType(), idEnum.getBiz());
// 设置超时时间为10s
HttpResponse httpResponse = HttpRequest.get(url)
.setReadTimeout(10000)
.execute();
if (httpResponse.isOk()) {
return httpResponse.body();
}
throw new SLException(
StrUtil.format("访问leaf服务出错,leafUrl = {}, idEnum = {}", this.leafUrl, idEnum));
}
}
IdEnum
package com.sl.transport.common.enums;
/**
* @author zzj
* @version 1.0
*/
public enum IdEnum implements BaseEnum {
TRANSPORT_ORDER(1, "运单号", "transport_order", "segment", "SL");
private Integer code;
private String value;
/**
* 业务名称
*/
private String biz;
/**
* 类型:自增长(segment),雪花id(snowflake)
*/
private String type;
/**
* id前缀
*/
private String prefix;
IdEnum(Integer code, String value, String biz, String type, String prefix) {
this.code = code;
this.value = value;
this.biz = biz;
this.type = type;
this.prefix = prefix;
}
@Override
public Integer getCode() {
return this.code;
}
@Override
public String getValue() {
return this.value;
}
public String getBiz() {
return biz;
}
public String getType() {
return type;
}
public String getPrefix() {
return prefix;
}
@Override
public String toString() {
final StringBuffer sb = new StringBuffer("IdEnum{");
sb.append("code=").append(code);
sb.append(", value='").append(value).append('\'');
sb.append(", biz='").append(biz).append('\'');
sb.append(", type='").append(type).append('\'');
sb.append(", prefix='").append(prefix).append('\'');
sb.append('}');
return sb.toString();
}
}
3. 完善运单服务
3.1 统计状态的数量
@Override
public List<TransportOrderStatusCountDTO> findStatusCount() {
// 统计不同状态的 运单数量
// 获取所有枚举状态,封装List<TransportOrderStatusCountDTO>集合,每个状态count值都是0
// [新建: 0,已装车: 0,运输中: 0,到达终端网点: 0,拒收: 0]
List<TransportOrderStatusCountDTO> list = Arrays.stream(TransportOrderStatus.values())
.map(transportOrderStatus -> {
TransportOrderStatusCountDTO dto = new TransportOrderStatusCountDTO();
dto.setStatus(transportOrderStatus);
dto.setStatusCode(transportOrderStatus.getCode());
dto.setCount(0L);
return dto;
}).collect(Collectors.toList());
// 执行sql: SELECT `status` AS statusCode,count(1) AS count FROM sl_transport_order GROUP BY `status`
// [新建: 5, 运输中: 10,到达终端网点: 3]
List<TransportOrderStatusCountDTO> dbList = baseMapper.statusCount();
// 根据查询到的 状态数量结果,修改上面完整集合中的状态数量
// [新建: 5,已装车: 10,运输中: 0,到达终端网点: 3,拒收: 0]
for (TransportOrderStatusCountDTO transportOrderStatusCountDTO : list) {
for (TransportOrderStatusCountDTO countDTO : dbList) {
if (ObjectUtil.equal(transportOrderStatusCountDTO.getStatusCode(), countDTO.getStatusCode())) {
transportOrderStatusCountDTO.setCount(countDTO.getCount());
break;
}
}
}
return list;
}
3.2 更新状态
在更新运单状态时需要考虑 3 件事:
- 如果更新运单为拒收状态,需要将快递退回去,也就是原路返回;
- 在更新状态时,需要同步更新物流信息,通过发送消息的方式完成(先 TODO,后面实现);
- 更新状态后需要通知其他系统(消息通知)
接口定义:
/**
* 修改运单状态
*
* @param ids 运单id列表
* @param transportOrderStatus 修改的状态
* @return 是否成功
*/
boolean updateStatus(List<String> ids, TransportOrderStatus transportOrderStatus);
接口实现:
/**
* 修改运单状态
*
* @param ids 运单id列表
* @param transportOrderStatus 修改的状态
* @return
*/
@Override
public boolean updateStatus(List<String> ids, TransportOrderStatus transportOrderStatus) {
// 1. 校验: ids不能为空 状态不能等于CREATED
if (CollUtil.isEmpty(ids)) {
throw new SLException(WorkExceptionEnum.TRANSPORT_TASK_NOT_FOUND);
}
// 2. 声明运单集合变量: List<TransportOrderEntity>
List<TransportOrderEntity> transportOrderEntities = null;
// 判断运单状态
if (transportOrderStatus == TransportOrderStatus.REJECTED) {
// 3 如果是拒收状态,如果是拒收需要重新查询路线,将包裹逆向回去
// 3.1 根据ids查询运单列表
transportOrderEntities = listByIds(ids);
// 3.2 遍历运单列表
for (TransportOrderEntity transportOrderEntity : transportOrderEntities) {
// 3.2.1 设置拒收状态
transportOrderEntity.setStatus(TransportOrderStatus.REJECTED);
// 3.2.2 获取起始网点id 终点网点id (根据起始机构规划运输路线,这里要将起点和终点互换)
Long startAgencyId = transportOrderEntity.getEndAgencyId();
Long endAgencyId = transportOrderEntity.getStartAgencyId();
// 3.2.3 声明变量 isDispatch 默认参与调度
boolean isDispatch = true;
// 判断起始网点id 终点网点id是否相等
if (NumberUtil.equals(startAgencyId, endAgencyId)) {
// 3.2.4 如果相等 isDispatch = false (无需调度,直接生成派件任务)
isDispatch = false;
} else {
// 3.2.5 如果不等:
// 3.2.5.1 根据调度获取运输路线 transportLineNodeDTO tips: transportLineFeign
TransportLineNodeDTO transportLineNodeDTO = transportLineFeign.queryPathByDispatchMethod(startAgencyId, endAgencyId);
// 3.2.5.2 未查询到运输路线抛异常
if (ObjectUtil.isEmpty(transportLineNodeDTO) || CollUtil.isEmpty(transportLineNodeDTO.getNodeList())) {
throw new SLException(WorkExceptionEnum.TRANSPORT_LINE_NOT_FOUND);
}
// 3.2.5.3 删除掉第一个机构,逆向回去的第一个节点就是当前所在节点
transportLineNodeDTO.getNodeList().remove(0);
// 3.2.5.4 设置运单调度状态: 待调度
transportOrderEntity.setSchedulingStatus(TransportOrderSchedulingStatus.TO_BE_SCHEDULED);
// 3.2.5.5 设置当前所在机构ID
transportOrderEntity.setCurrentAgencyId(startAgencyId);
// 3.2.5.6 设置下一站网点机构ID
transportOrderEntity.setNextAgencyId(transportLineNodeDTO.getNodeList().get(0).getId());
// 3.2.5.7 获取运单中原有运输任务信息 将当前线路 追加到原有线路
TransportLineNodeDTO originTransportLineNodeDTO = JSONUtil.toBean(transportOrderEntity.getTransportLine(), TransportLineNodeDTO.class);
originTransportLineNodeDTO.getNodeList().addAll(transportLineNodeDTO.getNodeList());
transportOrderEntity.setTransportLine(JSONUtil.toJsonStr(originTransportLineNodeDTO));
// 3.2.5.8 合并成本 并重新设置运单线路信息
originTransportLineNodeDTO.setCost(NumberUtil.add(originTransportLineNodeDTO.getCost(), transportLineNodeDTO.getCost()));
}
// 3.2.6 判断是否需要调度
if (isDispatch) {
// 3.2.7 判断如果需要调度 发送消息参与调度 sendTransportOrderMsgToDispatch
sendTransportOrderMsgToDispatch(transportOrderEntity);
} else {
// 3.2.8 判断如果不需要调度 运单状态改为到达网点: 发送消息生成派件任务 sendDispatchTaskMsgToDispatch
transportOrderEntity.setStatus(TransportOrderStatus.ARRIVED_END);
sendDispatchTaskMsgToDispatch(transportOrderEntity);
}
}
} else {
// 4 如果不是拒收状态,遍历根据ids列表封装成运单对象列表
transportOrderEntities = ids.stream().map(id -> {
// 4.1 发送物流信息消息 sendTransportOrderInfoMsg info: 快件已发往【$organId】
sendTransportOrderInfoMsg(id, "快件已发往【$organId】");
// 4.2 封装运单对象 TransportOrderEntity 只设置 id status用于修改即可
TransportOrderEntity transportOrderEntity = new TransportOrderEntity();
transportOrderEntity.setId(id);
transportOrderEntity.setStatus(transportOrderStatus);
return transportOrderEntity;
}).collect(Collectors.toList());
}
// 5. 批量更新 运单状态数据
boolean result = updateBatchById(transportOrderEntities);
// 6. 发消息通知其他系统运单状态的变化 sendUpdateStatusMsg
sendUpdateStatusMsg(ids, transportOrderStatus);
return result;
}
private void sendTransportOrderInfoMsg(String id, String info) {
this.sendTransportOrderInfoMsg(this.getById(id), info);
}
private void sendTransportOrderInfoMsg(TransportOrderEntity transportOrder, String info) {
// 获取将发往的目的地机构
Long nextAgencyId = transportOrder.getNextAgencyId();
// 构建消息实体类
String transportInfoMsg = TransportInfoMsg.builder()
// 运单id
.transportOrderId(transportOrder.getId())
// 消息状态
.status("运送中")
// 消息详情
.info(info)
.organId(nextAgencyId)
// 创建时间
.created(DateUtil.current())
.build().toJson();
// 发送运单跟踪消息
this.mqFeign.sendMsg(Constants.MQ.Exchanges.TRANSPORT_INFO,
Constants.MQ.RoutingKeys.TRANSPORT_INFO_APPEND, transportInfoMsg);
}
/**
* 发送运单消息到调度中,参与调度
*/
private void sendTransportOrderMsgToDispatch(TransportOrderEntity transportOrder) {
Map<Object, Object> msg = MapUtil.builder()
.put("transportOrderId", transportOrder.getId())
.put("currentAgencyId", transportOrder.getCurrentAgencyId())
.put("nextAgencyId", transportOrder.getNextAgencyId())
.put("totalWeight", transportOrder.getTotalWeight())
.put("totalVolume", transportOrder.getTotalVolume())
.put("created", System.currentTimeMillis()).build();
String jsonMsg = JSONUtil.toJsonStr(msg);
//发送消息,延迟5秒,确保本地事务已经提交,可以查询到数据
this.mqFeign.sendMsg(Constants.MQ.Exchanges.TRANSPORT_ORDER_DELAYED,
Constants.MQ.RoutingKeys.JOIN_DISPATCH, jsonMsg, Constants.MQ.LOW_DELAY);
}
/**
* 发送生成取派件任务的消息
*
* @param transportOrder 运单对象
*/
private void sendDispatchTaskMsgToDispatch(TransportOrderEntity transportOrder) {
// 预计完成时间,如果是中午12点到的快递,当天22点前,否则,第二天22点前
int offset = 0;
if (LocalDateTime.now().getHour() >= 12) {
offset = 1;
}
LocalDateTime estimatedEndTime = DateUtil.offsetDay(new Date(), offset)
.setField(DateField.HOUR_OF_DAY, 22)
.setField(DateField.MINUTE, 0)
.setField(DateField.SECOND, 0)
.setField(DateField.MILLISECOND, 0).toLocalDateTime();
// 发送分配快递员派件任务的消息
OrderMsg orderMsg = OrderMsg.builder()
.agencyId(transportOrder.getCurrentAgencyId())
.orderId(transportOrder.getOrderId())
.created(DateUtil.current())
.taskType(PickupDispatchTaskType.DISPATCH.getCode())
.mark("系统提示:派件前请与收件人电话联系.")
.estimatedEndTime(estimatedEndTime).build();
// 发送消息
this.sendPickupDispatchTaskMsgToDispatch(transportOrder, orderMsg);
}
4. 合并运单
4.1 实现分析
运单在运输过程中,虽然快件的起点与终点都不一定相同,但是在中间转运环节有一些运输节点是相同的:
可以看出,A→E 与 A→G 的运单,在 A→B 和 B→C 的转运是相同的,所以在做任务调度时,首先要做的事情就是将相同转运的运单进行合并,以供后续调度中心进行调度。
合并之后的结果存储在哪里呢?我们可以想一下,后续处理的需求:
- 先进行合并处理的运单按照顺序进行调度
- 此次运单调度处理完成后就应该将其删除掉,表示已经处理完成
根据以上两点的需求,很容易想到队列的存储方式,先进先出,实现队列的技术方案有很多,在这里我们采用 Redis 的 List 作为队列,将相同节点转运的订单放到同一个队列中,可以使用其 LPUSH
放进去,RPOP
弹出数据,这样就可以确保先进先出,并且弹出后数据将删除,因此符合我们的需求。
4.2 代码实现
合并运单与调度的业务逻辑都放到 sl-express-ms-dispatch-service 工程中。
实现中,需要考虑消息的幂等性,防止重复数据的产生。
package com.sl.ms.dispatch.mq;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONUtil;
import com.sl.ms.dispatch.dto.DispatchMsgDTO;
import com.sl.transport.common.constant.Constants;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
* 对于待调度运单消息的处理
*/
@Slf4j
@Component
public class TransportOrderDispatchMQListener {
@Resource
private StringRedisTemplate stringRedisTemplate;
/**
* 处理消息,合并运单到Redis队列
*
* @param msg
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = Constants.MQ.Queues.DISPATCH_MERGE_TRANSPORT_ORDER),
exchange = @Exchange(name = Constants.MQ.Exchanges.TRANSPORT_ORDER_DELAYED, type = ExchangeTypes.TOPIC, delayed = Constants.MQ.DELAYED),
key = Constants.MQ.RoutingKeys.JOIN_DISPATCH
))
public void listenDispatchMsg(String msg) {
// {"transportOrderId":"SL1000000000560","currentAgencyId":100280,"nextAgencyId":90001,
// "totalWeight":3.5,"totalVolume":2.1,"created":1652337676330}
log.info("接收到新运单的消息 >>> msg = {}", msg);
DispatchMsgDTO dispatchMsgDTO = JSONUtil.toBean(msg, DispatchMsgDTO.class);
if (ObjectUtil.isEmpty(dispatchMsgDTO)) {
return;
}
// ===== 监听待调度运单队列,进行合并运单操作 =====
// 1. 获取 当前网点id 下一站网点id
Long currentAgencyId = dispatchMsgDTO.getCurrentAgencyId();
Long nextAgencyId = dispatchMsgDTO.getNextAgencyId();
// 2. 获取运单id (说明: 消息幂等性处理,将相同起始节点的运单存放到set结构的redis中,在相应的运单处理完成后将其删除掉)
String transportOrderId = dispatchMsgDTO.getTransportOrderId();
// 3. 获取redis中set集合的key
String setRedisKey = getSetRedisKey(currentAgencyId, nextAgencyId);
// 4. 判断当前的运单任务 在redis的set集合中是否已经存在 tips: 使用set集合的isMember方法
if (stringRedisTemplate.opsForSet().isMember(setRedisKey, transportOrderId)) {
return;
}
// 5. 获取redis中list集合的key
String listRedisKey = getListRedisKey(currentAgencyId, nextAgencyId);
// (说明: 存储数据到redis,采用list结构,从左边写入数据,读取数据时从右边读取)
// (要存的value值格式==>{"transportOrderId":111222, "totalVolume":0.8,
// "totalWeight":2.1, "created":111222223333})
// 6. 构建value值 基于map即可,然后转为json字符串
String listValue = JSONUtil.toJsonStr(MapUtil.builder()
.put("transportOrderId", transportOrderId)
.put("totalVolume", dispatchMsgDTO.getTotalVolume())
.put("totalWeight", dispatchMsgDTO.getTotalWeight())
.put("created", dispatchMsgDTO.getCreated())
.build());
// 7. 将构建的value值 从左push到list集合中
stringRedisTemplate.opsForList().leftPush(listRedisKey, listValue);
// 8. 将构运单id存到 set集合中,用于后续{幂等性}判断
stringRedisTemplate.opsForSet().add(setRedisKey, transportOrderId);
}
public String getListRedisKey(Long startId, Long endId) {
return StrUtil.format("DISPATCH_LIST_{}_{}", startId, endId);
}
public String getSetRedisKey(Long startId, Long endId) {
return StrUtil.format("DISPATCH_SET_{}_{}", startId, endId);
}
}
Quiz:
- 物流项目中你参与了核心的功能开发吗?能说一下核心的业务逻辑吗?
- 你们的运单号是怎么生成的?如何确保性能?
- 能说一下订单转运单的业务逻辑吗?生成运单后如何与调度中心整合在一起的?
- 合并运单为什么使用 Redis 的 List 作为队列?如何确保消息的幂等性的?