首页 > 其他分享 >自动生成logstash导入配置文件

自动生成logstash导入配置文件

时间:2022-09-08 10:44:06浏览次数:119  
标签:jdbc name 配置文件 导入 statement table import logstash String

Controller

package diit.microservice.midrange.controller;

import com.github.xiaoymin.knife4j.annotations.ApiOperationSupport;
import diit.microservice.midrange.dto.TableDto;
import diit.microservice.midrange.dto.TableFieldDto;
import diit.microservice.midrange.entity.DatasourceConfig;
import diit.microservice.midrange.mapper.DatasourceConfigMapper;
import diit.microservice.midrange.service.TableConfigService;
import io.micrometer.core.instrument.util.StringUtils;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiImplicitParams;
import io.swagger.annotations.ApiOperation;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.text.StrBuilder;
import org.apache.tomcat.util.http.fileupload.IOUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.servlet.ServletOutputStream;
import javax.servlet.http.HttpServletResponse;
import java.io.*;
import java.net.URLEncoder;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * <p>
 * 自动写配置文件
 * </p>
 *
 * @author LvJM
 * @since 2022-08-11
 */
@Slf4j
@RestController
@Api(value = "AutoConfWriteController", tags = "自动写配置文件")
@RequestMapping("/autoConf")
public class AutoConfWriteController {

    @Autowired
    private TableConfigService tableConfigService;
    @Autowired
    private DatasourceConfigMapper datasourceConfigMapper;

    @Value("${spring.elasticsearch.rest.uris}")
    private String elasticSearchUrl;


    @GetMapping(value = "/getAutoConf")
    @ApiOperation(value = "自动写conf",produces = "application/octet-stream")
    @ApiOperationSupport(author = "LvJM")
    @ApiImplicitParams({
            @ApiImplicitParam(name = "sourceId", value = "数据库配置id", dataType = "String"),
            @ApiImplicitParam(name = "tableName", value = "表名称(用','分割),空为所有表", dataType = "String"),
            @ApiImplicitParam(name = "modulePath", value = "模块路径,例:个人中心>我的资料", dataType = "String"),
            @ApiImplicitParam(name = "jdbcDriverPath", value = "数据库驱动在logstash的存放路径,例:D:\\logstash-7.13.3\\bin\\databaseDriverJar\\postgresql-42.2.23.jar", dataType = "String"),
            @ApiImplicitParam(name = "lastValuePath", value = "增量同步文件路径,例:D:\\logstash-7.13.3\\sql_last_value", dataType = "String"),
    })
    public void getAutoConf(String sourceId, String tableName, String modulePath,String jdbcDriverPath,String lastValuePath,HttpServletResponse response) throws IOException {
        DatasourceConfig datasourceConfig = datasourceConfigMapper.selectById(sourceId);
        /**
         * 进行配置
         */
        String jdbc_connection_string = datasourceConfig.getDatabaseUrl();
        StringBuilder conf = new StringBuilder();
        String hosts = elasticSearchUrl;
//        String module_path = "个人中心>我的资料";
        String indexName = "dbms_data";
        String jdbc_user = datasourceConfig.getUserName();
        String jdbc_password = datasourceConfig.getPassword();

//        String jdbc_driver_library = "D:\\ShuHuiShiKong\\ElasticSearch\\ES\\logstash-7.13.3\\bin\\databaseDriverJar\\postgresql-42.2.23.jar";
//        String last_run_metadata_path = "D:\\ShuHuiShiKong\\ElasticSearch\\ES\\logstash-7.13.3\\sql_last_value";
        String jdbc_driver_class = datasourceConfig.getDriverName();
        /**
         * 配置完成
         */
        File file = getConf(sourceId, tableName, jdbc_connection_string, conf, indexName, hosts, modulePath, jdbc_user, jdbc_password, jdbcDriverPath, lastValuePath, jdbc_driver_class);
        download(file,response);
    }




