首页 > 其他分享 >Pipeline模式应用

Pipeline模式应用

时间:2023-12-08 09:45:38浏览次数:23  
标签:Pipeline return demo 模式 DemoPipelineProduct 应用 import com example

本文记录Pipeline设计模式在业务流程编排中的应用

前言

Pipeline模式意为管道模式,又称为流水线模式。旨在通过预先设定好的一系列阶段来处理输入的数据,每个阶段的输出即是下一阶段的输入。

本案例通过定义PipelineProduct(管道产品),PipelineJob(管道任务),PipelineNode(管道节点),完成一整条流水线的组装,并将“原材料”加工为“商品”。其中管道产品负责承载各个阶段的产品信息;管道任务负责不同阶段对产品的加工;管道节点约束了管道产品及任务的关系,通过信号量定义了任务的执行方式。

依赖

工具依赖如下

            <!-- 工具类大全 -->
            <dependency>
                <groupId>cn.hutool</groupId>
                <artifactId>hutool-all</artifactId>
                <version>最新版本</version>
            </dependency>



编程示例

1. 管道产品定义

package com.example.demo.pipeline.model;

/**
 * 管道产品接口
 *
 * @param <S> 信号量
 * @author 
 * @date 2023/05/15 11:49
 */
public interface PipelineProduct<S> {
}




2. 管道任务定义

package com.example.demo.pipeline.model;

/**
 * 管道任务接口
 *
 * @param <P> 管道产品
 * @author 
 * @date 2023/05/15 11:52
 */
@FunctionalInterface
public interface PipelineJob<P> {
    /**
     * 执行任务
     *
     * @param product 管道产品
     * @return {@link P}
     */
    P execute(P product);
}



3. 管道节点定义

package com.jd.baoxian.mall.market.service.pipeline.model;

import java.util.function.Predicate;

/**
 * 管道节点定义
 *
 * @param <S> 信号量
 * @param <P> 管道产品
 * @author 
 * @date 2023/05/15 11:54
 */
public interface PipelineNode<S, P extends PipelineProduct<S>> {
    /**
     * 节点组装,按照上个管道任务传递的信号,执行 pipelineJob
     *
     * @param pipelineJob 管道任务
     * @return {@link PipelineNode}<{@link S},  {@link P}>
     */
    PipelineNode<S, P> flax(PipelineJob<P> pipelineJob);

    /**
     * 节点组装,按照传递的信号,判断当前管道的信号是否相等,执行 pipelineJob
     *
     * @param signal      信号
     * @param pipelineJob 管道任务
     * @return {@link PipelineNode}<{@link S},  {@link P}>
     */
    PipelineNode<S, P> flax(S signal, PipelineJob<P> pipelineJob);

    /**
     * 节点组装,按照传递的信号,判断当前管道的信号是否相等,执行 pipelineJob
     *
     * @param predicate   信号
     * @param pipelineJob 管道任务
     * @return {@link PipelineNode}<{@link S},  {@link P}>
     */
    PipelineNode<S, P> flax(Predicate<S> predicate, PipelineJob<P> pipelineJob);

    /**
     * 管道节点-任务执行
     *
     * @param product 管道产品
     * @return {@link P}
     */
    P execute(P product);
}




4. 管道产品、任务,节点的实现

4.1 管道产品

package com.example.demo.pipeline.factory;


import com.example.demo.model.request.DemoReq;
import com.example.demo.model.response.DemoResp;
import com.example.demo.pipeline.model.PipelineProduct;
import lombok.*;

