Java 调用 kettle,难的不是怎么调用,而是解决 maven 依赖冲突问题,
直接将 kettle 依赖,添加到我们的 maven 工程,可能会导致代码大范围报错;
解决方案也很简单,就是直接从 spoon 的 lib 目录下,复制我们所需的 jar 包,按需导入我们的工程。
主要用到的jar包如下,这些足以调起 kettle 脚本,剩下的还有 ftp、http 等服务,用啥加啥。
比如:下面没有 ftp 的依赖,如果 kettle 脚本是处理 ftp 的,那就会报错,
这时候就要根据报错信息,去 kettle 工程的pom.xml文件中,把 ftp 相关的依赖找出来。(去lib目录找也一样)
这里就不提供工具包了,换成 apache 的 commons,照着意思改一改。
import cn.seaboot.commons.core.Converter; import cn.seaboot.commons.exception.SystemError; import cn.seaboot.commons.file.FileUtils; import cn.seaboot.commons.file.IOUtils; import cn.seaboot.commons.file.PropertiesUtils; import org.pentaho.di.core.KettleEnvironment; import org.pentaho.di.core.database.DatabaseMeta; import org.pentaho.di.core.exception.KettleException; import org.pentaho.di.core.logging.ChannelLogTable; import org.pentaho.di.core.logging.JobEntryLogTable; import org.pentaho.di.core.logging.JobLogTable; import org.pentaho.di.job.Job; import org.pentaho.di.job.JobMeta; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Service; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import java.io.File; import java.io.IOException; import java.io.InputStream; import java.util.HashMap; import java.util.Map; import java.util.Properties; /** * Kettle桥接器,通过这个类,调用使用Kettle提供的jar包运行脚本 * <p> * KettleBridge kettleBridge = new KettleBridge(); * kettleBridge.init(); * kettleBridge.execute(filepath, null); * * @author Mr.css * @version 2022-03-28 14:44 */ @Service public class KettleBridge { private Logger logger = LoggerFactory.getLogger(KettleBridge.class); /** * Kettle 环境初始化 */ @PostConstruct public void init() { try { KettleEnvironment.init(); logger.debug("【Configuration】Kettle environment init succeed!"); } catch (KettleException e) { throw new SystemError("【Configuration】Kettle Environment build failed!", e); } } /** * 设置输出日志 * * @param jobMeta - */ private void initDatabase(JobMeta jobMeta) { DatabaseMeta databaseMeta = new DatabaseMeta(); databaseMeta.setName("med"); databaseMeta.setDatabaseType("MySQL"); databaseMeta.setAccessType(DatabaseMeta.TYPE_ACCESS_NATIVE); databaseMeta.setHostname("localhost"); databaseMeta.setDBName("med"); databaseMeta.setDBPort("3306"); databaseMeta.setUsername("root"); databaseMeta.setPassword("root"); jobMeta.addDatabase(databaseMeta); // databaseMeta.name 对应于下面的 connectionName // 任务日志 JobLogTable jobLogTable = JobLogTable.getDefault(jobMeta, jobMeta); jobLogTable.setConnectionName("med"); jobLogTable.setSchemaName("med"); jobLogTable.setTableName("t_kettle_job_log"); jobMeta.setJobLogTable(jobLogTable); // 任务节点日志 JobEntryLogTable jobEntryLogTable = JobEntryLogTable.getDefault(jobMeta, jobMeta); jobEntryLogTable.setConnectionName("med"); jobEntryLogTable.setSchemaName("med"); jobEntryLogTable.setTableName("t_kettle_item_log"); jobMeta.setJobEntryLogTable(jobEntryLogTable); // 任通道日志 ChannelLogTable channelLogTable = ChannelLogTable.getDefault(jobMeta, jobMeta); channelLogTable.setConnectionName("med"); channelLogTable.setSchemaName("med"); channelLogTable.setTableName("t_kettle_channel_log"); jobMeta.setChannelLogTable(channelLogTable); } /** * Kettle 环境销毁 */ @PreDestroy public void shutdown() { KettleEnvironment.shutdown(); } /** * 执行kettle脚本,Kettle以文件作为脚本的最小单位,提供脚本所在的绝对路即可 * * @param path 脚本路径 * @param params 脚本运行所需的参数变量 * @return 执行结果 * @throws KettleException run kettle cause any exception * @throws IOException can not read kettle.properties */ public JobResult execute(String path, Map<String, Object> params) throws KettleException, IOException { // 初始化job路径 JobMeta jobMeta = new JobMeta(path, null); Job job = new Job(null, jobMeta); // 设置环境变量 Map<String, String> en = this.loadVariable(params); for (Map.Entry<String, String> entry : en.entrySet()) { job.setVariable(entry.getKey(), entry.getValue()); } // 日志设置 this.initDatabase(jobMeta); // 启动等待直到结束 job.start(); job.waitUntilFinished(); // 为了避免出现意外的编程,通过对象打包执行结果,不返回Job对象 JobResult result = new JobResult(); result.setBatchId(job.getBatchId()); result.setErrors(job.getErrors()); result.setPassedBatchId(job.getPassedBatchId()); result.setStatus(job.getStatus()); result.setParams(en); return result; } /** * 载入环境变量 * 这里会损耗一部分性能,主要为了方便整理代码 * * @param params 用户指定的变量 * @return 最终使用的变量 * @throws IOException 读取配置异常 */ private Map<String, String> loadVariable(Map<String, Object> params) throws IOException { Map<String, String> result = new HashMap<>(); // 用户文件夹下的kettle.properties Properties properties = this.readKettleProperties(); if (properties != null) { for (Map.Entry<Object, Object> entry : properties.entrySet()) { result.put(Converter.toString(entry.getKey()), Converter.toString(entry.getValue())); } } // 用户指定的环境变量 if (params != null) { for (Map.Entry<String, Object> entry : params.entrySet()) { result.put(entry.getKey(), Converter.toString(entry.getValue())); } } return result; } /** * 获取kettle配置参数,读取用户文件夹下的kettle.properties * * @return 参数 * @throws IOException can not read kettle.properties */ private Properties readKettleProperties() throws IOException { String home = FileUtils.getUserDirectoryPath(); if (home != null) { File file = new File(home, ".kettle/kettle.properties"); if (file.exists()) { try (InputStream is = IOUtils.openFileInputStream(file)) { return PropertiesUtils.load(is); } } } else { logger.debug("Can not found user.home, had batter find out why!"); } return null; } }
标签:Kettle,Java,kettle,job,调用,result,org,import,jobMeta From: https://www.cnblogs.com/chenss15060100790/p/16789378.html