    private File getConf(String sourceId, String tableName, String jdbc_connection_string, StringBuilder conf, String indexName, String hosts, String moudle_path, String jdbc_user, String jdbc_password, String jdbc_driver_library, String last_run_metadata_path, String jdbc_driver_class) throws FileNotFoundException {
        conf.append("input {\n" +
                "        stdin {}\n");
        if (StringUtils.isNotEmpty(tableName)){
            String[] tableNameArray = tableName.split(",");
            for (String t : tableNameArray) {
                Map<String, String> statement = getStatement(sourceId, t,jdbc_connection_string
                        ,jdbc_user,jdbc_password,jdbc_driver_library,last_run_metadata_path,jdbc_driver_class,moudle_path);
                if (null != statement){
                    conf.append(statement.get("statement"));
                    String key = statement.get("key");
                    conf.append("}\n");
                    conf.append(addFilter());
                    conf.append("output {\n" +
                            "        if [type] == \""+t+"\" {\n" +
                            "            elasticsearch {\n" +
                            "                     # 配置ES集群地址\n" +
                            "                    hosts => [\""+hosts+"\"]\n" +
                            "                     # 索引名字,必须小写\n" +
                            "                    index => \""+indexName+"\"\n" +
                            "                    #数据唯一索引(建议使用数据库KeyID)\n" +
                            "                    document_id => \"%{"+key+"}\"\n" +
                            "            }\n" +
                            "        }\n");
                    conf.append(addEnd());
                }
            }
        }else {
            List<TableDto> allTables = tableConfigService.getAllTables(sourceId);
            boolean once = true;
            for (TableDto allTable : allTables) {
                Map<String, String> statement = getStatement(sourceId, allTable.getTableName(),jdbc_connection_string,
                        jdbc_user,jdbc_password,jdbc_driver_library,last_run_metadata_path,jdbc_driver_class,moudle_path);
                if (null != statement){
                    conf.append(statement.get("statement"));
                }
            }
            conf.append("}\n");
            conf.append(addFilter());
            conf.append("output {\n");
            for (TableDto allTable : allTables) {
                Map<String, String> statement = getStatement(sourceId, allTable.getTableName(),jdbc_connection_string,
                        jdbc_user,jdbc_password,jdbc_driver_library,last_run_metadata_path,jdbc_driver_class,moudle_path);
                if (null != statement){
                    if (once){
                        String elasticsearch = getElasticsearch(allTable.getTableName(), indexName, statement.get("key"), hosts, "0");
                        conf.append(elasticsearch);
                        once = false;
                    }else {
                        String elasticsearch = getElasticsearch(allTable.getTableName(), indexName, statement.get("key"), hosts, "1");
                        conf.append(elasticsearch);
                    }
                }
            }
            conf.append(addEnd());


        }

        String data=conf.toString();
        System.out.println(data);
//        String path = ResourceUtils.getURL("classpath:").getPath() + "static/";
        String path = System.getProperty("user.dir");
        File file = new File(path+ "/logstash/database.conf");
        File parentFile = file.getParentFile();
        if (!parentFile.exists()) {
            parentFile.mkdir();
        }
        System.out.println(path);
        try (FileWriter fr = new FileWriter(file)) {
        // 以字符流的形式把数据写入到文件中
        char[] cs = data.toCharArray();
        fr.write(cs);

    } catch (IOException e) {
        e.printStackTrace();
    }
        return file;
    }

    private void download(File file, HttpServletResponse response) throws IOException {
        //获取文件地址

        //把服务器中文件读取到内存中
        FileInputStream fis = new FileInputStream(file.getAbsolutePath());
        //设置下载的类型
        response.setHeader("content-disposition","attachment;fileName="+ URLEncoder.encode(file.getName(),"UTF-8"));
        //获取输出流
        ServletOutputStream os = response.getOutputStream();
        //复制
        IOUtils.copy(fis,os);
        //关闭资源
        fis.close();
        os.close();
        file.delete();
    }


    private String addEnd(){
        String s = "stdout {\n" +
                "             codec => json_lines\n" +
                "        }\n" +
                "}\n";
        return s;
    }