/**
 * 样例-管道产品
 *
 * @author 
 * @date 2023/05/15 14:04
 */
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class DemoPipelineProduct implements PipelineProduct<DemoPipelineProduct.DemoSignalEnum> {
    /**
     * 信号量
     */
    private DemoSignalEnum signal;

    /**
     * 产品-入参及回参
     */
    private DemoProductData productData;

    /**
     * 异常信息
     */
    private Exception exception;

    /**
     * 流程Id
     */
    private String tradeId;

    @Data
    @Builder
    @NoArgsConstructor
    @AllArgsConstructor
    public static class DemoProductData {
        /**
         * 待验证入参
         */
        private DemoReq userRequestData;

        /**
         * 待验证回参
         */
        private DemoResp userResponseData;
    }

    /**
     * 产品-信号量
     *
     * @author 
     * @date 2023/05/15 13:54
     */
    @Getter
    public enum DemoSignalEnum {
        /**
         *
         */
        NORMAL(0, "正常"),
        /**
         *
         */
        CHECK_NOT_PASS(1, "校验不通过"),
        /**
         *
         */
        BUSINESS_ERROR(2, "业务异常"),
        /**
         *
         */
        LOCK_ERROR(3, "锁处理异常"),
        /**
         *
         */
        DB_ERROR(4, "事务处理异常"),

        ;
        /**
         * 枚举码值
         */
        private final int code;
        /**
         * 枚举描述
         */
        private final String desc;

        /**
         * 构造器
         *
         * @param code
         * @param desc
         */
        DemoSignalEnum(int code, String desc) {
            this.code = code;
            this.desc = desc;
        }
    }
}





4.2 管道任务(抽象类)

package com.example.demo.pipeline.factory.job;

import cn.hutool.core.util.ClassUtil;
import cn.hutool.json.JSONUtil;
import com.example.demo.pipeline.factory.DemoPipelineProduct;
import com.example.demo.pipeline.model.PipelineJob;
import lombok.extern.slf4j.Slf4j;

/**
 * 管道任务-抽象层
 *
 * @author 
 * @date 2023/05/15 19:48
 */
@Slf4j
public abstract class AbstractDemoJob implements PipelineJob<DemoPipelineProduct> {

    /**
     * 公共执行逻辑
     *
     * @param product 产品
     * @return
     */
    @Override
    public DemoPipelineProduct execute(DemoPipelineProduct product) {
        DemoPipelineProduct.DemoSignalEnum newSignal;
        try {
            newSignal = execute(product.getTradeId(), product.getProductData());
        } catch (Exception e) {
            product.setException(e);
            newSignal = DemoPipelineProduct.DemoSignalEnum.BUSINESS_ERROR;
        }
        product.setSignal(newSignal);
        defaultLogPrint(product.getTradeId(), product);
        return product;
    }

    /**
     * 子类执行逻辑
     *
     * @param tradeId     流程Id
     * @param productData 请求数据
     * @return
     * @throws Exception 异常
     */
    abstract DemoPipelineProduct.DemoSignalEnum execute(String tradeId, DemoPipelineProduct.DemoProductData productData) throws Exception;

    /**
     * 默认的日志打印
     */
    public void defaultLogPrint(String tradeId, DemoPipelineProduct product) {
        if (!DemoPipelineProduct.DemoSignalEnum.NORMAL.equals(product.getSignal())) {
            log.info("流水线任务处理异常:流程Id=【{}】,信号量=【{}】,任务=【{}】,参数=【{}】", tradeId, product.getSignal(),
                    ClassUtil.getClassName(this, true), JSONUtil.toJsonStr(product.getProductData()), product.getException());
        }
    }

}




4.3 管道节点

package com.example.demo.pipeline.factory;


import cn.hutool.core.util.ClassUtil;
import cn.hutool.json.JSONUtil;
import com.example.demo.pipeline.model.PipelineJob;
import com.example.demo.pipeline.model.PipelineNode;
import lombok.extern.slf4j.Slf4j;

import java.util.function.Predicate;

/**
 * 审核-管道节点
 *
 * @author 
 * @date 2023/05/15 14:32
 */
@Slf4j
public class DemoPipelineNode implements PipelineNode<DemoPipelineProduct.DemoSignalEnum, DemoPipelineProduct> {

    /**
     * 下一管道节点
     */
    private DemoPipelineNode next;

    /**
     * 当前管道任务
     */
    private PipelineJob<DemoPipelineProduct> job;

