首页 > 编程语言 >kettle 更新/插入组件 JAVA API

kettle 更新/插入组件 JAVA API

时间:2023-02-09 09:12:02浏览次数:37  
标签:JAVA String di kettle new API pentaho org import

package com.example.fg.kettle;


import org.pentaho.di.core.KettleEnvironment;
import org.pentaho.di.core.ObjectLocationSpecificationMethod;
import org.pentaho.di.core.RowMetaAndData;
import org.pentaho.di.core.database.DatabaseMeta;
import org.pentaho.di.core.exception.KettleException;
import org.pentaho.di.core.logging.KettleLogStore;
import org.pentaho.di.job.Job;
import org.pentaho.di.job.JobHopMeta;
import org.pentaho.di.job.JobMeta;
import org.pentaho.di.job.entries.special.JobEntrySpecial;
import org.pentaho.di.job.entries.success.JobEntrySuccess;
import org.pentaho.di.job.entries.trans.JobEntryTrans;
import org.pentaho.di.job.entry.JobEntryCopy;
import org.pentaho.di.repository.ObjectId;
import org.pentaho.di.repository.RepositoryDirectoryInterface;
import org.pentaho.di.repository.kdr.KettleDatabaseRepository;
import org.pentaho.di.repository.kdr.KettleDatabaseRepositoryMeta;
import org.pentaho.di.trans.Trans;
import org.pentaho.di.trans.TransHopMeta;
import org.pentaho.di.trans.TransMeta;
import org.pentaho.di.trans.step.StepMeta;
import org.pentaho.di.trans.steps.insertupdate.InsertUpdateMeta;
import org.pentaho.di.trans.steps.tableinput.TableInputMeta;

import javax.validation.constraints.Null;
import java.util.List;

/**
 * @author cgm
 * @date 2019年7月8日
 */
public class KettleJobCronTest {
    public static void main(String[] args) throws Exception {
        runSyncJob();

    }