    private String addFilter(){
        String s = "filter {\n" +
                "    json {\n" +
                "        source => \"message\"\n" +
                "        remove_field => [\"message\"]\n" +
                "    }\n" +
                "    mutate  {\n" +
                "        remove_field => [\"@timestamp\",\"@version\"]\n" +
                "    }\n" +
                "}\n";
        return s;
    }
    private String getElasticsearch(String tableName,String indexName,String key,String hosts,String isElseIf){
        StrBuilder elasticsearchConf = new StrBuilder();
        if ("1".equals(isElseIf)){
            elasticsearchConf.append("        else if [type] == \""+tableName+"\" {\n");
        }else {
            elasticsearchConf.append("        if [type] == \""+tableName+"\" {\n");
        }

        elasticsearchConf.append(
                "            elasticsearch {\n" +
                "                     # 配置ES集群地址\n" +
                "                    hosts => [\""+hosts+"\"]\n" +
                "                     # 索引名字,必须小写\n" +
                "                    index => \""+indexName+"\"\n" +
                "                    #数据唯一索引(建议使用数据库KeyID)\n" +
                "                    document_id => \"%{"+key+"}\"\n" +
                "            }\n" +
                "        }\n");
        return elasticsearchConf.toString();
    }
    private Map<String,String> getStatement(String sourceId, String tableName,
                                            String jdbc_connection_string,String jdbc_user,String jdbc_password,
                                            String jdbc_driver_library,String last_run_metadata_path,
                                            String jdbc_driver_class,String moudle_path){
        int keyNum = 0;
        String key = "";
        List<TableFieldDto> tableFieldDtoList = tableConfigService.getTableFields(sourceId, tableName);
        for (TableFieldDto tableFieldDto : tableFieldDtoList) {
            if (tableFieldDto.getIsPrimary() == 1
                    || tableFieldDto.getColumnName().toLowerCase().equals("id")
                    || tableFieldDto.getColumnName().toLowerCase().equals("objectid")){
                keyNum ++;
                key = tableFieldDto.getColumnName();
                break;
            }
        }
        if (keyNum != 1){
            return null;
        }
        StringBuilder statement = new StringBuilder();
//

        String upStr = "        jdbc {\n" +
                "            type => \""+tableName+"\"\n" +
                "            jdbc_connection_string => \""+jdbc_connection_string+"\"\n" +
                "            # 数据库连接账号密码;\n" +
                "            jdbc_user => \""+jdbc_user+"\"\n" +
                "            jdbc_password => \""+jdbc_password+"\"\n" +
                "            # MySQL依赖包路径;\n" +
                "            jdbc_driver_library => \""+jdbc_driver_library+"\"\n" +
                "            jdbc_driver_class => \""+jdbc_driver_class+"\"\n" +
                "            #是否开启分页\n" +
                "            jdbc_paging_enabled => \"true\"\n" +
                "            #分页条数\n" +
                "            jdbc_page_size => \"50000\"\n";
        statement.append(upStr);
        statement.append("            statement => \"select "+"\n");
        statement.append("                        '"+moudle_path+"' as moudle_path ," +"\n");
        statement.append("                        '"+tableName+"' as resource_table ," +"\n");
        for (int i = 0; i < tableFieldDtoList.size(); i++) {
            String fieldType = tableFieldDtoList.get(i).getUdtName().toLowerCase();
            if ("varchar".equals(fieldType) && i != tableFieldDtoList.size() -1){
                statement.append("                        "+tableFieldDtoList.get(i).getColumnName()+","+"\n");
            }else if ("varchar".equals(fieldType)){
                statement.append("                        "+tableFieldDtoList.get(i).getColumnName()+" "+"\n");
            }
            else if (i != tableFieldDtoList.size() -1){
                statement.append("                        "+tableFieldDtoList.get(i).getColumnName()+"::text,"+"\n");
            }else {
                statement.append("                        "+tableFieldDtoList.get(i).getColumnName()+"::text"+"\n");
            }
        }
        statement.append("                    "+"from "+tableName+"\""+"\n");

        String downStr =
                        "            #每一分钟做一次同步\n" +
                        "            schedule => \"* * * * *\"\n" +
                        "            #是否将字段名转换为小写,默认true(如果有数据序列化、反序列化需求,建议改为false)\n" +
                        "            lowercase_column_names => true\n" +
                        "            # 是否记录上次执行结果,true表示会将上次执行结果的tracking_column字段的值保存到last_run_metadata_path指定的文件中;\n" +
                        "            record_last_run => true\n" +
                        "            # 需要记录查询结果某字段的值时,此字段为true,否则默认tracking_column为timestamp的值;\n" +
                        "            use_column_value => true\n" +
                        "            # 需要记录的字段,用于增量同步,需是数据库字段\n" +
                        "            tracking_column => \""+key+"\"\n" +
                        "            # record_last_run上次数据存放位置;\n" +
                        "            last_run_metadata_path => \""+last_run_metadata_path+"\"\n" +
                        "            #是否将字段名转换为小写,默认true(如果有数据序列化、反序列化需求,建议改为false)\n" +
                        "            clean_run => false \n" +
                        "        } \n";
        statement.append(downStr);
        Map map = new HashMap();
        map.put("key",key);
        map.put("statement",statement.toString());
        return map;
    }
}

