首页 > 编程语言 >java操作kettle

java操作kettle

时间:2024-06-18 15:32:31浏览次数:16  
标签:return String stepInfo java jsonObject kettle new 操作 public

  1. 新建springboot项目

  2. 引入相关依赖

<!--kettle-->
        <dependency>
            <groupId>pentaho-kettle</groupId>
            <artifactId>kettle-core</artifactId>
            <version>9.4.0.0-343</version>
        </dependency>
        <dependency>
            <groupId>pentaho-kettle</groupId>
            <artifactId>kettle-engine</artifactId>
            <version>9.4.0.0-343</version>
        </dependency>
        <dependency>
            <groupId>org.pentaho</groupId>
            <artifactId>pentaho-encryption-support</artifactId>
            <version>9.4.0.0-343</version>
        </dependency>
        <dependency>
            <groupId>pentaho</groupId>
            <artifactId>metastore</artifactId>
            <version>9.4.0.0-343</version>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-vfs2</artifactId>
            <version>2.7.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>3.12.0</version>
        </dependency>
        <dependency>
            <groupId>commons-lang</groupId>
            <artifactId>commons-lang</artifactId>
            <version>2.6</version>
        </dependency>
        <dependency>
            <groupId>org.owasp.encoder</groupId>
            <artifactId>encoder</artifactId>
            <version>1.2.3</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.29</version>
        </dependency>

国内仓库无法加载该项目jar包,可以采用以下方式进行安装

mvn install:install-file -Dfile=jar包路劲\commons-vfs2-2.7.0.jar -DgroupId=org.apache.commons  -DartifactId=commons-vfs2 -Dversion=2.7.0 -Dpackaging=jar
  1. 新建kettle-password-encoder-plugins.xml文件,否则启动将会报错,文件内容为

<password-encoder-plugins>

  <password-encoder-plugin id="Kettle">
    <description>Kettle Password Encoder</description>
    <classname>org.pentaho.support.encryption.KettleTwoWayPasswordEncoder</classname>
  </password-encoder-plugin>
  <!--
  <password-encoder-plugin id="AES">
    <description>AES Password Encoder</description>
    <classname>org.pentaho.support.encryption.AESTwoWayPasswordEncoder</classname>
    <default-encoder>true</default-encoder>
  </password-encoder-plugin>
  -->

</password-encoder-plugins>
  1. 新建接口传参json格式参数

{
  "stepInfos": [
    {
      "type": "tableInput",
      "database": {
        "access": "Native",
        "database": "kettle-test",
        "name": "test",
        "password": "***",
        "port": "3306",
        "server": "localhost",
        "type": "MYSQL",
        "username": "root"
      },
      "stepInfo": {
        "sql": "select * from user",
        "stepName": "1",
        "location": {
          "x": 100,
          "y": 100
        }
      }
    },
    {
      "type": "tableInput",
      "database": {
        "access": "Native",
        "database": "kettle-test",
        "name": "test",
        "password": "****",
        "port": "3306",
        "server": "localhost",
        "type": "MYSQL",
        "username": "root"
      },
      "stepInfo": {
        "sql": "select * from user",
        "stepName": "2",
        "location": {
          "x": 200,
          "y": 100
        }
      }
    },
    {
      "type": "append",
      "headName": "1",
      "tailName": "2",
      "stepInfo": {
        "stepName": "3",
        "location": {
          "x": 300,
          "y": 100
        }
      }
    }
  ],
  "previewInfo": {
    "stepName": "1",
    "size": 2
  },
  "connectInfos": [
    {
      "from": "1",
      "to": "3"
    },
    {
      "from": "2",
      "to": "3"
    }
  ]
}
  1. 新建controller调用接口

import com.alibaba.fastjson.JSONObject;
import com.central.business.dto.KettleTableInputDTO;
import com.central.business.dto.PreviewData;
import com.central.business.service.KettleService;
import com.central.common.exception.BizException;
import com.central.common.model.Result;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.multipart.MultipartFile;
import sun.rmi.runtime.Log;

import javax.annotation.Resource;
import javax.servlet.http.HttpServletResponse;


@Slf4j
@RestController
@RequestMapping("kettle")
@Api(tags = "Kettle操作接口")
public class KettleController {

    @Resource
    private KettleService kettleService;
    