    /**
     * 节点组装,按照上个管道任务传递的信号,执行 pipelineJob
     *
     * @param pipelineJob 管道任务
     * @return {@link DemoPipelineNode}
     */
    @Override
    public DemoPipelineNode flax(PipelineJob<DemoPipelineProduct> pipelineJob) {
        return flax(DemoPipelineProduct.DemoSignalEnum.NORMAL, pipelineJob);
    }

    /**
     * 节点组装,按照传递的信号,判断当前管道的信号是否相等,执行 pipelineJob
     *
     * @param signal      信号
     * @param pipelineJob 管道任务
     * @return {@link DemoPipelineNode}
     */
    @Override
    public DemoPipelineNode flax(DemoPipelineProduct.DemoSignalEnum signal, PipelineJob<DemoPipelineProduct> pipelineJob) {
        return flax(signal::equals, pipelineJob);
    }

    /**
     * 节点组装,上个管道过来的信号运行 predicate 后是true的话,执行 pipelineJob
     *
     * @param predicate
     * @param pipelineJob
     * @return
     */
    @Override
    public DemoPipelineNode flax(Predicate<DemoPipelineProduct.DemoSignalEnum> predicate,
                                 PipelineJob<DemoPipelineProduct> pipelineJob) {
        this.next = new DemoPipelineNode();
        this.job = (job) -> {
            if (predicate.test(job.getSignal())) {
                return pipelineJob.execute(job);
            } else {
                return job;
            }
        };
        return next;
    }

    /**
     * 管道节点-任务执行
     *
     * @param product 管道产品
     * @return
     */
    @Override
    public DemoPipelineProduct execute(DemoPipelineProduct product) {
        // 执行当前任务
        try {
            product = job == null ? product : job.execute(product);
            return next == null ? product : next.execute(product);
        } catch (Exception e) {
            log.error("流水线处理异常:流程Id=【{}】,任务=【{}】,参数=【{}】", product.getTradeId(), ClassUtil.getClassName(job, true), JSONUtil.toJsonStr(product.getProductData()), product.getException());
            return null;
        }

    }
}





5. 业务实现

通过之前的定义,我们已经可以通过Pipeline完成流水线的搭建,接下来以“审核信息提交”这一业务场景,完成应用。

5.1 定义Api、入参、回参

package com.example.demo.api;

import com.example.demo.model.request.DemoReq;
import com.example.demo.model.response.DemoResp;
import com.example.demo.pipeline.factory.PipelineForManagerSubmit;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;

/**
 * 演示-API
 *
 * @author 
 * @date 2023/08/06 16:27
 */
@Service
public class DemoManagerApi {

    /**
     * 管道-审核提交
     */
    @Resource
    private PipelineForManagerSubmit pipelineForManagerSubmit;

    /**
     * 审核提交
     *
     * @param requestData 请求数据
     * @return {@link DemoResp}
     */
    public DemoResp managerSubmit(DemoReq requestData) {
        return pipelineForManagerSubmit.managerSubmitCheck(requestData);
    }
}


package com.example.demo.model.request;

/**
 * 演示入参
 *
 * @author 
 * @date 2023/08/06 16:33
 */
public class DemoReq {
}


package com.example.demo.model.response;

import lombok.Data;

/**
 * 演示回参
 *
 * @author 
 * @date 2023/08/06 16:33
 */
@Data
public class DemoResp {
    /**
     * 成功标识
     */
    private Boolean success = false;

    /**
     * 结果信息
     */
    private String resultMsg;

    /**
     * 构造方法
     *
     * @param message 消息
     * @return {@link DemoResp}
     */
    public static DemoResp buildRes(String message) {
        DemoResp response = new DemoResp();
        response.setResultMsg(message);
        return response;
    }
}





5.2 定义具体任务

假定审核提交的流程需要包含:参数验证、加锁、解锁、事务提交

package com.example.demo.pipeline.factory.job;

import cn.hutool.json.JSONUtil;
import com.example.demo.model.request.DemoReq;
import com.example.demo.pipeline.factory.DemoPipelineProduct;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