tableConfigService.getAllTables(sourceId)

/**
     * 获取数据库下的所有表
     * @return
     */
    public List<TableDto> getAllTables(String sourceId){
        List<TableDto> list = new ArrayList<>();
        //查询配置信息
        QueryWrapper<DatasourceConfig> queryWrapper = new QueryWrapper<>();
        queryWrapper.eq("id",sourceId);
        DatasourceConfig config = datasourceConfigMapper.selectOne(queryWrapper);
        SQLHelper helper = new SQLHelper(config.getUserName(),config.getPassword(),config.getDriverName(),config.getDatabaseUrl());
        try {
            ResultSet resultSet = getAllTablesSet(config.getType(), helper);
            //读取
            while (resultSet.next()) {
                TableDto dto = new TableDto();
                dto.setTableName(resultSet.getString("table_name"));
                dto.setObjDescription(resultSet.getString("obj_description"));
                dto.setTableSize(resultSet.getString("table_size"));
                dto.setTableCatalog(resultSet.getString("table_catalog"));
                dto.setTableSchema(resultSet.getString("table_schema"));
                list.add(dto);
            }
            //关闭
            SQLHelper.close(resultSet,helper.getPs(),helper.getCt());
        }catch (Exception e){
            e.printStackTrace();
            log.error("执行出现错误{}",e.getMessage());
        }
        return list;
    }

private ResultSet getAllTablesSet(String type,SQLHelper helper){
        ResultSet resultSet = null;
        switch (type.toUpperCase()){
            case "ORACLE":
                resultSet = helper.executeQuery("select \n" +
                        "    a.table_name,\n" +
                        "    a.tablespace_name as table_catalog,\n" +
                        "    a.tablespace_name as table_schema,\n" +
                        "    a.sample_size as table_size,\n" +
                        "    b.COMMENTS as obj_description\n" +
                        "from user_tables a,user_tab_comments b\n" +
                        "where a.TABLE_NAME=b.TABLE_NAME\n" +
                        "ORDER BY TABLE_NAME");
                break;
            case "POSTGRESQL":
                resultSet = helper.executeQuery("\n" +
                        "SELECT\n" +
                        "\t\"table_name\",\n" +
                        "\ttable_catalog,\n" +
                        "\ttable_schema,\n" +
//                        "\tpg_size_pretty ( pg_relation_size ( \"table_name\" ) ) as table_size,\n" +
                        "pg_size_pretty (pg_total_relation_size(('\"' || table_schema || '\".\"' || table_name || '\"'))) as table_size,"+
                        "\tobj_description ( oid, 'pg_class' ) \n" +
                        "FROM\n" +
                        "\tinformation_schema.tables t1,\n" +
                        "\tpg_class t2 \n" +
                        "WHERE\n" +
                        "\ttable_schema = 'public' \n" +
                        "\tAND t1.\"table_name\" = t2.relname;");
                break;
//                break;
            case "KINGBASE8":
                resultSet = helper.executeQuery("SELECT \n" +
                        "\ttable_name,\n" +
                        "\t'' as table_catalog,\n" +
                        "\t'' as table_schema,\n" +
                        "\t'' as table_size,\n" +
                        "\t'' as obj_description\n" +
                        "from information_schema.TABLES WHERE  \n" +
                        "table_schema='public';");
                break;
            default:
                break;
        }
        return resultSet;
    }