    @PostMapping("previewByJson")
    @ApiOperation("通过json数据预览")
    public Result<PreviewData> previewByJson(@RequestBody JSONObject jsonObject) {
        log.info(jsonObject.toJSONString());
        return kettleService.previewByJson(jsonObject);
    }

    @PostMapping("downloadKtrByJson")
    @ApiOperation("通过json数据下载对应ktr文件,可用于验证json格式数据自定义ktr图是否正误")
    public void downloadKtrByJson(@RequestBody JSONObject jsonObject, HttpServletResponse response) {
        try {
            kettleService.downloadKtrByJson(jsonObject, response);
        } catch (Exception e) {
            e.printStackTrace();
            throw new BizException(e.getMessage());
        }
    }

    @GetMapping("/execKtr")
    @ApiOperation("执行ktr文件")
    public Object runKtr(String filename) throws Exception {
        return Result.succeed(kettleService.runTaskKtr(filename, null).toString());
    }

    @GetMapping("/execKjb")
    @ApiOperation("执行kjb文件")
    public Object runKjb(String filename) throws Exception {
        return Result.succeed(kettleService.runTaskKjb(filename, null).toString());
    }
}
  1. kettle核心操作类

对应的service就直接不写了,直接写核心部分,操作工具类

	/**
     * 以流的方式调用和预览 kettle ktr
     *
     * @param in 文件流
     */
    public static PreviewData previewKtrByStream(InputStream in, KettlePreviewDTO previewDTO) {
        if (Objects.isNull(previewDTO) || StringUtils.isBlank(previewDTO.getStepName())) {
            return null;
        }
        try {
            KettleEnvironment.init();
            // 此处可以注册自定义插件,用于扩展定制个人需求
            registerPlugins();
            TransMeta transMeta = new TransMeta(in, null, true, null, null);
            Trans trans = new Trans(transMeta);
            trans.setPreview(true);
            if (needSendLog(previewDTO)) {
                PreviewMap.setPreviewDTO(trans.getLogChannelId(), previewDTO);
                // 新增执行监听插件,可用于在kettle执行ktl文件的生命周期内定制动作
                trans.addTransListener(new AsyncLogListener());
            }
            trans.prepareExecution(null);
            PreviewData previewData = new PreviewData();
            previewData.setData(new LinkedList<>());

            if (!isExist(transMeta, previewDTO)) {
                throw new BizException("预览信息中的步骤" + previewDTO.getStepName() + "不存在,请指定存在的步骤");
            }
            if (!isAsync(previewDTO)) {
            	// 支持同spoon一样的preview操作
                trans.getSteps().stream().filter(c -> previewDTO.getStepName().equals(c.step.getStepname()))
                        .findFirst()
                        .ifPresent(n -> n.step.addRowListener(new RowAdapter() {
                            @Override
                            public void rowWrittenEvent(RowMetaInterface rowMeta, Object[] row) {
                                addToPreviewData(new PreviewParam(rowMeta, row, previewData, previewDTO, n));
                            }
                        }));
            }
            trans.startThreads();
            if (!isAsync(previewDTO)) {
                trans.waitUntilFinished();
            }
            return previewData;
        } catch (Exception e) {
            e.printStackTrace();
            throw new BizException(e.getMessage());
        }
    }



	/**
	注册个人自定义插件
	*/
    private static void registerPlugins() throws KettlePluginException {
        Map<Class<?>, String> hashMap = new HashMap<>(2);
        hashMap.put(StepMetaInterface.class, AppendStepMeta.class.getName());
        // 注册spoon上存在,而java操作kettle上不存在的append组件,此处仅为注册,具体操作往下看
        PluginRegistry.getInstance().registerPlugin(StepPluginType.class, new Plugin(
                new String[]{"Append"}, StepPluginType.class, StepMetaInterface.class,
                "", "", "", "",
                false, false, hashMap,
                new LinkedList<>(), null, null
        ));
        Map<Class<?>, String> hashMap2 = new HashMap<>(2);
        hashMap2.put(StepMetaInterface.class, FieldSplitter2Meta.class.getName());
        // 此处为原有的FieldSplitter组件的字符分隔功能无法满足使用,通过注册重写一个满足使用的组件
        PluginRegistry.getInstance().registerPlugin(StepPluginType.class, new Plugin(
                new String[]{"FieldSplitter2"}, StepPluginType.class, StepMetaInterface.class,
                "", "", "", "",
                false, false, hashMap2,
                new LinkedList<>(), null, null
        ));
    }


	private static void addToPreviewData(PreviewParam previewParam) {
        if (Objects.isNull(previewParam.getRowMeta()) || Objects.isNull(previewParam.getPreviewData())
                || isReachSize(previewParam.getPreviewDTO(), previewParam.getPreviewData())) {
            return;
        }
        List<ValueMetaInterface> valueMetaList = previewParam.getRowMeta().getValueMetaList();
        if (isEmpty(valueMetaList)) {
            return;
        }
        List<String> joinFields = getJoinFields(previewParam);
        List<String> fieldName = valueMetaList.stream()
                .map(ValueMetaInterface::getName)
                .collect(toCollection(LinkedList::new));
        previewParam.getPreviewData().setFieldName(fieldName);
        if (ArrayUtils.isEmpty(previewParam.getRow())) {
            return;
        }
        Map<String, Object> dataMap = new LinkedHashMap<>();
        for (int i = 0; i < fieldName.size(); i++) {
            if (isContains(joinFields, fieldName.get(i))) {
                continue;
            }
            dataMap.put(fieldName.get(i), getValue(previewParam, i));
        }
        removeJoinFields(previewParam, joinFields, fieldName);
        if (dateMapIsAllNull(dataMap)) {
            return;
        }
        previewParam.getPreviewData().getData().add(dataMap);
    }
  1. kettle组件生成核心代码

