首页 > 其他分享 >flink-cdc学习记录

flink-cdc学习记录

时间:2023-11-02 15:01:55浏览次数:40  
标签:记录 flink boot cdc apache org com

添加依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.7.14</version>
		<relativePath/> <!-- lookup parent from repository -->
	</parent>
	<groupId>com.xwq</groupId>
	<artifactId>cdc</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<name>cdc</name>
	<description>cdc</description>
	<properties>
		<java.version>1.8</java.version>
		<flink-version>1.13.0</flink-version>
	</properties>
	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>

		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-java</artifactId>
			<version>${flink-version}</version>
		</dependency>

		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-streaming-java_2.12</artifactId>
			<version>${flink-version}</version>
		</dependency>

		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-clients_2.12</artifactId>
			<version>${flink-version}</version>
		</dependency>

		<dependency>
			<groupId>org.apache.hadoop</groupId>
			<artifactId>hadoop-client</artifactId>
			<version>3.1.3</version>
		</dependency>

		<dependency>
			<groupId>mysql</groupId>
			<artifactId>mysql-connector-java</artifactId>
			<version>8.0.26</version>
		</dependency>

		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-table-planner-blink_2.12</artifactId>
			<version>${flink-version}</version>
		</dependency>

		<dependency>
			<groupId>com.ververica</groupId>
			<artifactId>flink-connector-mysql-cdc</artifactId>
			<version>2.0.0</version>
		</dependency>

		<dependency>
			<groupId>com.alibaba</groupId>
			<artifactId>fastjson</artifactId>
			<version>1.2.75</version>
		</dependency>
	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-assembly-plugin</artifactId>
				<version>3.0.0</version>
				<configuration>
					<descriptorRefs>
						<descriptorRef>jar-with-dependencies</descriptorRef>
					</descriptorRefs>
				</configuration>
				<executions>
					<execution>
						<id>make-assembly</id>
						<phase>package</phase>
						<goals>
							<goal>single</goal>
						</goals>
					</execution>
				</executions>
			</plugin>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>

</project>

示例代码

package com.xwq;

import com.ververica.cdc.connectors.mysql.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.DebeziumSourceFunction;
import com.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class Main {
    public static void main(String[] args) throws Exception {
        //1、获取Flink-cdc运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //设置并行度
        env.setParallelism(1);
        //通过flink-cdc构建SourceFunction
        DebeziumSourceFunction<String> sourceFunction = MySqlSource.<String>builder()
                .hostname("127.0.0.1")
                .port(3306)
                .username("root")
                .password("root")
                .databaseList("cdc_test")
                .tableList()//db.tableName
                .deserializer(new StringDebeziumDeserializationSchema())
                .startupOptions(StartupOptions.initial())
                .build();
        DataStreamSource<String> dataStreamSource = env.addSource(sourceFunction);
        dataStreamSource.print();
        env.execute("cdc_test");
    }
}

问题及解决

  1. bin_log未开启

问题:Cannot read the binlog filename and position via 'SHOW MASTER STATUS'. Make sure your server is correctly configured

解决:my.ini增加配置

log_bin=mysql-bin
binlog-format=ROW
  1. 未设置时区

问题:The server time zone value '�й���׼ʱ��' is unrecognized or represents more than one time zone. You must configure either the server or JDBC driver

解决:增加sourceFunction参数

.serverTimeZone("UTC")

标签:记录,flink,boot,cdc,apache,org,com
From: https://blog.51cto.com/u_16338251/8147063

相关文章

  • 【转载】CUDA编程学习记录 C++
    参考Yuezero的CUDA编程基础(https://blog.csdn.net/weixin_54338498/article/details/127947551)CUDA编程模型host指代CPU及其内存,包含host程序device指代GPU及其内存,包含device程序经典CUDA程序的执行流程如下:分配host内存,并进行数据初始化;分配device内存,并从host将......
  • 日常记录--2023-11月1日--周三
    日程:今天只有上午有课,7点起床,吃了个早饭去上课,早上第一节数据结构,学习了队列,还讲了相关应用。中午午休一个小时,下午起来干了点别的,完善了之前的代码,晚上7-9点听了下代码随想路,学了会javaweb。学了什么:可恶的Javaweb,复习了数据结构。PS:不想学习,想要成为月饼盒;......
  • [20231023]为什么刷新缓存后输出记录顺序发生变化6.txt
    [20231023]为什么刷新缓存后输出记录顺序发生变化6.txt--//前几天做了单表刷新缓存后输出记录顺序发生变化的情况,测试2个表的情况时遇到一个奇怪的现象。--//我前面的测试18c,如果使用10046跟踪看不到我遇到的情况,我想使用strace跟踪,发现该机器配置使用asm,strace跟踪无法看到一--/......
  • 20231101构造题记录
    20231101构造题记录A.人生的经验可以对于每个长度为\(l-1\)的串建一个点,每个点有\(c\)个后继状态,也有\(c\)个入边,所以一定可以找到一个欧拉路因此答案为\(c^l+l-1\)即所有可能的串首尾相接拼起来的长度考虑用一个圈套圈求欧拉路,即每次拓展一个点,用栈维护,如果不能继......
  • C++ 记录
    STL队列(queue),一个先进先出的容器,需要用到头文件queue。函数成员名功能返回值类型que.empty()判断队列是否为空,空返回真,非空返回假boolque.size()返回队列中元素个数unsignedlonglongque.push()将元素x放进队尾voidque.front()返回队首元素qu......
  • linux学习记录:进程管理
    1.进程:正在运行的程序,包括这个程序所占用的系统资源。每个进程都有唯一的进程标识pid,一个pid只能识别一个进程,ppid是父进程id。进程状态:就绪、运行、阻塞。2.查看进程静态查看进程:psaux(捕捉某一瞬间某一个进程的状态)-a:显示所有用户的进程,包括完整路径-u:显示使用者的名......
  • 深度学习相关问题的记录:验证集loss上升,准确率却上升
    验证集loss上升,准确率却上升验证集loss上升,acc也上升这种现象很常见,原因是过拟合或者训练验证数据分布不一致导致,即在训练后期,预测的结果趋向于极端,使少数预测错的样本主导了loss,但同时少数样本不影响整体的验证acc情况。ICML2020发表了一篇文章:《DoWeNeedZeroTrainingLossAf......
  • linux 安装rabbitmq流程记录
    Linux系统:CentOS7.x(如果是CentOS8.x的话,需要修改下面两个环境版本号中的el7为el8)Erlang:erlang-22.3.4.12-1.el7.x86_64.rpmRabbitMQ:rabbitmq-server-3.8.13-1.el7.noarch.rpm1安装erlangLinux系统:CentOS7.x(如果是CentOS8.x的话,需要修改下面两个环境版本号中的el7为el8......
  • 批处理BAT使用记录
    combin.batrem@echooffclssetsrcdir=%~1setoutdir=%~d1%~p1setoutname=%~n1setoutpath="%outdir%%outname%.txt"%~d1cd%srcdir%del%outpath%echooutpath:%outpath%echoloadfilelistpleasewait...for/f"delims="%%ain(�......
  • nginx 全局变量记录
    remote_addr客户端ip,如:192.168.4.2binary_remote_addr客户端ip(二进制)remote_port客户端port,如:50472remote_user已经经过AuthBasicModule验证的用户名host请求主机头字段,否则为服务器名称,如:dwz.stamhe.comrequest用户请求信息,如:GET/?_a=index&_m=show&count=10HT......