首页 > 数据库 >Flink与mysql结合

Flink与mysql结合

时间:2023-04-01 16:34:32浏览次数:46  
标签:load ## spring Flink 结合 hikari datasource mysql dataSource

  在流式计算中,不是有时候需要和mysql进行结合做一些处理。

 

1.调用其他方法进行

  

 

 

2.更快的处理

  使用guava本地缓存

  对msql的操作是new对象过来

    private final static RuleService ruleService = new RuleService();

    final static Cache<Long, Map<Long,CustomerVerifyConfig>> verifyConfigCache = CacheBuilder.newBuilder()
            //设置cache的初始大小为10,要合理设置该值
            .initialCapacity(1000)
            //设置并发数为5,即同一时间最多只能有5个线程往cache执行写入操作
            .concurrencyLevel(5)
            //设置cache中的数据在写入之后的存活时间为10秒
            .expireAfterWrite(5, TimeUnit.MINUTES)
            //设置缓存最大容量为9999,超过100之后就会按照LRU最近虽少使用算法来移除缓存项
            .maximumSize(9999)
            //构建cache实例
            .build();


    public static CustomerVerifyConfig getVerifyConfigByCustomerId(Long customerId){
        Map<Long,CustomerVerifyConfig> customerVerifyConfigMap = verifyConfigCache.getIfPresent(CUSTOMER_VERIFY_CONFIG_KEY);
        if (customerVerifyConfigMap != null){
            return customerVerifyConfigMap.get(customerId);
        } else {
            List<CustomerVerifyConfig> customerVerifyConfigList = ruleService.queryCustomerVerifyConfig();
            if (CollectionUtils.isEmpty(customerVerifyConfigList)){
                verifyConfigCache.put(CUSTOMER_VERIFY_CONFIG_KEY, new HashMap<>());
                return null;
            } else {
                Map<Long, CustomerVerifyConfig> configMap = customerVerifyConfigList.stream().collect(Collectors.toMap(CustomerVerifyConfig::getCustomerId, Function.identity(), (k1, k2) -> k2));
                verifyConfigCache.put(CUSTOMER_VERIFY_CONFIG_KEY, configMap);
                return configMap.get(customerId);
            }
        }
    }

 

3.引入依赖

        <!--数据库连接池-->
        <dependency>
            <groupId>com.zaxxer</groupId>
            <artifactId>HikariCP</artifactId>
            <version>4.0.3</version>
        </dependency>


        <!--MYSQL驱动-->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.47</version>
        </dependency>

 

4.配置

application.properties

## 数据库配置
spring.datasource.type=com.zaxxer.hikari.HikariDataSource
spring.datasource.driverClassName = com.mysql.jdbc.Driver
spring.datasource.url = jdbc:mysql://localhost:3306/ssm?useUnicode=true&characterEncoding=utf-8&useSSL=false
spring.datasource.username = root
spring.datasource.password = root
##  Hikari 连接池配置 ------ 详细配置请访问:https://github.com/brettwooldridge/HikariCP
## 最小空闲连接数量
spring.datasource.hikari.minimum-idle=5
## 空闲连接存活最大时间,默认600000(10分钟)
spring.datasource.hikari.idle-timeout=180000
## 连接池最大连接数,默认是10
spring.datasource.hikari.maximum-pool-size=10
## 此属性控制从池返回的连接的默认自动提交行为,默认值:true
spring.datasource.hikari.auto-commit=true
## 连接池名称
spring.datasource.hikari.pool-name=MyHikariCP
## 此属性控制池中连接的最长生命周期,值0表示无限生命周期,默认1800000即30分钟
spring.datasource.hikari.max-lifetime=1800000
## 数据库连接超时时间,默认30秒,即30000
spring.datasource.hikari.connection-timeout=30000
spring.datasource.hikari.connection-test-query=SELECT 1
spring.datasource.hikari.pool-name=hikariXXXXDbPool

 

Manager配置

@Slf4j
public class HikariPoolManager {

    private static final HikariDataSource dataSource;

    static {
        Config load = ConfigFactory.load();
        dataSource = new HikariDataSource();
        dataSource.setDriverClassName(load.getString("datasource.driver-class-name"));
        dataSource.setJdbcUrl(load.getString("datasource.url"));
        dataSource.setUsername(load.getString("datasource.username"));
        dataSource.setPassword(load.getString("datasource.password"));
        dataSource.setMinimumIdle(load.getInt("datasource.hikari.minimum-idle"));
        dataSource.setMaximumPoolSize(load.getInt("datasource.hikari.maximum-pool-size"));
        dataSource.setAutoCommit(load.getBoolean("datasource.hikari.auto-commit"));
        dataSource.setIdleTimeout(load.getInt("datasource.hikari.idle-timeout"));
        dataSource.setPoolName(load.getString("datasource.hikari.pool-name"));
        dataSource.setMaxLifetime(load.getInt("datasource.hikari.max-lifetime"));
        dataSource.setConnectionTimeout(load.getInt("datasource.hikari.connection-timeout"));
        dataSource.setConnectionTestQuery(load.getString("datasource.hikari.connection-test-query"));
    }

    public static Connection getConnection() {
        try {
            return dataSource.getConnection();
        } catch (SQLException e) {
            log.error("连接数据库失败", e);
        }
        return null;
    }

    public static void close(Connection con) {
        try {
            if (con != null) {
                con.close();
            }
        } catch (Exception e) {
            log.error("=======>close Mysql connection cause error,es={}", e.getMessage());
        }

    }

}

 

