首页 > 数据库 >SpringBoot整合Flink CDC,实时追踪mysql数据变动

SpringBoot整合Flink CDC,实时追踪mysql数据变动

时间:2024-07-25 08:59:25浏览次数:21  
标签:SpringBoot CDC Spring Flink 实时 MySQL org

❃博主首页 : 「码到三十五」 ,同名公众号 :「码到三十五」,wx号 : 「liwu0213」
☠博主专栏 : <mysql高手> <elasticsearch高手> <源码解读> <java核心> <面试攻关>
♝博主的话 : 搬的每块砖,皆为峰峦之基;公众号搜索「码到三十五」关注这个爱发技术干货的coder,一起筑基


我们将整合Spring Boot和Apache Flink CDC(Change Data Capture)来实现实时数据追踪。下面是一个基本的实践流程代码,包括搭建Spring Boot项目、整合Flink CDC以及实现数据变动的实时追踪。

文章目录

前言

Flink CDC(Flink Change Data Capture)是一种基于数据库日志的CDC技术,它实现了一个全增量一体化的数据集成框架。与Flink计算框架相结合,Flink CDC能够高效地实现海量数据的实时集成。其核心功能在于实时监视数据库或数据流中的数据变动,并将这些变动抽取出来,以便进行进一步的处理和分析。借助Flink CDC,用户可以轻松地构建实时数据管道,实时响应和处理数据变动,为实时分析、实时报表和实时决策等场景提供有力支持。

Flink CDC的应用场景广泛,包括但不限于实时数据仓库更新、实时数据同步和迁移以及实时数据处理等。它还能确保数据一致性,并在数据发生变更时准确地进行捕获和处理。此外,Flink CDC支持与多种数据源进行集成,如MySQL、PostgreSQL、Oracle等,并提供了相应的连接器,便于数据的捕获和处理。

接下来,将详细介绍MySQL CDC的使用。MySQL CDC连接器允许从MySQL数据库中读取快照数据和增量数据。

1. MySQL开启Binlog

MySQL中开启binlog功能,需要修改配置文件中(如Linux的/etc/my.cnf或Windows的\my.ini)的[mysqld]部分设置相关参数:

[mysqld]
server-id=1
# 设置日志格式为行级格式
binlog-format=Row
# 设置binlog日志文件的前缀
log-bin=mysql-bin
# 指定需要记录二进制日志的数据库
binlog_do_db=testjpa

除了开启binlog功能外,还需要为Flink CDC配置相应的权限,以确保其能够正常连接到MySQL并读取数据。这包括授予Flink CDC连接MySQL的用户必要的权限,如SELECT、REPLICATION SLAVE、REPLICATION CLIENT、SHOW VIEW等。这些权限是Flink CDC读取数据和元数据所必需的。

检查是否已开启binlog功能:

mysql> SHOW VARIABLES LIKE 'log_bin';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| log_bin       | ON    |
+---------------+-------+

至此,MySQL的相关配置已完成。

2. 创建Spring Boot项目

