首页 > 其他分享 >通过MVEL表达式和Apache Chain职责链模式解耦MQ消息处理节点的实践应用

通过MVEL表达式和Apache Chain职责链模式解耦MQ消息处理节点的实践应用

时间:2024-05-16 16:41:29浏览次数:29  
标签:return String Chain 处理 MVEL MQ context false

导读

本文主要讲解了MVEL表达式和责任链设计模式相结合一起的消息处理解决方案设计、解耦消息处理节点以及方便代码维护扩展。通过“订单拆单消息”的接入作为具体实践案例,简要阐述了MVEL表达式和Apache Chain职责链设计模式应用场景。希望通过本文,读者可以对MVEL表达式和责任链模式相关概念有一定的认识,并且能够将它们应用到具体的业务场景之中,帮助大家在实际代码研发的时候,降低代码复杂度和提升代码的复用率。

1、背景

互联网的头部公司,各个后台系统应用交互主链路之中,会下发大量MQ消息给分支业务差异化应用。业务系统应用收到MQ消息后结合实际业务处理,但是往往大家在处理逻辑代码的时候会进行不断的叠加代码,造成代码臃肿、复杂和可读性差等问题。例如:

public void handleMessage(String message) throws Exception {
    CallerInfo callerInfo = Profiler.registerInfo(UmpKey.KEY_BD_DLOK_FLAG_GHOST_HANDLER, "xxx", false, true);
    try {
        DeliveredMessage msg = parseMessage(message);
        if (null == msg) {
            return;
        }
        String id = msg.getOrderId();
        if (null == id) {
            //监听到的订单消息 id不应为空
            return;
        }
        String sendPay = msg.getSendPay();
        //是否XXX
        boolean isShop = CAR_O2O.equals(String.valueOf(sendPay.charAt(XXX)));
        //是否是XXX
        boolean isCar = CAR_ADDED_SERVICE.equals(String.valueOf(sendPay.charAt(XXX)));
        String waybillSign = msg.getWaybillSign();
        //是否是XX
        boolean isSelf = SELF_ORDER.equals(String.valueOf(waybillSign.charAt(XXX)));

        long tid = System.nanoTime();
        Long orderId = Long.parseLong(id);

        //监听到订单后,更改订单状态表中的订单状态
        if (isCar && isSelf) {
            verOrderCarService.updateVerOrderCarStatusByOrderId(tid, orderIdLong, UPDATE_PIN);
        }
        if (isShop && isCar) {
            if (isSelf) {
                // 若在新表ver_order_sms_car中存在发送模板1短信,否则,发送原短信(模板3)
                List<VerOrderSmsCar> verOrderSmsCarList = verOrderSmsCarDao.getCarOrderListByOrderId(orderIdLong);
                if (CollectionUtils.isEmpty(verOrderSmsCarList)) {
                    dealTemplateThreeOrder(tid,orderId);
                } else {
                    dealTemplateOneOrder(tid, orderIdLong, verOrderSmsCarList);
                    this.sendShopSms(verOrderSmsCarList);
                }
            } else {
                // 满足条件的订单  即原订单流程没有走完,发送模板3
                List<VerOrderSmsCar> verOrderSmsCarList = verOrderSmsCarDao.getSmsCarOrderByOrderId(orderId);
                //返回数据字段id
                if (CollectionUtils.isNotEmpty(verOrderSmsCarList)) {
                    return;
                }
                dealTemplateThreeOrder(tid, orderId);
            }
        }
        // 发送状态变更消息
        if(isCar){
            this.sendVerStore(orderId, isShop ? 1 : 0);
        }
    } catch (Exception e) {
        LOGGER.error("监听MQ消息处理异常 : {}", e);
        Profiler.functionError(callerInfo);
    } finally {
        Profiler.registerInfoEnd(callerInfo);
    }
}

总结:代码片段逻辑嵌套复杂、各个处理节点耦合(例如:dealTemplateThreeOrder方法、sendShopSms方法)、新增节点不方便(例如:dealTemplateOneOrder(tid, orderId, verOrderSmsCarList))以及代码行数1000+等一系列问题。

