首页 > 数据库 >大数据(Flink)—数据写入数据库篇

大数据(Flink)—数据写入数据库篇

时间:2022-09-23 17:23:35浏览次数:75  
标签:ps Exception Flink flink 写入 env new 数据 public

Flink写入mysql的几种方式,废话不多说直接上代码:

  相关jar包
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc_2.11</artifactId>
            <version>1.14.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-jdbc_2.11</artifactId>
            <version>1.9.1</version>
        </dependency>            

第一种:flink  JDBC Connector

用法示例:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env
        .fromElements(...)
        .addSink(JdbcSink.sink(
                "insert into books (id, title, author, price, qty) values (?,?,?,?,?)",
                (ps, t) -> {
                    ps.setInt(1, t.id);
                    ps.setString(2, t.title);
                    ps.setString(3, t.author);
                    ps.setDouble(4, t.price);
                    ps.setInt(5, t.qty);
                },
                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                        .withUrl(getDbMetadata().getUrl())
                        .withDriverName(getDbMetadata().getDriverClass())
                        .build()));
env.execute();

第二种:Mysql JDBCOutputFormat

用法实例:

 FlinkKafkaConsumer011<String> consumer011 = new FlinkKafkaConsumer011<String>("flink_order", new SimpleStringSchema(), props);
        DataStream<String> stream = env.addSource(consumer011);
        stream.map(new MapFunction<String, Row>() {

            @Override
            public Row map(String s) throws Exception {
                System.out.println(s);
                Row row = new Row(3);
                int va=1;
                row.setField(0, va);
                row.setField(1, va);
                row.setField(2, va);
                return row;
            }
        }).writeUsingOutputFormat(JDBCOutputFormat.buildJDBCOutputFormat()
                .setDrivername("com.mysql.jdbc.Driver")
                .setDBUrl("jdbc:mysql://xx")
                .setUsername("xx")
                .setPassword("xx")
                .setQuery("insert into order_cnt(cnt,user,num) values(?,?,?)")
                .setSqlTypes(new int[]{Types.INTEGER, Types.INTEGER, Types.INTEGER})
                .finish());

        try {
            env.execute("flink-test");
        } catch (Exception e) {
            e.printStackTrace();
        }

第三种:自定义数据源(mybatis整合)

 Flink JOB代码:
     DataStreamSource<HttpFailEntity> dataStreamSource = env.addSource(new Source());
        dataStreamSource.addSink(new MybatisSink("com.example.springbootflink.dao.mapper.save"));
        JobExecutionResult result = env.execute("My Flink ");
        System.out.println("The job Mybatis took " + result.getNetRuntime() + " to execute");
自定义数据类:
public class MybatisSink extends RichSinkFunction<HttpFailEntity> {

    private SqlSessionFactory sqlSessionFactory;

    private SqlSession sqlSession;

    /**
     * 执行sql的id
     */
    private String sqlId;

    public MybatisSink(String sqlId) {
        this.sqlId = sqlId;

    }

    @Override
    public void open(Configuration parameters) throws Exception {
        sqlSessionFactory = MybatisSqlSessionFactory.sqlSessionFactory;
        sqlSession = sqlSessionFactory.openSession(true);
    }

    @Override
    public void close() throws Exception {
        sqlSession.close();
    }
    /**
     * 执行任务
     * @param value
     * @param context
     */
    @Override
    public void invoke(HttpFailEntity value, Context context) {
        try {
            sqlSession.insert(sqlId, value);
            sqlSession.commit();
        } catch (Exception ex) {
            ex.printStackTrace();
        }
    }

}
mybtis的配置文件:
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE configuration PUBLIC "-//mybatis.org//DTD SQL Map Config 3.0//EN"
        "http://mybatis.org/dtd/mybatis-3-config.dtd">
