SpringBoot-web环境
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
springboot--启动类
@SpringBootApplication(exclude = DataSourceAutoConfiguration.class)
public class DemoApplication {
public static void main(String[] args) {
SpringApplication sp = new SpringApplication(DemoApplication.class);
sp.addListeners(new ApplicationPidFileWriter());
ConfigurableApplicationContext applicationContext = sp.run(args);
Environment env = applicationContext.getEnvironment();
String port = env.getProperty("server.port");
System.out.print("\n----------------------------------------------------------\n\t" +
"Application is running! Access URL:\n\t" +
"Local: \t\thttp://localhost:" + port + "/\n\t" +
"----------------------------------------------------------\n\t");
}
}
RestController接口
package com.example.demo.demos.web;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.*;
import javax.servlet.http.HttpServletRequest;
import java.io.BufferedReader;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URLDecoder;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.HashMap;
import java.util.Map;
@RestController
@RequestMapping("/datahub")
public class SqlLineageController {
@Value("${datahub.sqllineage.shell.path:/home/datahub/sqlShell}")
private String sqlShellPath;
@Value("${datahub.sqllineage.shell.python.name:lineage_emitter_dataset_finegrained_sample.py}")
private String pythonShellName;
@Value("${datahub.sqllineage.shell.python.version:python3}")
private String pythonShellVersion;
private String sqlTextName = "/sql.txt";
@RequestMapping(value = "/sqlEncodeBase64", method = {RequestMethod.GET, RequestMethod.POST})
public String createTableLineage(@RequestParam("sql") String sql) throws IOException {
byte[] encode = Base64.getEncoder().encode(sql.getBytes(StandardCharsets.UTF_8));
return new String(encode,StandardCharsets.UTF_8);
}
@RequestMapping(value = "/createTableLineage", method = {RequestMethod.GET, RequestMethod.POST})
public Map<String, Object> createTableLineage(@RequestBody Map<String, Object> sql, HttpServletRequest request) {
Map<String, Object> resp = new HashMap<>();
resp.put("code", 500);
try {
Object insertLineageSql = null;
if (sql.containsKey("sql")) {
insertLineageSql = sql.get("sql");
} else {
resp.put("msg", "SQl必传....");
return resp;
}
String executeSqlBase64 = insertLineageSql.toString();
byte[] decodedBytes = Base64.getDecoder().decode(executeSqlBase64);
String executeSql = new String(decodedBytes, StandardCharsets.UTF_8);
String categorySql = URLDecoder.decode(executeSql, "UTF-8").replace("\t"," ");
int i = executePythonSql(sqlShellPath, categorySql);
System.out.println(executeSql + "执行结果:" + i);
if (i == 0) {
resp.put("code", 200);
resp.put("msg", "success");
return resp;
}
resp.put("msg", "服务器错误....");
return resp;
} catch (IOException e) {
resp.put("msg", e.getMessage());
return resp;
}
}
public int executePythonSql(String sqlPythonLocalValue, String sql) throws IOException {
FileWriter fileWriter = new FileWriter(sqlPythonLocalValue + sqlTextName);
fileWriter.write("");
fileWriter.write(sql);
fileWriter.close();
// 定义要执行的Shell命令
String command = pythonShellVersion + " " + sqlPythonLocalValue + "/" + pythonShellName;
System.out.println(command);
try {
// 开始执行Shell命令并获取输入流
Process process = new ProcessBuilder()
.command("bash", "-c", command)
.redirectErrorStream(true)
.start();
BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()));
StringBuilder output = new StringBuilder();
String line;
while ((line = reader.readLine()) != null) {
output.append(line).append("\n");
}
int exitCode = process.waitFor();
System.out.println("Exit Code: " + exitCode);
System.out.println("Output:\n" + output.toString());
return exitCode;
} catch (InterruptedException e) {
e.printStackTrace();
}
return 1;
}
}
配置 命令执行的Python脚本位置
datahub.sqllineage.shell.path=/home/datahub/sqlShell
datahub.sqllineage.shell.python.name=lineage_emitter_dataset_finegrained_sample.py
datahub.sqllineage.shell.python.version=python3