tableConfigService.getTableFields(sourceId, tableName);

/**
     * 获取表中的字段设计
     * @return
     */
    public List<TableFieldDto> getTableFields(String sourceId, String tableName){
        List<TableFieldDto> list = new ArrayList<>();
        //查询配置信息
        QueryWrapper<DatasourceConfig> queryWrapper = new QueryWrapper<>();
        queryWrapper.eq("id",sourceId);
        DatasourceConfig config = datasourceConfigMapper.selectOne(queryWrapper);
        SQLHelper helper = new SQLHelper(config.getUserName(),config.getPassword(),config.getDriverName(),config.getDatabaseUrl());
        try {
            ResultSet resultSet = getTableFieldsSet(config.getType(), tableName,helper);
            //读取
            while (resultSet.next()) {
                TableFieldDto dto = new TableFieldDto();
                dto.setColumnName(resultSet.getString("column_name"));
                dto.setColDescription(resultSet.getString("col_description"));
                dto.setUdtName(resultSet.getString("udt_name"));
                dto.setField_size(resultSet.getString("field_size"));
                dto.setIsPrimary(resultSet.getInt("is_primary"));
                list.add(dto);
            }
            //关闭
            SQLHelper.close(resultSet,helper.getPs(),helper.getCt());
        }catch (Exception e){
            e.printStackTrace();
            log.error("执行出现错误{}",e.getMessage());
        }
        return list;
    }

private ResultSet getTableFieldsSet(String type,String tableName,SQLHelper helper){
        ResultSet resultSet = null;
        switch (type.toUpperCase()){
            case "ORACLE":
                resultSet = helper.executeQuery("SELECT \n" +
                        "\n" +
                        "    lower(b.column_name) column_name \n" +
                        "    ,a.comments col_description \n" +
                        "    ,b.data_type udt_name \n" +
                        "    ,b.data_length field_size \n" +
                        "    ,b.data_precision is_primary \n" +
                        "    \n" +
                        "FROM all_col_comments a \n" +
                        "    ,all_tab_columns b \n" +
                        "WHERE a.table_name = b.table_name \n" +
                        "and a.OWNER = b.OWNER \n" +
                        "and a.Column_name = b.Column_name \n" +
                        " and a.table_name = '"+tableName+"'");
                break;
            case "POSTGRESQL":
                resultSet = helper.executeQuery("\n" +
                        "SELECT\n" +
                        "base.\"column_name\",\n" +
                        "col_description ( t1.oid, t2.attnum ),\n" +
                        "base.udt_name,\n" +
                        "COALESCE(character_maximum_length, numeric_precision, datetime_precision) as field_size,\n" +
                        "(CASE\n" +
                        "\tWHEN ( SELECT t2.attnum = ANY ( conkey ) FROM pg_constraint WHERE conrelid = t1.oid AND contype = 'p' ) = 't' \n" +
                        "\tTHEN 1 ELSE 0 \n" +
                        "END ) as  is_primary\n" +
                        "FROM\n" +
                        "information_schema.COLUMNS base,\n" +
                        "pg_class t1,\n" +
                        "pg_attribute t2 \n" +
                        "WHERE\n" +
                        "base.\"table_name\" = '"+ tableName +"' \n" +
                        "AND t1.relname = base.\"table_name\" \n" +
                        "AND t2.attname = base.\"column_name\" \n" +
                        "AND t1.oid = t2.attrelid \n" +
                        "AND t2.attnum > 0;\n");
                break;
            case "KINGBASE8":
                resultSet = helper.executeQuery("SELECT \n" +
                        "\n" +
                        "    lower(b.column_name) column_name \n" +
                        "    ,a.comments col_description \n" +
                        "    ,b.data_type udt_name \n" +
                        "    ,b.data_length field_size \n" +
                        "    ,b.data_precision is_primary \n" +
                        "    \n" +
                        "FROM all_col_comments a \n" +
                        "    ,all_tab_columns b \n" +
                        "WHERE a.table_name = b.table_name \n" +
                        "and a.OWNER = b.OWNER \n" +
                        "and a.Column_name = b.Column_name \n" +
                        " and UPPER(a.table_name) = '"+ tableName.toUpperCase() +"'");
                break;
            default:
                break;
        }
        return resultSet;
    }

