1.概述
Flink CDC 是一个用于实时数据和批处理数据的分布式数据集成工具。他可以监听数据库表的变化。实现将数据变化写到其他的数据源中。
我们可以使用java 实现自定义的数据写出。下面是实现细节。
2.实现代码
2.1 项目依赖
<dependencies>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-sql-connector-mysql-cdc</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.14.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>1.14.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>1.14.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_2.12</artifactId>
<version>1.14.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime_2.12</artifactId>
<version>1.14.0</version>
</dependency>
</dependencies>
2.2 实现Sink
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
public class CustomSink extends RichSinkFunction<String> {
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
}
@Override
public void close() throws Exception {
super.close();
}
@Override
public void invoke(String value, Context context) throws Exception {
System.err.println(value);
}
}
2.3 执行CDC相关代码
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class MainCdc {
public static void main(String[] args) throws Exception {
MySqlSource<String> source = MySqlSource.builder()
.hostname("localhost")
.port(3306)
.databaseList("demo2")
.tableList("demo2.customer")
.username("root")
.password("root")
.deserializer((DebeziumDeserializationSchema)new JsonDebeziumDeserializationSchema())
.includeSchemaChanges(true)
.build();
Configuration configuration =new Configuration();
configuration.setInteger(RestOptions.PORT,8081);
StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment(configuration);
env.enableCheckpointing(5000);
DataStreamSink<String> sink = env.fromSource(source, WatermarkStrategy.noWatermarks(),"Mysql Source")
.addSink(new CustomSink());
env.execute();
}
}
2.4 对表进行操作
继续增加删除更新
- 删除数据
{"before":{"id":"1","name":"张飞"},"after":null,"source":{"version":"1.6.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1736261166000,"snapshot":"false","db":"demo2","sequence":null,"table":"customer","server_id":1,"gtid":null,"file":"mysql-bin.001770","pos":2755,"row":0,"thread":null,"query":null},"op":"d","ts_ms":1736261166689,"transaction":null}
- 增加数据
{"before":null,"after":{"id":"9","name":"小王"},"source":{"version":"1.6.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1736261283000,"snapshot":"false","db":"demo2","sequence":null,"table":"customer","server_id":1,"gtid":null,"file":"mysql-bin.001770","pos":3047,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1736261283678,"transaction":null}
- 更新数据
{"before":{"id":"9","name":"小王"},"after":{"id":"9","name":"赵云"},"source":{"version":"1.6.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1736261452000,"snapshot":"false","db":"demo2","sequence":null,"table":"customer","server_id":1,"gtid":null,"file":"mysql-bin.001770","pos":3348,"row":0,"thread":null,"query":null},"op":"u","ts_ms":1736261452180,"transaction":null}
更新的数据有更新前后的数据。