首页 > 数据库 >大数据-业务数据采集-FlinkCDC 读取 MySQL 数据存入 Kafka

大数据-业务数据采集-FlinkCDC 读取 MySQL 数据存入 Kafka

时间:2022-12-08 12:00:59浏览次数:62  
标签:flink String MySQL new FlinkCDC org apache import 数据

image

目录 作用
app 产生各层数据的 flink 任务
bean 数据对象
common 公共常量
utils 工具类

app.ods.FlinkCDC.java

package com.atguigu.app.ods;

import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
import com.atguigu.app.function.CustomerDeserialization;
import com.atguigu.utils.MyKafkaUtil;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class FlinkCDC {

    public static void main(String[] args) throws Exception {

        //1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //1.1 设置CK&状态后端
        //env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/gmall-flink-210325/ck"));
        //env.enableCheckpointing(5000L);
        //env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        //env.getCheckpointConfig().setCheckpointTimeout(10000L);
        //env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);
        //env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000);

        //env.setRestartStrategy(RestartStrategies.fixedDelayRestart());

        //2.通过FlinkCDC构建SourceFunction并读取数据
        DebeziumSourceFunction<String> sourceFunction = MySQLSource.<String>builder()
                .hostname("hadoop102")
                .port(3306)
                .username("root")
                .password("000000")
                .databaseList("gmall-210325-flink")
                .deserializer(new CustomerDeserialization())
                .startupOptions(StartupOptions.latest())
                .build();
        DataStreamSource<String> streamSource = env.addSource(sourceFunction);

        //3.打印数据并将数据写入Kafka
        streamSource.print();
        String sinkTopic = "ods_base_db";
        streamSource.addSink(MyKafkaUtil.getKafkaProducer(sinkTopic));

        //4.启动任务
        env.execute("FlinkCDC");
    }

}

CustomerDeserialization

package com.atguigu.app.function;

import com.alibaba.fastjson.JSONObject;
import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;

import java.util.List;

public class CustomerDeserialization implements DebeziumDeserializationSchema<String> {

    /**
     * 封装的数据格式
     * {
     * "database":"",
     * "tableName":"",
     * "before":{"id":"","tm_name":""....},
     * "after":{"id":"","tm_name":""....},
     * "type":"c u d",
     * //"ts":156456135615
     * }
     */
    @Override
    public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {

        //1.创建JSON对象用于存储最终数据
        JSONObject result = new JSONObject();

        //2.获取库名&表名
        String topic = sourceRecord.topic();
        String[] fields = topic.split("\\.");
        String database = fields[1];
        String tableName = fields[2];

        Struct value = (Struct) sourceRecord.value();
        //3.获取"before"数据
        Struct before = value.getStruct("before");
        JSONObject beforeJson = new JSONObject();
        if (before != null) {
            Schema beforeSchema = before.schema();
            List<Field> beforeFields = beforeSchema.fields();
            for (Field field : beforeFields) {
                Object beforeValue = before.get(field);
                beforeJson.put(field.name(), beforeValue);
            }
        }

        //4.获取"after"数据
        Struct after = value.getStruct("after");
        JSONObject afterJson = new JSONObject();
        if (after != null) {
            Schema afterSchema = after.schema();
            List<Field> afterFields = afterSchema.fields();
            for (Field field : afterFields) {
                Object afterValue = after.get(field);
                afterJson.put(field.name(), afterValue);
            }
        }

        //5.获取操作类型  CREATE UPDATE DELETE
        Envelope.Operation operation = Envelope.operationFor(sourceRecord);
        String type = operation.toString().toLowerCase();
        if ("create".equals(type)) {
            type = "insert";
        }

        //6.将字段写入JSON对象
        result.put("database", database);
        result.put("tableName", tableName);
        result.put("before", beforeJson);
        result.put("after", afterJson);
        result.put("type", type);

        //7.输出数据
        collector.collect(result.toJSONString());

    }

    @Override
    public TypeInformation<String> getProducedType() {
        return BasicTypeInfo.STRING_TYPE_INFO;
    }
}

MyKafkaUtil

package com.atguigu.utils;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import javax.annotation.Nullable;
import java.util.Properties;

public class MyKafkaUtil {

