使用程序调用 kettle
pom.xml
点击查看代码
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>fucking-great-kettle</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>fucking-great-kettle</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<spring-boot.version>2.2.6.RELEASE</spring-boot.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.pentaho</groupId>
<artifactId>kettle-core</artifactId>
<version>9.3.0.0-428</version>
<scope>system</scope>
<systemPath>${project.basedir}/src/main/resources/lib/kettle-core-9.3.0.0-428.jar</systemPath>
</dependency>
<dependency>
<groupId>org.pentaho</groupId>
<artifactId>kettle-engine</artifactId>
<version>9.3.0.0-428</version>
<scope>system</scope>
<systemPath>${project.basedir}/src/main/resources/lib/kettle-engine-9.3.0.0-428.jar</systemPath>
</dependency>
<dependency>
<groupId>org.pentaho</groupId>
<artifactId>metastore</artifactId>
<version>9.3.0.0-428</version>
<scope>system</scope>
<systemPath>${project.basedir}/src/main/resources/lib/metastore-9.3.0.0-428.jar</systemPath>
</dependency>
<dependency>
<groupId>org.pentaho</groupId>
<artifactId>pentaho-encryption-support</artifactId>
<version>9.3.0.0-428</version>
<scope>system</scope>
<systemPath>${project.basedir}/src/main/resources/lib/pentaho-encryption-support-9.3.0.0-428.jar
</systemPath>
</dependency>
<dependency>
<groupId>org.pentaho</groupId>
<artifactId>kettle-dbdialog</artifactId>
<version>9.3.0.0-428</version>
<scope>system</scope>
<systemPath>${project.basedir}/src/main/resources/lib/kettle-dbdialog-9.3.0.0-428.jar</systemPath>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.commons/commons-vfs2 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-vfs2</artifactId>
<version>2.7.0</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
<version>1.20</version>
</dependency>
<dependency>
<groupId>commons-net</groupId>
<artifactId>commons-net</artifactId>
<version>1.4.1</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.9</version>
</dependency>
<dependency>
<groupId>com.jcraft</groupId>
<artifactId>jsch</artifactId>
<version>0.1.54</version>
</dependency>
<!-- <dependency>-->
<!-- <groupId>org.apache.jackrabbit</groupId>-->
<!-- <artifactId>jackrabbit-jcr2dav</artifactId>-->
<!-- <version>2.4.1</version>-->
<!-- </dependency>-->
<!-- https://mvnrepository.com/artifact/com.google.guava/guava -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>30.0-android</version>
</dependency>
<!-- https://mvnrepository.com/artifact/commons-io/commons-io -->
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.7</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.commons/commons-lang3 -->
<!--dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.10</version>
</dependency-->
<!-- https://mvnrepository.com/artifact/commons-lang/commons-lang -->
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
<version>2.6</version>
</dependency>
<!-- https://mvnrepository.com/artifact/commons-codec/commons-codec -->
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>1.14</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.12</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.20</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
<version>RELEASE</version>
<scope>test</scope>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${spring-boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.pentaho/pentaho-parent-pom -->
<dependency>
<groupId>org.pentaho</groupId>
<artifactId>pentaho-parent-pom</artifactId>
<version>8.1.0.0-365</version>
<type>pom</type>
</dependency>
<!-- connector -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.49</version>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>2.2.6.RELEASE</version>
</plugin>
</plugins>
</build>
</project>
整个代码
点击查看代码
package com.example.fg.kettle;
import lombok.extern.slf4j.Slf4j;
import org.pentaho.di.core.KettleEnvironment;
import org.pentaho.di.core.database.DatabaseMeta;
import org.pentaho.di.core.exception.KettleException;
import org.pentaho.di.core.exception.KettleXMLException;
import org.pentaho.di.core.plugins.PluginRegistry;
import org.pentaho.di.core.plugins.StepPluginType;
import org.pentaho.di.repository.kdr.KettleDatabaseRepository;
import org.pentaho.di.repository.kdr.KettleDatabaseRepositoryMeta;
import org.pentaho.di.trans.Trans;
import org.pentaho.di.trans.TransHopMeta;
import org.pentaho.di.trans.TransMeta;
import org.pentaho.di.trans.step.StepMeta;
import org.pentaho.di.trans.steps.insertupdate.InsertUpdateMeta;
import org.pentaho.di.trans.steps.tableinput.TableInputMeta;
import org.pentaho.di.trans.steps.tableoutput.TableOutputMeta;
import javax.servlet.http.HttpServletRequest;
import java.io.*;
@Slf4j
public class KettleClient {
public static void main(String[] args) {
try {
KettleClient client = new KettleClient();
client.initKettleEnvironment(null);
TransMeta meta = client.buildTransMeta("kettle");
PluginRegistry registry = client.getRegistry();
StepMeta step1 = client.setTableInputStep(meta, registry,
"test-data", "select id,name from stu1", "table input");
StepMeta step2 = client.setTableOutput(meta, registry,
"test-data", "stu2", "table insert");
client.addTransHop(meta, step1, step2);
client.executeTrans(meta, "test");
} catch (KettleException e) {
e.printStackTrace();
}
}
/**
* 初始化环境
*/
public void initKettleEnvironment(HttpServletRequest request) throws KettleException {
if (KettleEnvironment.isInitialized()) {
return;
}
if (request == null) {
// 运行环境初始化
KettleEnvironment.init();
} else {
String userDir = System.getProperty("user.dir");
String kettleHome = request.getSession().getServletContext().getRealPath(File.separator + "WEB-INF");
// 设置用户路径和系统环境,包括用户路径和主目录
System.setProperty("user.dir", kettleHome);
System.setProperty("KETTLE_HOME", kettleHome);
// 运行环境初始化
KettleEnvironment.init();
// 避免造成影响其他程序的运行,还原用户路径
System.setProperty("user.dir", userDir);
}
}
/**
* 创建转化元
*/
public TransMeta buildTransMeta(String metaName, String... transXML) throws KettleXMLException {
TransMeta transMeta = new TransMeta();
// 设置转化元的名称
transMeta.setName(metaName);
// 添加转换的数据库连接
transMeta.addDatabase(getDatabaseMeta());
return transMeta;
}
/**
* 设置表输入步骤
*/
public StepMeta setTableInputStep(TransMeta transMeta, PluginRegistry registry, String sourceDbName, String sql, String stepName) {
// 创建表输入
TableInputMeta tableInputMeta = new TableInputMeta();
String pluginId = registry.getPluginId(StepPluginType.class, tableInputMeta);
// 指定数据源数据库配置名
DatabaseMeta source = transMeta.findDatabase(sourceDbName);
tableInputMeta.setDatabaseMeta(source);
tableInputMeta.setSQL(sql);
// 将表输入添加到转换中
StepMeta stepMeta = new StepMeta(pluginId, stepName, tableInputMeta);
// 给步骤添加在spoon工具中的显示位置
stepMeta.setDraw(true);
stepMeta.setLocation(100, 100);
// 将表输入添加到步骤中
transMeta.addStep(stepMeta);
return stepMeta;
}
/**
* 设置表输出步骤,用于整表抽取
*/
public StepMeta setTableOutput(TransMeta transMeta, PluginRegistry registry, String targetDbName, String targetTableName, String stepName) {
// 创建表输出
TableOutputMeta tableOutputMeta = new TableOutputMeta();
String pluginId = registry.getPluginId(StepPluginType.class, tableOutputMeta);
// 配置表输出的目标数据库配置名
DatabaseMeta targetDb = transMeta.findDatabase(targetDbName);
tableOutputMeta.setDatabaseMeta(targetDb);
tableOutputMeta.setTableName(targetTableName);
// 将表输出添加到转换中
StepMeta stepMeta = new StepMeta(pluginId, stepName, tableOutputMeta);
transMeta.addStep(stepMeta);
return stepMeta;
}
/**
* 设置表插入与更新步骤,用于表中部分字段更新
*/
public StepMeta setInsertUpdateMeta(TransMeta transMeta, PluginRegistry registry, String targetDbName, String targetTableName, String[] updatelookup, String[] updateStream, String[] updateStream2, String[] conditions, Boolean[] updateOrNot, String stepName) {
// 创建插入与更新
InsertUpdateMeta insertUpdateMeta = new InsertUpdateMeta();
String pluginId = registry.getPluginId(StepPluginType.class, insertUpdateMeta);
// 配置目标数据库配置名
DatabaseMeta database_target = transMeta.findDatabase(targetDbName);
insertUpdateMeta.setDatabaseMeta(database_target);
// 设置目标表名
insertUpdateMeta.setTableName(targetTableName);
// 设置用来查询的关键字
insertUpdateMeta.setKeyLookup(updatelookup);
insertUpdateMeta.setKeyStream(updateStream);
insertUpdateMeta.setKeyStream2(updateStream2);// 这一步不能省略
insertUpdateMeta.setKeyCondition(conditions);
// 设置要更新的字段
insertUpdateMeta.setUpdateLookup(updatelookup);
insertUpdateMeta.setUpdateStream(updateStream);
insertUpdateMeta.setUpdate(updateOrNot);
// 添加步骤到转换中
StepMeta stepMeta = new StepMeta(pluginId, stepName, insertUpdateMeta);
stepMeta.setDraw(true);
stepMeta.setLocation(250, 100);
transMeta.addStep(stepMeta);
return stepMeta;
}
/**
* 用于将表输入步骤与第二步骤绑定 绑定关联步骤
*/
public void addTransHop(TransMeta transMeta, StepMeta from, StepMeta to) {
transMeta.addTransHop(new TransHopMeta(from, to));
}
/**
* 执行抽取
*/
@SuppressWarnings("resource")
public void executeTrans(TransMeta transMeta, String targetDbName) {
try {
KettleDatabaseRepository repository = RepositoryCon();
transMeta.setRepository(repository);
Trans trans = new Trans(transMeta);
trans.execute(new String[]{"start..."});
trans.waitUntilFinished();
// 关闭数据库连接
if (trans.getErrors() > 0) {
throw new RuntimeException("There were errors during transformation execution.");
}
} catch (KettleException e) {
e.printStackTrace();
}
}
private static DatabaseMeta getDatabaseMeta() {
DatabaseMeta databaseMeta = new DatabaseMeta("test-data", "MYSQL", "Native", "localhost",
"test", "3306", "root", "toor@1234");
// 关闭mysql推荐SSL连接提示
databaseMeta.addExtraOption("MYSQL", "useSSL", "false");
return databaseMeta;
}
/**
* * 连接到资源库
*/
private static KettleDatabaseRepository RepositoryCon() throws KettleException {
// 初始化环境
if (!KettleEnvironment.isInitialized()) {
try {
KettleEnvironment.init();
} catch (KettleException e) {
e.printStackTrace();
}
}
// 数据库形式的资源库元对象
KettleDatabaseRepositoryMeta kettleDatabaseRepositoryMeta = new KettleDatabaseRepositoryMeta();
kettleDatabaseRepositoryMeta.setConnection(getDatabaseMeta());
// 数据库形式的资源库对象
KettleDatabaseRepository kettleDatabaseRepository = new KettleDatabaseRepository();
// 用资源库元对象初始化资源库对象
kettleDatabaseRepository.init(kettleDatabaseRepositoryMeta);
// 连接到资源库 , 默认的连接资源库的用户名和密码
kettleDatabaseRepository.connect("admin", "admin");
if (kettleDatabaseRepository.isConnected()) {
System.out.println("连接成功");
return kettleDatabaseRepository;
} else {
System.out.println("连接失败");
return null;
}
}
public PluginRegistry getRegistry() {
// 插件注册,用于注册转换中需要用到的插件
return PluginRegistry.getInstance();
}
}