public static void main(String[] args) throws IOException, KettleException {
        // 初始化
        kettleInit("merge data test");

        // 添加需要的组件
        getAppendStep(KettleAppendInfo.builder()
                        .headName("字段选择")
                        .tailName("字段选择 2")
                        .build(),
                KettleStepInfo.builder()
                        .stepName("追加流")
                        .location(KettleLocation.builder()
                                .x(450)
                                .y(175)
                                .build())
                        .build());

        // 输出
        String transXml = getTransMeta().getXML();
        File file = new File("merge_test.ktr");
        FileUtils.writeStringToFile(file, transXml, "UTF-8");
    }

	// kettle初始化
	public static void kettleInit(String name) {
        try {
            KettleEnvironment.init();
            setTransMeta(new TransMeta());
            getTransMeta().setName(name);
            // registry是给每个步骤生成一个标识Id用
            setRegistry(PluginRegistry.getInstance());
        } catch (KettleException e) {
            e.printStackTrace();
        }
    }
    
    // 合并两个步骤使用
    public static void combineTwoStep(StepMeta from, StepMeta to) {
        if (Objects.isNull(from) || Objects.isNull(to)) {
            throw new BizException("开头from或者结尾to参数不可为空");
        }
        getTransMeta().addTransHop(new TransHopMeta(from, to));
    }
    
    public static StepMeta getTableOutputStep(KettleTableOutputInfo outputInfo, KettleDatabase database,
                                              KettleStepInfo stepInfo) {
        assertParam(outputInfo);
        TableOutputMeta tableOutputMeta = new TableOutputMeta();
        tableOutputMeta.setDatabaseMeta(getDatabaseMeta(database));
        tableOutputMeta.setTableName(outputInfo.getTableName());
        tableOutputMeta.setSpecifyFields(outputInfo.getSpecifyFields());
        tableOutputMeta.setTruncateTable(outputInfo.getTruncateTable());
        tableOutputMeta.setFieldDatabase(outputInfo.getFieldDatabase());
        tableOutputMeta.setFieldStream(outputInfo.getFieldStream());

        return getStepMeta(stepInfo, tableOutputMeta);
    }
    
    public static StepMeta getSelectValuesStep(KettleSelectValuesInfo selectValuesInfo, KettleStepInfo stepInfo) {
        assertParam(selectValuesInfo);
        SelectValuesMeta setValueFieldMeta = new SelectValuesMeta();
        if (Objects.nonNull(selectValuesInfo.getSelectName())) {
            setValueFieldMeta.setSelectName(selectValuesInfo.getSelectName());
        }
        if (Objects.nonNull(selectValuesInfo.getSelectRename())) {
            setValueFieldMeta.setSelectRename(selectValuesInfo.getSelectRename());
        }
        setValueFieldMeta.setDeleteName(selectValuesInfo.getDeleteName());
        if (ArrayUtils.isNotEmpty(selectValuesInfo.getMetas())) {
            SelectMetadataChange[] metas = new SelectMetadataChange[selectValuesInfo.getMetas().length];
            for (int i = 0; i < selectValuesInfo.getMetas().length; i++) {
                SelectMetadata current = selectValuesInfo.getMetas()[i];
                metas[i] = new SelectMetadataChange(setValueFieldMeta, current.getName(), current.getRename(),
                        current.getType(), current.getLength(), current.getPrecision(), current.getStorageType(),
                        current.getConversionMask(), current.isDateFormatLenient(), current.getDateFormatLocale(),
                        current.getDateFormatTimeZone(), current.isLenientStringToNumber(), current.getDecimalSymbol(),
                        current.getGroupingSymbol(), current.getCurrencySymbol());
            }
            setValueFieldMeta.setMeta(metas);
        }
        return getStepMeta(stepInfo, setValueFieldMeta);
    }
    
    private static DatabaseMeta getDatabaseMeta(KettleDatabase database) {
        // 添加转换的数据库连接
        DatabaseMeta databaseMeta;
        try {
            databaseMeta = new DatabaseMeta(getDatabaseXml(database));
        } catch (Exception e) {
            e.printStackTrace();
            throw new BizException(e.getMessage());
        }
        getTransMeta().addDatabase(databaseMeta);
        // 给表输入添加一个DatabaseMeta连接数据库
        return getTransMeta().findDatabase(database.getName());
    }
    
    private static StepMeta getStepMeta(KettleStepInfo stepInfo, StepMetaInterface stepMetaInterface) {
        String pluginId = getRegistry().getPluginId(StepPluginType.class, stepMetaInterface);
        // 添加SortRowsMeta到转换中
        StepMeta stepMeta = new StepMeta(pluginId, stepInfo.getStepName(), stepMetaInterface);
        // 给步骤添加在spoon工具中的显示位置
        stepMeta.setDraw(true);
        stepMeta.setLocation(stepInfo.getLocation().getX(), stepInfo.getLocation().getY());
        getTransMeta().addStep(stepMeta);
        return stepMeta;
    }
    
    private static String getDatabaseXml(KettleDatabase kettleDatabase) {
        if (StringUtils.isBlank(kettleDatabase.getName())) {
            throw new BizException("database中的name不可为空");
        }
        return "<?xml version=\"1.0\" encoding=\"UTF-8\"?>" +
                "<connection>" +
                "<name>" + kettleDatabase.getName() + "</name>" +
                "<server>" + kettleDatabase.getServer() + "</server>" +
                "<type>" + kettleDatabase.getType() + "</type>" +
                "<access>" + kettleDatabase.getAccess() + "</access>" +
                "<database>" + kettleDatabase.getDatabase() + "</database>" +
                "<port>" + kettleDatabase.getPort() + "</port>" +
                "<username>" + kettleDatabase.getUsername() + "</username>" +
                "<password>" + kettleDatabase.getPassword() + "</password>" +
                "</connection>";
    }
  1. 新增kettle执行ktr文件过程中定制自定义动作的监听器

    • 实现 TranListener接口
    • 重写需要的生命周期函数,如本例子执行了ktr执行完成后的动作