2、MVEL表达式

MVEL为 MVFLEX Expression Language(MVFLEX表达式语言)的缩写,它是一种动态/静态的可嵌入的表达式语言和为Java平台提供Runtime(运行时)的语言。它也可以用来解析简单的JavaBean表达式。Runtime(运行时)允许MVEL表达式通过解释执行或者预编译生成字节码后执行。简单一句话,MVEL可以将字符串内容,转化为Java程序来运行,具体细节内容大家可以参考 https://blog.51cto.com/u_16091571/6271830。

3、责任链设计模式

定义:

责任链模式(Chain of Responsibility)又名 职责链模式,是一种行为设计模式,它允许你构建一个由多个对象组成的链,每个对象都有机会处理请求,或者将请求传递给链中的下一个对象。这种模式常用于处理请求的对象之间存在 层次关系 的情况。责任链模式的主要目的是解耦发送者和接收者,使多个对象都有机会处理请求,而不是将请求发送者与接收者硬编码在一起。

结构:

抽象处理者(Handler): 定义一个处理请求的接口,包含抽象处理方法并维护一个对下一个处理者的引用。

具体处理者(Concrete Handler): 实现处理请求的接口,判断能否处理本次请求,如果能够处理则处理,否则将请求传递给下一个处理者。

客户端类(Client): 创建处理链,并向链头的具体处理者对象提交请求,它不关心处理细节和请求的传递过程。

优缺点:

1)优点

a.松散耦合: 责任链模式使得请求发送者和接收者解耦,每个处理者仅需关心自己能否处理请求,而不需要知道整个处理流程的细节。

b.灵活性: 可以动态地改变处理者之间的关系和顺序,新增或删除处理者,以适应不同的需求和场景。

c.可扩展性: 容易添加新的处理者,无需修改现有的代码,符合开闭原则。

d.单一职责原则: 每个具体处理者只负责处理特定类型的请求,符合单一职责原则,使得代码更清晰和可维护。

2)缺点

a.性能问题: 在责任链比较长的情况下,请求可能需要遍历整个链条才能找到合适的处理者,可能影响性能。

Apache Chain 职责链:

整个Apache Chain职责链,包括Context、Command和Filter三个核心组件以及ChainBase类。

1)Context 接口

Context 表示命令执行的上下文,在命令间实现共享信息的传递,父接口是 Map,它只是一个标记接口。

2)Command 接口

Commons Chain 中最重要的接口,表示在 Chain 中的具体某一步要执行的命令。它只有一个方法:boolean execute(Context context),如果返回 true,那么表示 Chain 的处理结束,Chain 中的其他命令不会被调用;返回 false,则 Chain 会继续调用下一个 Command,直到 Chain 的末尾或抛出异常。

3)Filter 接口

它是一种特殊的 Command,除了 Command 的 execute 方法之外,还包括了一个方法:boolean postProcess(Context context, Exception exception),Commons Chain 会在执行了 Filter 的 execute 方法之后,执行 postprocess(不论 Chain 以何种方式结束);Filter 执行 execute 的顺序与 Filter 出现在 Chain 中出现的位置一致,但是执行 postprocess 顺序与之相反。如:execute 的执行顺序是:filter1 -> filter2;而 postprocess 的执行顺序是:filter2 -> filter1。

4)ChainBase

ChainBase 实现 Chain 接口。Chain表示“命令链”,要在其中执行的命令,需要先添加到 Chain 中,Chain 的父接口是 Command。ChainBase类可以直接在Spring使用。它的具体执行方法:

public boolean execute(Context context) throws Exception {
    if (context == null) {
        throw new IllegalArgumentException();
    } else {
        this.frozen = true;
        boolean saveResult = false;
        Exception saveException = null;
        int i = false;
        int n = this.commands.length;
        int i;
        for(i = 0; i < n; ++i) {
            try {
                saveResult = this.commands[i].execute(context);
                if (saveResult) {
                    break;
                }
            } catch (Exception var11) {
                saveException = var11;
                break;
            }
        }
        if (i >= n) {
            --i;
        }
        boolean handled = false;
        boolean result = false;
        for(int j = i; j >= 0; --j) {
            if (this.commands[j] instanceof Filter) {
                try {
                    result = ((Filter)this.commands[j]).postprocess(context, saveException);
                    if (result) {
                        handled = true;
                    }
                } catch (Exception var10) {
                }
            }
        }
        if (saveException != null && !handled) {
            throw saveException;
        } else {
            return saveResult;
        }
    }
}