    public static void runSyncJob() throws KettleException {
        String transName = "cgmTransName";
        String repositoryName = "Repository";
        String jobName = "cgmTransName";
        KettleDatabaseRepository kettleDatabaseRepository  = RepositoryCon(repositoryName);
        RepositoryDirectoryInterface dir = kettleDatabaseRepository.loadRepositoryDirectoryTree().findDirectory("/");
        ObjectId objectId = kettleDatabaseRepository.getTransformationID(transName, dir);
        TransMeta transMeta;
        if (objectId == null) {
            transMeta = new TransMeta();
            // 设置转化的名称
            transMeta.setName(transName);
            // 添加转换的数据库连接
            transMeta.addDatabase(new DatabaseMeta("fromDbName", "mysql", "Native(JDBC)", "127.0.0.1",
                    "test?useSSL=false", "3306", "root", "toor@1234"));
            transMeta.addDatabase(new DatabaseMeta("toDbName", "mysql", "Native(JDBC)", "127.0.0.1", "test?useSSL=false",
                    "3306", "root", "toor@1234"));

            // 新建一个表输入步骤(TableInputMeta)
            TableInputMeta tableInputMeta = new TableInputMeta();
            // 设置步骤1的数据库连接
            tableInputMeta.setDatabaseMeta(transMeta.findDatabase("fromDbName"));
            // 设置步骤1中的sql
            tableInputMeta.setSQL("SELECT id1 as id2 ,name1 as name2 FROM from_user");
            // 设置步骤名称
            StepMeta step1 = new StepMeta("step1name", tableInputMeta);
            transMeta.addStep(step1);

            // 新建一个插入/更新步骤
            InsertUpdateMeta insertUpdateMeta = new InsertUpdateMeta();
            // 设置步骤2的数据库连接
            insertUpdateMeta.setDatabaseMeta(transMeta.findDatabase("toDbName"));
            // 设置目标表
            insertUpdateMeta.setTableName("to_user");
            // 设置用来查询的关键字
            insertUpdateMeta.setKeyLookup(new String[]{"id2"});
            insertUpdateMeta.setKeyCondition(new String[]{"="});
            insertUpdateMeta.setKeyStream(new String[]{"id2"});
            insertUpdateMeta.setKeyStream2(new String[]{""});// 一定要加上
            // 设置要更新的字段
            String[] updatelookup = {"id2", "name2"};
            String[] updateStream = {"id2", "name2"};
            Boolean[] updateOrNot = {false, true};
            // 设置表字段
            insertUpdateMeta.setUpdateLookup(updatelookup);
            // 设置流字段
            insertUpdateMeta.setUpdateStream(updateStream);
            // 设置是否更新
            insertUpdateMeta.setUpdate(updateOrNot);
            // 设置步骤2的名称
            StepMeta step2 = new StepMeta("step2name", insertUpdateMeta);
            transMeta.addStep(step2);

            TransHopMeta transHopMeta = new TransHopMeta(step1, step2);
            transMeta.addTransHop(transHopMeta);


            transMeta.setRepositoryDirectory(dir);
            kettleDatabaseRepository.save(transMeta, null);
        }

        ObjectId id = kettleDatabaseRepository.getJobId(jobName, dir);
        if (id == null) {
            JobMeta jobMeta = new JobMeta();
            jobMeta.setName(jobName);
            jobMeta.setJobstatus(0);

            JobEntrySpecial jobEntrySpecial = new JobEntrySpecial();
            jobEntrySpecial.setName("START");
            jobEntrySpecial.setStart(true);
            jobEntrySpecial.setRepeat(true);
            jobEntrySpecial.setIntervalSeconds(3);
            JobEntryCopy start = new JobEntryCopy(jobEntrySpecial);
            start.setDrawn();
            start.setLocation(100, 200);

            JobEntryTrans jobEntryTrans = new JobEntryTrans();
            ObjectLocationSpecificationMethod SpecMethod = ObjectLocationSpecificationMethod.getSpecificationMethodByCode(repositoryName);
            jobEntryTrans.setSpecificationMethod(SpecMethod);
            jobEntryTrans.setRepository(kettleDatabaseRepository);
            jobEntryTrans.setDirectory("/");
            jobEntryTrans.setTransname(transName);
            JobEntryCopy trans1 = new JobEntryCopy(jobEntryTrans);
            trans1.setName("JTrans");
            trans1.setDrawn(true);
            trans1.setLocation(200, 300);

            JobEntrySuccess jobEntrySuccess = new JobEntrySuccess();
            jobEntrySuccess.setName("Success");
            JobEntryCopy success = new JobEntryCopy(jobEntrySuccess);
            success.setDrawn();
            success.setLocation(300, 400);

            jobMeta.addJobEntry(start);
            jobMeta.addJobEntry(trans1);
            jobMeta.addJobEntry(success);
            
            jobMeta.addJobHop(new JobHopMeta(start, trans1));
            jobMeta.addJobHop(new JobHopMeta(trans1, success));

            jobMeta.setRepositoryDirectory(dir);
            kettleDatabaseRepository.save(jobMeta, null);
        }

        JobMeta jobMetaObj = kettleDatabaseRepository.loadJob(id, null);
        Job job = new Job(kettleDatabaseRepository, jobMetaObj);
        job.run();
        job.waitUntilFinished();
        String strLog = KettleLogStore.getAppender().getBuffer().toString();
        System.out.println("==========开始打印日志==========");

        System.out.println(KettleLogStore.getAppender().getBuffer().toString());
        System.out.println("==========日志打印结束==========");
        System.out.println("getLastProcessed:" + job.getStatus());

        String substring = strLog.substring(strLog.lastIndexOf("I=") + 1);
        String successCount = substring.substring(substring.lastIndexOf("W=") + 2, substring.lastIndexOf("W=") + 3);
        System.out.println("成功数:" + successCount);

        System.out.println("errors:" + job.getErrors());
        if (job.getErrors() != 0) {
            System.out.println("执行失败!");
        }


    }