/**
 * 加锁-实现层
 *
 * @author 
 * @date 2023/05/17 17:00
 */
@Service
@Slf4j
public class CheckRequestLockJob extends AbstractDemoJob {

    /**
     * 子类执行逻辑
     *
     * @param tradeId     流程Id
     * @param productData 请求数据
     * @return
     * @throws Exception 异常
     */
    @Override
    DemoPipelineProduct.DemoSignalEnum execute(String tradeId, DemoPipelineProduct.DemoProductData productData) throws Exception {
        DemoReq userRequestData = productData.getUserRequestData();
        log.info("任务[{}]加锁,线程号:{}", JSONUtil.toJsonStr(userRequestData), tradeId);
        return DemoPipelineProduct.DemoSignalEnum.NORMAL;
    }
}


package com.example.demo.pipeline.factory.job;

import cn.hutool.json.JSONUtil;
import com.example.demo.model.request.DemoReq;
import com.example.demo.pipeline.factory.DemoPipelineProduct;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

/**
 * 解锁-实现层
 *
 * @author 
 * @date 2023/05/17 17:00
 */
@Service
@Slf4j
public class CheckRequestUnLockJob extends AbstractDemoJob {

    /**
     * 子类执行逻辑
     *
     * @param tradeId     流程Id
     * @param productData 请求数据
     * @return
     * @throws Exception 异常
     */
    @Override
    DemoPipelineProduct.DemoSignalEnum execute(String tradeId, DemoPipelineProduct.DemoProductData productData) throws Exception {
        DemoReq userRequestData = productData.getUserRequestData();
        log.info("任务[{}]解锁,线程号:{}", JSONUtil.toJsonStr(userRequestData), tradeId);
        return DemoPipelineProduct.DemoSignalEnum.NORMAL;
    }
}

package com.example.demo.pipeline.factory.job;

import cn.hutool.json.JSONUtil;
import com.example.demo.model.request.DemoReq;
import com.example.demo.pipeline.factory.DemoPipelineProduct;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;


/**
 * 审核-参数验证-实现类
 *
 * @author 
 * @date 2023/05/15 19:50
 */
@Slf4j
@Component
public class ManagerCheckParamJob extends AbstractDemoJob {

    /**
     * 执行基本入参验证
     *
     * @param tradeId
     * @param productData 请求数据
     * @return
     */
    @Override
    DemoPipelineProduct.DemoSignalEnum execute(String tradeId, DemoPipelineProduct.DemoProductData productData) {
        /*
         * 入参验证
         */
        DemoReq userRequestData = productData.getUserRequestData();
        log.info("任务[{}]入参验证,线程号:{}", JSONUtil.toJsonStr(userRequestData), tradeId);
        // 非空验证

        // 有效验证

        // 校验通过,退出
        return DemoPipelineProduct.DemoSignalEnum.NORMAL;
    }

}


package com.example.demo.pipeline.factory.job;

import cn.hutool.json.JSONUtil;
import com.example.demo.model.request.DemoReq;
import com.example.demo.model.response.DemoResp;
import com.example.demo.pipeline.factory.DemoPipelineProduct;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

/**
 * 审核-信息提交-业务实现
 *
 * @author 
 * @date 2023/05/12 14:36
 */
@Service
@Slf4j
public class ManagerSubmitJob extends AbstractDemoJob {

    /**
     * 子类执行逻辑
     *
     * @param tradeId     流程Id
     * @param productData 请求数据
     * @return
     * @throws Exception 异常
     */
    @Override
    DemoPipelineProduct.DemoSignalEnum execute(String tradeId, DemoPipelineProduct.DemoProductData productData) throws Exception {
        DemoReq userRequestData = productData.getUserRequestData();
        try {
            /*
             * DB操作
             */
            log.info("任务[{}]信息提交,线程号:{}", JSONUtil.toJsonStr(userRequestData), tradeId);
            productData.setUserResponseData(DemoResp.buildRes("成功"));
        } catch (Exception ex) {
            log.error("审核-信息提交-DB操作失败,入参:{}", JSONUtil.toJsonStr(userRequestData), ex);
            throw ex;
        }
        return DemoPipelineProduct.DemoSignalEnum.NORMAL;
    }
}