4、实践案例(订单MQ消息处理流程)

在汽车线下安装履约服务的业务场景之中,除开主站黄金流量流程以外,需要在接到中台订单拆单消息、订单出库消息之后给门店技师派单、发送核销码短信等定制化业务流程。此过程中存在接入多个消息处理同一个事件的相同点,也有同一个消息处理不同事件差异点。具体处理层级结构图如下:

 


 

相关类图

 


 

实现代码

基于SpringBoot框架实现,消息处理链路中,核心内容包含三部分。第一部分消息处理Handler,接收到消息后将消息内容转化为Java Bean,例如:订单拆单消息(需要拆分订单)OdcDivideOrderhandler。第二部分处理节点Handler,它是职责链的处理节点,按照业务需求进行具体业务代码的实现,例如:技师派单消息发送节点(AddedTechDispatchHandler)。第三部分职责链配置文件,application-chain.xml,以下用订单拆分消息(拆单)处理流程为例。

第一部分(OdcDivideOrderHandler.java):

/**
 * 订单拆分消息(拆单消息)
 
 * @author xxx
 * @date xxxx-xx-xx xx:xx
 */
@Service("odcDivideOrderHandler")
public class OdcDivideOrderHandler extends BaseOrderHandler implements MqMessageHandler<List<VerOrder>> {
    /**
     * 消息分派处理
     */
    @Resource(name="odcDivideOrderChain")
    private Chain odcDivideOrderChain;
    /**
     * 基于MVEL表达式过滤执行器的筛选规则
     */
    private final Map<String, String> expressionMap = new HashMap<String, String>() {
        {
            //派单过滤规则
            put("tech_dispatch_rule", "return sendPayMap.get(\"XXX\") == X && sendPayMap.get(\"XXX\") == X;");
        }
    };
    /**
     * @param tid        处理事件
     * @param messageDTO MQ消息
     * @return 处理结果
     * @throws Exception 处理异常
     */
    private boolean handleMessage(long tid, MqMessageDTO<List<VerOrder>> messageDTO) throws Exception {
         List<VerOrder> verOrderList = messageDTO.getObject();
        try {
            //上下文内容
            Context context = new ContextBase();
            //1.处理时间
            context.put(Constants.TID, tid);
            //2.派单列表
            context.put(Constants.VER_ORDER_LIST,carOrderList);
            //3.操作过滤规则
            context.put(Constants.EXPRESSION_RULE_MAP,expressionMap);
            odcDivideOrderChain.execute(context);
        } catch (Exception ex) {
            //此次代码省略........
        }
        return true;
    }
}

解析:消息处理Handler主要是将接收到消息转化Java Bean,再将具体的上下文内容下传给后续事件处理Handler。参数expressionMap存储的是MVEL表达式需要处理的内容,具体内容结合实际业务场景差异化设置,对于后续节点处理Handler扩展性有很大帮助。odcDivideOrderChain职责链的命令链类,后续各个节点的流转全靠它。

第二部分(AddedTechDispatchHandler.java):

/**
 * 派单Handler
 *
 * @author xxx 
 * @date xxxx-xx-xx xx:xx
 */
