Controller
package diit.microservice.midrange.controller;
import com.github.xiaoymin.knife4j.annotations.ApiOperationSupport;
import diit.microservice.midrange.dto.TableDto;
import diit.microservice.midrange.dto.TableFieldDto;
import diit.microservice.midrange.entity.DatasourceConfig;
import diit.microservice.midrange.mapper.DatasourceConfigMapper;
import diit.microservice.midrange.service.TableConfigService;
import io.micrometer.core.instrument.util.StringUtils;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiImplicitParams;
import io.swagger.annotations.ApiOperation;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.text.StrBuilder;
import org.apache.tomcat.util.http.fileupload.IOUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.servlet.ServletOutputStream;
import javax.servlet.http.HttpServletResponse;
import java.io.*;
import java.net.URLEncoder;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* <p>
* 自动写配置文件
* </p>
*
* @author LvJM
* @since 2022-08-11
*/
@Slf4j
@RestController
@Api(value = "AutoConfWriteController", tags = "自动写配置文件")
@RequestMapping("/autoConf")
public class AutoConfWriteController {
@Autowired
private TableConfigService tableConfigService;
@Autowired
private DatasourceConfigMapper datasourceConfigMapper;
@Value("${spring.elasticsearch.rest.uris}")
private String elasticSearchUrl;
@GetMapping(value = "/getAutoConf")
@ApiOperation(value = "自动写conf",produces = "application/octet-stream")
@ApiOperationSupport(author = "LvJM")
@ApiImplicitParams({
@ApiImplicitParam(name = "sourceId", value = "数据库配置id", dataType = "String"),
@ApiImplicitParam(name = "tableName", value = "表名称(用','分割),空为所有表", dataType = "String"),
@ApiImplicitParam(name = "modulePath", value = "模块路径,例:个人中心>我的资料", dataType = "String"),
@ApiImplicitParam(name = "jdbcDriverPath", value = "数据库驱动在logstash的存放路径,例:D:\\logstash-7.13.3\\bin\\databaseDriverJar\\postgresql-42.2.23.jar", dataType = "String"),
@ApiImplicitParam(name = "lastValuePath", value = "增量同步文件路径,例:D:\\logstash-7.13.3\\sql_last_value", dataType = "String"),
})
public void getAutoConf(String sourceId, String tableName, String modulePath,String jdbcDriverPath,String lastValuePath,HttpServletResponse response) throws IOException {
DatasourceConfig datasourceConfig = datasourceConfigMapper.selectById(sourceId);
/**
* 进行配置
*/
String jdbc_connection_string = datasourceConfig.getDatabaseUrl();
StringBuilder conf = new StringBuilder();
String hosts = elasticSearchUrl;
// String module_path = "个人中心>我的资料";
String indexName = "dbms_data";
String jdbc_user = datasourceConfig.getUserName();
String jdbc_password = datasourceConfig.getPassword();
// String jdbc_driver_library = "D:\\ShuHuiShiKong\\ElasticSearch\\ES\\logstash-7.13.3\\bin\\databaseDriverJar\\postgresql-42.2.23.jar";
// String last_run_metadata_path = "D:\\ShuHuiShiKong\\ElasticSearch\\ES\\logstash-7.13.3\\sql_last_value";
String jdbc_driver_class = datasourceConfig.getDriverName();
/**
* 配置完成
*/
File file = getConf(sourceId, tableName, jdbc_connection_string, conf, indexName, hosts, modulePath, jdbc_user, jdbc_password, jdbcDriverPath, lastValuePath, jdbc_driver_class);
download(file,response);
}
private File getConf(String sourceId, String tableName, String jdbc_connection_string, StringBuilder conf, String indexName, String hosts, String moudle_path, String jdbc_user, String jdbc_password, String jdbc_driver_library, String last_run_metadata_path, String jdbc_driver_class) throws FileNotFoundException {
conf.append("input {\n" +
" stdin {}\n");
if (StringUtils.isNotEmpty(tableName)){
String[] tableNameArray = tableName.split(",");
for (String t : tableNameArray) {
Map<String, String> statement = getStatement(sourceId, t,jdbc_connection_string
,jdbc_user,jdbc_password,jdbc_driver_library,last_run_metadata_path,jdbc_driver_class,moudle_path);
if (null != statement){
conf.append(statement.get("statement"));
String key = statement.get("key");
conf.append("}\n");
conf.append(addFilter());
conf.append("output {\n" +
" if [type] == \""+t+"\" {\n" +
" elasticsearch {\n" +
" # 配置ES集群地址\n" +
" hosts => [\""+hosts+"\"]\n" +
" # 索引名字,必须小写\n" +
" index => \""+indexName+"\"\n" +
" #数据唯一索引(建议使用数据库KeyID)\n" +
" document_id => \"%{"+key+"}\"\n" +
" }\n" +
" }\n");
conf.append(addEnd());
}
}
}else {
List<TableDto> allTables = tableConfigService.getAllTables(sourceId);
boolean once = true;
for (TableDto allTable : allTables) {
Map<String, String> statement = getStatement(sourceId, allTable.getTableName(),jdbc_connection_string,
jdbc_user,jdbc_password,jdbc_driver_library,last_run_metadata_path,jdbc_driver_class,moudle_path);
if (null != statement){
conf.append(statement.get("statement"));
}
}
conf.append("}\n");
conf.append(addFilter());
conf.append("output {\n");
for (TableDto allTable : allTables) {
Map<String, String> statement = getStatement(sourceId, allTable.getTableName(),jdbc_connection_string,
jdbc_user,jdbc_password,jdbc_driver_library,last_run_metadata_path,jdbc_driver_class,moudle_path);
if (null != statement){
if (once){
String elasticsearch = getElasticsearch(allTable.getTableName(), indexName, statement.get("key"), hosts, "0");
conf.append(elasticsearch);
once = false;
}else {
String elasticsearch = getElasticsearch(allTable.getTableName(), indexName, statement.get("key"), hosts, "1");
conf.append(elasticsearch);
}
}
}
conf.append(addEnd());
}
String data=conf.toString();
System.out.println(data);
// String path = ResourceUtils.getURL("classpath:").getPath() + "static/";
String path = System.getProperty("user.dir");
File file = new File(path+ "/logstash/database.conf");
File parentFile = file.getParentFile();
if (!parentFile.exists()) {
parentFile.mkdir();
}
System.out.println(path);
try (FileWriter fr = new FileWriter(file)) {
// 以字符流的形式把数据写入到文件中
char[] cs = data.toCharArray();
fr.write(cs);
} catch (IOException e) {
e.printStackTrace();
}
return file;
}
private void download(File file, HttpServletResponse response) throws IOException {
//获取文件地址
//把服务器中文件读取到内存中
FileInputStream fis = new FileInputStream(file.getAbsolutePath());
//设置下载的类型
response.setHeader("content-disposition","attachment;fileName="+ URLEncoder.encode(file.getName(),"UTF-8"));
//获取输出流
ServletOutputStream os = response.getOutputStream();
//复制
IOUtils.copy(fis,os);
//关闭资源
fis.close();
os.close();
file.delete();
}
private String addEnd(){
String s = "stdout {\n" +
" codec => json_lines\n" +
" }\n" +
"}\n";
return s;
}
private String addFilter(){
String s = "filter {\n" +
" json {\n" +
" source => \"message\"\n" +
" remove_field => [\"message\"]\n" +
" }\n" +
" mutate {\n" +
" remove_field => [\"@timestamp\",\"@version\"]\n" +
" }\n" +
"}\n";
return s;
}
private String getElasticsearch(String tableName,String indexName,String key,String hosts,String isElseIf){
StrBuilder elasticsearchConf = new StrBuilder();
if ("1".equals(isElseIf)){
elasticsearchConf.append(" else if [type] == \""+tableName+"\" {\n");
}else {
elasticsearchConf.append(" if [type] == \""+tableName+"\" {\n");
}
elasticsearchConf.append(
" elasticsearch {\n" +
" # 配置ES集群地址\n" +
" hosts => [\""+hosts+"\"]\n" +
" # 索引名字,必须小写\n" +
" index => \""+indexName+"\"\n" +
" #数据唯一索引(建议使用数据库KeyID)\n" +
" document_id => \"%{"+key+"}\"\n" +
" }\n" +
" }\n");
return elasticsearchConf.toString();
}
private Map<String,String> getStatement(String sourceId, String tableName,
String jdbc_connection_string,String jdbc_user,String jdbc_password,
String jdbc_driver_library,String last_run_metadata_path,
String jdbc_driver_class,String moudle_path){
int keyNum = 0;
String key = "";
List<TableFieldDto> tableFieldDtoList = tableConfigService.getTableFields(sourceId, tableName);
for (TableFieldDto tableFieldDto : tableFieldDtoList) {
if (tableFieldDto.getIsPrimary() == 1
|| tableFieldDto.getColumnName().toLowerCase().equals("id")
|| tableFieldDto.getColumnName().toLowerCase().equals("objectid")){
keyNum ++;
key = tableFieldDto.getColumnName();
break;
}
}
if (keyNum != 1){
return null;
}
StringBuilder statement = new StringBuilder();
//
String upStr = " jdbc {\n" +
" type => \""+tableName+"\"\n" +
" jdbc_connection_string => \""+jdbc_connection_string+"\"\n" +
" # 数据库连接账号密码;\n" +
" jdbc_user => \""+jdbc_user+"\"\n" +
" jdbc_password => \""+jdbc_password+"\"\n" +
" # MySQL依赖包路径;\n" +
" jdbc_driver_library => \""+jdbc_driver_library+"\"\n" +
" jdbc_driver_class => \""+jdbc_driver_class+"\"\n" +
" #是否开启分页\n" +
" jdbc_paging_enabled => \"true\"\n" +
" #分页条数\n" +
" jdbc_page_size => \"50000\"\n";
statement.append(upStr);
statement.append(" statement => \"select "+"\n");
statement.append(" '"+moudle_path+"' as moudle_path ," +"\n");
statement.append(" '"+tableName+"' as resource_table ," +"\n");
for (int i = 0; i < tableFieldDtoList.size(); i++) {
String fieldType = tableFieldDtoList.get(i).getUdtName().toLowerCase();
if ("varchar".equals(fieldType) && i != tableFieldDtoList.size() -1){
statement.append(" "+tableFieldDtoList.get(i).getColumnName()+","+"\n");
}else if ("varchar".equals(fieldType)){
statement.append(" "+tableFieldDtoList.get(i).getColumnName()+" "+"\n");
}
else if (i != tableFieldDtoList.size() -1){
statement.append(" "+tableFieldDtoList.get(i).getColumnName()+"::text,"+"\n");
}else {
statement.append(" "+tableFieldDtoList.get(i).getColumnName()+"::text"+"\n");
}
}
statement.append(" "+"from "+tableName+"\""+"\n");
String downStr =
" #每一分钟做一次同步\n" +
" schedule => \"* * * * *\"\n" +
" #是否将字段名转换为小写,默认true(如果有数据序列化、反序列化需求,建议改为false)\n" +
" lowercase_column_names => true\n" +
" # 是否记录上次执行结果,true表示会将上次执行结果的tracking_column字段的值保存到last_run_metadata_path指定的文件中;\n" +
" record_last_run => true\n" +
" # 需要记录查询结果某字段的值时,此字段为true,否则默认tracking_column为timestamp的值;\n" +
" use_column_value => true\n" +
" # 需要记录的字段,用于增量同步,需是数据库字段\n" +
" tracking_column => \""+key+"\"\n" +
" # record_last_run上次数据存放位置;\n" +
" last_run_metadata_path => \""+last_run_metadata_path+"\"\n" +
" #是否将字段名转换为小写,默认true(如果有数据序列化、反序列化需求,建议改为false)\n" +
" clean_run => false \n" +
" } \n";
statement.append(downStr);
Map map = new HashMap();
map.put("key",key);
map.put("statement",statement.toString());
return map;
}
}
tableConfigService.getAllTables(sourceId)
/**
* 获取数据库下的所有表
* @return
*/
public List<TableDto> getAllTables(String sourceId){
List<TableDto> list = new ArrayList<>();
//查询配置信息
QueryWrapper<DatasourceConfig> queryWrapper = new QueryWrapper<>();
queryWrapper.eq("id",sourceId);
DatasourceConfig config = datasourceConfigMapper.selectOne(queryWrapper);
SQLHelper helper = new SQLHelper(config.getUserName(),config.getPassword(),config.getDriverName(),config.getDatabaseUrl());
try {
ResultSet resultSet = getAllTablesSet(config.getType(), helper);
//读取
while (resultSet.next()) {
TableDto dto = new TableDto();
dto.setTableName(resultSet.getString("table_name"));
dto.setObjDescription(resultSet.getString("obj_description"));
dto.setTableSize(resultSet.getString("table_size"));
dto.setTableCatalog(resultSet.getString("table_catalog"));
dto.setTableSchema(resultSet.getString("table_schema"));
list.add(dto);
}
//关闭
SQLHelper.close(resultSet,helper.getPs(),helper.getCt());
}catch (Exception e){
e.printStackTrace();
log.error("执行出现错误{}",e.getMessage());
}
return list;
}
private ResultSet getAllTablesSet(String type,SQLHelper helper){
ResultSet resultSet = null;
switch (type.toUpperCase()){
case "ORACLE":
resultSet = helper.executeQuery("select \n" +
" a.table_name,\n" +
" a.tablespace_name as table_catalog,\n" +
" a.tablespace_name as table_schema,\n" +
" a.sample_size as table_size,\n" +
" b.COMMENTS as obj_description\n" +
"from user_tables a,user_tab_comments b\n" +
"where a.TABLE_NAME=b.TABLE_NAME\n" +
"ORDER BY TABLE_NAME");
break;
case "POSTGRESQL":
resultSet = helper.executeQuery("\n" +
"SELECT\n" +
"\t\"table_name\",\n" +
"\ttable_catalog,\n" +
"\ttable_schema,\n" +
// "\tpg_size_pretty ( pg_relation_size ( \"table_name\" ) ) as table_size,\n" +
"pg_size_pretty (pg_total_relation_size(('\"' || table_schema || '\".\"' || table_name || '\"'))) as table_size,"+
"\tobj_description ( oid, 'pg_class' ) \n" +
"FROM\n" +
"\tinformation_schema.tables t1,\n" +
"\tpg_class t2 \n" +
"WHERE\n" +
"\ttable_schema = 'public' \n" +
"\tAND t1.\"table_name\" = t2.relname;");
break;
// break;
case "KINGBASE8":
resultSet = helper.executeQuery("SELECT \n" +
"\ttable_name,\n" +
"\t'' as table_catalog,\n" +
"\t'' as table_schema,\n" +
"\t'' as table_size,\n" +
"\t'' as obj_description\n" +
"from information_schema.TABLES WHERE \n" +
"table_schema='public';");
break;
default:
break;
}
return resultSet;
}
tableConfigService.getTableFields(sourceId, tableName);
/**
* 获取表中的字段设计
* @return
*/
public List<TableFieldDto> getTableFields(String sourceId, String tableName){
List<TableFieldDto> list = new ArrayList<>();
//查询配置信息
QueryWrapper<DatasourceConfig> queryWrapper = new QueryWrapper<>();
queryWrapper.eq("id",sourceId);
DatasourceConfig config = datasourceConfigMapper.selectOne(queryWrapper);
SQLHelper helper = new SQLHelper(config.getUserName(),config.getPassword(),config.getDriverName(),config.getDatabaseUrl());
try {
ResultSet resultSet = getTableFieldsSet(config.getType(), tableName,helper);
//读取
while (resultSet.next()) {
TableFieldDto dto = new TableFieldDto();
dto.setColumnName(resultSet.getString("column_name"));
dto.setColDescription(resultSet.getString("col_description"));
dto.setUdtName(resultSet.getString("udt_name"));
dto.setField_size(resultSet.getString("field_size"));
dto.setIsPrimary(resultSet.getInt("is_primary"));
list.add(dto);
}
//关闭
SQLHelper.close(resultSet,helper.getPs(),helper.getCt());
}catch (Exception e){
e.printStackTrace();
log.error("执行出现错误{}",e.getMessage());
}
return list;
}
private ResultSet getTableFieldsSet(String type,String tableName,SQLHelper helper){
ResultSet resultSet = null;
switch (type.toUpperCase()){
case "ORACLE":
resultSet = helper.executeQuery("SELECT \n" +
"\n" +
" lower(b.column_name) column_name \n" +
" ,a.comments col_description \n" +
" ,b.data_type udt_name \n" +
" ,b.data_length field_size \n" +
" ,b.data_precision is_primary \n" +
" \n" +
"FROM all_col_comments a \n" +
" ,all_tab_columns b \n" +
"WHERE a.table_name = b.table_name \n" +
"and a.OWNER = b.OWNER \n" +
"and a.Column_name = b.Column_name \n" +
" and a.table_name = '"+tableName+"'");
break;
case "POSTGRESQL":
resultSet = helper.executeQuery("\n" +
"SELECT\n" +
"base.\"column_name\",\n" +
"col_description ( t1.oid, t2.attnum ),\n" +
"base.udt_name,\n" +
"COALESCE(character_maximum_length, numeric_precision, datetime_precision) as field_size,\n" +
"(CASE\n" +
"\tWHEN ( SELECT t2.attnum = ANY ( conkey ) FROM pg_constraint WHERE conrelid = t1.oid AND contype = 'p' ) = 't' \n" +
"\tTHEN 1 ELSE 0 \n" +
"END ) as is_primary\n" +
"FROM\n" +
"information_schema.COLUMNS base,\n" +
"pg_class t1,\n" +
"pg_attribute t2 \n" +
"WHERE\n" +
"base.\"table_name\" = '"+ tableName +"' \n" +
"AND t1.relname = base.\"table_name\" \n" +
"AND t2.attname = base.\"column_name\" \n" +
"AND t1.oid = t2.attrelid \n" +
"AND t2.attnum > 0;\n");
break;
case "KINGBASE8":
resultSet = helper.executeQuery("SELECT \n" +
"\n" +
" lower(b.column_name) column_name \n" +
" ,a.comments col_description \n" +
" ,b.data_type udt_name \n" +
" ,b.data_length field_size \n" +
" ,b.data_precision is_primary \n" +
" \n" +
"FROM all_col_comments a \n" +
" ,all_tab_columns b \n" +
"WHERE a.table_name = b.table_name \n" +
"and a.OWNER = b.OWNER \n" +
"and a.Column_name = b.Column_name \n" +
" and UPPER(a.table_name) = '"+ tableName.toUpperCase() +"'");
break;
default:
break;
}
return resultSet;
}
SQLHelper
package diit.microservice.midrange.utils;
import lombok.extern.slf4j.Slf4j;
import java.sql.*;
@Slf4j
public class SQLHelper {
//定义三个变量
private Connection ct=null;
private PreparedStatement ps=null;
private ResultSet rs=null;
//连接数据库的用户名,密码,url,驱动
//说明:在实际开发中,我们往往把这些变量写到一个外部文件中
//当程序启动时,我们读入这些配置信息。java.util.Properites
private String username;
private String password;
private String driver;
private String url;
//使用静态块加载驱动(驱动只需要加载一次)
public SQLHelper(String username, String password, String driver, String url){
//获取dbinfo.properties文件内信息
this.username=username;
this.password=password;
this.driver=driver;
this.url=url;
try{
//获得驱动
Class.forName(driver);
}catch (Exception e){
e.printStackTrace();
log.error("加载驱动和链接信息出错:{}",e.getMessage());
}
}
//统一的curd操作
public void executeUpdate(String sql,String[] parameters){
try {
ct=DriverManager.getConnection(url,username,password);
ps=ct.prepareStatement(sql);
if(parameters!=null){
for(int i=0;i<parameters.length;i++){
ps.setString(i+1, parameters[i]);
}
}
//执行
ps.executeUpdate();
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e.getMessage());
}finally{
close(rs,ps,ct);
}
}
//写一个方法,完成查询任务
//sql表示要执行的sql语句
//sql select * from emp where ename=?
public ResultSet executeQuery(String sql,String ... parameters){
try {
//根据实际情况我们对sql语句?赋值
//得到连接
ct=DriverManager.getConnection(url,username,password);
//创建ps对象,得到sql语句对象
ps=ct.prepareStatement(sql);
//如果parameters不为null,才赋值
if(parameters!=null){
for(int i=0;i<parameters.length;i++){
ps.setString(i+1, parameters[i]);
}
}
rs=ps.executeQuery();
} catch (SQLException e) {
e.printStackTrace();
log.error("执行查询出错:{}",e.getMessage());
//抛出运行异常
throw new RuntimeException(e.getMessage());
} finally{
// close(rs,ps,ct);
}
return rs;
}
//把关闭资源写成函数
public static void close(ResultSet rs,Statement ps,Connection ct){
//关闭资源
if(rs!=null){
try {
rs.close();
} catch (SQLException e) {
e.printStackTrace();
}
rs=null;
}
if(ps!=null){
try {
ps.close();
} catch (SQLException e) {
e.printStackTrace();
}
ps=null;
}
if(ct!=null){
try {
ct.close();
} catch (SQLException e) {
e.printStackTrace();
}
ct=null;
}
}
public Connection getCt() {
return ct;
}
public PreparedStatement getPs() {
return ps;
}
}
标签:jdbc,name,配置文件,导入,statement,table,import,logstash,String
From: https://www.cnblogs.com/ideaAI/p/16668655.html