flinkcdc版本:1.14.0
mysql版本:5.7
1、开启MySQL中binlog日志
修改我们的配置文件 my.cnf,增加:
server_id=1 log_bin=mysql-bin binlog_format=ROW expire_logs_days=30
重启mysql
查看MySQL是否开启日志成功
show variables like '%log_bin%'
2、引入pom依赖
<repositories> <repository> <id>aliyun</id> <url>http://maven.aliyun.com/nexus/content/groups/public/</url> </repository> <repository> <id>cloudera</id> <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url> </repository> <repository> <id>jboss</id> <url>http://repository.jboss.com/nexus/content/groups/public</url> </repository> </repositories> <dependencies> <!--flink connector连接器基础包--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-base</artifactId> <version>1.14.0</version> </dependency> <!--flink cdc mysql源--> <dependency> <groupId>com.ververica</groupId> <artifactId>flink-sql-connector-mysql-cdc</artifactId> <version>2.3.0</version> </dependency> <!--flink 的DataStream数据流api--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.12</artifactId> <version>1.14.0</version> </dependency> <!--flink java客户端--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.12</artifactId> <version>1.14.0</version> </dependency> <!--开启webUI支持,端口8081,默认没有开启--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-runtime-web_2.12</artifactId> <version>1.14.0</version> </dependency> <!--flink的table api&sql程序可以连接到其他外部系统,用于读写批处理表和流式表--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-runtime_2.12</artifactId> <version>1.14.0</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.0</version> <configuration> <source>1.8</source> <target>1.8</target> <encoding>UTF-8</encoding> </configuration> </plugin> </plugins> </build>View Code
3、java代码
CustomSink:
package com.flinkcdc.mysql; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; /** * @author * @date 2023/3/15 16:37 */ public class CustomSink extends RichSinkFunction<String> { @Override public void invoke(String json,Context context) throws Exception { //OP字段:该字段也有4种取值,分别是C(create),U(update), D(delete) //对于u操作,其数据部分包含了before和after System.out.println(">>>"+json); } /** * 打开连接 * @param parameters * @throws Exception */ @Override public void open(Configuration parameters) throws Exception { } /** * 关闭连接 * @throws Exception */ @Override public void close() throws Exception { } }View Code
MySqlSourceExample:
package com.flinkcdc.mysql; import com.ververica.cdc.connectors.mysql.source.MySqlSource; import com.ververica.cdc.connectors.mysql.table.StartupOptions; import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.RestOptions; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; /** * @author * @date 2023/3/15 16:40 */ public class MySqlSourceExample { public static void main(String[] args) throws Exception { MySqlSource<String> mySqlSource = MySqlSource.<String>builder() .hostname("192.168.100.105") .port(3306) .databaseList("test") // set captured database .tableList("test.Course") // set captured table .username("root") .password("123456") // 自定义反序列化方式 .deserializer(new JsonDebeziumDeserializationSchema()) // StartupOptions.initial() 先全量后增量 .startupOptions(StartupOptions.initial()) // StartupOptions.latest() 从最新binlog读取,增量方式 // .startupOptions(StartupOptions.latest()) .build(); Configuration config = new Configuration(); config.setInteger(RestOptions.PORT, 8081); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config); env.enableCheckpointing(5000); env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source") .addSink(new CustomSink()); env.execute("flinkcdc"); } }View Code
标签:cdc,mysql,flink,import,apache,org,com From: https://www.cnblogs.com/chong-zuo3322/p/17221410.html