@Service("addedTechDispatchHandler")
public class AddedTechDispatchHandler implements Command {
    /**
     * 派单消息topic
     */
    @Value("${xxx}")
    private String topic;
    /**
     * 消息生产
     */
    @Resource(name = "xxxxx")
    private MessageProducer messageProducer;
    @Override
    public boolean execute(Context context) throws Exception {
        Object tid = context.get(Constants.TID);
        Object object = context.get(Constants.VER_ORDER_LIST);
        if (!(object instanceof List)) {
            return false;
        }
        //订单列表
        List<VerOrder> orders = (List<VerOrder>) object;
        //列表为空
        if (CollectionUtils.isEmpty(orders)) {
            return false;
        }
        //过滤规则
        Object ruleObj = context.get(Constants.EXPRESSION_RULE_MAP);
        if (!(ruleObj instanceof Map)) {
            return false;
        }
        //派单规则
        Object obj = ((Map) ruleObj).get(Constants.TECH_DISPATCH_RULE);
        //没有配置规则直接返回
        if (!(obj instanceof String)) {
            return false;
        }
        String expression = (String) obj;
        if (StringUtils.isBlank(expression)) {
            return false;
        }
        for (VerOrder verOrder : orders) {
            //发送派单消息
            this.sendTechDispatch(tid, verOrder, expression);
        }
        return false;
    }

    /**
     * 发送技师派单消息
     *
     * @param tid      处理时间
     * @param verOrder 订单
     */
    public void sendTechDispatch(Object tid, VerOrder verOrder, String expression) {
        try {
            //派单规则判断,false-不派单,true-需要派单
            if (!SendPayUtil.isExpression(expression, verOrder.getSendPayMap())) {
                return;
            }
            String cxt = JSON.toJSONString(verOrder);
            Message message = new Message(topic, cxt, verOrder.getOrderId().toString());
            messageProducer.send(message);
        } catch (JMQException e) {
           //此次代码省略........
        } catch (Exception e) {
           //此次代码省略........
        } finally {
           //此次代码省略........
        }
    }
}

解析:事件节点Handler主要是解析上下内容,执行需要处理的事项内容。特别是SendPayUtil.isExpression(expression, verOrder.getSendPayMap())方法,识别了MVEL表达式,使得即使同一个事件处理节点(例如:派单节点)也可以根据不同MQ消息,设置不同的规则。

/**
 *  sendPayMap表达式解析
 * @param expression 表达式
 * @param sendPayMap 订单SendPayMap值
 * @return 解析结果
 */
public static boolean isExpression(String expression,String sendPayMap){
    //sendPayMap为空
    if (StringUtils.isBlank(sendPayMap)) {
        return false;
    }
    Map map = null;
    try {
        map = JSON.parseObject(sendPayMap, Map.class);
    } catch (Exception ex) {
        LOGGER.error("sendPayMap格式转化错误", ex);
    }
    //map
    if (map == null || map.isEmpty()) {
        return false;
    }
    Map<String,Map> param = new HashMap<>(1);
    param.put(Constant.SEND_PAY_MAP,map);
    return (Boolean)MVEL.eval(expression,param);
}

第三部分(application-chain.xml):

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="
       http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans-3.0.xsd"
       default-lazy-init="false" default-autowire="byName">
    <bean id="odcOutStockFullOrderChain" class="org.apache.commons.chain.impl.ChainBase">
        <constructor-arg>
            <array>
                <ref bean="addedOrderWangShiFuHandler"/>
                <ref bean="addedTechDispatchHandler"/>
            </array>
        </constructor-arg>
    </bean>
    <bean id="odcDivideOrderChain" class="org.apache.commons.chain.impl.ChainBase">
        <constructor-arg>
            <array>
                <ref bean="addedTechDispatchHandler"/>
            </array>
        </constructor-arg>
    </bean>
    <bean id="odcUndividedOrderChain" class="org.apache.commons.chain.impl.ChainBase">
        <constructor-arg>
            <array>
                <ref bean="addedTechDispatchHandler"/>
            </array>
        </constructor-arg>
    </bean>
</beans>

解析:命令链配置文件,实现各个事件处理节点进行配置化,聚合各个分散的节点业务逻辑内,后续注入到对应的消息解析Handler。

5、总结

整个消息处理过程中采用Apache Chain职责链模式来降低代码层面的耦合度以及可以动态地改变处理者之间的关系和顺序,新增或删除处理者,以适应不同的需求和场景。MVEL表达式增强了同一事件处理节点的复用性,最大限度的提升了代码的简洁性。希望此文对大家后续设计类似场景有一定的帮助和启发。

