首页 > 数据库 >flink监控数据库表

flink监控数据库表

时间:2024-05-14 11:29:52浏览次数:26  
标签:cdc 数据库 flink 监控 import apache org com

背景

在日常服务运行中可能会遇到很多数据上的问题,一些我们可以通过日志查询,但是一些修改等操作日志无法查询到,binlog日志不方便查询而且不是所有表都需要日志,增加了查询的难度,我们考虑使用canal或者flink对binlog进行记录,这里flink,flink程序和客户端版本1.17.1

pom.xml


<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

    <artifactId>xhtx-platform-data</artifactId>
    <groupId>flink</groupId>
    <version>1.0-SNAPSHOT</version>
    <modelVersion>4.0.0</modelVersion>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>5.8.26</version>
        </dependency>
        <!-- flink -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.17.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>1.17.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>1.17.1</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.83</version>
        </dependency>
        <!-- flink -->

        <!-- flink cdc -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.30</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_2.12</artifactId>
            <version>1.17.1</version>
        </dependency>
        <dependency>
            <groupId>com..ververica</groupId>
            <artifactId>flink-connector-mysql-cdc</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.flink</groupId>
                    <artifactId>flink-shaded-guava</artifactId>
                </exclusion>
            </exclusions>
            <version>2.3.0</version>
        </dependency>

        <dependency>
            <groupId>org.mybatis</groupId>
            <artifactId>mybatis-spring</artifactId>
            <version>3.0.2</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-collections4</artifactId>
            <version>4.3</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.2.4</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers combine.children="append">
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer">
                                </transformer>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>com.flink.cdc.FlinkCDCMySQL</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

FlinkCDCMySQL


package com.flink.cdc;

import com.ververica.cdc.connectors.mysql.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.DebeziumSourceFunction;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Properties;

public class FlinkCDCMySQL {

    private static final Logger LOGGER = LoggerFactory.getLogger(FlinkCDCMySQL.class);

    static String[] tables = {
            "库名.表名"
    };

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.enableCheckpointing(3000, CheckpointingMode.EXACTLY_ONCE);

        DebeziumSourceFunction<String> build = MySqlSource.<String>builder()
                .hostname("hostName")
                .port(3306)
                // set captured database
                .databaseList("库名")
                // 如果不添加该参数,则消费指定数据库中的所有表
                .tableList(tables)
                .username("username")
                .password("password")
                /**initial:初始化快照,即全量导入后增量导入(检测更新数据写入)
                 * latest:只进行增量导入(不读取历史变化)
                 * timestamp:指定时间戳进行数据导入(大于等于指定时间错读取数据)
                 */
                .startupOptions(StartupOptions.latest())
                .deserializer(new JsonDebeziumDeserializationSchema())
                .serverTimeZone("Asia/Shanghai")
                .build();
        DataStreamSource<String> stringDataStreamSource = env.addSource(build);
        stringDataStreamSource.setParallelism(1);

        stringDataStreamSource.print();
        LOGGER.info("监控+++++++++++++");
        DataStreamSink<String> stringDataStreamSink = stringDataStreamSource.addSink(new LotteryCollectSink<>());
        env.execute("flinkcdcmysql");
    }
}

算子LotteryCollectSink,我这里只记录没有使用kafka,使用队列的话配置相关算子即可


public class LotteryCollectSink<T> extends RichSinkFunction<String> {
    private static final Logger LOGGER = LoggerFactory.getLogger(LotteryCollectSink.class);

    @Override
    public void invoke(String value, Context context) {
        Action action = JSONUtil.toBean(value, Action.class);
        try {
            if (null != action && !action.getOp().equals("r")) {
                TableChangeHandleStrategy tableChangeHandleStrategy = new ActivityNewChangeHandle();
//记录逻辑
                tableChangeHandleStrategy.handle(action);
            }
        } catch (Exception e) {
            LOGGER.error("invoke错误:"+e);
            return;
        }
    }
}

