首页 > 数据库 >chatpgt-flinkcdc从mysql到kafka再到mysql

chatpgt-flinkcdc从mysql到kafka再到mysql

时间:2023-04-10 20:25:42浏览次数:44  
标签:flink database mysql chatpgt kafka org apache import props

flinkcdc mysql到kafka

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl;
import org.apache.flink.types.Row;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class FlinkCDCDemo {
    // 定义 Kafka Topic 名字和 Flink Checkpoint 目录
    private static final String KAFKA_TOPIC = "test";
    private static final String CHECKPOINT_DIR = "file:///tmp/flink-cdc";

    public static void main(String[] args) throws Exception {
        // 创建 Flink 执行环境
        StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings settings = EnvironmentSettings.newInstance()
                .useBlinkPlanner().inStreamingMode().build();
        StreamTableEnvironment tEnv = StreamTableEnvironmentImpl.create(env, settings);

        // 设置 FlinkCDC 的配置信息
        Properties props = new Properties();
        props.setProperty("scan.startup.mode", "earliest-offset");
        props.setProperty("database.hostname", "localhost");
        props.setProperty("database.port", "3306");
        props.setProperty("database.user", "root");
        props.setProperty("database.password", "password");
        props.setProperty("database.server.id", "1");
        props.setProperty("database.server.name", "mysql-binlog");
        props.setProperty("database.whitelist", "test_db");
        props.setProperty("table.whitelist", "test_table");
        
        // 创建 FlinkCDC 的 Source,读取 MySQL 数据库的变更数据
        tEnv.executeSql(String.format(
                "CREATE TABLE mysql_source (\n" +
                        " id INT,\n" +
                        " name STRING\n" +
                        ") WITH (\n" +
                        " 'connector' = 'mysql-cdc',\n" +
                        " 'properties.bootstrap.servers' = '%s',\n" +
                        " 'properties.group.id' = 'test',\n" +
                        " 'format' = 'json',\n" +
                        " 'scan.startup.mode' = 'earliest-offset',\n" +
                        " 'database.hostname' = '%s',\n" +
                        " 'database.port' = '%s',\n" +
                        " 'database.user' = '%s',\n" +
                        " 'database.password' = '%s',\n" +
                        " 'database.server.id' = '%s',\n" +
                        " 'database.server.name' = '%s',\n" +
                        " 'database.whitelist' = '%s',\n" +
                        " 'table.whitelist' = '%s',\n" +
                        " 'debezium.transforms' = 'unwrap,flatten'\n" +
                        ")",
                "localhost:9092",
                props.getProperty("database.hostname"),
                props.getProperty("database.port"),
                props.getProperty("database.user"),
                props.getProperty("database.password"),
                props.getProperty("database.server.id"),
                props.getProperty("database.server.name"),
                props.getProperty("database.whitelist"),
                props.getProperty("table.whitelist")
        ));
        
        // 创建 Kafka Producer,用于将变更数据发送到 Kafka Topic 中
        Properties producerProps = new Properties();
        producerProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        producerProps.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        producerProps.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>(
                KAFKA_TOPIC,
                new KeyedSerializationSchema<Row>() {
                    @Override
                    public ProducerRecord<byte[], byte[]> serializeKeyAndValue(Row row) {
                        return null;
                    }

                    @Override
                    public ProducerRecord<byte[], byte[]> serializeValue(Row row) {
                        // 把 Row 转成 JSON 字符串,并将其发送到 Kafka Topic
                        return new ProducerRecord<>(KAFKA_TOPIC, row.toString().getBytes());
                    }
                },
                producerProps,
                FlinkKafkaProducer.Semantic.EXACTLY_ONCE
        );

        // 将 Source 读取的数据写入 Kafka Topic 中
        tEnv.executeSql(String.format(
                "INSERT INTO kafka_sink\n" +
                        "SELECT id, name\n" +
                        "FROM mysql_source"
        )).print();

        // 启动 Flink Job
        env.execute("FlinkCDC Demo");
    }
}

从kafka到mysql



import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.Properties;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.mysql.MySQLSinkFunction;
import org.apache.flink.streaming.connectors.mysql.MySQLUpsertTableSink;
import org.apache.flink.streaming.connectors.mysql.StatementSetSinkFunction;
import org.apache.flink.streaming.connectors.mysql.descriptor.MySQLDescriptor;
import org.apache.flink.streaming.connectors.mysql.descriptor.MySQLValidator;
import org.apache.flink.streaming.connectors.mysql.ExperimentalUpsertMySQLSinkFunction;
import org.apache.flink.streaming.connectors.mysql.JdbcConnectionOptions;
import org.apache.flink.streaming.connectors.mysql.MySQLConnectionProvider;
import org.apache.flink.types.Row;

public class FlinkKafkaToMySQLDemo {
    private static final String KAFKA_TOPIC = "test";
    private static final String MYSQL_JDBC_URL = "jdbc:mysql://localhost:3306/test";
    private static final String MYSQL_USERNAME = "root";
    private static final String MYSQL_PASSWORD = "password";

    public static void main(String[] args) throws Exception {
        // 创建 Flink 执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 设置 Checkpointing 配置
        env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setCheckpointTimeout(60000);
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000);
        env.getCheckpointConfig().setPreferCheckpointForRecovery(true);

        // 设置 MySQLSink 配置
        MySQLConnectionProvider connectionProvider = new MySQLConnectionProvider() {
            @Override
            public Connection getConnection() throws SQLException {
                return DriverManager.getConnection(MYSQL_JDBC_URL, MYSQL_USERNAME, MYSQL_PASSWORD);
            }
        };

