flink-cdc
flink-cdc 概述
flink-cdc
文档地址:https://ververica.github.io/flink-cdc-connectors/master/content/about.html#
依赖
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.17.Final</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.4.13</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase</artifactId>
<version>1.4.13</version>
<type>pom</type>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>0.20.2</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>1.4.13</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-common</artifactId>
<version>1.4.13</version>
</dependency>
<dependency>
<groupId>org.antlr</groupId>
<artifactId>antlr4-runtime</artifactId>
<version>4.7</version>
</dependency>
<dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
<version>1.4</version>
</dependency>
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
<version>2.6</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.1.2</version>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep-scala_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>1.13.6</version>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-parquet_2.11</artifactId>
<version>1.9.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>3.2.0</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.0</version>
<exclusions>
<exclusion>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>1.13.6</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.16</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
<version>4.4.9</version>
</dependency>
API方式
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import com.ververica.cdc.debezium.StringDebeziumDeserializationSchema
import com.ververica.cdc.connectors.mysql.source.MySqlSource
import com.ververica.cdc.connectors.mysql.table.StartupOptions
import org.apache.flink.api.common.eventtime.WatermarkStrategy
val env = StreamExecutionEnvironment.getExecutionEnvironment
val mySqlSource:MySqlSource[String] = MySqlSource.builder[String]()
.hostname("114.116.44.117")
.port(3306)
.databaseList("test") //监控库存数据库下的所有表 可以监控多库多表
.tableList("test.student")
.username("root")
.password("123456")
.deserializer(new StringDebeziumDeserializationSchema()) // converts SourceRecord
.startupOptions(StartupOptions.latest()) //从flink cdc开始执行起,获取新增数据
.build()
env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks[String], "aaa").print()
env.execute("flink cdc")
SQL方式
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation}
import org.apache.flink.table.api.bridge.scala.{StreamTableEnvironment, tableConversions}
import org.apache.flink.table.api.{EnvironmentSettings, Types}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.types.Row
val sourceTestEnv = StreamExecutionEnvironment.getExecutionEnvironment//set参数 流处理
val streamsettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val streamtableenv = StreamTableEnvironment.create(sourceTestEnv,streamsettings)
//不需要自定义反序列化器
streamtableenv.executeSql(""" CREATE TABLE user_info (
id INT NOT NULL,
name STRING,
score DECIMAL(10,3),
PRIMARY KEY(id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '114.116.44.117',
'scan.startup.mode'='latest-offset',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'database-name' = 'test',
'table-name' = 'student'
)
""".stripMargin)
val table = streamtableenv.sqlQuery("select * from user_info")
streamtableenv.toRetractStream[Row](table).print()
sourceTestEnv.execute("flink cdc")
辅助命令
cd /opt/mysql/mysql/bin
./mysql -u root -p'123456'
use test;
delete from student;
insert into student values(1,'zhangsan',1);
insert into student values(2,'lisi',2);
insert into student values(3,'wangwu',3);
insert into student values(4,'sunliu',4);
insert into student values(5,'liuqi',5);
2.X变化
CDC 1.X 痛点
1.Debezium 保证数据一致性,是通过加锁,DBA不会给权限
2.只支持单并发
3.全量读取不支持checkpoint
CDC 2.X
将数据按照主键进行切分:Chunk (实现了并行)
Chunk读取、Chunk汇报、Chunk分配、增量读取
标签:Flink,cdc,flink,CDC,import,apache,org,2.11
From: https://www.cnblogs.com/wuxiaolong4/p/16807494.html