<configuration>
    <properties resource="数据库配置文件"></properties>
    <settings>
        <setting name="useGeneratedKeys" value="true"/>
        <setting name="defaultExecutorType" value="REUSE"/>
        <!-- <setting name="logImpl" value="STDOUT_LOGGING"/>  打印查询语句 -->
    </settings>

    <environments default="default">
        <environment id="default">
            <transactionManager type="JDBC"/>
            <dataSource type="com.example.springbootflink.flink.DruidDataSourceFactory">
                <property name="driverClassName" value="${db.driver}"/>
                <property name="url" value="${mysql.master.url}"/>
                <property name="username" value="${mysql.master.username}"/>
                <property name="password" value="${mysql.master.password}"/>
            </dataSource>
        </environment>
    </environments>
    <mappers>
<!--        <mapper class="com.ctid.maap.mapper.MessageDownMapper"></mapper>-->
        <mapper resource="mapper/HttpFailEntity.xml"/>
    </mappers>
</configuration>


mybatis相关类:
public class DruidDataSourceFactory extends PooledDataSourceFactory {
    public DruidDataSourceFactory() {
        this.dataSource = new DruidDataSource();
    }

}



public class MybatisSqlSessionFactory {


    public static SqlSessionFactory sqlSessionFactory;

    static {
        try(InputStream in = Resources.getResourceAsStream("mybatis_conf.xml")) {
            sqlSessionFactory = new SqlSessionFactoryBuilder().build(in);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

标签:ps,Exception,Flink,flink,写入,env,new,数据,public
From: https://www.cnblogs.com/javabianhua/p/16723461.html

相关文章

  • 对象数组去重(保留最后的数据)
    constarrayUnique=(arr:any,name:any)=>{varhash:any={}returnarr.reduce((acc:any,cru:any,index:any)=>{if(!hash[cru[name]]){......
  • Centos7设置postgresql数据库开机自启动
    前言PostgreSQL的开机自启动脚本位于PostgreSQL源码目录的contrib/start-scripts路径下如果不知道具体的路径,可以用find命令进行查找。命令如下:[root@admin~]#find/-......
  • JavaWeb--MySQL约束、数据库设计、多表查询、事务--2022年9月22日
    第一节  约束1、概念A、约束是什么约束是作用于表中列上的规则,用于限制加入表的数据约束的存在保证了数据库中数据的正确性、......
  • 尚硅谷大数据项目之电商数仓(3数仓数据同步策略)
    尚硅谷大数据项目之电商数仓(3数仓数据同步策略)(作者:尚硅谷研究院) 版本:V5.0 第1章实时数仓同步数据实时数仓由Flink源源不断从Kafka当中读数据计算,所以不需要手动同......
  • Murano数据设计
    1.运行数据表environment:环境定义,当前有效的运行上下文,对应Heat的Stacksession:会话定义,用户编辑环境时的临时存储,支持服务的增量变更,是部署的前提准备;用户完成sessi......
  • layui 数据表格使用python django提供的数据接口
    数据库新建表 fromdjango.dbimportmodels#Createyourmodelshere.classHost(models.Model):hostname=models.CharField(max_length=32,verbose_name......
  • list对象中的数据如何去重呢?
    下文笔者讲述list对象的去重方法分享,list的实现类是我们存储数据的容器,当里面存储的对象存在重复值时,我们该如何对其进行去重操作呢?下文笔者将一一道来,首先我们需了解对......
  • ABAP链接FTP把txt文件数据获取到内表
    啥都不说,直接上代码*******如果无法链接FTP,可能需要往表SAPFTP_SERVERS加入IP地址和端口(21)即可DATA:p_hostTYPEchar64VALUE'IP',"IPp_unameTYPEc......
  • 我眼中的大数据(三)——MapReduce
    ​这次来聊聊Hadoop中使用广泛的分布式计算方案——MapReduce。MapReduce是一种编程模型,还是一个分布式计算框架。MapReduce作为一种编程模型功能强大,使用简单。运算内容......
  • 简化数据结构的初始化过程
    避免在每新建类时,都要重复实现构造器,因此可以定义一个公共基类,在基类中实现实例属性的初始化规则,此后在派生类中,只需要指定属性字段即可1classInit:2_fields=......