首页 > 编程语言 >kettle 同步组件 synchronizeAfterMerge JAVA API

kettle 同步组件 synchronizeAfterMerge JAVA API

时间:2023-02-09 09:12:19浏览次数:52  
标签:JAVA String di import kettle API pentaho org new

package com.example.fg.kettle;


import org.pentaho.di.core.KettleEnvironment;
import org.pentaho.di.core.ObjectLocationSpecificationMethod;
import org.pentaho.di.core.database.DatabaseMeta;
import org.pentaho.di.core.exception.KettleException;
import org.pentaho.di.core.logging.KettleLogStore;
import org.pentaho.di.job.Job;
import org.pentaho.di.job.JobHopMeta;
import org.pentaho.di.job.JobMeta;
import org.pentaho.di.job.entries.special.JobEntrySpecial;
import org.pentaho.di.job.entries.success.JobEntrySuccess;
import org.pentaho.di.job.entries.trans.JobEntryTrans;
import org.pentaho.di.job.entry.JobEntryCopy;
import org.pentaho.di.repository.ObjectId;
import org.pentaho.di.repository.RepositoryDirectoryInterface;
import org.pentaho.di.repository.kdr.KettleDatabaseRepository;
import org.pentaho.di.repository.kdr.KettleDatabaseRepositoryMeta;
import org.pentaho.di.trans.TransHopMeta;
import org.pentaho.di.trans.TransMeta;
import org.pentaho.di.trans.step.StepIOMetaInterface;
import org.pentaho.di.trans.step.StepMeta;
import org.pentaho.di.trans.steps.insertupdate.InsertUpdateMeta;
import org.pentaho.di.trans.steps.mergerows.MergeRows;
import org.pentaho.di.trans.steps.mergerows.MergeRowsMeta;
import org.pentaho.di.trans.steps.synchronizeaftermerge.SynchronizeAfterMerge;
import org.pentaho.di.trans.steps.synchronizeaftermerge.SynchronizeAfterMergeMeta;
import org.pentaho.di.trans.steps.tableinput.TableInputMeta;

/**
 * @author cgm
 * @date 2019年7月8日
 */
public class KettleJobSynchronizeAfterMerge {
    public static void main(String[] args) throws Exception {
        runSyncJob();

    }