@Slf4j
public class AsyncLogListener implements TransListener {
    @Override
    public void transStarted(Trans trans) throws KettleException {

    }

    @Override
    public void transActive(Trans trans) {

    }

    @Override
    public void transFinished(Trans trans) throws KettleException {
        String logChannelId = trans.getLogChannelId();
        LoggingBuffer appender = KettleLogStore.getAppender();
        // 执行后的日志信息
        String logMsg = appender.getBuffer(logChannelId, true).toString();
        if (StringUtils.isBlank(logMsg)) {
            return;
        }
        doSomethingYouWant(logMsg, trans);
    }
}
  1. 自定义kettle组件的三要素

    1. 继承BaseStep,实现StepInterface
    2. 继承BaseStepData,实现StepDataInterface
    3. 继承BaseStepMeta, 实现StepMetaInterface
// 第一步
@Data
public class AppendStep extends BaseStep implements StepInterface {

    private static final Class<?> PKG = FieldSplitter2Meta.class;

    private String headName;
    private String tailName;


    public AppendStep(StepMeta s, StepDataInterface stepDataInterface, int c, TransMeta t, Trans dis) {
        super(s, stepDataInterface, c, t, dis);
    }

    /**
     * TODO 初始化方法,可以建立数据库链接、获取文件句柄等操作,会被PDI调用​​​​​​​。
     */
    @Override
    public boolean init(StepMetaInterface smi, StepDataInterface sdi) {
        AppendStepMeta meta = (AppendStepMeta) smi;
        AppendStepData data = (AppendStepData) sdi;
        if (!super.init(meta, data)) {
            return false;
        }
        //在这里添加特定的初始化代码
        headName = meta.getHeadName();
        tailName = meta.getTailName();
        return true;
    }