作者:京东零售 张强

来源:京东云开发者社区

 

标签:return,String,Chain,处理,MVEL,MQ,context,false
From: https://www.cnblogs.com/Jcloud/p/18196231

相关文章

  • RabbitMQ消息堆积
    根据搜索结果中提供的信息,处理RabbitMQ消息堆积的问题可以采取以下几种策略:增加消费者数量:通过增加消费者的数量来提升消息的处理能力,分担消息消费的负载,缓解消息队列的堆积问题[^5^]。优化消费者的处理逻辑:检查消费者的代码是否存在性能瓶颈或是复杂的处理逻辑。可以通过优......
  • Mac (Intel) brew 安装 rabbitMQ
    一、安装##rabbitmq依赖erlang环境,先安装erlangbrewinstallerlangbrewinstallrabbitmq##安装主要日志(备份)==>Caveats==>erlangManpagescanbefoundin:/usr/local/opt/erlang/lib/erlang/manAccessthemwith`erl-man`,oraddthisdirectoryto......
  • LangChain 进阶历史对话管理
    自动历史管理前面的示例将消息显式地传递给链。这是一种完全可接受的方法,但确实需要外部管理新消息。LangChain还包括一个名为RunnableWithMessageHistory的包裹器,能够自动处理这个过程。为了展示其工作原理,我们稍微修改上面的提示,增加一个最终输入变量,该变量在聊天历史记录之后......
  • Docker安装Rabbitmq
    step1:安装必要的一些系统工具yuminstall-yyum-utilsdevice-mapper-persistent-datalvm2Step2:添加软件源信息yum-config-manager--add-repohttps://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repoStep3:更新并安装Docker-CEyummakecachefasty......
  • MultiPromptChain--场景切换
    fromlangchain_community.llmsimportOllamafromlangchain.chains.routerimportMultiPromptChainfromlangchain.chainsimportConversationChainfromlangchain.chains.llmimportLLMChainfromlangchain.promptsimportPromptTemplatellm=Ollama(base_url='htt......
  • MultiPromptChain--精简版
    fromlangchain_community.llmsimportOllamafromlangchain.chains.routerimportMultiPromptChainfromlangchain.chainsimportConversationChainfromlangchain.chains.llmimportLLMChainfromlangchain.promptsimportPromptTemplate#physics_template="&q......
  • Java开发微服务SpringCloudAlibaba+Nginx+Vue+Mysql+RabbitMQ
    项目介绍随着互联网技术的飞速发展和移动设备的普及,自媒体平台已经成为人们获取信息、传播观点、实现自我价值的重要途径。自媒体平台的设计与实现,不仅需要考虑如何提供便捷的内容发布、编辑和管理功能,还需要考虑如何构建健康的内容生态,保证信息的真实性和可靠性,防止虚假信息的传......
  • MQ消息积压,把我整吐血了
    前言我之前在一家餐饮公司待过两年,每天中午和晚上用餐高峰期,系统的并发量不容小觑。为了保险起见,公司规定各部门都要在吃饭的时间轮流值班,防止出现线上问题时能够及时处理。我当时在后厨显示系统团队,该系统属于订单的下游业务。用户点完菜下单后,订单系统会通过发kafka消息给我......
  • STM32Cube-10 | 使用ADC读取气体传感器数据(MQ-2)
    本篇详细的记录了如何使用STM32CubeMX配置STM32L431RCT6的ADC外设,读取MQ-2气体传感器的数据并通过串口发送本质就是ADC采集MQ-2的原理图如下: 生成MDK工程选择芯片型号打开STM32CubeMX,打开MCU选择器:搜索并选中芯片STM32L431RCT6:配置时钟源如果选择使用外......
  • 在linux中下载安装rabbitmq
    在linux CentOS7.6中首先,添加Erlang下载rpm-Uvherlang-23.2.1-1.el7.x86_64.rpm安装Erlang:yuminstallerlang检查是否安装Erlangerl-v安装socatyuminstall-ysocat安装RabbitMQ下载rpm-Uvhrabbitmq-server-3.8.3-1.el7.noarch.rpm yuminstall......