<!--kettle-->
<dependency>
<groupId>pentaho-kettle</groupId>
<artifactId>kettle-core</artifactId>
<version>9.4.0.0-343</version>
</dependency>
<dependency>
<groupId>pentaho-kettle</groupId>
<artifactId>kettle-engine</artifactId>
<version>9.4.0.0-343</version>
</dependency>
<dependency>
<groupId>org.pentaho</groupId>
<artifactId>pentaho-encryption-support</artifactId>
<version>9.4.0.0-343</version>
</dependency>
<dependency>
<groupId>pentaho</groupId>
<artifactId>metastore</artifactId>
<version>9.4.0.0-343</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-vfs2</artifactId>
<version>2.7.0</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.12.0</version>
</dependency>
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
<version>2.6</version>
</dependency>
<dependency>
<groupId>org.owasp.encoder</groupId>
<artifactId>encoder</artifactId>
<version>1.2.3</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.29</version>
</dependency>
国内仓库无法加载该项目jar包,可以采用以下方式进行安装
mvn install:install-file -Dfile=jar包路劲\commons-vfs2-2.7.0.jar -DgroupId=org.apache.commons -DartifactId=commons-vfs2 -Dversion=2.7.0 -Dpackaging=jar
<password-encoder-plugins>
<password-encoder-plugin id="Kettle">
<description>Kettle Password Encoder</description>
<classname>org.pentaho.support.encryption.KettleTwoWayPasswordEncoder</classname>
</password-encoder-plugin>
<!--
<password-encoder-plugin id="AES">
<description>AES Password Encoder</description>
<classname>org.pentaho.support.encryption.AESTwoWayPasswordEncoder</classname>
<default-encoder>true</default-encoder>
</password-encoder-plugin>
-->
</password-encoder-plugins>
{
"stepInfos": [
{
"type": "tableInput",
"database": {
"access": "Native",
"database": "kettle-test",
"name": "test",
"password": "***",
"port": "3306",
"server": "localhost",
"type": "MYSQL",
"username": "root"
},
"stepInfo": {
"sql": "select * from user",
"stepName": "1",
"location": {
"x": 100,
"y": 100
}
}
},
{
"type": "tableInput",
"database": {
"access": "Native",
"database": "kettle-test",
"name": "test",
"password": "****",
"port": "3306",
"server": "localhost",
"type": "MYSQL",
"username": "root"
},
"stepInfo": {
"sql": "select * from user",
"stepName": "2",
"location": {
"x": 200,
"y": 100
}
}
},
{
"type": "append",
"headName": "1",
"tailName": "2",
"stepInfo": {
"stepName": "3",
"location": {
"x": 300,
"y": 100
}
}
}
],
"previewInfo": {
"stepName": "1",
"size": 2
},
"connectInfos": [
{
"from": "1",
"to": "3"
},
{
"from": "2",
"to": "3"
}
]
}
import com.alibaba.fastjson.JSONObject;
import com.central.business.dto.KettleTableInputDTO;
import com.central.business.dto.PreviewData;
import com.central.business.service.KettleService;
import com.central.common.exception.BizException;
import com.central.common.model.Result;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.multipart.MultipartFile;
import sun.rmi.runtime.Log;
import javax.annotation.Resource;
import javax.servlet.http.HttpServletResponse;
@Slf4j
@RestController
@RequestMapping("kettle")
@Api(tags = "Kettle操作接口")
public class KettleController {
@Resource
private KettleService kettleService;
@PostMapping("previewByJson")
@ApiOperation("通过json数据预览")
public Result<PreviewData> previewByJson(@RequestBody JSONObject jsonObject) {
log.info(jsonObject.toJSONString());
return kettleService.previewByJson(jsonObject);
}
@PostMapping("downloadKtrByJson")
@ApiOperation("通过json数据下载对应ktr文件,可用于验证json格式数据自定义ktr图是否正误")
public void downloadKtrByJson(@RequestBody JSONObject jsonObject, HttpServletResponse response) {
try {
kettleService.downloadKtrByJson(jsonObject, response);
} catch (Exception e) {
e.printStackTrace();
throw new BizException(e.getMessage());
}
}
@GetMapping("/execKtr")
@ApiOperation("执行ktr文件")
public Object runKtr(String filename) throws Exception {
return Result.succeed(kettleService.runTaskKtr(filename, null).toString());
}
@GetMapping("/execKjb")
@ApiOperation("执行kjb文件")
public Object runKjb(String filename) throws Exception {
return Result.succeed(kettleService.runTaskKjb(filename, null).toString());
}
}
对应的service就直接不写了,直接写核心部分,操作工具类
/**
* 以流的方式调用和预览 kettle ktr
*
* @param in 文件流
*/
public static PreviewData previewKtrByStream(InputStream in, KettlePreviewDTO previewDTO) {
if (Objects.isNull(previewDTO) || StringUtils.isBlank(previewDTO.getStepName())) {
return null;
}
try {
KettleEnvironment.init();
// 此处可以注册自定义插件,用于扩展定制个人需求
registerPlugins();
TransMeta transMeta = new TransMeta(in, null, true, null, null);
Trans trans = new Trans(transMeta);
trans.setPreview(true);
if (needSendLog(previewDTO)) {
PreviewMap.setPreviewDTO(trans.getLogChannelId(), previewDTO);
// 新增执行监听插件,可用于在kettle执行ktl文件的生命周期内定制动作
trans.addTransListener(new AsyncLogListener());
}
trans.prepareExecution(null);
PreviewData previewData = new PreviewData();
previewData.setData(new LinkedList<>());
if (!isExist(transMeta, previewDTO)) {
throw new BizException("预览信息中的步骤" + previewDTO.getStepName() + "不存在,请指定存在的步骤");
}
if (!isAsync(previewDTO)) {
// 支持同spoon一样的preview操作
trans.getSteps().stream().filter(c -> previewDTO.getStepName().equals(c.step.getStepname()))
.findFirst()
.ifPresent(n -> n.step.addRowListener(new RowAdapter() {
@Override
public void rowWrittenEvent(RowMetaInterface rowMeta, Object[] row) {
addToPreviewData(new PreviewParam(rowMeta, row, previewData, previewDTO, n));
}
}));
}
trans.startThreads();
if (!isAsync(previewDTO)) {
trans.waitUntilFinished();
}
return previewData;
} catch (Exception e) {
e.printStackTrace();
throw new BizException(e.getMessage());
}
}
/**
注册个人自定义插件
*/
private static void registerPlugins() throws KettlePluginException {
Map<Class<?>, String> hashMap = new HashMap<>(2);
hashMap.put(StepMetaInterface.class, AppendStepMeta.class.getName());
// 注册spoon上存在,而java操作kettle上不存在的append组件,此处仅为注册,具体操作往下看
PluginRegistry.getInstance().registerPlugin(StepPluginType.class, new Plugin(
new String[]{"Append"}, StepPluginType.class, StepMetaInterface.class,
"", "", "", "",
false, false, hashMap,
new LinkedList<>(), null, null
));
Map<Class<?>, String> hashMap2 = new HashMap<>(2);
hashMap2.put(StepMetaInterface.class, FieldSplitter2Meta.class.getName());
// 此处为原有的FieldSplitter组件的字符分隔功能无法满足使用,通过注册重写一个满足使用的组件
PluginRegistry.getInstance().registerPlugin(StepPluginType.class, new Plugin(
new String[]{"FieldSplitter2"}, StepPluginType.class, StepMetaInterface.class,
"", "", "", "",
false, false, hashMap2,
new LinkedList<>(), null, null
));
}
private static void addToPreviewData(PreviewParam previewParam) {
if (Objects.isNull(previewParam.getRowMeta()) || Objects.isNull(previewParam.getPreviewData())
|| isReachSize(previewParam.getPreviewDTO(), previewParam.getPreviewData())) {
return;
}
List<ValueMetaInterface> valueMetaList = previewParam.getRowMeta().getValueMetaList();
if (isEmpty(valueMetaList)) {
return;
}
List<String> joinFields = getJoinFields(previewParam);
List<String> fieldName = valueMetaList.stream()
.map(ValueMetaInterface::getName)
.collect(toCollection(LinkedList::new));
previewParam.getPreviewData().setFieldName(fieldName);
if (ArrayUtils.isEmpty(previewParam.getRow())) {
return;
}
Map<String, Object> dataMap = new LinkedHashMap<>();
for (int i = 0; i < fieldName.size(); i++) {
if (isContains(joinFields, fieldName.get(i))) {
continue;
}
dataMap.put(fieldName.get(i), getValue(previewParam, i));
}
removeJoinFields(previewParam, joinFields, fieldName);
if (dateMapIsAllNull(dataMap)) {
return;
}
previewParam.getPreviewData().getData().add(dataMap);
}
public static void main(String[] args) throws IOException, KettleException {
// 初始化
kettleInit("merge data test");
// 添加需要的组件
getAppendStep(KettleAppendInfo.builder()
.headName("字段选择")
.tailName("字段选择 2")
.build(),
KettleStepInfo.builder()
.stepName("追加流")
.location(KettleLocation.builder()
.x(450)
.y(175)
.build())
.build());
// 输出
String transXml = getTransMeta().getXML();
File file = new File("merge_test.ktr");
FileUtils.writeStringToFile(file, transXml, "UTF-8");
}
// kettle初始化
public static void kettleInit(String name) {
try {
KettleEnvironment.init();
setTransMeta(new TransMeta());
getTransMeta().setName(name);
// registry是给每个步骤生成一个标识Id用
setRegistry(PluginRegistry.getInstance());
} catch (KettleException e) {
e.printStackTrace();
}
}
// 合并两个步骤使用
public static void combineTwoStep(StepMeta from, StepMeta to) {
if (Objects.isNull(from) || Objects.isNull(to)) {
throw new BizException("开头from或者结尾to参数不可为空");
}
getTransMeta().addTransHop(new TransHopMeta(from, to));
}
public static StepMeta getTableOutputStep(KettleTableOutputInfo outputInfo, KettleDatabase database,
KettleStepInfo stepInfo) {
assertParam(outputInfo);
TableOutputMeta tableOutputMeta = new TableOutputMeta();
tableOutputMeta.setDatabaseMeta(getDatabaseMeta(database));
tableOutputMeta.setTableName(outputInfo.getTableName());
tableOutputMeta.setSpecifyFields(outputInfo.getSpecifyFields());
tableOutputMeta.setTruncateTable(outputInfo.getTruncateTable());
tableOutputMeta.setFieldDatabase(outputInfo.getFieldDatabase());
tableOutputMeta.setFieldStream(outputInfo.getFieldStream());
return getStepMeta(stepInfo, tableOutputMeta);
}
public static StepMeta getSelectValuesStep(KettleSelectValuesInfo selectValuesInfo, KettleStepInfo stepInfo) {
assertParam(selectValuesInfo);
SelectValuesMeta setValueFieldMeta = new SelectValuesMeta();
if (Objects.nonNull(selectValuesInfo.getSelectName())) {
setValueFieldMeta.setSelectName(selectValuesInfo.getSelectName());
}
if (Objects.nonNull(selectValuesInfo.getSelectRename())) {
setValueFieldMeta.setSelectRename(selectValuesInfo.getSelectRename());
}
setValueFieldMeta.setDeleteName(selectValuesInfo.getDeleteName());
if (ArrayUtils.isNotEmpty(selectValuesInfo.getMetas())) {
SelectMetadataChange[] metas = new SelectMetadataChange[selectValuesInfo.getMetas().length];
for (int i = 0; i < selectValuesInfo.getMetas().length; i++) {
SelectMetadata current = selectValuesInfo.getMetas()[i];
metas[i] = new SelectMetadataChange(setValueFieldMeta, current.getName(), current.getRename(),
current.getType(), current.getLength(), current.getPrecision(), current.getStorageType(),
current.getConversionMask(), current.isDateFormatLenient(), current.getDateFormatLocale(),
current.getDateFormatTimeZone(), current.isLenientStringToNumber(), current.getDecimalSymbol(),
current.getGroupingSymbol(), current.getCurrencySymbol());
}
setValueFieldMeta.setMeta(metas);
}
return getStepMeta(stepInfo, setValueFieldMeta);
}
private static DatabaseMeta getDatabaseMeta(KettleDatabase database) {
// 添加转换的数据库连接
DatabaseMeta databaseMeta;
try {
databaseMeta = new DatabaseMeta(getDatabaseXml(database));
} catch (Exception e) {
e.printStackTrace();
throw new BizException(e.getMessage());
}
getTransMeta().addDatabase(databaseMeta);
// 给表输入添加一个DatabaseMeta连接数据库
return getTransMeta().findDatabase(database.getName());
}
private static StepMeta getStepMeta(KettleStepInfo stepInfo, StepMetaInterface stepMetaInterface) {
String pluginId = getRegistry().getPluginId(StepPluginType.class, stepMetaInterface);
// 添加SortRowsMeta到转换中
StepMeta stepMeta = new StepMeta(pluginId, stepInfo.getStepName(), stepMetaInterface);
// 给步骤添加在spoon工具中的显示位置
stepMeta.setDraw(true);
stepMeta.setLocation(stepInfo.getLocation().getX(), stepInfo.getLocation().getY());
getTransMeta().addStep(stepMeta);
return stepMeta;
}
private static String getDatabaseXml(KettleDatabase kettleDatabase) {
if (StringUtils.isBlank(kettleDatabase.getName())) {
throw new BizException("database中的name不可为空");
}
return "<?xml version=\"1.0\" encoding=\"UTF-8\"?>" +
"<connection>" +
"<name>" + kettleDatabase.getName() + "</name>" +
"<server>" + kettleDatabase.getServer() + "</server>" +
"<type>" + kettleDatabase.getType() + "</type>" +
"<access>" + kettleDatabase.getAccess() + "</access>" +
"<database>" + kettleDatabase.getDatabase() + "</database>" +
"<port>" + kettleDatabase.getPort() + "</port>" +
"<username>" + kettleDatabase.getUsername() + "</username>" +
"<password>" + kettleDatabase.getPassword() + "</password>" +
"</connection>";
}
@Slf4j
public class AsyncLogListener implements TransListener {
@Override
public void transStarted(Trans trans) throws KettleException {
}
@Override
public void transActive(Trans trans) {
}
@Override
public void transFinished(Trans trans) throws KettleException {
String logChannelId = trans.getLogChannelId();
LoggingBuffer appender = KettleLogStore.getAppender();
// 执行后的日志信息
String logMsg = appender.getBuffer(logChannelId, true).toString();
if (StringUtils.isBlank(logMsg)) {
return;
}
doSomethingYouWant(logMsg, trans);
}
}
-
自定义kettle组件的三要素
- 继承BaseStep,实现StepInterface
- 继承BaseStepData,实现StepDataInterface
- 继承BaseStepMeta, 实现StepMetaInterface
// 第一步
@Data
public class AppendStep extends BaseStep implements StepInterface {
private static final Class<?> PKG = FieldSplitter2Meta.class;
private String headName;
private String tailName;
public AppendStep(StepMeta s, StepDataInterface stepDataInterface, int c, TransMeta t, Trans dis) {
super(s, stepDataInterface, c, t, dis);
}
/**
* TODO 初始化方法,可以建立数据库链接、获取文件句柄等操作,会被PDI调用。
*/
@Override
public boolean init(StepMetaInterface smi, StepDataInterface sdi) {
AppendStepMeta meta = (AppendStepMeta) smi;
AppendStepData data = (AppendStepData) sdi;
if (!super.init(meta, data)) {
return false;
}
//在这里添加特定的初始化代码
headName = meta.getHeadName();
tailName = meta.getTailName();
return true;
}
/**
* 读取行的业务逻辑,会被PDI调用,当此方法返回false时,完成行读取。
*/
@Override
public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException {
AppendStepMeta meta = (AppendStepMeta) smi;
AppendStepData data = (AppendStepData) sdi;
//从输入流中读取一行
Object[] r = getRow();
if (r == null) {
setOutputDone();
return false;
}
if (first) {
first = false;
//如果是第一行则保存数据行元信息到data类中,后续使用
data.outputRowMeta = getInputRowMeta().clone();
}
//将行放入输出行流
putRow(data.outputRowMeta, r);
//如有需要,可以进行日志记录
if (checkFeedback(getLinesRead())) {
logBasic(BaseMessages.getString(PKG, "AppendStep.Linenr", getLinesRead()));
}
//返回true则表示还应继续使用processRow()读取下一行
return true;
}
/**
* 析构函数,用来释放资源,会被PDI调用。
*/
@Override
public void dispose(StepMetaInterface smi, StepDataInterface sdi) {
AppendStepMeta meta = (AppendStepMeta) smi;
AppendStepData data = (AppendStepData) sdi;
super.dispose(meta, data);
}
}
// 第二步
public class AppendStepData extends BaseStepData implements StepDataInterface {
RowMetaInterface outputRowMeta;
public AppendStepData() {
super();
}
}
// 第三步
@Data
public class AppendStepMeta extends BaseStepMeta implements StepMetaInterface {
private static final Class<?> PKG = AppendStepMeta.class;
private String headName;
private String tailName;
public AppendStepMeta() {
super();
}
@Override
public StepInterface getStep(StepMeta stepMeta, StepDataInterface stepDataInterface, int cnr, TransMeta transMeta, Trans disp) {
return new AppendStep(stepMeta, stepDataInterface, cnr, transMeta, disp);
}
@Override
public void loadXML(Node stepnode, List<DatabaseMeta> databases, IMetaStore metaStore) throws KettleXMLException {
try {
setHeadName(XMLHandler.getNodeValue(XMLHandler.getSubNode(stepnode, "head_name")));
setTailName(XMLHandler.getNodeValue(XMLHandler.getSubNode(stepnode, "tail_name")));
} catch (Exception e) {
throw new KettleXMLException("Demo plugin unable to read step info from XML node", e);
}
}
@Override
public StepDataInterface getStepData() {
return new AppendStepData();
}
@Override
public Object clone() {
return super.clone();
}
@Override
public void setDefault() {
setHeadName("");
setTailName("");
}
@Override
public String getXML() {
return XMLHandler.addTagValue("type", "Append") +
XMLHandler.addTagValue("head_name", getHeadName()) +
XMLHandler.addTagValue("tail_name", getTailName());
}
}
public interface StepParse {
/**
* 是否可以处理,命名规则 step的type + StepParse
*/
default boolean match(String type) {
String simpleName = this.getClass().getSimpleName();
// 去掉该类名的StepParse后缀
String substring = simpleName.substring(0, simpleName.length() - 9);
// 首字母小写
return StringUtils.equals(type, Character.toLowerCase(substring.charAt(0)) + substring.substring(1));
}
/**
* 解析
*
* @param jsonObj jsonObj
* @param stepInfo stepInfo
* @param connectInfos connectInfos
*/
StepMeta parse(JSONObject jsonObj, KettleStepInfo stepInfo, List<ConnectInfo> connectInfos);
}
private JsonXmlDTO getXmlByJson(JSONObject jsonObject) {
String transXml;
JsonInfo jsonInfo;
synchronized (this) {
KettleCompUtils.kettleInit(UUID.randomUUID().toString());
// 1,解析json数据
jsonInfo = KettleExec.parse(jsonObject);
if (Objects.isNull(jsonInfo)) {
throw new BizException("json内容不可为空");
}
// 2,生成对应xml文档
transXml = KettleExec.getTransXml(jsonInfo);
}
return JsonXmlDTO.builder().transXml(transXml).jsonInfo(jsonInfo).build();
}
public static JsonInfo parse(JSONObject jsonObject) {
if (Objects.isNull(jsonObject)) {
throw new BizException("请输入对应的json格式数据");
}
return JsonParse.parse(jsonObject);
}
static JsonInfo parse(JSONObject jsonObject) {
List<StepMeta> stepMetas;
List<JSONObject> delayParses = new ArrayList<>();
List<ConnectInfo> connectInfos = parseConnectInfos(jsonObject);
try {
stepMetas = parseStepInfos(jsonObject, connectInfos, delayParses);
KettleExec.setStepMetas(stepMetas);
parseDelay(delayParses, stepMetas, connectInfos);
} catch (Exception e) {
e.printStackTrace();
throw new BizException(e.getMessage());
}
return JsonInfo.builder()
.stepInfos(stepMetas)
.previewInfo(parsePreviewInfo(jsonObject))
.connectInfos(connectInfos)
.build();
}
private static List<StepMeta> parseStepInfos(JSONObject jsonObject, List<ConnectInfo> connectInfos,
List<JSONObject> delayParses) {
JSONArray jsonArray = jsonObject.getJSONArray("stepInfos");
if (CollectionUtils.isEmpty(jsonArray)) {
throw new BizException("stepInfos信息不可为空");
}
List<StepMeta> stepMetas = new LinkedList<>();
for (int i = 0; i < jsonArray.size(); i++) {
doParse(connectInfos, delayParses, stepMetas, jsonArray.getJSONObject(i));
}
return stepMetas;
}
private static void doParse(List<ConnectInfo> connectInfos, List<JSONObject> delayParses,
List<StepMeta> stepMetas, JSONObject jsonObject) {
if (Objects.isNull(jsonObject) || StringUtils.isBlank(jsonObject.toString())) {
throw new BizException("stepInfos内的信息不可为空");
}
List<StepParse> stepParses = StepParseManager.getAllStepParse();
if (CollectionUtils.isEmpty(stepParses)) {
throw new BizException("StepParse不可为空");
}
String type = jsonObject.getString("type");
if (StringUtils.isBlank(type)) {
throw new BizException("步骤:"
+ jsonObject.toString(SerializerFeature.PrettyFormat)
+ "内的type信息不可为空");
}
boolean finished = false;
for (StepParse stepPars : stepParses) {
if (Objects.nonNull(delayParses) && needDelay(type)) {
delayParses.add(jsonObject);
finished = true;
break;
}
if (StepParseManager.match(stepPars, type)) {
stepMetas.add(StepParseManager.parse(stepPars, jsonObject, connectInfos));
finished = true;
break;
}
}
if (!finished) {
throw new BizException("该step对应的type为"
+ type
+ "无法处理,请查正或者联系开发人员新增");
}
}
public static StepMeta parse(StepParse stepPars, JSONObject jsonObject, List<ConnectInfo> connectInfos) {
JSONObject stepInfo = jsonObject.getJSONObject("stepInfo");
if (Objects.isNull(stepInfo)) {
throw new BizException("step内的stepInfo信息不可为空");
}
KettleStepInfo stepInfo1 = stepInfo.toJavaObject(KettleStepInfo.class);
if (StringUtils.isBlank(stepInfo1.getStepName())) {
throw new BizException("stepInfo内的stepName不可为空");
}
return stepPars.parse(jsonObject, stepInfo1, connectInfos);
}
**个人原创,转载请标明来源 **
END
标签:return,String,stepInfo,java,jsonObject,kettle,new,操作,public From: https://blog.csdn.net/weixin_40194030/article/details/139772431