    /**
     * 读取行的业务逻辑,会被PDI调用,当此方法返回false时,完成行读取。
     */
    @Override
    public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException {
        AppendStepMeta meta = (AppendStepMeta) smi;
        AppendStepData data = (AppendStepData) sdi;
        //从输入流中读取一行
        Object[] r = getRow();
        if (r == null) {
            setOutputDone();
            return false;
        }
        if (first) {
            first = false;
            //如果是第一行则保存数据行元信息到data类中,后续使用
            data.outputRowMeta = getInputRowMeta().clone();
        }
        //将行放入输出行流
        putRow(data.outputRowMeta, r);

        //如有需要,可以进行日志记录
        if (checkFeedback(getLinesRead())) {
            logBasic(BaseMessages.getString(PKG, "AppendStep.Linenr", getLinesRead()));
        }

        //返回true则表示还应继续使用processRow()读取下一行
        return true;
    }

    /**
     * 析构函数,用来释放资源,会被PDI调用。
     */
    @Override
    public void dispose(StepMetaInterface smi, StepDataInterface sdi) {
        AppendStepMeta meta = (AppendStepMeta) smi;
        AppendStepData data = (AppendStepData) sdi;
        super.dispose(meta, data);
    }

}

// 第二步
public class AppendStepData extends BaseStepData implements StepDataInterface {

    RowMetaInterface outputRowMeta;

    public AppendStepData() {
        super();
    }

}

// 第三步
@Data
public class AppendStepMeta extends BaseStepMeta implements StepMetaInterface {

    private static final Class<?> PKG = AppendStepMeta.class;

    private String headName;
    private String tailName;

    public AppendStepMeta() {
        super();
    }

    @Override
    public StepInterface getStep(StepMeta stepMeta, StepDataInterface stepDataInterface, int cnr, TransMeta transMeta, Trans disp) {
        return new AppendStep(stepMeta, stepDataInterface, cnr, transMeta, disp);
    }

    @Override
    public void loadXML(Node stepnode, List<DatabaseMeta> databases, IMetaStore metaStore) throws KettleXMLException {
        try {
            setHeadName(XMLHandler.getNodeValue(XMLHandler.getSubNode(stepnode, "head_name")));
            setTailName(XMLHandler.getNodeValue(XMLHandler.getSubNode(stepnode, "tail_name")));
        } catch (Exception e) {
            throw new KettleXMLException("Demo plugin unable to read step info from XML node", e);
        }
    }

    @Override
    public StepDataInterface getStepData() {
        return new AppendStepData();
    }


    @Override
    public Object clone() {
        return super.clone();
    }

    @Override
    public void setDefault() {
        setHeadName("");
        setTailName("");
    }