5.使用查询

    public List<CustomerVerifyConfig> queryCustomerVerifyConfig(){
        List<CustomerVerifyConfig> customerVerifyConfigList = new ArrayList<>();
        // 查询
        Connection connection = HikariPoolManager.getConnection();
        String sql = "select customer_id, offline_switch from bme_verify_db.t_customer_verify_config";
        log.info("getCustomerVerifyConfig sql == {}", sql);
        try {
            PreparedStatement preparedStatement = connection.prepareStatement(sql);
            ResultSet resultSet = preparedStatement.executeQuery();
            while (resultSet.next()) {
                CustomerVerifyConfig customerVerifyConfig = new CustomerVerifyConfig();
                customerVerifyConfig.setCustomerId(resultSet.getLong(1));
                customerVerifyConfig.setOfflineSwitch(resultSet.getInt(2));
            
                customerVerifyConfigList.add(customerVerifyConfig);
            }

        } catch (Exception e) {
            log.error("获取核查配置异常", e);
        } finally {
            HikariPoolManager.close(connection);
        }
        return customerVerifyConfigList;
    }

 

标签:load,##,spring,Flink,结合,hikari,datasource,mysql,dataSource
From: https://www.cnblogs.com/juncaoit/p/17278816.html

相关文章

  • mysql二进制文件安装方式
    安装进制包如果用户既不想安装最简单却不够灵活的RPM包,又不想安装复杂费时的源码包,那么,已经编泽好的二进制包将是很好的选择具体安装步骤如下。(1用root登录操作系统,增加mysql用户和组,数据库将安装在此用户下:she1l>groupaddmysalshell>useradd-gmysqlmysal(2)解压二进制安......
  • 三天吃透MySQL面试八股文
    什么是MySQLMySQL是一个关系型数据库,它采用表的形式来存储数据。你可以理解成是Excel表格,既然是表的形式存储数据,就有表结构(行和列)。行代表每一行数据,列代表该行中的每个值。列上的值是有数据类型的,比如:整数、字符串、日期等等。数据库的三大范式第一范式1NF确保数据库表字段......
  • 《Mysql基础》【Mysql触发器 新建触发器、修改触发器、删除触发器、举例】 编程入门
     --mysql数据库程序设计笔记:--=========第八章:触发器========================触发器:触发执行特定事件。(关联表对象,当特定事件出现时,触发激活)目的:保护表数据,(保证表数据完整性和一致性。)1、新建触发器:格式:createtrigger数据库名.触发器名称触发时刻inserton表名f......
  • 《Mysql基础》【Mysql表查询、去重、表连接、左连接 右连接、子表查询、排序、分组等
     --mysql数据库程序设计笔记:第三章:查询1、单表查询:1)、简单查询查所有列:格式:select*from表名;举例:mysql>select*fromtb_student;+----+-----------+-------------+------+------------+----------+--------+---------+-------------------+|id|studentNo|s......
  • mysql - 存储过程
    定义存储过程(storedprocedure)是一组为了完成特定功能的SQL语句集合,经编译后存储在服务器端的数据库中,利用存储过程可以加速SQL语句的执行。分类存储过程分为系统存储过程和自定义存储过程。1)系统存储过程在master数据库中,但是在其他的数据库中可以直接调用,并且在调用时不必在存......
  • 《Mysql基础》【Mysql表的基本操作 新建表、修改表、删除表、外键约束、主键约束、完
     --mysql数据库程序设计笔记:表基本操作:1、新建表:格式如:1)、建表加主键:createtable表名(idintNOTNULLauto_incrementcomment'自增主键id',列名类型(范围)comment'列备注',...primarykey(id))engine=InnoDB;2)、建表加候选键副键约束createtable表名......
  • 《Mysql基础》【Mysql删除数据库、新建数据库、修改数据库】 编程入门 学习分享 【公
     --mysql数据库程序设计笔记:数据定义:1、创建数据库:如:createdatabasedb_pro_1defaultcharsetgb2312collategb2312_chinese_ci;QueryOK,1rowaffected(0.00sec)或:createdatabasedb_pro_2defaultcharactersetgb2312defaultcollategb2312_chinese_ci;......
  • 《Mysql基础》【Mysql函数 mysql数据类型】 编程入门 学习分享 【公开免费】
    -- --mysql数据库程序设计笔记:gb2312是国标,中国字库。一个汉字2个字节。utf8国际通用标准。包含gb2312;外键只能引用主键和候选键。外键只可以在InnoDB中使用。字段约束:字段类型后可加:check(多个列判断条件)列为:column用col1、col2....代替一、mysql函数:聚合函数:1、......
  • 力扣607(MySQL)-销售员(简单)
    题目:表: SalesPerson 表: Company 表: Orders编写一个SQL查询,报告没有任何与名为“RED”的公司相关的订单的所有销售人员的姓名。以任意顺序返回结果表。查询结果格式如下所示。示例:  解释:根据表 orders 中的订单'3'和'4',容易看出只有'John'和'......
  • 《Mysql基础》【Mysql添加外键(新增外键)、mysql添加主键、mysql删除外键】 编程入门 学
    --mysql数据库程序设计笔记:--新建表:foreignkey加外键举例:createdatabasedb_test_1defaultcharactersetgb2312defaultcollategb2312_chinese_ci;usedb_test_1;createtablea(idintnotnullauto_incrementcomment'id自增',ainfovarchar(255),primarykey......