package com.example.fg.kettle;
import org.pentaho.di.core.KettleEnvironment;
import org.pentaho.di.core.ObjectLocationSpecificationMethod;
import org.pentaho.di.core.database.DatabaseMeta;
import org.pentaho.di.core.exception.KettleException;
import org.pentaho.di.core.logging.KettleLogStore;
import org.pentaho.di.job.Job;
import org.pentaho.di.job.JobHopMeta;
import org.pentaho.di.job.JobMeta;
import org.pentaho.di.job.entries.special.JobEntrySpecial;
import org.pentaho.di.job.entries.success.JobEntrySuccess;
import org.pentaho.di.job.entries.trans.JobEntryTrans;
import org.pentaho.di.job.entry.JobEntryCopy;
import org.pentaho.di.repository.ObjectId;
import org.pentaho.di.repository.RepositoryDirectoryInterface;
import org.pentaho.di.repository.kdr.KettleDatabaseRepository;
import org.pentaho.di.repository.kdr.KettleDatabaseRepositoryMeta;
import org.pentaho.di.trans.TransHopMeta;
import org.pentaho.di.trans.TransMeta;
import org.pentaho.di.trans.step.StepIOMetaInterface;
import org.pentaho.di.trans.step.StepMeta;
import org.pentaho.di.trans.steps.insertupdate.InsertUpdateMeta;
import org.pentaho.di.trans.steps.mergerows.MergeRows;
import org.pentaho.di.trans.steps.mergerows.MergeRowsMeta;
import org.pentaho.di.trans.steps.synchronizeaftermerge.SynchronizeAfterMerge;
import org.pentaho.di.trans.steps.synchronizeaftermerge.SynchronizeAfterMergeMeta;
import org.pentaho.di.trans.steps.tableinput.TableInputMeta;
/**
* @author cgm
* @date 2019年7月8日
*/
public class KettleJobSynchronizeAfterMerge {
public static void main(String[] args) throws Exception {
runSyncJob();
}
public static void runSyncJob() throws KettleException {
String transName = "cgmTransName";
String repositoryName = "Repository";
String jobName = "cgmTransName";
String sourceTableName = "xy_social_security";
String targetTableName = "xy_social_security_qfzx_test";
String targetDatabase = "test";
KettleDatabaseRepository kettleDatabaseRepository = RepositoryCon(repositoryName);
RepositoryDirectoryInterface dir = kettleDatabaseRepository.loadRepositoryDirectoryTree().findDirectory("/");
ObjectId objectId = kettleDatabaseRepository.getTransformationID(transName, dir);
TransMeta transMeta;
if (objectId == null) {
transMeta = new TransMeta();
transMeta.setName(transName);
DatabaseMeta sourceDatabaseMeta = new DatabaseMeta("fromDbName", "mysql", "Native(JDBC)", "127.0.0.1", "test?useSSL=false", "3306", "root", "toor@1234");
transMeta.addDatabase(sourceDatabaseMeta);
DatabaseMeta targetDatabaseMeta = new DatabaseMeta("toDbName", "mysql", "Native(JDBC)", "127.0.0.1", "test?useSSL=false", "3306", "root", "toor@1234");
transMeta.addDatabase(targetDatabaseMeta);
TableInputMeta sourceTableInputMeta = new TableInputMeta();
sourceTableInputMeta.setDatabaseMeta(transMeta.findDatabase("fromDbName"));
sourceTableInputMeta.setSQL("SELECT * FROM xy_social_security order by id");
StepMeta sourceStep = new StepMeta("源数据", sourceTableInputMeta);
transMeta.addStep(sourceStep);
TableInputMeta targetTableInputMeta = new TableInputMeta();
targetTableInputMeta.setDatabaseMeta(transMeta.findDatabase("fromDbName"));
targetTableInputMeta.setSQL("SELECT * from xy_social_security_qfzx_test order by id");
StepMeta targetStep = new StepMeta("源数据", targetTableInputMeta);
transMeta.addStep(targetStep);
MergeRowsMeta mergeRowsMeta = new MergeRowsMeta();
StepIOMetaInterface stepIOMeta = mergeRowsMeta.getStepIOMeta();
stepIOMeta.getInfoStreams().get(0).setStepMeta(sourceStep);
stepIOMeta.getInfoStreams().get(1).setStepMeta(targetStep);
mergeRowsMeta.setFlagField("bz");
mergeRowsMeta.setKeyFields(new String[]{"id"});
String[] columns = new String[]{"id", "eid", "name", "report_year", "report_name", "report_date", "bq_shengyubx_je", "dw_je_display", "dw_yanglaobx_je", "bq_shiyebx_je", "dw_yiliaobx_je", "shiyebx_num", "dw_yanglaobx_js", "dw_gongshangbx_js", "yiliaobx_num", "dw_shengyubx_je", "dw_js_display", "dw_shengyubx_js", "bq_yiliaobx_je", "bq_gongshangbx_je", "dw_gongshangbx_je", "shengyubx_num", "dw_shiyebx_js", "dw_shiyebx_je", "bq_je_display", "gongshangbx_num", "yanglaobx_num", "bq_yanglaobx_je", "dw_yiliaobx_js", "currency", "created_time", "row_update_time", "local_row_update_time"};
mergeRowsMeta.setValueFields(columns);
StepMeta mergeStepMeta = new StepMeta("合并记录", mergeRowsMeta);
transMeta.addStep(mergeStepMeta);
transMeta.addTransHop(new TransHopMeta(sourceStep, mergeStepMeta));
transMeta.addTransHop(new TransHopMeta(targetStep, mergeStepMeta));
SynchronizeAfterMergeMeta synchronizeAfterMergeMeta = new SynchronizeAfterMergeMeta();
synchronizeAfterMergeMeta.setCommitSize(10000);
synchronizeAfterMergeMeta.setDatabaseMeta(targetDatabaseMeta);
synchronizeAfterMergeMeta.setSchemaName("");
synchronizeAfterMergeMeta.setTableName(targetTableName);
synchronizeAfterMergeMeta.setUseBatchUpdate(true);
synchronizeAfterMergeMeta.setKeyLookup(new String[]{"ID"}); //设置用来查询的关键字
synchronizeAfterMergeMeta.setKeyStream(new String[]{"ID"}); //设置流输入的字段
synchronizeAfterMergeMeta.setKeyStream2(new String[]{""});//一定要加上
synchronizeAfterMergeMeta.setKeyCondition(new String[]{"="}); //设置操作符
Boolean[] updateOrNot = {false, true, true, true, true};
synchronizeAfterMergeMeta.setUpdateLookup(columns);
synchronizeAfterMergeMeta.setUpdateStream(columns);
synchronizeAfterMergeMeta.setUpdate(updateOrNot);
synchronizeAfterMergeMeta.setOperationOrderField("bz"); //设置操作标志字段名
synchronizeAfterMergeMeta.setOrderInsert("new");
synchronizeAfterMergeMeta.setOrderUpdate("changed");
synchronizeAfterMergeMeta.setOrderDelete("deleted");
StepMeta synStepMeta = new StepMeta("数据同步", synchronizeAfterMergeMeta);
transMeta.addStep(synStepMeta);
transMeta.setRepositoryDirectory(dir);
kettleDatabaseRepository.save(transMeta, null);
}
ObjectId id = kettleDatabaseRepository.getJobId(jobName, dir);
if (id == null) {
JobMeta jobMeta = new JobMeta();
jobMeta.setName(jobName);
jobMeta.setJobstatus(0);
JobEntrySpecial jobEntrySpecial = new JobEntrySpecial();
jobEntrySpecial.setName("START");
jobEntrySpecial.setStart(true);
jobEntrySpecial.setRepeat(true);
jobEntrySpecial.setIntervalSeconds(3);
JobEntryCopy start = new JobEntryCopy(jobEntrySpecial);
start.setDrawn();
start.setLocation(100, 200);
JobEntryTrans jobEntryTrans = new JobEntryTrans();
ObjectLocationSpecificationMethod SpecMethod = ObjectLocationSpecificationMethod.getSpecificationMethodByCode(repositoryName);
jobEntryTrans.setSpecificationMethod(SpecMethod);
jobEntryTrans.setRepository(kettleDatabaseRepository);
jobEntryTrans.setDirectory("/");
jobEntryTrans.setTransname(transName);
JobEntryCopy trans1 = new JobEntryCopy(jobEntryTrans);
trans1.setName("JTrans");
trans1.setDrawn(true);
trans1.setLocation(200, 300);
JobEntrySuccess jobEntrySuccess = new JobEntrySuccess();
jobEntrySuccess.setName("Success");
JobEntryCopy success = new JobEntryCopy(jobEntrySuccess);
success.setDrawn();
success.setLocation(300, 400);
jobMeta.addJobEntry(start);
jobMeta.addJobEntry(trans1);
jobMeta.addJobEntry(success);
jobMeta.addJobHop(new JobHopMeta(start, trans1));
jobMeta.addJobHop(new JobHopMeta(trans1, success));
jobMeta.setRepositoryDirectory(dir);
kettleDatabaseRepository.save(jobMeta, null);
}
JobMeta jobMetaObj = kettleDatabaseRepository.loadJob(id, null);
Job job = new Job(kettleDatabaseRepository, jobMetaObj);
job.run();
job.waitUntilFinished();
String strLog = KettleLogStore.getAppender().getBuffer().toString();
System.out.println("==========开始打印日志==========");
System.out.println(KettleLogStore.getAppender().getBuffer().toString());
System.out.println("==========日志打印结束==========");
System.out.println("getLastProcessed:" + job.getStatus());
String substring = strLog.substring(strLog.lastIndexOf("I=") + 1);
String successCount = substring.substring(substring.lastIndexOf("W=") + 2, substring.lastIndexOf("W=") + 3);
System.out.println("成功数:" + successCount);
System.out.println("errors:" + job.getErrors());
if (job.getErrors() != 0) {
System.out.println("执行失败!");
}
}
public static KettleDatabaseRepository RepositoryCon(String repositoryName) throws KettleException {
// 初始化环境
if (!KettleEnvironment.isInitialized()) {
try {
KettleEnvironment.init();
} catch (KettleException e) {
e.printStackTrace();
}
}
// 数据库连接元对象
// (kettle数据库连接名称(KETTLE工具右上角显示),资源库类型,连接方式,IP,数据库名,端口,用户名,密码) //cgmRepositoryConn
DatabaseMeta databaseMeta = new DatabaseMeta(repositoryName, "mysql", "Native(JDBC)", "127.0.0.1", "test", "3306", "root", "toor@1234");
KettleDatabaseRepositoryMeta kettleDatabaseRepositoryMeta = new KettleDatabaseRepositoryMeta();
kettleDatabaseRepositoryMeta.setConnection(databaseMeta);
KettleDatabaseRepository kettleDatabaseRepository = new KettleDatabaseRepository();
kettleDatabaseRepository.init(kettleDatabaseRepositoryMeta);
kettleDatabaseRepository.connect("admin", "admin");
if (kettleDatabaseRepository.isConnected()) {
System.out.println("连接成功");
return kettleDatabaseRepository;
} else {
System.out.println("连接失败");
return null;
}
}
}
标签:JAVA,String,di,import,kettle,API,pentaho,org,new
From: https://www.cnblogs.com/guanchaoguo/p/17104043.html