首页 > 编程语言 >Java调用Kettle

Java调用Kettle

时间:2022-10-13 19:33:59浏览次数:47  
标签:Kettle Java kettle job 调用 result org import jobMeta

Java 调用 kettle,难的不是怎么调用,而是解决 maven 依赖冲突问题,
直接将 kettle 依赖,添加到我们的 maven 工程,可能会导致代码大范围报错;
解决方案也很简单,就是直接从 spoon 的 lib 目录下,复制我们所需的 jar 包,按需导入我们的工程。

主要用到的jar包如下,这些足以调起 kettle 脚本,剩下的还有 ftp、http 等服务,用啥加啥。

比如:下面没有 ftp 的依赖,如果 kettle 脚本是处理 ftp 的,那就会报错,
这时候就要根据报错信息,去 kettle 工程的pom.xml文件中,把 ftp 相关的依赖找出来。(去lib目录找也一样)

 

 

 这里就不提供工具包了,换成 apache 的 commons,照着意思改一改。

import cn.seaboot.commons.core.Converter;
import cn.seaboot.commons.exception.SystemError;
import cn.seaboot.commons.file.FileUtils;
import cn.seaboot.commons.file.IOUtils;
import cn.seaboot.commons.file.PropertiesUtils;
import org.pentaho.di.core.KettleEnvironment;
import org.pentaho.di.core.database.DatabaseMeta;
import org.pentaho.di.core.exception.KettleException;
import org.pentaho.di.core.logging.ChannelLogTable;
import org.pentaho.di.core.logging.JobEntryLogTable;
import org.pentaho.di.core.logging.JobLogTable;
import org.pentaho.di.job.Job;
import org.pentaho.di.job.JobMeta;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

/**
 * Kettle桥接器,通过这个类,调用使用Kettle提供的jar包运行脚本
 * <p>
 * KettleBridge kettleBridge = new KettleBridge();
 * kettleBridge.init();
 * kettleBridge.execute(filepath, null);
 *
 * @author Mr.css
 * @version 2022-03-28 14:44
 */
@Service
public class KettleBridge {
    private Logger logger = LoggerFactory.getLogger(KettleBridge.class);

    /**
     * Kettle 环境初始化
     */
    @PostConstruct
    public void init() {
        try {
            KettleEnvironment.init();
            logger.debug("【Configuration】Kettle environment init succeed!");
        } catch (KettleException e) {
            throw new SystemError("【Configuration】Kettle Environment build failed!", e);
        }
    }

    /**
     * 设置输出日志
     *
     * @param jobMeta -
     */
    private void initDatabase(JobMeta jobMeta) {
        DatabaseMeta databaseMeta = new DatabaseMeta();
        databaseMeta.setName("med");
        databaseMeta.setDatabaseType("MySQL");
        databaseMeta.setAccessType(DatabaseMeta.TYPE_ACCESS_NATIVE);
        databaseMeta.setHostname("localhost");
        databaseMeta.setDBName("med");
        databaseMeta.setDBPort("3306");
        databaseMeta.setUsername("root");
        databaseMeta.setPassword("root");
        jobMeta.addDatabase(databaseMeta);

        // databaseMeta.name 对应于下面的 connectionName

        // 任务日志
        JobLogTable jobLogTable = JobLogTable.getDefault(jobMeta, jobMeta);
        jobLogTable.setConnectionName("med");
        jobLogTable.setSchemaName("med");
        jobLogTable.setTableName("t_kettle_job_log");
        jobMeta.setJobLogTable(jobLogTable);

        // 任务节点日志
        JobEntryLogTable jobEntryLogTable = JobEntryLogTable.getDefault(jobMeta, jobMeta);
        jobEntryLogTable.setConnectionName("med");
        jobEntryLogTable.setSchemaName("med");
        jobEntryLogTable.setTableName("t_kettle_item_log");
        jobMeta.setJobEntryLogTable(jobEntryLogTable);

        // 任通道日志
        ChannelLogTable channelLogTable = ChannelLogTable.getDefault(jobMeta, jobMeta);
        channelLogTable.setConnectionName("med");
        channelLogTable.setSchemaName("med");
        channelLogTable.setTableName("t_kettle_channel_log");
        jobMeta.setChannelLogTable(channelLogTable);
    }

    /**
     * Kettle 环境销毁
     */
    @PreDestroy
    public void shutdown() {
        KettleEnvironment.shutdown();
    }