    public static void runSyncJob() throws KettleException {
        String transName = "cgmTransName";
        String repositoryName = "Repository";
        String jobName = "cgmTransName";
        String sourceTableName = "xy_social_security";
        String targetTableName = "xy_social_security_qfzx_test";
        String targetDatabase = "test";
        KettleDatabaseRepository kettleDatabaseRepository = RepositoryCon(repositoryName);
        RepositoryDirectoryInterface dir = kettleDatabaseRepository.loadRepositoryDirectoryTree().findDirectory("/");
        ObjectId objectId = kettleDatabaseRepository.getTransformationID(transName, dir);
        TransMeta transMeta;
        if (objectId == null) {
            transMeta = new TransMeta();
            transMeta.setName(transName);
            DatabaseMeta sourceDatabaseMeta = new DatabaseMeta("fromDbName", "mysql", "Native(JDBC)", "127.0.0.1", "test?useSSL=false", "3306", "root", "toor@1234");
            transMeta.addDatabase(sourceDatabaseMeta);
            DatabaseMeta targetDatabaseMeta = new DatabaseMeta("toDbName", "mysql", "Native(JDBC)", "127.0.0.1", "test?useSSL=false", "3306", "root", "toor@1234");
            transMeta.addDatabase(targetDatabaseMeta);

            TableInputMeta sourceTableInputMeta = new TableInputMeta();
            sourceTableInputMeta.setDatabaseMeta(transMeta.findDatabase("fromDbName"));
            sourceTableInputMeta.setSQL("SELECT * FROM xy_social_security order by id");
            StepMeta sourceStep = new StepMeta("源数据", sourceTableInputMeta);
            transMeta.addStep(sourceStep);

            TableInputMeta targetTableInputMeta = new TableInputMeta();
            targetTableInputMeta.setDatabaseMeta(transMeta.findDatabase("fromDbName"));
            targetTableInputMeta.setSQL("SELECT  *  from  xy_social_security_qfzx_test order by id");
            StepMeta targetStep = new StepMeta("源数据", targetTableInputMeta);
            transMeta.addStep(targetStep);


            MergeRowsMeta mergeRowsMeta = new MergeRowsMeta();
            StepIOMetaInterface stepIOMeta = mergeRowsMeta.getStepIOMeta();
            stepIOMeta.getInfoStreams().get(0).setStepMeta(sourceStep);
            stepIOMeta.getInfoStreams().get(1).setStepMeta(targetStep);
            mergeRowsMeta.setFlagField("bz");
            mergeRowsMeta.setKeyFields(new String[]{"id"});
            String[] columns = new String[]{"id", "eid", "name", "report_year", "report_name", "report_date", "bq_shengyubx_je", "dw_je_display", "dw_yanglaobx_je", "bq_shiyebx_je", "dw_yiliaobx_je", "shiyebx_num", "dw_yanglaobx_js", "dw_gongshangbx_js", "yiliaobx_num", "dw_shengyubx_je", "dw_js_display", "dw_shengyubx_js", "bq_yiliaobx_je", "bq_gongshangbx_je", "dw_gongshangbx_je", "shengyubx_num", "dw_shiyebx_js", "dw_shiyebx_je", "bq_je_display", "gongshangbx_num", "yanglaobx_num", "bq_yanglaobx_je", "dw_yiliaobx_js", "currency", "created_time", "row_update_time", "local_row_update_time"};
            mergeRowsMeta.setValueFields(columns);
            StepMeta mergeStepMeta = new StepMeta("合并记录", mergeRowsMeta);
            transMeta.addStep(mergeStepMeta);

            transMeta.addTransHop(new TransHopMeta(sourceStep, mergeStepMeta));
            transMeta.addTransHop(new TransHopMeta(targetStep, mergeStepMeta));

            SynchronizeAfterMergeMeta synchronizeAfterMergeMeta = new SynchronizeAfterMergeMeta();
            synchronizeAfterMergeMeta.setCommitSize(10000);
            synchronizeAfterMergeMeta.setDatabaseMeta(targetDatabaseMeta);
            synchronizeAfterMergeMeta.setSchemaName("");
            synchronizeAfterMergeMeta.setTableName(targetTableName);
            synchronizeAfterMergeMeta.setUseBatchUpdate(true);

            synchronizeAfterMergeMeta.setKeyLookup(new String[]{"ID"}); //设置用来查询的关键字
            synchronizeAfterMergeMeta.setKeyStream(new String[]{"ID"}); //设置流输入的字段
            synchronizeAfterMergeMeta.setKeyStream2(new String[]{""});//一定要加上
            synchronizeAfterMergeMeta.setKeyCondition(new String[]{"="}); //设置操作符

            Boolean[] updateOrNot = {false, true, true, true, true};
            synchronizeAfterMergeMeta.setUpdateLookup(columns);
            synchronizeAfterMergeMeta.setUpdateStream(columns);
            synchronizeAfterMergeMeta.setUpdate(updateOrNot);

            synchronizeAfterMergeMeta.setOperationOrderField("bz"); //设置操作标志字段名
            synchronizeAfterMergeMeta.setOrderInsert("new");
            synchronizeAfterMergeMeta.setOrderUpdate("changed");
            synchronizeAfterMergeMeta.setOrderDelete("deleted");
            StepMeta synStepMeta = new StepMeta("数据同步", synchronizeAfterMergeMeta);
            transMeta.addStep(synStepMeta);

            transMeta.setRepositoryDirectory(dir);
            kettleDatabaseRepository.save(transMeta, null);
        }

        ObjectId id = kettleDatabaseRepository.getJobId(jobName, dir);
        if (id == null) {
            JobMeta jobMeta = new JobMeta();
            jobMeta.setName(jobName);
            jobMeta.setJobstatus(0);

            JobEntrySpecial jobEntrySpecial = new JobEntrySpecial();
            jobEntrySpecial.setName("START");
            jobEntrySpecial.setStart(true);
            jobEntrySpecial.setRepeat(true);
            jobEntrySpecial.setIntervalSeconds(3);
            JobEntryCopy start = new JobEntryCopy(jobEntrySpecial);
            start.setDrawn();
            start.setLocation(100, 200);

            JobEntryTrans jobEntryTrans = new JobEntryTrans();
            ObjectLocationSpecificationMethod SpecMethod = ObjectLocationSpecificationMethod.getSpecificationMethodByCode(repositoryName);
            jobEntryTrans.setSpecificationMethod(SpecMethod);
            jobEntryTrans.setRepository(kettleDatabaseRepository);
            jobEntryTrans.setDirectory("/");
            jobEntryTrans.setTransname(transName);
            JobEntryCopy trans1 = new JobEntryCopy(jobEntryTrans);
            trans1.setName("JTrans");
            trans1.setDrawn(true);
            trans1.setLocation(200, 300);

            JobEntrySuccess jobEntrySuccess = new JobEntrySuccess();
            jobEntrySuccess.setName("Success");
            JobEntryCopy success = new JobEntryCopy(jobEntrySuccess);
            success.setDrawn();
            success.setLocation(300, 400);

            jobMeta.addJobEntry(start);
            jobMeta.addJobEntry(trans1);
            jobMeta.addJobEntry(success);

            jobMeta.addJobHop(new JobHopMeta(start, trans1));
            jobMeta.addJobHop(new JobHopMeta(trans1, success));

            jobMeta.setRepositoryDirectory(dir);
            kettleDatabaseRepository.save(jobMeta, null);
        }

        JobMeta jobMetaObj = kettleDatabaseRepository.loadJob(id, null);
        Job job = new Job(kettleDatabaseRepository, jobMetaObj);
        job.run();
        job.waitUntilFinished();
        String strLog = KettleLogStore.getAppender().getBuffer().toString();
        System.out.println("==========开始打印日志==========");

        System.out.println(KettleLogStore.getAppender().getBuffer().toString());
        System.out.println("==========日志打印结束==========");
        System.out.println("getLastProcessed:" + job.getStatus());

        String substring = strLog.substring(strLog.lastIndexOf("I=") + 1);
        String successCount = substring.substring(substring.lastIndexOf("W=") + 2, substring.lastIndexOf("W=") + 3);
        System.out.println("成功数:" + successCount);

        System.out.println("errors:" + job.getErrors());
        if (job.getErrors() != 0) {
            System.out.println("执行失败!");
        }


    }