首先,你需要创建一个Spring Boot项目。可以使用Spring Initializr(https://start.spring.io/)来快速生成项目。

3. 添加依赖

pom.xml中添加Apache Flink和Flink CDC的依赖。以下是必要的依赖:

<dependencies>
    <!-- Flink dependency -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>1.14.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.12</artifactId>
        <version>1.14.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-mysql-cdc</artifactId>
        <version>2.0.0</version>
    </dependency>
    <!-- Spring Boot dependencies -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
    </dependency>
</dependencies>

4. 配置Flink和MySQL CDC

在Spring Boot的application.ymlapplication.properties文件中配置Flink和MySQL数据库连接:

flink:
  checkpoint:
    interval: 10000
  parallelism: 1

spring:
  datasource:
    url: jdbc:mysql://localhost:3306/your_database
    username: your_username
    password: your_password

5. 实现数据实时追踪

创建一个服务类来实现数据的实时追踪:

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.springframework.stereotype.Service;

@Service
public class FlinkCdcService {

    public void startDataStreaming() {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        // 使用Flink CDC连接MySQL
        String name = "inventory";
        tableEnv.executeSql("CREATE TABLE " + name + " (" +
            "  id INT," +
            "  name STRING," +
            "  description STRING," +
            "  weight DECIMAL(10, 3)" +
            ") WITH (" +
            "  'connector' = 'mysql-cdc'," +
            "  'hostname' = 'localhost'," +
            "  'port' = '3306'," +
            "  'username' = 'your_username'," +
            "  'password' = 'your_password'," +
            "  'database-name' = 'your_database'," +
            "  'table-name' = 'your_table'" +
            ")");

        // 查询并打印结果
        DataStream<String> dataStream = tableEnv.sqlQuery("SELECT * FROM " + name).execute().print();

        try {
            env.execute("Flink CDC Demo");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

6. 启动Spring Boot应用

在你的Spring Boot应用的启动类中调用FlinkCdcServicestartDataStreaming方法来启动数据追踪:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class FlinkCdcApplication implements CommandLineRunner {

    @Autowired
    private FlinkCdcService flinkCdcService;

    public static void main(String[] args) {
        SpringApplication.run(FlinkCdcApplication.class, args);
    }

    @Override
    public void run(String... args) throws Exception {
        flinkCdcService.startDataStreaming();
    }
}

7. 运行并测试

运行Spring Boot应用,并在MySQL数据库中做出一些数据变动。你应该能在控制台看到实时打印的数据变动。


关注公众号[码到三十五]获取更多技术干货 !

标签:SpringBoot,CDC,Spring,Flink,实时,MySQL,org
From: https://blog.csdn.net/qq_26664043/article/details/140572554

相关文章

  • 基于微信小程序+协同过滤推荐算法+SpringBoot+数据可视化的校园顺路代送平台设计和实
    博主介绍:✌全网粉丝50W+,csdn特邀作者、博客专家、CSDN新星计划导师、Java领域优质创作者,博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和学生毕业项目实战,高校老师/讲师/同行前辈交流✌技术范围:SpringBoot、Vue、SSM、HLMT、Jsp、PHP、Nodejs、P......
  • 计算机专业论文 (SpringBoot/SpringCloud+Vue+MySql)
    (可辅导论文)需要源码dd毕业设计(论文)  论文题目:基于Vue和SpringCloud的旅游网站设计与实现 摘 要 本论文主要介绍了基于Vue和SpringCloud的旅游网站的设计与实现。如今,旅游业已成为社会发展中的重要组成部分,旅游平台聚集多种多样的旅行方案以及攻略,越来越多的人......
  • SpringBoot 配置文件详解:properties 和 yml
    目录一、配置文件的作用二、配置文件的格式三、properties配置文件说明 3.1 properties基本语法3.2读取配置文件四、yml配置文件说明4.1yml基本语法4.2yml读取文件4.3yml使用进阶4.3.1配置对象4.3.2配置集合4.3.3配置Map一、配置文件的作用配置文......
  • SpringBoot自动配置(面试重点)
    自动配置是指:自动配置是指在应用程序启动时,SpringBoot根据classpath路径下的jar包自动配置应用程序所需的一系列bean和组件,从而减少开发者的配置工作,提高开发效率。一:ConditionCondition是spring4.0之后添加的条件判断功能,通过这个功能可以实现选择性的创建Bean操作。Condit......
  • SpringBoot整合Swagger2,代码文档一手抓
    文章目录引言什么是swaggerSwagger的优势SpringBoot整合Swagger2添加Swagger依赖application.yml配置配置类SwaggerConfig启动类配置RESTful实战案例参考常见swagger注解说明页面访问效果常见错误引言什么是swaggerSwagger是一个规范且完整的框架,用于生成、描......
  • Springboot整合redis
    引入依赖<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency>修改配置文件//单机模式配置spring.redis.host=172.16.7.21 //ip地址spring.redis.port=6379 //端口号s......
  • Java学习 - Springboot 集成 Security 入门小实例
    前言SpringSecurity是Spring家族中一个强大可定制的身份验证和访问控制框架,和Shiro一样,它们都具有认证、授权、加密等用于权限管理的功能。但相比于Shiro,SpringSecurity的功能无疑更加强大。而且作为Spring家族中的一份子,配合家族中的其它兄弟-SpringBoot、S......
  • Springboot的n多注解(自用)
    pojocontrollerservice(无)service.implmapper  ——————pojo@Dataget/set等@NoArgsConstructor无参构造@AllArgsConstructor有参构造 ——————controller@Slf4j......
  • 基于Java+SpringBoot+Vue的卓越导师双选系统的设计与开发(源码+lw+部署文档+讲解等)
    文章目录前言项目背景介绍技术栈后端框架SpringBoot前端框架Vue数据库MySQL(MyStructuredQueryLanguage)具体实现截图详细视频演示系统测试系统测试目的系统功能测试系统测试结论代码参考数据库参考源码获取前言......
  • 基于springboot的助农服务平台
    基于springboot的助农服务app介绍2024届软件工程毕业设计 该项目是基于springboot的助农App的设计及实现,主要实现了管理员,用户,商家三个端的设计,其中主要实现的功能有产品模块,订单模块,购物车模块,以及相关联的管理模块,秒杀等,帮助农民出售农作物,提高农业水平的发展,提高农民......