由于实现程序实现kettle的自动化作业
- kettle =9.3
- java = 1.8.0_202
java 代码 总体思路
- 可以使用 ktr / kjb 的文件
- 使用springboot 解析调用 kettle
代码核模块
- kettle-core:kettle的核心模块,包括一些数据处理等。
- kettle-dbdialog:kettle数据库连接界面逻辑。
- kettle-engine:kettle的引擎,负责执行kettle的具体作业和转换的逻辑,并会调用core模块。
- kettle-ui-swt:用户界面模块,包括用户界面显示的xul文件,通过后端代码编写的Dialog以及国际化等。
- pdi-assemblies:该模块用于项目的生成,里面主要包括各个工具启动的脚本、静态资源、帮助文档、组件简单的事例(ktr/kjb)、第三方包引用等。
- pdi-engine-ext:kettle引擎扩展模块。
- pdi-plugins:kettle的核心插件模块,如果我们要自定义组件,可以参考该模块的组件。
- integrantion:集成测试模块。
数据定义
- ${user.home}/./kettle/repositories.xml
* 用户设置的所有资源库信息连接参数等
* spoon 启动后出现的选择资源库下拉列表中 - ${user.home}/./kettle/kettle.properties
* 转换或作业中需要的变量,
* spoon 启动后会自动加载该文件 - ${user.home}/./kettle/shared.xml
* 保存了共享对象,共享对象可以是。对象共享实质上就是将对象序列化的过程
* 类型: Database 、connectionsSteps、 SlaveserversPartition schemas、Cluster schemas
* spoon 启动时,会加载 shared.xml 文件中定义的所有对象
库文件配置信息
- lib\kettle-engine.jar\kettle-jobs.xml
- Spoon 启动时需要加载的作业项
- lib\kettle-engine.jar\kettle-partition-plugins.xml
- Spoon 启动时需要加载的分区插件
- lib\kettle-engine.jar\kettle-plugins.xml
* Spoon 启动时步骤和作业项插件的加载路径 - lib\kettle-engine.jar\kettle-steps.xml
- Spoon 启动时需要加载的转换步骤
连接资源库
点击查看代码
public static KettleDatabaseRepository repositoryConnect() throws KettleException {
if (!KettleEnvironment.isInitialized()) {
try {
KettleEnvironment.init();
} catch (Exception e) {
e.printStackTrace();
}
}
// 数据库元连接
// 参数介绍
// kettle资源库名称,数据库类型,连接类型,ip,数据库名称,端口,用户名,密码
DatabaseMeta databaseMeta = new DatabaseMeta("test-data","MYSQL","Native","localhost",
"test","3306","root","toor@1234");
//数据库形式的资源库元对象
KettleDatabaseRepositoryMeta repository = new KettleDatabaseRepositoryMeta();
//保存数据库连接至元对象种
repository.setConnection(databaseMeta);
//数据库形式的资源库对象
KettleDatabaseRepository databaseRepository = new KettleDatabaseRepository();
//用资源库元对象初始化资源库对象
databaseRepository.init(repository);
//连接到资源库 默认为 root root
databaseRepository.connect("admin", "admin");
//判断状态为false | true
if (databaseRepository.isConnected()) {
System.out.println("连接成功,数据库名称为:" + databaseRepository.getDatabaseMeta());
return databaseRepository;
} else {
System.out.println("连接失败,数据库名称为:" + databaseRepository.getDatabaseMeta());
return null;
}
}
设置转换信息
点击查看代码
public TransMeta buildTransMeta(String metaName, String... transXML) throws KettleXMLException {
TransMeta transMeta = new TransMeta();
// 设置转化元的名称
transMeta.setName(metaName);
// 添加转换的数据库连接
transMeta.addDatabase(getDatabaseMeta());
return transMeta;
}
点击查看代码
public StepMeta setTableInputStep(TransMeta transMeta, PluginRegistry registry, String sourceDbName, String sql, String stepName) {
// 创建表输入
TableInputMeta tableInputMeta = new TableInputMeta();
String pluginId = registry.getPluginId(StepPluginType.class, tableInputMeta);
// 指定数据源数据库配置名
DatabaseMeta source = transMeta.findDatabase(sourceDbName);
tableInputMeta.setDatabaseMeta(source);
tableInputMeta.setSQL(sql);
// 将表输入添加到转换中
StepMeta stepMeta = new StepMeta(pluginId, stepName, tableInputMeta);
// 给步骤添加在spoon工具中的显示位置
stepMeta.setDraw(true);
stepMeta.setLocation(100, 100);
// 将表输入添加到步骤中
transMeta.addStep(stepMeta);
return stepMeta;
}
点击查看代码
public StepMeta setTableOutput(TransMeta transMeta, PluginRegistry registry, String targetDbName, String targetTableName, String stepName) {
// 创建表输出
TableOutputMeta tableOutputMeta = new TableOutputMeta();
String pluginId = registry.getPluginId(StepPluginType.class, tableOutputMeta);
// 配置表输出的目标数据库配置名
DatabaseMeta targetDb = transMeta.findDatabase(targetDbName);
tableOutputMeta.setDatabaseMeta(targetDb);
tableOutputMeta.setTableName(targetTableName);
// 将表输出添加到转换中
StepMeta stepMeta = new StepMeta(pluginId, stepName, tableOutputMeta);
transMeta.addStep(stepMeta);
return stepMeta;
}
关联单元
点击查看代码
public StepMeta setInsertUpdateMeta(TransMeta transMeta, PluginRegistry registry, String targetDbName, String targetTableName, String[] updatelookup, String[] updateStream, String[] updateStream2, String[] conditions, Boolean[] updateOrNot, String stepName) {
// 创建插入与更新
InsertUpdateMeta insertUpdateMeta = new InsertUpdateMeta();
String pluginId = registry.getPluginId(StepPluginType.class, insertUpdateMeta);
// 配置目标数据库配置名
DatabaseMeta database_target = transMeta.findDatabase(targetDbName);
insertUpdateMeta.setDatabaseMeta(database_target);
// 设置目标表名
insertUpdateMeta.setTableName(targetTableName);
// 设置用来查询的关键字
insertUpdateMeta.setKeyLookup(updatelookup);
insertUpdateMeta.setKeyStream(updateStream);
insertUpdateMeta.setKeyStream2(updateStream2);// 这一步不能省略
insertUpdateMeta.setKeyCondition(conditions);
// 设置要更新的字段
insertUpdateMeta.setUpdateLookup(updatelookup);
insertUpdateMeta.setUpdateStream(updateStream);
insertUpdateMeta.setUpdate(updateOrNot);
// 添加步骤到转换中
StepMeta stepMeta = new StepMeta(pluginId, stepName, insertUpdateMeta);
stepMeta.setDraw(true);
stepMeta.setLocation(250, 100);
transMeta.addStep(stepMeta);
return stepMeta;
}
点击查看代码
public void addTransHop(TransMeta transMeta, StepMeta from, StepMeta to) {
transMeta.addTransHop(new TransHopMeta(from, to));
}
执行转换
点击查看代码
public void executeTrans(TransMeta transMeta, String targetDbName) {
try {
KettleDatabaseRepository repository = RepositoryCon();
transMeta.setRepository(repository);
Trans trans = new Trans(transMeta);
trans.execute(new String[]{"start..."});
trans.waitUntilFinished();
// 关闭数据库连接
if (trans.getErrors() > 0) {
throw new RuntimeException("There were errors during transformation execution.");
}
} catch (KettleException e) {
e.printStackTrace();
}
}