    public static KettleDatabaseRepository RepositoryCon(String repositoryName) throws KettleException {
        // 初始化环境
        if (!KettleEnvironment.isInitialized()) {
            try {
                KettleEnvironment.init();
            } catch (KettleException e) {
                e.printStackTrace();
            }
        }
        // 数据库连接元对象
        // (kettle数据库连接名称(KETTLE工具右上角显示),资源库类型,连接方式,IP,数据库名,端口,用户名,密码) //cgmRepositoryConn
        DatabaseMeta databaseMeta = new DatabaseMeta(repositoryName, "mysql", "Native(JDBC)", "127.0.0.1",
                "test", "3306", "root", "toor@1234");
        KettleDatabaseRepositoryMeta kettleDatabaseRepositoryMeta = new KettleDatabaseRepositoryMeta();
        kettleDatabaseRepositoryMeta.setConnection(databaseMeta);
        KettleDatabaseRepository kettleDatabaseRepository = new KettleDatabaseRepository();
        kettleDatabaseRepository.init(kettleDatabaseRepositoryMeta);
        kettleDatabaseRepository.connect("admin", "admin");
        if (kettleDatabaseRepository.isConnected()) {
            System.out.println("连接成功");
            return kettleDatabaseRepository;
        } else {
            System.out.println("连接失败");
            return null;
        }
    }



}

标签:JAVA,String,di,kettle,new,API,pentaho,org,import
From: https://www.cnblogs.com/guanchaoguo/p/17104044.html

相关文章

  • Java笔记
    编译型语言使用专门的编译器一次性编译所有代码,并包装成该平台能识别的可执行性程序的格式运行效率高,可移植性差C,C++,Objective-C解释性语言使用解释器逐行......
  • 对线面试官:浅聊一下 Java 虚拟机栈?
    对于JVM(Java虚拟机)来说,它有两个非常重要的区域,一个是栈(Java虚拟机栈),另一个是堆。堆是JVM的存储单位,所有的对象和数组都是存储在此区域的;而栈是JVM的运行单位,它主管......
  • 基于APIView写接口
    一、视图层代码"""基于APIView实现接口的编写用的是同一个模型表路由也没变这次做了解耦合写了序列化类与视图类分开了"""fromrest_framework.viewsimportAPI......
  • 断点调试 认证权限频率源码执行流程 自定义频率SimpleRateThrottle 基于APIView编写分
    目录回顾断点调试的使用认证权限频率源码分析权限类的执行源码分析认证源码执行流程分析频率源码执行流程分析自定义频率类SimpleRateThrottle基于APIView编写分页全局异......
  • 第七十章 使用 REST API 监控 IRIS - 互操作性指标
    第七十章使用RESTAPI监控IRIS-互操作性指标除了上一节中描述的指标外,IRIS实例还可以记录有关活动互操作性产品的指标,并将它们包含在/metrics端点的输出中。默......
  • 读Java实战(第二版)笔记05_Collection API的增强功能
    1. 集合工厂1.1. Arrays.asList()1.1.1. 创建了一个固定大小的列表1.1.2. 列表的元素可以更新1.1.3. 不能增加或者删除1.1.4. 大小固定的可变数组1.2. 没有A......
  • Java + SikuliX 基于图像实现自动化测试
    转载请注明出处❤️作者:测试蔡坨坨原文链接:caituotuo.top/6d2908e8.html你好,我是测试蔡坨坨。由于目前大多数GUI工具均需要依赖于程序类型进行特征属性识别,例如:Selenium......
  • 关于netcore webapi 前后端分离跨域配置
    最近做一个后台管理系统,但是期间遇到了跨域的问题,所以在此记录一下。这些问题都是很初级的基础知识。后台配置需要先配置指定域名跨域,这也是为了防止安全。一、关于netco......
  • java基础面试题
    java基础面试题1. Java有哪些数据类型?Java中有8种基本数据类型,分别为:6种数字类型(四个整数形,两个浮点型):byte、short、int、long、float、double,1种字符类型:char,1......
  • 认证,权限,频率源码分析 自定义频率类 SimpleRateThrottle缓存频率类 基于APIView编写分
    目录昨日回顾三种位置的token获取三种权限校验方式原生django的cookie+session认证底层原理断点调试使用认证,权限,频率源码分析(了解)权限源码分析认证源码分析频率源码分析自......