标签:cdc,数据库,flink,监控,import,apache,org,com
From: https://www.cnblogs.com/shi-hao/p/18190977

相关文章

  • Flink同步mysql到iceberg
    一、如何做一致性保障1、全量数据分片读取,增量数据单并发读取,保证增量阶段不会乱序2、全量阶段写入失败会清空表后重新写入,避免重复数据。3、全量阶段多task并行读取,把每个task开始结束时间提交给FlinkCoordinator,由Coordinator做时间合并后,仅读取一次全量同步区间内变化的binlo......
  • 危化工厂安全监管新篇章:智能化视频监控方案的探索与实践
    一、背景需求分析危化品因其特殊的物理和化学性质,一旦发生事故,往往会造成严重的人员伤亡和财产损失。传统的监管手段往往依赖于人工巡查和定期检测,然而这种方式不仅效率低下,而且难以全面覆盖和实时监控。因此,借助现代信息技术,特别是视频智能监控技术,构建一套高效、智能的危化工厂......
  • prometheus+grafana 使用blackbox_exporte监控站点url
    1.1下载blackbox_exporte插件在https://github.com/prometheus/blackbox_exporter 上下载blackbox_exporte对应版本安装包,并上传到prometheus服务器 /usr/local 目录1.2安装blackbox_exporte[root@rancherlocal]#tarxvfblackbox_exporter-0.25.0.linux-amd64.tar.g......
  • mongodb数据库:手动释放磁盘空间
    平台:阿里云mongoDB云数据库版本:Mongodb4.2数据库集群方案:一主二从三分片需求:手动释放过剩磁盘空间从文档解析可知:delete数据或者做分片数据迁移,并不会释放磁盘空间,而是将这些空间标记为reuse可重用状态,后续新写入的数据会重用这部分空间。需求是手动释放这些空间,使用compact......
  • flink sql
    【案例1】Flink01_Table_BaseUsepublicclassFlink01_Table_BaseUse{publicstaticvoidmain(String[]args)throwsException{StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);......
  • Etcd开启grpc请求耗时监控
    针对各种类型grpc请求耗时统计场景,etcd提供监控数据。#etcd容器增加环境变量env:-name:ETCD_METRICSvalue:extensivecurl-shttp://127.0.0.1:2381/metrics|grepgrpc_server_handling_seconds_bucketgrpc_method包含了Alarm/AuthDisable/AuthEnable/Authenticate/......
  • HarmonyOS 使用关系型数据库进行增删改查
    HarmonyOS中的关系型数据库基于SQLite组件,提供了一套完整的对本地数据库进行管理的机制。它支持事务、索引、视图、触发器、外键、参数化查询和预编译SQL语句等特性。关系型数据库适用于存储包含复杂关系数据的场景,例如学生信息或雇员信息,这些数据之间有较强的对应关系。操......
  • 随笔-调试-数据库常用命令
    目录Mysql命令登录命令在命令行直接执行命令查看表数据/修改表数据/清空数据查看表结构/增加列/删除列/删除表wireshark抓mysql的报文Redis命令登录并执行命令/查看数据批量删除数据sqlite命令Mysql命令登录命令mysql-uroot-p'root'-h127.0.0.1-P3360xxxDb在命令行直接......
  • 备份恢复数据库
    #备份数据并且恢复数据#备份数据,要在DOS下执行mysqldump指令,这个指令其实在mysql安装目录\bin#这个备份的文件,就是对应的sql语句mysqldump-uroot-p-Bhsp_db02hsp_db03>d:\\bak.sqlDROPDATABASEhsp_db03;#恢复数据(注意:在DOS下进去mysql在执行)sourced:\\bak.sql#......
  • 数据库基础
    1.什么是数据库是存储数据的电子仓库2.数据库分类2.1关系型数据库定义:数据库中表与表存在某种关系,数据存储在不同的表中db2mysqloraclesqlserver2.2非关系型数据库定义:通常数据是以对象的形式存储在数据库中mongoredis3.linux添加卸载MySQLyuminstallmysql删除......