    private static String brokers = "hadoop102:9092,hadoop103:9092,hadoop104:9092";
    private static String default_topic = "DWD_DEFAULT_TOPIC";

    public static FlinkKafkaProducer<String> getKafkaProducer(String topic) {
        return new FlinkKafkaProducer<String>(brokers,
                topic,
                new SimpleStringSchema());
    }

    public static <T> FlinkKafkaProducer<T> getKafkaProducer(KafkaSerializationSchema<T> kafkaSerializationSchema) {

        Properties properties = new Properties();
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);

        return new FlinkKafkaProducer<T>(default_topic,
                kafkaSerializationSchema,
                properties,
                FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
    }

    public static FlinkKafkaConsumer<String> getKafkaConsumer(String topic, String groupId) {

        Properties properties = new Properties();

        properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);

        return new FlinkKafkaConsumer<String>(topic,
                new SimpleStringSchema(),
                properties);

    }

    //拼接Kafka相关属性到DDL
    public static String getKafkaDDL(String topic, String groupId) {
        return  " 'connector' = 'kafka', " +
                " 'topic' = '" + topic + "'," +
                " 'properties.bootstrap.servers' = '" + brokers + "', " +
                " 'properties.group.id' = '" + groupId + "', " +
                " 'format' = 'json', " +
                " 'scan.startup.mode' = 'latest-offset'  ";
    }

}

尚硅谷 源代码
https://gitee.com/wh-alex/gmall-flink-210325

标签:flink,String,MySQL,new,FlinkCDC,org,apache,import,数据
From: https://www.cnblogs.com/vipsoft/p/16965598.html

相关文章

  • 时间序列数据分析 tsfresh 平稳性
    参考文章:点这里平稳性:  通常来说,一个平稳的时间序列指的是这个时间序列在一段时间内=具有稳定的统计值,如均值、方差。由于我们对于一个数据是否平稳是有自己的直觉的,所......
  • centos7安装mysql6.5
    1、配置mysql6.5yum源wget-P.http://repo.mysql.com/mysql-community-release-el6-5.noarch.rpmrpm-ivhmysql-community-release-el6-5.noarch.rpm清理yum源缓......
  • mysql的qps与tps等指标监控
    1.SQL概念StructuredQueryLanguage---结构化查询语言有数据定义语言(DDL),例如:CREATE、DROP、ALTER等语句;数据操作语言(DML),例如:INSERT(插入)、UPDATE(修改)、DELETE(删除)语......
  • mysql分组取每组前几条记录(排序)
    首先来造一部分数据,表mygoods为商品表,cat_id为分类id,goods_id为商品id,status为商品当前的状态位(1:有效,0:无效)CREATETABLE`mygoods`(`goods_id`int(11)unsigned......
  • 数据库定时异地备份与还原软件
    数据同步备份与还原软件,主要是实际现实生活中要将生成环境下的数据备份到公司进行相关历史数据的统计,也同时为了保证生产环境下的数据的安全性,做到生产环境下的定时备份,同......
  • 数据挖掘算法—SVM算法
    ✅作者简介:热爱科研的算法开发者,Python、Matlab项目可交流、沟通、学习。......
  • bulk insert批量数据导入
    在SQLServer中,BULKINSERT是用来将外部文件以一种特定的格式加载到数据库表的T-SQL命令。该命令使开发人员能够直接将数据加载到数据库表中,而不需要使用类似于Integration......
  • 5、mybatis连接sqlserver数据库
    1          在idea的maven项目下,使用mybatis连接sqlserver数据库 2          下载Sqljdbc4.jar包2.1         地址:​​http://mvnrepository......
  • 仲裁服务器的作用,仲裁:见证服务器如何影响数据库可用性
    每当为数据库镜像会话设置见证服务器时,都需要“仲裁”。仲裁是数据库镜像会话中两个或多个服务器实例彼此连接时存在的一种关系。仲裁通常包括三个互连的服务器实例。设......
  • [转]MySQL数据类型详解
    原文地址:https://www.cnblogs.com/lteal/archive/2013/03/04/2943061.htmlMySQL数据类型,可以被分为3类:数值类型、日期和时间类型以及字符串(字符)类型 方括号(“[”和......