首页 > 其他分享 >flinkcdc 实现数据监听

flinkcdc 实现数据监听

时间:2025-01-07 23:01:07浏览次数:8  
标签:import flink flinkcdc mysql apache org null 数据 监听

1.概述

Flink CDC 是一个用于实时数据和批处理数据的分布式数据集成工具。他可以监听数据库表的变化。实现将数据变化写到其他的数据源中。

我们可以使用java 实现自定义的数据写出。下面是实现细节。

2.实现代码

2.1 项目依赖

<dependencies>
        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-sql-connector-mysql-cdc</artifactId>
            <version>2.3.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.12</artifactId>
            <version>1.14.0</version>
        </dependency>


        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.12</artifactId>
            <version>1.14.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-base</artifactId>
            <version>1.14.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime-web_2.12</artifactId>
            <version>1.14.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-runtime_2.12</artifactId>
            <version>1.14.0</version>
        </dependency>

    </dependencies>

2.2 实现Sink

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

public class CustomSink extends RichSinkFunction<String> {

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
    }

    @Override
    public void close() throws Exception {
        super.close();
    }

    @Override
    public void invoke(String value, Context context) throws Exception {
        System.err.println(value);
    }
}

2.3 执行CDC相关代码

import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class MainCdc {

    public static void main(String[] args) throws Exception {
        MySqlSource<String> source = MySqlSource.builder()
                .hostname("localhost")
                .port(3306)
                .databaseList("demo2")
                .tableList("demo2.customer")
                .username("root")
                .password("root")
                .deserializer((DebeziumDeserializationSchema)new JsonDebeziumDeserializationSchema())
                .includeSchemaChanges(true)
                .build();

        Configuration configuration =new Configuration();
        configuration.setInteger(RestOptions.PORT,8081);
        StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment(configuration);

        env.enableCheckpointing(5000);
        DataStreamSink<String> sink = env.fromSource(source, WatermarkStrategy.noWatermarks(),"Mysql Source")
                .addSink(new CustomSink());
        env.execute();

    }
}

2.4 对表进行操作

继续增加删除更新

  • 删除数据
{"before":{"id":"1","name":"张飞"},"after":null,"source":{"version":"1.6.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1736261166000,"snapshot":"false","db":"demo2","sequence":null,"table":"customer","server_id":1,"gtid":null,"file":"mysql-bin.001770","pos":2755,"row":0,"thread":null,"query":null},"op":"d","ts_ms":1736261166689,"transaction":null}
  • 增加数据
{"before":null,"after":{"id":"9","name":"小王"},"source":{"version":"1.6.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1736261283000,"snapshot":"false","db":"demo2","sequence":null,"table":"customer","server_id":1,"gtid":null,"file":"mysql-bin.001770","pos":3047,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1736261283678,"transaction":null}
  • 更新数据
{"before":{"id":"9","name":"小王"},"after":{"id":"9","name":"赵云"},"source":{"version":"1.6.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1736261452000,"snapshot":"false","db":"demo2","sequence":null,"table":"customer","server_id":1,"gtid":null,"file":"mysql-bin.001770","pos":3348,"row":0,"thread":null,"query":null},"op":"u","ts_ms":1736261452180,"transaction":null}

更新的数据有更新前后的数据。

2.5 web管理界面

image

标签:import,flink,flinkcdc,mysql,apache,org,null,数据,监听
From: https://www.cnblogs.com/yg_zhang/p/18658608

相关文章

  • Easy.Admin:基于 .NET 8 和 Vue3 的后台管理系统,支持多种数据库和服务端渲染(SSR)
     ......
  • C语言数据结构与算法(二叉树)
    1.二叉树的概念及结构1.1概念一棵二叉树是结点的一个有限集合,该集合:1.或者为空2.由一个根节点加上两棵别称为左子树和右子树的二叉树组成特性:1.二叉树不存在度大于2的结点2.二叉树的子树有左右之分,次序不能颠倒,因此二叉树是有序树1.2特殊的二叉树满二叉树:每......
  • Python应用指南:高德交通态势数据(二)
    本篇文章是对上篇内容的一个深化探讨,通过生成多个矩形来实现一定范围的道路交通态势查询,在上一篇文章中,我们详细介绍了如何利用单个矩形区域查询功能来获取特定区域内的实时交通状况。然而,在实际应用中,城市交通网络复杂多变,单一矩形往往难以覆盖广泛的地理范围或满足更精细的......
  • 数据库设计是构建高效、可靠和可扩展数据存储系统的关键过程
    数据库设计是构建高效、可靠和可扩展数据存储系统的关键过程。以下是数据库设计的一些核心原则:规范化(Normalization):确保数据以最少的冗余存储。消除数据更新异常和删除异常。通常分为第一范式(1NF)、第二范式(2NF)、第三范式(3NF)以及BC范式(BCNF)。反规范化(Denormalization):......
  • 深度学习目标检测中_如何使用Yolov5训练变电站各种仪表数据集等共6000余张 ,yolo标签,构
    深度学习目标检测中如何使用Yolov5训练变电站各种仪表数据集等共6000余张,并且都已打上标签,构建一个各种仪表数据集检测的项目。图像信息清晰yolo格式yolov5目标检测变电站各种仪表数据集等共6000余张,并且都已打上标签,图像信息清晰以下所有代码仅供参考!构建一个基......
  • 使用凯斯西储大学轴承数据集来构建一个完整的分析流程,包括原始信号图、时域频域对比图
    如何利用凯斯西储大学轴承数据图更换数据集,包括原始信号图,➕时域频域对比图,➕模态分解图+包络谱图,怎么做?使用凯斯西储大学轴承数据集来构建一个完整的分析流程,包括原始信号图、时域频域对比图、模态分解图和包络谱图。以下是详细的步骤和代码示例。文章代码仅供参考......
  • 构建基于yolov10麦穗目标检测系统 小麦麦头数据集检测 实现对麦穗4000张数据的处理 深
    yolov10麦穗目标检测项目,附h代码和麦穗数据集的检测麦穗目标检测数据集4000张左右yolov8,yolov10系列图像分辨率为1024x1024麦穗数据集标签有yolo格式(txt文件标签)和coco格式(json文件标签)如何水处理这些数据声明:文章内所有代码仅供参考!帮助你使用YOLOv8来训练麦穗......
  • 操作手册:集成钉钉审批实例消息监听配置
    此文档将记录在慧集通平台怎么实现钉钉审批实例结束或发起或取消时,能够实时的将对应的实例数据抓取出来送入第三方系统集成平台配置1、配置中心库,存储钉钉发送的消息,可以忽略,若不配置,则钉钉的消息将不再记录到中心库2、创建存储方案,发布平台对象到中心库。3、打开......
  • MySQL 数据库的备份和恢复
    想象一下,如果一份关键的数据库数据因意外丢失,公司将面临怎样的危机?对技术人员而言,备份与恢复是守护数据安全的最后一道防线。今天,就带你深入了解MySQL数据库备份与恢复的核心方法,让你从容应对数据丢失的挑战。如何高效备份MySQL数据库?当数据丢失时,又该如何快速恢复? ......
  • [数据结构学习笔记8] 二叉查找树(Binary Search Trees)
    二叉查找树,它是一类特殊的二叉树,除了基本的二叉树规则外,还要满足:1.左边的子节点要小于父节点值2.右边的子节点要大于父节点值 图示:添加节点:        42       |   |      24  99      |    | ......