    /**
     * 执行kettle脚本,Kettle以文件作为脚本的最小单位,提供脚本所在的绝对路即可
     *
     * @param path   脚本路径
     * @param params 脚本运行所需的参数变量
     * @return 执行结果
     * @throws KettleException run kettle cause any exception
     * @throws IOException     can not read kettle.properties
     */
    public JobResult execute(String path, Map<String, Object> params) throws KettleException, IOException {
        // 初始化job路径
        JobMeta jobMeta = new JobMeta(path, null);
        Job job = new Job(null, jobMeta);

        // 设置环境变量
        Map<String, String> en = this.loadVariable(params);
        for (Map.Entry<String, String> entry : en.entrySet()) {
            job.setVariable(entry.getKey(), entry.getValue());
        }

        // 日志设置
        this.initDatabase(jobMeta);

        // 启动等待直到结束
        job.start();
        job.waitUntilFinished();

        // 为了避免出现意外的编程,通过对象打包执行结果,不返回Job对象
        JobResult result = new JobResult();
        result.setBatchId(job.getBatchId());
        result.setErrors(job.getErrors());
        result.setPassedBatchId(job.getPassedBatchId());
        result.setStatus(job.getStatus());
        result.setParams(en);
        return result;
    }

    /**
     * 载入环境变量
     * 这里会损耗一部分性能,主要为了方便整理代码
     *
     * @param params 用户指定的变量
     * @return 最终使用的变量
     * @throws IOException 读取配置异常
     */
    private Map<String, String> loadVariable(Map<String, Object> params) throws IOException {
        Map<String, String> result = new HashMap<>();

        // 用户文件夹下的kettle.properties
        Properties properties = this.readKettleProperties();
        if (properties != null) {
            for (Map.Entry<Object, Object> entry : properties.entrySet()) {
                result.put(Converter.toString(entry.getKey()), Converter.toString(entry.getValue()));
            }
        }

        // 用户指定的环境变量
        if (params != null) {
            for (Map.Entry<String, Object> entry : params.entrySet()) {
                result.put(entry.getKey(), Converter.toString(entry.getValue()));
            }
        }
        return result;
    }

    /**
     * 获取kettle配置参数,读取用户文件夹下的kettle.properties
     *
     * @return 参数
     * @throws IOException can not read kettle.properties
     */
    private Properties readKettleProperties() throws IOException {
        String home = FileUtils.getUserDirectoryPath();
        if (home != null) {
            File file = new File(home, ".kettle/kettle.properties");
            if (file.exists()) {
                try (InputStream is = IOUtils.openFileInputStream(file)) {
                    return PropertiesUtils.load(is);
                }
            }
        } else {
            logger.debug("Can not found user.home, had batter find out why!");
        }
        return null;
    }
}

 

标签:Kettle,Java,kettle,job,调用,result,org,import,jobMeta
From: https://www.cnblogs.com/chenss15060100790/p/16789378.html

相关文章

  • java并发之synchronized
    java实现同步互斥访问有两种方式,synchronized和Lock。Sychronized是java实现的内置锁,由jvm实现。通过编译Synchronized代码块为字节码可以发现,加锁逻辑被翻译为monitorent......
  • 安装和调用opencv清楚又明白,只看这一篇就够了!
    现在很多人都要学习opencv,但是网上安装教程五花八门,有的还需要安装VS(VisualStudio),搞得初学者一头雾水,因此这篇文章介绍最简单便捷的对opencv的安装,以及如何调用。笔者这......
  • JAVA 多线程
    JVM:1.虚拟机栈和程序计数器每个线程会单独生成2.方法区和堆是多线程共享的 多线程优点: 1、提高计算机系统CPU的利用率2、将既长又复杂的进程分为多个线程,独立运......
  • Java在PDF文档中添加或删除页面
    前言当你编辑一个PDF文档时,有时需要删除文档中多余的页面或向文档中添加新的页面。本文将向您演示如何使用Spire.PDFforJava在PDF文档中添加或删除页面。 程序环境......
  • Java并发编程学习5-对象的组合
    对象的组合前面的博文,我们已经了解了关于线程安全和同步的一些基础知识。本篇博文将介绍一些线程安全的组合模式,来帮助我们确保使用这些模式开发的程序是线程安全的。1.......
  • 入门学习Java必须明确的几点
    在现在对于任何人来说,学一个东西从入门到精通都是需要有一个过程的,我们才能慢慢的掌握。同样学Java也是如此,学到精通必然会得到一份高薪的工作。所以对于任何一个零基础小白......
  • java根据模板excel导出pdf和excel (easypoi)示例
    /***下载带模板的excel*@paramresponse*@parammap数据mapkey需与模板中对应*@paramtemplateUrl模板excel路径*@param......
  • java并发之volatile
    java并发围绕原子性、可见性和有序性展开。volatile可以保证可见性。在说volatile前,需要了解几个概念。1、JMM(javamemorymodel)JMM是个抽象的概念,他是java对底层操作系......
  • 《大话设计模式 Java溢彩加强版》相关主题
    《大话设计模式Java溢彩加强版》读者须知     《大话设计模式Java溢彩加强版》在2022年10月在各大网上书店中有售!源代码与课件下载 《大话设计模式Java溢彩......
  • SuyaUi接口调用示例
    向保管箱投放物品insertinto__Suya_Ui_Center_Tab_Box_Datavalues(@CharID,@Type,@Name,@Code,@Count,@From,@Reason)@CharID必填@CharID@Type......