首页 > 数据库 >flink cdc 读取mysql数据

flink cdc 读取mysql数据

时间:2023-03-16 10:44:32浏览次数:56  
标签:cdc mysql flink import apache org com

flinkcdc版本:1.14.0

mysql版本:5.7

 

1、开启MySQL中binlog日志

修改我们的配置文件 my.cnf,增加:

server_id=1
log_bin=mysql-bin
binlog_format=ROW
expire_logs_days=30

重启mysql 

 

查看MySQL是否开启日志成功

show variables like '%log_bin%'

2、引入pom依赖

<repositories>
        <repository>
            <id>aliyun</id>
            <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
        </repository>
        <repository>
            <id>cloudera</id>
            <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
        </repository>
        <repository>
            <id>jboss</id>
            <url>http://repository.jboss.com/nexus/content/groups/public</url>
        </repository>
    </repositories>

    <dependencies>
        <!--flink connector连接器基础包-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-base</artifactId>
            <version>1.14.0</version>
        </dependency>
        <!--flink cdc mysql源-->
        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-sql-connector-mysql-cdc</artifactId>
            <version>2.3.0</version>
        </dependency>
        <!--flink 的DataStream数据流api-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.12</artifactId>
            <version>1.14.0</version>
        </dependency>
        <!--flink java客户端-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.12</artifactId>
            <version>1.14.0</version>
        </dependency>
        <!--开启webUI支持,端口8081,默认没有开启-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime-web_2.12</artifactId>
            <version>1.14.0</version>
        </dependency>
        <!--flink的table api&sql程序可以连接到其他外部系统,用于读写批处理表和流式表-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-runtime_2.12</artifactId>
            <version>1.14.0</version>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
        </plugins>
    </build>
View Code

3、java代码

CustomSink:

package com.flinkcdc.mysql;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

/**
 * @author
 * @date 2023/3/15 16:37
 */
public class CustomSink extends RichSinkFunction<String> {

    @Override
    public void invoke(String json,Context context) throws Exception {
        //OP字段:该字段也有4种取值,分别是C(create),U(update), D(delete)
        //对于u操作,其数据部分包含了before和after
        System.out.println(">>>"+json);
    }

    /**
     * 打开连接
     * @param parameters
     * @throws Exception
     */
    @Override
    public void open(Configuration parameters) throws Exception {

    }

    /**
     * 关闭连接
     * @throws Exception
     */
    @Override
    public void close() throws Exception {

    }
}
View Code

MySqlSourceExample:

package com.flinkcdc.mysql;

import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @author 
 * @date 2023/3/15 16:40
 */
public class MySqlSourceExample {

    public static void main(String[] args) throws Exception {
        MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
                .hostname("192.168.100.105")
                .port(3306)
                .databaseList("test") // set captured database
                .tableList("test.Course") // set captured table
                .username("root")
                .password("123456")
                // 自定义反序列化方式
                .deserializer(new JsonDebeziumDeserializationSchema())
                //           StartupOptions.initial() 先全量后增量
                .startupOptions(StartupOptions.initial())
                //   StartupOptions.latest() 从最新binlog读取,增量方式
//                .startupOptions(StartupOptions.latest())
                .build();

        Configuration config = new Configuration();
        config.setInteger(RestOptions.PORT, 8081);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);

        env.enableCheckpointing(5000);
        env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
                .addSink(new CustomSink());

        env.execute("flinkcdc");

    }

}
View Code

 

标签:cdc,mysql,flink,import,apache,org,com
From: https://www.cnblogs.com/chong-zuo3322/p/17221410.html

相关文章

  • MySQL中IF语句的使用
    mysql中if语句的使用参考地址:https://www.php.cn/mysql-tutorials-417851.html1IF表达式IF(expr1,expr2,expr3)如果expr1是true,则返回expr2,否则返回expr3select*,if......
  • mysql中单引号双引号的区别
    单独使用时,单引号和双引号没有区别;需要嵌套使用时,双引号和单引号可以互相嵌套。使用的结果是把内部的内容当做整体一个字符串变量不需要成对出现SELECT *FROM `exc......
  • Node向Mysql数据发送请求响应请求
    //导入mysql依赖(想要先安装依赖,在终端输入:npminstallmysql)varmysql=require('mysql');//配置mysql的数据信息varmy=mysql.createConnection({//mysql连接......
  • mysql 数据不存在则插入,存在则更新
    mysql数据不存在则插入,存在则更新,可以用INSERTONDUPLICATEKEYUPDATE实现。INSERTONDUPLICATEKEYUPDATE不可以和WHERE一起使用,使用INSERTONDUPLICATEKEYUP......
  • MySQL数据库30条规范解读
    军规适用场景:并发量大、数据量大的互联网业务军规:介绍内容解读:讲解原因,解读比军规更重要 一、基础规范(1)必须使用InnoDB存储引擎解读:支持事务、行级锁、并发性能更好、CPU及......
  • mysql面试题
    1.binlog的3种格式,类型对比  statement,row,mixed   setsessionbinlog_format='statement';  showvariableslike'binlog_format'; showmasterstatus;......
  • MySQL部署后配置
    默认情况下,mysql是运行在127.0.0.1上,此时是无法远程被访问的root@JumpServer-DB-P01:/opt#netstat-tunlp|grep3306tcp00127.0.0.1:330600.0.......
  • Mysql_base
    基础:sql语句表结构设计调优:索引、慢查询优化配置参数调优核心原理:InnoDb存储引擎(包括隔离级别、事务、锁、缓存池、回滚日志等等)Mysqld(包括连接管理、进程......
  • MYSQL数据库操作语句
    数据库操作登录mysql-uroot-p退出quit/exit显示数据库版本showversion();查看当前使用的数据库selectdatabase();查看所有数据库showdatabases;创建......
  • MySQL学习(四)---->InnoDB数据页结构
    页是InnoDB管理存储空间的基本单位,一个页的大小一般是16KB。InnoDB为了不同的目的而设计了许多种不同类型的页,比如存放表空间头部信息的页,存放InsertBuffer信息的页,......