    @Override
    public String getXML() {
        return XMLHandler.addTagValue("type", "Append") +
                XMLHandler.addTagValue("head_name", getHeadName()) +
                XMLHandler.addTagValue("tail_name", getTailName());
    }

}
  1. 解析前端传输的json格式参数代码

	public interface StepParse {


    /**
     * 是否可以处理,命名规则 step的type + StepParse
     */
    	default boolean match(String type) {
        	String simpleName = this.getClass().getSimpleName();
       	 // 去掉该类名的StepParse后缀
        	String substring = simpleName.substring(0, simpleName.length() - 9);
       	 // 首字母小写
        	return StringUtils.equals(type, Character.toLowerCase(substring.charAt(0)) + substring.substring(1));
    	}

    /**
     * 解析
     *
     * @param jsonObj      jsonObj
     * @param stepInfo     stepInfo
     * @param connectInfos connectInfos

     */
    	StepMeta parse(JSONObject jsonObj, KettleStepInfo stepInfo, List<ConnectInfo> connectInfos);
	}

	private JsonXmlDTO getXmlByJson(JSONObject jsonObject) {
        String transXml;
        JsonInfo jsonInfo;
        synchronized (this) {
            KettleCompUtils.kettleInit(UUID.randomUUID().toString());
            // 1,解析json数据
            jsonInfo = KettleExec.parse(jsonObject);
            if (Objects.isNull(jsonInfo)) {
                throw new BizException("json内容不可为空");
            }
            // 2,生成对应xml文档
            transXml = KettleExec.getTransXml(jsonInfo);
        }
        return JsonXmlDTO.builder().transXml(transXml).jsonInfo(jsonInfo).build();
    }
    
    public static JsonInfo parse(JSONObject jsonObject) {
        if (Objects.isNull(jsonObject)) {
            throw new BizException("请输入对应的json格式数据");
        }
        return JsonParse.parse(jsonObject);
    }
    
    static JsonInfo parse(JSONObject jsonObject) {
        List<StepMeta> stepMetas;
        List<JSONObject> delayParses = new ArrayList<>();
        List<ConnectInfo> connectInfos = parseConnectInfos(jsonObject);
        try {
            stepMetas = parseStepInfos(jsonObject, connectInfos, delayParses);
            KettleExec.setStepMetas(stepMetas);
            parseDelay(delayParses, stepMetas, connectInfos);
        } catch (Exception e) {
            e.printStackTrace();
            throw new BizException(e.getMessage());
        }
        return JsonInfo.builder()
                .stepInfos(stepMetas)
                .previewInfo(parsePreviewInfo(jsonObject))
                .connectInfos(connectInfos)
                .build();
    }
    
    private static List<StepMeta> parseStepInfos(JSONObject jsonObject, List<ConnectInfo> connectInfos,
                                                 List<JSONObject> delayParses) {
        JSONArray jsonArray = jsonObject.getJSONArray("stepInfos");
        if (CollectionUtils.isEmpty(jsonArray)) {
            throw new BizException("stepInfos信息不可为空");
        }
        List<StepMeta> stepMetas = new LinkedList<>();
        for (int i = 0; i < jsonArray.size(); i++) {
            doParse(connectInfos, delayParses, stepMetas, jsonArray.getJSONObject(i));
        }
        return stepMetas;
    }
    
    private static void doParse(List<ConnectInfo> connectInfos, List<JSONObject> delayParses,
                                List<StepMeta> stepMetas, JSONObject jsonObject) {
        if (Objects.isNull(jsonObject) || StringUtils.isBlank(jsonObject.toString())) {
            throw new BizException("stepInfos内的信息不可为空");
        }
        List<StepParse> stepParses = StepParseManager.getAllStepParse();
        if (CollectionUtils.isEmpty(stepParses)) {
            throw new BizException("StepParse不可为空");
        }
        String type = jsonObject.getString("type");
        if (StringUtils.isBlank(type)) {
            throw new BizException("步骤:"
                    + jsonObject.toString(SerializerFeature.PrettyFormat)
                    + "内的type信息不可为空");
        }
        boolean finished = false;
        for (StepParse stepPars : stepParses) {
            if (Objects.nonNull(delayParses) && needDelay(type)) {
                delayParses.add(jsonObject);
                finished = true;
                break;
            }
            if (StepParseManager.match(stepPars, type)) {
                stepMetas.add(StepParseManager.parse(stepPars, jsonObject, connectInfos));
                finished = true;
                break;
            }
        }
        if (!finished) {
            throw new BizException("该step对应的type为"
                    + type
                    + "无法处理,请查正或者联系开发人员新增");
        }
    }
    
    public static StepMeta parse(StepParse stepPars, JSONObject jsonObject, List<ConnectInfo> connectInfos) {
        JSONObject stepInfo = jsonObject.getJSONObject("stepInfo");
        if (Objects.isNull(stepInfo)) {
            throw new BizException("step内的stepInfo信息不可为空");
        }
        KettleStepInfo stepInfo1 = stepInfo.toJavaObject(KettleStepInfo.class);
        if (StringUtils.isBlank(stepInfo1.getStepName())) {
            throw new BizException("stepInfo内的stepName不可为空");
        }
        return stepPars.parse(jsonObject, stepInfo1, connectInfos);
    }