    public static KettleDatabaseRepository RepositoryCon(String repositoryName) throws KettleException {
        // 初始化环境
        if (!KettleEnvironment.isInitialized()) {
            try {
                KettleEnvironment.init();
            } catch (KettleException e) {
                e.printStackTrace();
            }
        }
        // 数据库连接元对象
        // (kettle数据库连接名称(KETTLE工具右上角显示),资源库类型,连接方式,IP,数据库名,端口,用户名,密码) //cgmRepositoryConn
        DatabaseMeta databaseMeta = new DatabaseMeta(repositoryName, "mysql", "Native(JDBC)", "127.0.0.1", "test", "3306", "root", "toor@1234");
        KettleDatabaseRepositoryMeta kettleDatabaseRepositoryMeta = new KettleDatabaseRepositoryMeta();
        kettleDatabaseRepositoryMeta.setConnection(databaseMeta);
        KettleDatabaseRepository kettleDatabaseRepository = new KettleDatabaseRepository();
        kettleDatabaseRepository.init(kettleDatabaseRepositoryMeta);
        kettleDatabaseRepository.connect("admin", "admin");
        if (kettleDatabaseRepository.isConnected()) {
            System.out.println("连接成功");
            return kettleDatabaseRepository;
        } else {
            System.out.println("连接失败");
            return null;
        }
    }


}

标签:JAVA,String,di,import,kettle,API,pentaho,org,new
From: https://www.cnblogs.com/guanchaoguo/p/17104043.html

相关文章

  • kettle 更新/插入组件 JAVA API
    packagecom.example.fg.kettle;importorg.pentaho.di.core.KettleEnvironment;importorg.pentaho.di.core.ObjectLocationSpecificationMethod;importorg.pentaho......
  • Java笔记
    编译型语言使用专门的编译器一次性编译所有代码,并包装成该平台能识别的可执行性程序的格式运行效率高,可移植性差C,C++,Objective-C解释性语言使用解释器逐行......
  • 对线面试官:浅聊一下 Java 虚拟机栈?
    对于JVM(Java虚拟机)来说,它有两个非常重要的区域,一个是栈(Java虚拟机栈),另一个是堆。堆是JVM的存储单位,所有的对象和数组都是存储在此区域的;而栈是JVM的运行单位,它主管......
  • 基于APIView写接口
    一、视图层代码"""基于APIView实现接口的编写用的是同一个模型表路由也没变这次做了解耦合写了序列化类与视图类分开了"""fromrest_framework.viewsimportAPI......
  • 断点调试 认证权限频率源码执行流程 自定义频率SimpleRateThrottle 基于APIView编写分
    目录回顾断点调试的使用认证权限频率源码分析权限类的执行源码分析认证源码执行流程分析频率源码执行流程分析自定义频率类SimpleRateThrottle基于APIView编写分页全局异......
  • 第七十章 使用 REST API 监控 IRIS - 互操作性指标
    第七十章使用RESTAPI监控IRIS-互操作性指标除了上一节中描述的指标外,IRIS实例还可以记录有关活动互操作性产品的指标,并将它们包含在/metrics端点的输出中。默......
  • 读Java实战(第二版)笔记05_Collection API的增强功能
    1. 集合工厂1.1. Arrays.asList()1.1.1. 创建了一个固定大小的列表1.1.2. 列表的元素可以更新1.1.3. 不能增加或者删除1.1.4. 大小固定的可变数组1.2. 没有A......
  • Java + SikuliX 基于图像实现自动化测试
    转载请注明出处❤️作者:测试蔡坨坨原文链接:caituotuo.top/6d2908e8.html你好,我是测试蔡坨坨。由于目前大多数GUI工具均需要依赖于程序类型进行特征属性识别,例如:Selenium......
  • 关于netcore webapi 前后端分离跨域配置
    最近做一个后台管理系统,但是期间遇到了跨域的问题,所以在此记录一下。这些问题都是很初级的基础知识。后台配置需要先配置指定域名跨域,这也是为了防止安全。一、关于netco......
  • java基础面试题
    java基础面试题1. Java有哪些数据类型?Java中有8种基本数据类型,分别为:6种数字类型(四个整数形,两个浮点型):byte、short、int、long、float、double,1种字符类型:char,1......