设置到数据库
transMeta.addDatabase(getDatabaseMeta(config));
TransLogTable origTransLogTable = transMeta.getTransLogTable();
origTransLogTable.setConnectionName(config.getName());
origTransLogTable.setTableName("r_log_trans");
origTransLogTable.setLogFieldUsed(true);
transMeta.setTransLogTable(origTransLogTable);
StepLogTable origStepLogTable = transMeta.getStepLogTable();
origStepLogTable.setConnectionName(config.getName());
origStepLogTable.setTableName("r_log_trans_step");
transMeta.setStepLogTable(origStepLogTable);
PerformanceLogTable origPerformanceLogTable = transMeta.getPerformanceLogTable();
origPerformanceLogTable.setConnectionName(config.getName());
origPerformanceLogTable.setTableName("r_log_trans_running");
transMeta.setPerformanceLogTable(origPerformanceLogTable);
ChannelLogTable origChannelLogTable = transMeta.getChannelLogTable();
origChannelLogTable.setConnectionName(config.getName());
origChannelLogTable.setTableName("r_log_channel");
transMeta.setChannelLogTable(origChannelLogTable);
MetricsLogTable origMetricsLogTable = transMeta.getMetricsLogTable();
origMetricsLogTable.setConnectionName(config.getName());
origMetricsLogTable.setTableName("r_log_trans_metrics");
transMeta.setMetricsLogTable(origMetricsLogTable);
完整操作服务文件
package com.sugon.dataexchangeplatform.services;
import cn.hutool.core.io.resource.ClassPathResource;
import com.sugon.dataexchangeplatform.domain.DataBaseMetaConfig;
import com.sugon.dataexchangeplatform.domain.RequestExecuteTrans;
import com.sugon.dataexchangeplatform.domain.TransJobEntryDomain;
import org.pentaho.di.core.KettleEnvironment;
import org.pentaho.di.core.database.DatabaseMeta;
import org.pentaho.di.core.exception.KettleException;
import org.pentaho.di.core.logging.*;
import org.pentaho.di.job.JobHopMeta;
import org.pentaho.di.job.JobMeta;
import org.pentaho.di.job.entries.special.JobEntrySpecial;
import org.pentaho.di.job.entries.success.JobEntrySuccess;
import org.pentaho.di.job.entry.JobEntryCopy;
import org.pentaho.di.repository.ObjectId;
import org.pentaho.di.repository.Repository;
import org.pentaho.di.repository.RepositoryDirectoryInterface;
import org.pentaho.di.repository.kdr.KettleDatabaseRepository;
import org.pentaho.di.repository.kdr.KettleDatabaseRepositoryCreationHelper;
import org.pentaho.di.repository.kdr.KettleDatabaseRepositoryMeta;
import org.pentaho.di.trans.Trans;
import org.pentaho.di.trans.TransHopMeta;
import org.pentaho.di.trans.TransMeta;
import org.pentaho.di.trans.step.StepIOMetaInterface;
import org.pentaho.di.trans.step.StepMeta;
import org.pentaho.di.trans.steps.mergerows.MergeRowsMeta;
import org.pentaho.di.trans.steps.synchronizeaftermerge.SynchronizeAfterMergeMeta;
import org.pentaho.di.trans.steps.tableinput.TableInputMeta;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Optional;
import static org.pentaho.di.core.logging.LogLevel.BASIC;
@Service
public class KettleService {
public void RunTransByName(RequestExecuteTrans executeTrans) throws KettleException {
KettleEnvironment.init();
KettleDatabaseRepository repository = RepositoryCon();
RepositoryDirectoryInterface dir = repository.loadRepositoryDirectoryTree().findDirectory(executeTrans.getDirectory());
ObjectId objectId = repository.getTransformationID(executeTrans.getTransName(), dir);
if (objectId == null) {
System.out.println("找不到任务!");
}
TransMeta transMeta = repository.loadTransformation(objectId, null);
setTransLog(transMeta);
Trans trans = new Trans(transMeta);
trans.setLogLevel(BASIC);
trans.execute(null);
trans.waitUntilFinished();
// JobResult result = new JobResult();
// result.setBatchId(job.getBatchId());
// result.setErrors(job.getErrors());
// result.setPassedBatchId(job.getPassedBatchId());
// result.setStatus(job.getStatus());
// result.setParams(en);
// return result;
if (trans.getErrors() > 0) {
throw new RuntimeException("There were errors during transformation execution.");
}
}
public void setTransLog(TransMeta transMeta) {
transMeta.addDatabase(getDatabaseMeta(config));
TransLogTable origTransLogTable = transMeta.getTransLogTable();
origTransLogTable.setConnectionName(config.getName());
origTransLogTable.setTableName("r_log_trans");
origTransLogTable.setLogFieldUsed(true);
transMeta.setTransLogTable(origTransLogTable);
StepLogTable origStepLogTable = transMeta.getStepLogTable();
origStepLogTable.setConnectionName(config.getName());
origStepLogTable.setTableName("r_log_trans_step");
transMeta.setStepLogTable(origStepLogTable);
PerformanceLogTable origPerformanceLogTable = transMeta.getPerformanceLogTable();
origPerformanceLogTable.setConnectionName(config.getName());
origPerformanceLogTable.setTableName("r_log_trans_running");
transMeta.setPerformanceLogTable(origPerformanceLogTable);
ChannelLogTable origChannelLogTable = transMeta.getChannelLogTable();
origChannelLogTable.setConnectionName(config.getName());
origChannelLogTable.setTableName("r_log_channel");
transMeta.setChannelLogTable(origChannelLogTable);
MetricsLogTable origMetricsLogTable = transMeta.getMetricsLogTable();
origMetricsLogTable.setConnectionName(config.getName());
origMetricsLogTable.setTableName("r_log_trans_metrics");
transMeta.setMetricsLogTable(origMetricsLogTable);
}
public void TransStatus(String transName, String path) {
try {
KettleEnvironment.init();
KettleDatabaseRepository repository = RepositoryCon();
RepositoryDirectoryInterface dir = repository.loadRepositoryDirectoryTree().findDirectory(path);
ObjectId objectId = repository.getTransformationID(transName, dir);
} catch (KettleException e) {
throw new RuntimeException(e);
}
}
public void SaveXmlStreamTransToRepo(InputStream xmlStream, String repoPath) {
try {
KettleEnvironment.init();
KettleDatabaseRepository repository = RepositoryCon();
TransMeta transMeta = new TransMeta(xmlStream, repository, false, null, null);
RepositoryDirectoryInterface dir = repository.loadRepositoryDirectoryTree().findDirectory(repoPath);
RepositoryDirectoryInterface rootDir = repository.loadRepositoryDirectoryTree().findDirectory("/");
String dirName = "task";
RepositoryDirectoryInterface subDir = repository.findDirectory(dirName);
if (!Optional.ofNullable(subDir).isPresent()) {
RepositoryDirectoryInterface newDir = repository.createRepositoryDirectory(rootDir, dirName);
repository.createRepositoryDirectory(newDir, "trans");
}
String transName = transMeta.getName();
if (!Optional.ofNullable(transName).isPresent()) {
throw new RuntimeException("xml格式错误!");
}
ObjectId objectId = repository.getTransformationID(transMeta.getName(), dir);
if (Optional.ofNullable(objectId).isPresent()) {
throw new RuntimeException("任务已经存在!");
}
transMeta.setRepositoryDirectory(dir);
repository.save(transMeta, "");
} catch (KettleException e) {
throw new RuntimeException(e);
}
}
public void SaveXmlFileTransToRepo(String fileName) {
try {
ClassPathResource resource = new ClassPathResource("trans/template.ktr");
String path = resource.getAbsolutePath();
KettleEnvironment.init();
TransMeta transMeta = new TransMeta(path);
KettleDatabaseRepository repository = RepositoryCon();
repository.save(transMeta, "");
} catch (KettleException e) {
throw new RuntimeException(e);
}
}
@Async
public void RunJob(TransJobEntryDomain job) {
try {
KettleEnvironment.init();
KettleDatabaseRepository repository = RepositoryCon();
RepositoryDirectoryInterface dir = repository.loadRepositoryDirectoryTree().findDirectory("/");
ObjectId objectId = repository.getTransformationID(job.getTransName(), dir);
TransMeta transMeta;
if (objectId == null) {
transMeta = GenerateTransformation(job, repository);
repository.save(transMeta, "");
} else {
transMeta = repository.loadTransformation(objectId, null);
}
Trans trans = new Trans(transMeta);
trans.execute(null);
trans.waitUntilFinished();
String strLog = KettleLogStore.getAppender().getBuffer().toString();
System.out.println("==========开始打印日志==========");
System.out.println(KettleLogStore.getAppender().getBuffer().toString());
System.out.println("==========日志打印结束==========");
System.out.println("getLastProcessed:" + trans.getStatus());
String substring = strLog.substring(strLog.lastIndexOf("I=") + 1);
String successCount = substring.substring(substring.lastIndexOf("W=") + 2, substring.lastIndexOf("W=") + 3);
System.out.println("成功数:" + successCount);
System.out.println("errors:" + trans.getErrors());
if (trans.getErrors() != 0) {
System.out.println("执行失败!");
}
} catch (KettleException e) {
throw new RuntimeException(e);
}
}
public DataBaseMetaConfig getConfig() {
return config;
}
@Autowired
DataBaseMetaConfig config;
public void CreateRepo() throws KettleException {
if (!KettleEnvironment.isInitialized()) {
KettleEnvironment.init();
}
KettleDatabaseRepositoryMeta kettleDatabaseRepositoryMeta = new KettleDatabaseRepositoryMeta();
kettleDatabaseRepositoryMeta.setConnection(getDatabaseMeta(config));
KettleDatabaseRepository repository = new KettleDatabaseRepository();
repository.init(kettleDatabaseRepositoryMeta);
repository.connectionDelegate.connect(true, true);
KettleDatabaseRepositoryCreationHelper helper = new KettleDatabaseRepositoryCreationHelper(repository);
helper.createRepositorySchema(null, false, new ArrayList<>(), false);
}
public TransMeta GenerateTransformation(TransJobEntryDomain job, KettleDatabaseRepository repository) throws KettleException {
DatabaseMeta targetDatabaseMeta = getDatabaseMeta(job.getTargetDatabaseMeta());
DatabaseMeta sourceDatabaseMeta = getDatabaseMeta(job.getSourceDatabaseMeta());
TransMeta transMeta = buildTransMeta(repository, job.getTransName(), sourceDatabaseMeta, targetDatabaseMeta);
StepMeta targetTableInput = buildTableInputStep(transMeta, job.getTargetDatabaseMeta().getName(), job.getTargetTableName(), job.getTargetInputName(), job.getTargetInputUseDraw(), job.getTargetInputLocationX(), job.getTargetInputLocationY());
transMeta.addStep(targetTableInput);
StepMeta sourceTableInput = buildTableInputStep(transMeta, job.getSourceDatabaseMeta().getName(), job.getSourceTableName(), job.getSourceInputName(), job.getSourceInputUseDraw(), job.getSourceInputLocationX(), job.getSourceInputLocationY());
transMeta.addStep(sourceTableInput);
StepMeta mergeStepMeta = buildMergeRowsMeta(job, sourceTableInput, targetTableInput, job.getMergeRowUseDraw(), job.getMergeRowLocationX(), job.getMergeRowLocationY());
transMeta.addStep(mergeStepMeta);
transMeta.addTransHop(new TransHopMeta(targetTableInput, mergeStepMeta));
transMeta.addTransHop(new TransHopMeta(sourceTableInput, mergeStepMeta));
StepMeta synStepMeta = buildSynchronizeAfterMergeMeta(job, targetDatabaseMeta, job.getSynchronizeUseDraw(), job.getSynchronizeLocationX(), job.getSynchronizeLocationY());
transMeta.addStep(synStepMeta);
transMeta.addTransHop(new TransHopMeta(mergeStepMeta, synStepMeta));
return transMeta;
}
public JobMeta GenerateJob(TransJobEntryDomain job, Repository repository) throws KettleException {
JobMeta jobMeta = buildJobMeta(job);
JobEntryCopy jobMetaStart = buildStartJobEntry(job);
jobMeta.addJobEntry(jobMetaStart);
JobEntryCopy transJobEntry = buildTransJobEntry(job, repository);
jobMeta.addJobEntry(transJobEntry);
JobEntryCopy successJobEntry = buildSuccessJobEntry(job);
jobMeta.addJobEntry(successJobEntry);
jobMeta.addJobHop(new JobHopMeta(jobMetaStart, transJobEntry));
jobMeta.addJobHop(new JobHopMeta(transJobEntry, successJobEntry));
// JobMeta jobMeta = buildJobMeta(job);
// JobEntryCopy start = buildStartJobEntry(job);
// JobEntryCopy tran = buildTransJobEntry(job, repository);
// JobEntryCopy success = buildSuccessJobEntry(job);
//
// jobMeta.addJobEntry(start);
// jobMeta.addJobEntry(tran);
// jobMeta.addJobEntry(success);
//
// jobMeta.addJobHop(new JobHopMeta(start, tran));
// jobMeta.addJobHop(new JobHopMeta(tran, success));
return jobMeta;
}
public TransMeta buildTransMeta(KettleDatabaseRepository repository, String transName, DatabaseMeta newDatabaseMeta, DatabaseMeta oldDatabaseMeta) throws KettleException {
TransMeta transMeta = new TransMeta();
transMeta.setName(transName);
transMeta.setRepository(repository);
transMeta.setName(transName);
transMeta.setRepositoryDirectory(repository.findDirectory("/"));
transMeta.addDatabase(repository.getDatabaseMeta());
transMeta.addDatabase(newDatabaseMeta);
transMeta.addDatabase(oldDatabaseMeta);
return transMeta;
}
public JobMeta buildJobMeta(TransJobEntryDomain job) {
JobMeta jobMeta = new JobMeta();
jobMeta.setName(job.getJobName());
jobMeta.setJobstatus(0);
return jobMeta;
}
public StepMeta buildTableInputStep(TransMeta transMeta, String inputDataBaseName, String inputTableName, String inputName, Boolean inputUseDraw, Integer inputLocationX, Integer inputLocationY) {
TableInputMeta targetTableInput = new TableInputMeta();
DatabaseMeta database = transMeta.findDatabase(inputDataBaseName);
targetTableInput.setDatabaseMeta(database);
String old_select_sql = "SELECT * FROM " + inputTableName + " order by id";
targetTableInput.setSQL(old_select_sql);
StepMeta inputMetaStep = new StepMeta(inputName, targetTableInput);
if (inputUseDraw) {
inputMetaStep.setDraw(true);
inputMetaStep.setLocation(inputLocationX, inputLocationY);
}
return inputMetaStep;
}
public static DatabaseMeta getDatabaseMeta(DataBaseMetaConfig meta) {
DatabaseMeta databaseMeta = new DatabaseMeta(meta.getName(), meta.getType(), meta.getAccess(), meta.getHost(), meta.getDb(), meta.getPort(), meta.getUser(), meta.getPass());
databaseMeta.addExtraOption(meta.getType(), "useSSL", "false");
databaseMeta.addExtraOption(meta.getType(), "serverTimezone", "Asia/Shanghai");
databaseMeta.addExtraOption(meta.getType(), "characterEncoding", "utf8");
return databaseMeta;
}
public StepMeta buildMergeRowsMeta(TransJobEntryDomain job, StepMeta sourceStepMeta, StepMeta targetStepMeta, Boolean inputUseDraw, Integer inputLocationX, Integer inputLocationY) {
MergeRowsMeta mergeRowsMeta = new MergeRowsMeta();
StepIOMetaInterface stepIOMeta = mergeRowsMeta.getStepIOMeta();
stepIOMeta.getInfoStreams().get(0).setStepMeta(sourceStepMeta);
stepIOMeta.getInfoStreams().get(1).setStepMeta(targetStepMeta);
mergeRowsMeta.setFlagField(job.getMergeRowFlagField());
mergeRowsMeta.setKeyFields(job.getMergeRowKeyFields());
mergeRowsMeta.setValueFields(job.getColumns());
StepMeta mergeRowsStep = new StepMeta(job.getMergeRowName(), mergeRowsMeta);
if (inputUseDraw) {
mergeRowsStep.setDraw(true);
mergeRowsStep.setLocation(inputLocationX, inputLocationY);
}
return mergeRowsStep;
}
public StepMeta buildSynchronizeAfterMergeMeta(TransJobEntryDomain job, DatabaseMeta targetDatabaseMeta, Boolean inputUseDraw, Integer inputLocationX, Integer inputLocationY) {
SynchronizeAfterMergeMeta synchronizeAfterMergeMeta = new SynchronizeAfterMergeMeta();
synchronizeAfterMergeMeta.setCommitSize(job.getSynchronizeCommitSize());
synchronizeAfterMergeMeta.setDatabaseMeta(targetDatabaseMeta);
synchronizeAfterMergeMeta.setSchemaName(job.getSynchronizeSchemaName());
synchronizeAfterMergeMeta.setTableName(job.getTargetTableName());
synchronizeAfterMergeMeta.setUseBatchUpdate(job.getSynchronizeUseBatchUpdate());
synchronizeAfterMergeMeta.setKeyLookup(job.getSynchronizeKeyLookup());
synchronizeAfterMergeMeta.setKeyStream(job.getSynchronizeKeyStream());
synchronizeAfterMergeMeta.setKeyStream2(job.getSynchronizeKeyStream2());
synchronizeAfterMergeMeta.setKeyCondition(job.getSynchronizeKeyCondition());
Boolean[] updateOrNot = new Boolean[job.getColumns().length];
for (int i = 0; i < job.getColumns().length; i++) {
updateOrNot[i] = true;
}
updateOrNot[0] = false;
synchronizeAfterMergeMeta.setUpdateLookup(job.getColumns());
synchronizeAfterMergeMeta.setUpdateStream(job.getColumns());
synchronizeAfterMergeMeta.setUpdate(updateOrNot);
synchronizeAfterMergeMeta.setOperationOrderField(job.getSynchronizeOperationOrderField());
synchronizeAfterMergeMeta.setOrderInsert(job.getSynchronizeOrderInsert());
synchronizeAfterMergeMeta.setOrderUpdate(job.getSynchronizeOrderUpdate());
synchronizeAfterMergeMeta.setOrderDelete(job.getSynchronizeOrderDelete());
StepMeta mergeRowsStep = new StepMeta(job.getSynchronizeName(), synchronizeAfterMergeMeta);
if (inputUseDraw) {
mergeRowsStep.setDraw(true);
mergeRowsStep.setLocation(inputLocationX, inputLocationY);
}
return mergeRowsStep;
}
public JobEntryCopy buildStartJobEntry(TransJobEntryDomain job) {
JobEntrySpecial jobEntrySpecial = new JobEntrySpecial();
jobEntrySpecial.setName(job.getJobEntryStartName());
jobEntrySpecial.setStart(job.getJobEntryStartUseStart());
if (job.getSynchronizeUseRepeat()) {
jobEntrySpecial.setRepeat(job.getSynchronizeUseRepeat());
jobEntrySpecial.setIntervalSeconds(job.getSynchronizeIntervalSeconds());
}
JobEntryCopy jobEntryCopy = new JobEntryCopy(jobEntrySpecial);
if (job.getSynchronizeUseDraw()) {
jobEntryCopy.setDrawn();
jobEntryCopy.setLocation(job.getSynchronizeLocationX(), job.getSynchronizeLocationY());
}
return jobEntryCopy;
}
public JobEntryCopy buildSuccessJobEntry(TransJobEntryDomain job) {
JobEntrySuccess jobEntrySuccess = new JobEntrySuccess();
jobEntrySuccess.setName(job.getJobEntrySuccessName());
JobEntryCopy jobEntryCopy = new JobEntryCopy(jobEntrySuccess);
if (job.getJobEntrySuccessUseDraw()) {
jobEntryCopy.setDrawn();
jobEntryCopy.setLocation(job.getJobEntrySuccessLocationX(), job.getJobEntrySuccessLocationY());
}
return jobEntryCopy;
}
public JobEntryCopy buildTransJobEntry(TransJobEntryDomain job, Repository repository) {
JobEntrySpecial jobEntrySpecial = new JobEntrySpecial();
jobEntrySpecial.setName(job.getJobEntryStartName());
jobEntrySpecial.setStart(job.getJobEntryStartUseStart());
if (job.getSynchronizeUseRepeat()) {
jobEntrySpecial.setRepeat(true);
jobEntrySpecial.setIntervalSeconds(job.getSynchronizeIntervalSeconds());
}
JobEntryCopy jobEntryCopy = new JobEntryCopy(jobEntrySpecial);
jobEntryCopy.setName(job.getJobEntryTransName());
if (job.getJobEntryTransUseDraw()) {
jobEntryCopy.setDrawn();
jobEntryCopy.setLocation(job.getJobEntryTransLocationX(), job.getJobEntryTransLocationY());
}
return jobEntryCopy;
}
public KettleDatabaseRepository RepositoryCon() throws KettleException {
if (!KettleEnvironment.isInitialized()) {
try {
KettleEnvironment.init();
} catch (KettleException e) {
e.printStackTrace();
}
}
KettleDatabaseRepositoryMeta kettleDatabaseRepositoryMeta = new KettleDatabaseRepositoryMeta();
kettleDatabaseRepositoryMeta.getRepositoryCapabilities();
DatabaseMeta kettleDatabase = getDatabaseMeta(config);
kettleDatabaseRepositoryMeta.setConnection(kettleDatabase);
KettleDatabaseRepository kettleDatabaseRepository = new KettleDatabaseRepository();
kettleDatabaseRepository.init(kettleDatabaseRepositoryMeta);
// kettleDatabaseRepository.connect("admin", "sugon@666#");
kettleDatabaseRepository.connect("admin", "admin");
if (kettleDatabaseRepository.isConnected()) {
System.out.println("连接成功");
return kettleDatabaseRepository;
} else {
System.out.println("连接失败");
return null;
}
}
}
标签:java,repository,kettle,job,api,transMeta,import,new,trans
From: https://www.cnblogs.com/guanchaoguo/p/17445885.html