**个人原创,转载请标明来源 **

END

标签:return,String,stepInfo,java,jsonObject,kettle,new,操作,public
From: https://blog.csdn.net/weixin_40194030/article/details/139772431

相关文章

  • 【java】为什么高并发下数据写入不推荐关系数据库?
    一、问题解析说到高并发写,就不得不提及新分布式数据库HTAP,它实现了OLAP和OLTP的融合,可以同时提供数据分析挖掘和关系查询。事实上,HTAP的OLAP并不是大数据,或者说它并不是我们印象中每天拿几T的日志过来用于离线分析计算的那个大数据。这里更多的是指数据挖掘的最后一环,也就是......
  • HTML5期末考核大作业——学生网页设计作业源码HTML+CSS+JavaScript 中华美德6页面带音
    ......
  • 初学JavaScript之console 应用
    console对象在JavaScript中用于向开发者控制台输出信息,这对于调试和日志记录非常有用。以下是一些常用的console方法及其用法:1.console.log()最常用的方法,用于输出一般信息。console.log('Hello,World!');console.log('Theansweris',42);2.console.error()用......
  • [javascript] JS增强HTML媒体资源的音量
    pre有些页面声音总是太小,又不想调整系统音量,而video标签的volume属性最高只能调到1。于是在网上找到了一个方案:ref:https://atjiu.github.io/2021/05/10/video-above-1.0/ref:https://cwestblog.com/2017/08/17/html5-getting-more-volume-from-the-web-audio-api/codefunc......
  • eclipse maven打包报错: 致命错误: 在类路径或引导类路径中找不到程序包 java.lang的
    还是上来帖张图:1、系统之前是运行在mac上的,打包一切正常,但是现在在win11的eclipse打包就报错了。2、致命错误:在类路径或引导类路径中找不到程序包java.lang,上面的问题应该是找不到java中的jar中的class导致。解决:1)java,运行直接提示找不到命令。发现以管理员运行是......
  • tomcat9 启动时报错:java.lang.IllegalStateException: Malformed \uxxxx encoding的
    1、启动tomcat9springboot项目的时候,直接报下面的错误。2024-06-1809:38:20ApacheCommonsDaemonprocrunstdoutinitialized.09:38:35.597[main]ERRORorg.springframework.boot.SpringApplication-Applicationrunfailedjava.lang.IllegalStateException:Malf......
  • Java项目:springboot优咪商城(计算机毕业设计)
    作者主页:Java毕设网 简介:Java领域优质创作者、Java项目、学习资料、技术互助文末获取源码一、项目介绍优咪网上购物体验系统1.该平台主要有两大功能:(1)浏览平台官方和认证作者提供的篮球相关信息,信息类型包括:视频,新闻,评论类文章,比赛结果(2)篮球周边商城,商品分类球......
  • Javaweb实现简易记事簿 jdbc实现Java连接数据库
    //注册-[]获取register的数据,从表单传过来将(账户,密码,用户名)上面的数据写入数据库中,用jdbc(插入)加载数据库驱动,连接数据库,发送SQL加载数据库有可能失败保险起见抛一个异常返回判断,如果注册成功则提醒用户注册成功,并且跳转到登录页面进行登录。如果注册失败则提醒用户注册失败,......
  • java之sql注入代码审计
    java之sql注入代码审计前言其实找到漏洞原因很简单,主要想学习一下JDBCsql的过程JDBC简单介绍Java通过java.sql.DriverManager来管理所有数据库的驱动注册,所以如果想要建立数据库连接需要先在java.sql.DriverManager中注册对应的驱动类,然后调用getConnection方法才能连接上数......
  • vm17Pro17.5.1安装操作系统
    vm17Pro17.5.1安装操作系统前言一、windows1.DVD安装1.1[操作系统下载]1.2操作系统版本1.3[windows虚拟机创建]1.4操作系统安装1.4.1虚拟机设置1.4.2客户机启动1.4.3安装设置1.4.4磁盘设置1.4.5区域设置1.4.6键盘布局1.4.7账户设置1.4.8历史及隐私设置1.4......