SQLHelper

package diit.microservice.midrange.utils;
 
import lombok.extern.slf4j.Slf4j;

import java.sql.*;

@Slf4j
public class SQLHelper {
   //定义三个变量
   private Connection ct=null;
   private PreparedStatement ps=null;
   private ResultSet rs=null;

   //连接数据库的用户名,密码,url,驱动
   //说明:在实际开发中,我们往往把这些变量写到一个外部文件中
   //当程序启动时,我们读入这些配置信息。java.util.Properites
   private String username;
   private String password;
   private String driver;
   private String url;

   //使用静态块加载驱动(驱动只需要加载一次)
   public SQLHelper(String username, String password, String driver, String url){
           //获取dbinfo.properties文件内信息
           this.username=username;
           this.password=password;
           this.driver=driver;
           this.url=url;
           try{
               //获得驱动
               Class.forName(driver);
           }catch (Exception e){
               e.printStackTrace();
               log.error("加载驱动和链接信息出错:{}",e.getMessage());
           }

   }

   //统一的curd操作
   public void executeUpdate(String sql,String[] parameters){
       try {
           ct=DriverManager.getConnection(url,username,password);
           ps=ct.prepareStatement(sql);
           if(parameters!=null){
               for(int i=0;i<parameters.length;i++){
                   ps.setString(i+1, parameters[i]);
               }
           }
           //执行
           ps.executeUpdate();

       } catch (Exception e) {
           e.printStackTrace();
           throw new RuntimeException(e.getMessage());
       }finally{
           close(rs,ps,ct);
       }
   }

   //写一个方法,完成查询任务
   //sql表示要执行的sql语句
   //sql select * from emp where ename=?
   public ResultSet executeQuery(String sql,String ... parameters){
       try {
           //根据实际情况我们对sql语句?赋值
           //得到连接
           ct=DriverManager.getConnection(url,username,password);
           //创建ps对象,得到sql语句对象
           ps=ct.prepareStatement(sql);
           //如果parameters不为null,才赋值
           if(parameters!=null){
               for(int i=0;i<parameters.length;i++){
                   ps.setString(i+1, parameters[i]);
               }
           }
           rs=ps.executeQuery();
       } catch (SQLException e) {
           e.printStackTrace();
           log.error("执行查询出错:{}",e.getMessage());
           //抛出运行异常
           throw new RuntimeException(e.getMessage());
       } finally{
//			close(rs,ps,ct);
       }
       return rs;
   }

   //把关闭资源写成函数
   public static void close(ResultSet rs,Statement ps,Connection ct){
       //关闭资源
       if(rs!=null){
           try {
               rs.close();
           } catch (SQLException e) {
               e.printStackTrace();
           }
           rs=null;
       }
       if(ps!=null){
           try {
               ps.close();
           } catch (SQLException e) {
               e.printStackTrace();
           }
           ps=null;
       }
       if(ct!=null){
           try {
               ct.close();
           } catch (SQLException e) {
               e.printStackTrace();
           }
           ct=null;
       }
   }

   public Connection getCt() {
       return ct;
   }

   public PreparedStatement getPs() {
       return ps;
   }
}

标签:jdbc,name,配置文件,导入,statement,table,import,logstash,String
From: https://www.cnblogs.com/ideaAI/p/16668655.html

相关文章