背景
在日常服务运行中可能会遇到很多数据上的问题,一些我们可以通过日志查询,但是一些修改等操作日志无法查询到,binlog日志不方便查询而且不是所有表都需要日志,增加了查询的难度,我们考虑使用canal或者flink对binlog进行记录,这里flink,flink程序和客户端版本1.17.1
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<artifactId>xhtx-platform-data</artifactId>
<groupId>flink</groupId>
<version>1.0-SNAPSHOT</version>
<modelVersion>4.0.0</modelVersion>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.8.26</version>
</dependency>
<!-- flink -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.17.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>1.17.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>1.17.1</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.83</version>
</dependency>
<!-- flink -->
<!-- flink cdc -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.30</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>1.17.1</version>
</dependency>
<dependency>
<groupId>com..ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<exclusions>
<exclusion>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-guava</artifactId>
</exclusion>
</exclusions>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>org.mybatis</groupId>
<artifactId>mybatis-spring</artifactId>
<version>3.0.2</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId>
<version>4.3</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.4</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers combine.children="append">
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer">
</transformer>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.flink.cdc.FlinkCDCMySQL</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
FlinkCDCMySQL
package com.flink.cdc;
import com.ververica.cdc.connectors.mysql.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.DebeziumSourceFunction;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Properties;
public class FlinkCDCMySQL {
private static final Logger LOGGER = LoggerFactory.getLogger(FlinkCDCMySQL.class);
static String[] tables = {
"库名.表名"
};
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(3000, CheckpointingMode.EXACTLY_ONCE);
DebeziumSourceFunction<String> build = MySqlSource.<String>builder()
.hostname("hostName")
.port(3306)
// set captured database
.databaseList("库名")
// 如果不添加该参数,则消费指定数据库中的所有表
.tableList(tables)
.username("username")
.password("password")
/**initial:初始化快照,即全量导入后增量导入(检测更新数据写入)
* latest:只进行增量导入(不读取历史变化)
* timestamp:指定时间戳进行数据导入(大于等于指定时间错读取数据)
*/
.startupOptions(StartupOptions.latest())
.deserializer(new JsonDebeziumDeserializationSchema())
.serverTimeZone("Asia/Shanghai")
.build();
DataStreamSource<String> stringDataStreamSource = env.addSource(build);
stringDataStreamSource.setParallelism(1);
stringDataStreamSource.print();
LOGGER.info("监控+++++++++++++");
DataStreamSink<String> stringDataStreamSink = stringDataStreamSource.addSink(new LotteryCollectSink<>());
env.execute("flinkcdcmysql");
}
}
算子LotteryCollectSink,我这里只记录没有使用kafka,使用队列的话配置相关算子即可
public class LotteryCollectSink<T> extends RichSinkFunction<String> {
private static final Logger LOGGER = LoggerFactory.getLogger(LotteryCollectSink.class);
@Override
public void invoke(String value, Context context) {
Action action = JSONUtil.toBean(value, Action.class);
try {
if (null != action && !action.getOp().equals("r")) {
TableChangeHandleStrategy tableChangeHandleStrategy = new ActivityNewChangeHandle();
//记录逻辑
tableChangeHandleStrategy.handle(action);
}
} catch (Exception e) {
LOGGER.error("invoke错误:"+e);
return;
}
}
}
标签:cdc,数据库,flink,监控,import,apache,org,com
From: https://www.cnblogs.com/shi-hao/p/18190977