5.3 完成流水线组装

针对入回参转换,管道任务执行顺序及执行信号量的构建

package com.example.demo.pipeline.factory;

import com.example.demo.model.request.DemoReq;
import com.example.demo.model.response.DemoResp;
import com.example.demo.pipeline.factory.job.CheckRequestLockJob;
import com.example.demo.pipeline.factory.job.CheckRequestUnLockJob;
import com.example.demo.pipeline.factory.job.ManagerCheckParamJob;
import com.example.demo.pipeline.factory.job.ManagerSubmitJob;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import java.util.Objects;
import java.util.UUID;

/**
 * 管道工厂入口-审核流水线
 *
 * @author 
 * @date 2023/05/15 19:52
 */
@Slf4j
@Service
@RequiredArgsConstructor
public class PipelineForManagerSubmit {

    /**
     * 审核-管道节点
     */
    private final DemoPipelineNode managerSubmitNode = new DemoPipelineNode();


    /**
     * 审核-管道任务-提交-防刷锁-加锁
     */
    private final CheckRequestLockJob checkRequestLockJob;

    /**
     * 审核-管道任务-提交-防刷锁-解锁
     */
    private final CheckRequestUnLockJob checkRequestUnLockJob;

    /**
     * 审核-管道任务-参数验证
     */
    private final ManagerCheckParamJob managerCheckParamJob;

    /**
     * 审核-管道任务-事务操作
     */
    private final ManagerSubmitJob managerSubmitJob;


    /**
     * 组装审核的处理链
     */
    @PostConstruct
    private void assembly() {
        assemblyManagerSubmit();
    }

    /**
     * 组装处理链
     */
    private void assemblyManagerSubmit() {

        managerSubmitNode
                // 参数验证及填充
                .flax(managerCheckParamJob)
                // 防刷锁
                .flax(checkRequestLockJob)
                // 事务操作
                .flax(managerSubmitJob)
                // 锁释放
                .flax((ignore) -> true, checkRequestUnLockJob);
    }

    /**
     * 审核-提交处理
     *
     * @param requestData 入参
     * @return
     */
    public DemoResp managerSubmitCheck(DemoReq requestData) {
        DemoPipelineProduct initialProduct = managerSubmitCheckInitial(requestData);
        DemoPipelineProduct finalProduct = managerSubmitNode.execute(initialProduct);
        if (Objects.isNull(finalProduct) || Objects.nonNull(finalProduct.getException())) {
            return DemoResp.buildRes("未知异常");
        }
        return finalProduct.getProductData().getUserResponseData();
    }

    /**
     * 审核-初始化申请的流水线数据
     *
     * @param requestData 入参
     * @return 初始的流水线数据
     */
    private DemoPipelineProduct managerSubmitCheckInitial(DemoReq requestData) {
        // 初始化
        return DemoPipelineProduct.builder()
                .signal(DemoPipelineProduct.DemoSignalEnum.NORMAL)
                .tradeId(UUID.randomUUID().toString())
                .productData(DemoPipelineProduct.DemoProductData.builder().userRequestData(requestData).build())
                .build();
    }
}




总结

本文重点为管道模式的抽象与应用,上述示例仅为个人理解。实际应用中,此案例长于应对各种规则冗杂的业务场景,便于规则编排。
待改进点:

  1. 各个任务其实隐含了执行的先后顺序,此项内容可进一步实现;

  2. 针对最后“流水线组装”这一步,可通过配置描述的方式,进一步抽象,从而将变动控制在每个“管道任务”的描述上,针对规则项做到“可插拔”式处理。

作者:京东保险 侯亚东

来源:京东云开发者社区 转载请注明来源

标签:Pipeline,return,demo,模式,DemoPipelineProduct,应用,import,com,example
From: https://www.cnblogs.com/Jcloud/p/17884476.html