        MySQLUpsertTableSink mysqlSink = MySQLUpsertTableSink.builder()
                .withConnectionProvider(connectionProvider)
                .withDbName("test")
                .withTableName("test_table")
                .withFlushIntervalMillis(1000)
                .build();

        // 创建 Kafka Consumer,读取指定 Topic 中的数据
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "localhost:9092");
        props.setProperty("group.id", "test-consumer-group");
        props.setProperty("auto.offset.reset", "earliest");
        FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(
                KAFKA_TOPIC,
                new SimpleStringSchema(),
                props
        );

        // 将 Kafka 数据转成 Row 类型(假设 Kafka 中的数据为 id,value 形式)
        DataStream<Row> kafkaStream = env.addSource(kafkaConsumer)
                .map((MapFunction<String, Row>) value -> {
                    String[] fields = value.split(",");
                    return Row.of(Integer.parseInt(fields[0]), fields[1], new Timestamp(System.currentTimeMillis()));
                });

        // 写入 MySQL
        kafkaStream.addSink(mysqlSink).name("MySQL Upsert Sink");

        // 启动 Flink Job
        env.execute("Flink Kafka to MySQL Demo");
    }
}


标签:flink,database,mysql,chatpgt,kafka,org,apache,import,props
From: https://www.cnblogs.com/hbym/p/17304157.html

相关文章

  • 学习笔记394—Windows 10 MySQL 数据库安装
    Windows10MySQL数据库安装1、MySQL的安装方式MySQL的社区版(MySQLCommunity)是免费的、开源的,像企业版这些是收费的,学习阶段使用社区版的即可。MySQL社区版在Windows10的安装方式可以分为两种,一种是使用安装程序安装,另一种是使用压缩包安装。个人倾向于使用压缩包......
  • Mysql tinyint长度为1时在java中被转化成boolean型(踩坑)
    资料参考链接1:https://www.cnblogs.com/joeylee/p/3878223.html资料参考链接2:https://blog.csdn.net/HD243608836/article/details/118197811目录背景线上事故1污染数据2类型转换异常原因解决方法.背景踩过两次tinyint的坑线上事故1污染数据问题背景tinyint(1)在j......
  • python操作mysql数据库
    Python操作mysql库python操作mysql数据库,需要使用第三库:pymysql一、mysql安装官网:https://www.mysql.com/二、安装pymysqlpipinstallPyMySql-ihttp://pypi.douban.com/simple/--trusted-hostpypi.douban.com三、使用代码演示:importpymysqlconn=pymysql.connec......
  • mysql 2023-04-09 23:59:59 999 为什么会展示为第二天
    在MySQL中,日期时间类型包括DATE、TIME、DATETIME、TIMESTAMP等。其中,DATETIME和TIMESTAMP类型可以表示具体的日期和时间,包含年、月、日、时、分、秒等信息。当使用DATETIME或TIMESTAMP存储日期时间值时,如果精度达到了秒级别以上,MySQL会进行四舍五入,将精度保留到秒级别......
  • ubuntu 修改mysql的大小写不敏感
    1.进入目录径:/etc/mysql/mysql.conf.d2.修改文件mysqld.cnf文件  [mysqld]  标签下方添加  lower_case_table_names=1注意:该配置会强制将表名改为小写,如果当前存在大写的表,请将大写的表改为小写再改配置,否则原来大写的表无法删除,小写的表名也无法创建或者修改完成后......
  • Windows 下安装 MySQL 8/7(使用命令行)
    下载Mysql的zip安装包下载地址:https://mirrors.aliyun.com/mysql/MySQL-8.0/https://mirrors.163.com/mysql/Downloads/MySQL-8.0/https://mirrors.aliyun.com/mysql/MySQL-5.7/https://mirrors.163.com/mysql/Downloads/MySQL-5.7/创建my.ini配置文件文件解压,进入mysql......
  • pymysqlpool踩坑和应用
    上周部署一个web服务时用到了多线程操作mysql数据库,虽然数据可以操作入库成功,但是一直报错‘pymysql.err.InterfaceError:(0,'')’。查了下资料是因为多线程操作引起的报错,需要使用连接池管理数据库连接。chatGPT也给出了以下回答:你可以使用连接池库(如pymysqlpool、DBUtils等......
  • mysql数据库锁优化和注意 -- [转一篇很好的文章]
    为了保证数据的一致完整性,任何一个数据库都存在锁定机制。锁定机制的优劣直接应想到一个数据库系统的并发处理能力和性能,所以锁定机制的实现也就成为了各种数据库的核心技术之一。本章将对MySQL中两种使用最为频繁的存储引擎MyISAM和Innodb各自的锁定机制进行较为详细的分析。......
  • 【MySQL】数据库基础
    1.什么是数据库数据库是用来存储数据的。那么我们之前学习过的存储数据可以使用文件,那么为什么还要弄个数据库呢?这就要谈谈用文件保存数据的几个缺点:文件的安全性问题文件不利于数据查询和管理文件不利于存储海量数据文件在程序中控制不方便因此为了解决上述的问题,专家们设计出更利......
  • 【MySQL--02】库的操作
    1.库的操作1.1创建数据库语法:CREATEDATABASE[IFNOTEXISTS]db_name[create_specification[,create_specification]...]create_specification:[DEFAULT]CHARACTERSETcharset_name[DEFAULT]COLLATEcollation_name说明:大写的表示关键字[]是可选项CHARACTERSET:指定......