相关文章

  • GOF23--23种设计模式(二)
    一.建造者模式建造者模式也是属于建造型模式,它提供了一种创建对象的最佳方式定义:将一个复杂的对象的构建和它的表示分离,使得同样的构建过程可以创建不同的表示主要作用:在用户不知道对象的构建细节的情况下,就可以创建复杂的对象这里需要注意一下,建造者模式都都是用来创建复杂对......
  • BFD技术与应用
    BFD双向转发检测   作用:检测网络故障,实现快速检测/收敛        提供了一个通用的、标准化的、介质无关和协议无关的快速故障检测机制    1.硬件检测,使用同一条链路相连的两个之间可以通过接口硬件检测功能检测故障        ......
  • InnoDB的应用场景
    InnoDB是MySQL数据库引擎的一种,具有许多特点,适用于特定的使用场景。以下是InnoDB的主要特点和适用场景:InnoDB的特点:事务支持:InnoDB支持事务处理,实现了ACID特性(原子性、一致性、隔离性、持久性),确保了数据库的数据完整性和一致性。行级锁定:InnoDB使用行级锁定机制,而不是......
  • Splay 伸展树扩展应用
    Update2023.5.27好吧,lxl好像已经发明过这种数据结构了(悲)。前言谈谈扩展Splay。(下文用KzSplay代替)前置知识:1.Splay,以及文艺平衡树。2.线段树。问题引入请你设计一种数据结构,支持在线处理以下操作:给定一个长度为\(n\)的序列\(a\)。1.支持序列的区间翻转。2.支持......
  • 23种设计模式——建造者模式
    今天我们要学习的是23种设计模式中的第四种——建造者模式。建造者模式主要适用于一些基本部件不会变,而其组合经常变化的情况,下面我们开始进入正题。概念:建造者模式(BuilderPattern)又叫生成器模式,是一种对象构建模式。建造者模式是将一个复杂对象的构造与它的表示分离,使同样的......
  • 【Nginx/IIS】解决uniapp/Vue history模式下页面刷新404
    uniapp/Vue开启History模式本地开发:二级页面刷新或者通过链接进入二级页面是正常的打包部署后:二级页面刷新或者通过链接进入二级页面会报错404页面找不到 解决方案:Nginx配置:在nginx.conf的对应location里配置一行代码try_files$uri$uri//index.html;location/h5{......
  • uml设计模式语言
    UML(UnifiedModelingLanguage)是一种广泛应用于软件开发和系统设计的建模语言。在软件工程中,UML图用于可视化、规划和共享系统设计的概念,其中类图是其中最核心、最常用的一种图示。本文将简述UML模型设计的基本概念,重点关注类图及其关系的详细解释。1.UML概述:UML作为一种标准化......
  • 2024年安防视频监控发展趋势预测及LiteCVR视频技术应用
    随着科技的快速发展,安防视频技术已经成为了各个领域中不可或缺的一部分。为了更好地应对各种安全挑战,安防视频技术也在不断地升级和改进。本文将预测2024年安防视频技术的几个发展趋势。首先,高清化将是未来安防视频技术的一个重要方向。随着人们对安全需求的不断提高,对视频清晰度......
  • 开发APP应用程序到底是选ios好还是Android好?
    哈喽大家好,我是咕噜老尼,现在我们都知道,APP应用已经覆盖了我们的生活,成为我们生活中不可缺少的一部分,手机系统主要分两种,分别是安卓和ios系统,不少APP开发公司在制作手机APP时,都需要将同一款APP做成两种,分别适应安卓和iOS系统。那么,开发APP应用程序到底是选ios好还是Android好,我们一......
  • 如何使用Visual Studio 2022创建基本Vue.js.Web应用程序
    最近接了个物联网项目,需要用到   VUEAnt-Design 对于vue没有概念 只能查找相关  vue.js的知识。 了解vue.js的前提条件 是要对  HTML+CSS+Jscript有一定的知识储备。所以又去看了看对应的三剑客(HTML+CSS+Jscript)。跟着vue.js官网学习了一下,就......