首页 > 其他分享 >flink 的安装以及fink-cdc 基于多数据源导入的es 的简单使用

flink 的安装以及fink-cdc 基于多数据源导入的es 的简单使用

时间:2023-12-27 12:11:35浏览次数:48  
标签:name cdc orders 数据源 flink id default order

此文档是参照flink-cdc 文档( https://ververica.github.io/flink-cdc-connectors/master/content/快速上手/mysql-postgres-tutorial-zh.html) 案例

 的最佳实践

1.下载flink release 最新版本1.18.0 并解压,

 https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-elasticsearch7/3.0.1-1.17/flink-sql-connector-elasticsearch7-3.0.1-1.17.jar 下载es flink-cdc 驱动包

 

git clone github上面  flink-cdc master 分支并编译

 

mvn clean install -DskipTests 执行命令进行编译会生成jar 包如下

 

 

 复制jar 包到flink lib 目录

 

启动flink 主程序  bin/start-cluster.sh 

 

访问 http://localhost:8081/#/job-manager/logs  flink webui 界面 

0

需要注意的一点是 确保加入jar包到lib 或者修改配置时,确保stop-cluster 停止成功, 之前碰到执行了stop-cluster.sh  web ui 8080 还能访问的情况 

按照文档编写docker-compose 文件

version: '2.1'
services:
  postgres:
    image: debezium/example-postgres:1.1
    ports:
      - "5432:5432"
    environment:
      - POSTGRES_DB=postgres
      - POSTGRES_USER=postgres
      - POSTGRES_PASSWORD=postgres
  mysql:
    image: debezium/example-mysql:1.1
    ports:
      - "3306:3306"
    environment:
      - MYSQL_ROOT_PASSWORD=123456
      - MYSQL_USER=mysqluser
      - MYSQL_PASSWORD=mysqlpw
  elasticsearch:
    image: elastic/elasticsearch:7.6.0
    environment:
      - cluster.name=docker-cluster
      - bootstrap.memory_lock=true
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
      - discovery.type=single-node
    ports:
      - "9200:9200"
      - "9300:9300"
    ulimits:
      memlock:
        soft: -1
        hard: -1
      nofile:
        soft: 65536
        hard: 65536
  kibana:
    image: elastic/kibana:7.6.0
    ports:
      - "5601:5601"

  启动 docker 容器 

    docker-compose  up  -d

   查看启动情况

 说明启动成功

准备mysql 数据

进入MySQL 容器

docker-compose exec mysql mysql -uroot -p123456

 

  1. 创建数据库和表 productsorders,并插入数据(参照原文档案例)

    -- MySQL
    CREATE DATABASE mydb;
    USE mydb;
    CREATE TABLE products (
      id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
      name VARCHAR(255) NOT NULL,
      description VARCHAR(512)
    );
    ALTER TABLE products AUTO_INCREMENT = 101;
    
    INSERT INTO products
    VALUES (default,"scooter","Small 2-wheel scooter"),
           (default,"car battery","12V car battery"),
           (default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"),
           (default,"hammer","12oz carpenter's hammer"),
           (default,"hammer","14oz carpenter's hammer"),
           (default,"hammer","16oz carpenter's hammer"),
           (default,"rocks","box of assorted rocks"),
           (default,"jacket","water resistent black wind breaker"),
           (default,"spare tire","24 inch spare tire");
    
    CREATE TABLE orders (
      order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
      order_date DATETIME NOT NULL,
      customer_name VARCHAR(255) NOT NULL,
      price DECIMAL(10, 5) NOT NULL,
      product_id INTEGER NOT NULL,
      order_status BOOLEAN NOT NULL -- Whether order has been placed
    ) AUTO_INCREMENT = 10001;
    
    INSERT INTO orders
    VALUES (default, '2020-07-30 10:08:22', 'Jark', 50.50, 102, false),
           (default, '2020-07-30 10:11:09', 'Sally', 15.00, 105, false),
           (default, '2020-07-30 12:00:30', 'Edward', 25.25, 106, false);

    在 Postgres 数据库中准备数据

    1. 进入 Postgres 容器

      docker-compose exec postgres psql -h localhost -U postgres

       

      1. 创建表 shipments,并插入数据

        -- PG
        CREATE TABLE shipments (
          shipment_id SERIAL NOT NULL PRIMARY KEY,
          order_id SERIAL NOT NULL,
          origin VARCHAR(255) NOT NULL,
          destination VARCHAR(255) NOT NULL,
          is_arrived BOOLEAN NOT NULL
        );
        ALTER SEQUENCE public.shipments_shipment_id_seq RESTART WITH 1001;
        ALTER TABLE public.shipments REPLICA IDENTITY FULL;
        INSERT INTO shipments
        VALUES (default,10001,'Beijing','Shanghai',false),
               (default,10002,'Hangzhou','Shanghai',false),
               (default,10003,'Shanghai','Hangzhou',false);
       


    cd 到flink 主目录

     启动flink-sql-client

     

     

    在 Flink SQL CLI 中使用 Flink DDL 创建表

    首先,开启 checkpoint,每隔3秒做一次 checkpoint

    -- Flink SQL                   
    Flink SQL> SET execution.checkpointing.interval = 3s;
    

    然后, 对于数据库中的表 productsordersshipments, 使用 Flink SQL CLI 创建对应的表,用于同步这些底层数据库表的数据

    -- Flink SQL
    Flink SQL> CREATE TABLE products (
        id INT,
        name STRING,
        description STRING,
        PRIMARY KEY (id) NOT ENFORCED
      ) WITH (
        'connector' = 'mysql-cdc',
        'hostname' = 'localhost',
        'port' = '3306',
        'username' = 'root',
        'password' = '123456',
        'database-name' = 'mydb',
        'table-name' = 'products'
      );
    
    Flink SQL> CREATE TABLE orders (
       order_id INT,
       order_date TIMESTAMP(0),
       customer_name STRING,
       price DECIMAL(10, 5),
       product_id INT,
       order_status BOOLEAN,
       PRIMARY KEY (order_id) NOT ENFORCED
     ) WITH (
       'connector' = 'mysql-cdc',
       'hostname' = 'localhost',
       'port' = '3306',
       'username' = 'root',
       'password' = '123456',
       'database-name' = 'mydb',
       'table-name' = 'orders'
     );
    
    Flink SQL> CREATE TABLE shipments (
       shipment_id INT,
       order_id INT,
       origin STRING,
       destination STRING,
       is_arrived BOOLEAN,
       PRIMARY KEY (shipment_id) NOT ENFORCED
     ) WITH (
       'connector' = 'postgres-cdc',
       'hostname' = 'localhost',
       'port' = '5432',
       'username' = 'postgres',
       'password' = 'postgres',
       'database-name' = 'postgres',
       'schema-name' = 'public',
       'table-name' = 'shipments',
       'slot.name' = 'flink'
     );
    

    最后,创建 enriched_orders 表, 用来将关联后的订单数据写入 Elasticsearch 中

    -- Flink SQL
    Flink SQL> CREATE TABLE enriched_orders (
       order_id INT,
       order_date TIMESTAMP(0),
       customer_name STRING,
       price DECIMAL(10, 5),
       product_id INT,
       order_status BOOLEAN,
       product_name STRING,
       product_description STRING,
       shipment_id INT,
       origin STRING,
       destination STRING,
       is_arrived BOOLEAN,
       PRIMARY KEY (order_id) NOT ENFORCED
     ) WITH (
         'connector' = 'elasticsearch-7',
         'hosts' = 'http://localhost:9200',
         'index' = 'enriched_orders'
     );
    

    关联订单数据并且将其写入 Elasticsearch 中

    使用 Flink SQL 将订单表 order 与 商品表 products,物流信息表 shipments 关联,并将关联后的订单信息写入 Elasticsearch 中

    -- Flink SQL
    Flink SQL> INSERT INTO enriched_orders
     SELECT o.*, p.name, p.description, s.shipment_id, s.origin, s.destination, s.is_arrived
     FROM orders AS o
     LEFT JOIN products AS p ON o.product_id = p.id
     LEFT JOIN shipments AS s ON o.order_id = s.order_id;



    这里执行
    INSERT INTO enriched_orders
     SELECT o.*, p.name, p.description, s.shipment_id, s.origin, s.destination, s.is_arrived
     FROM orders AS o
     LEFT JOIN products AS p ON o.product_id = p.id
     LEFT JOIN shipments AS s ON o.order_id = s.order_id; 时碰到的问题,

     

    任务提交成功, web ui 查询报错信息, 一直报时区不正确, 进入到mysql SET GLOBAL time_zone ='Asia/Shanghai';设置时区

     再次执行 上面insert sql 报什么资源不可用

    https://www.cnblogs.com/javasl/p/16861356.html

     

     重启flink  bin/start-cluster.sh

    启动 bin/sql-client.sh

    再次执行 

    INSERT INTO enriched_orders
     SELECT o.*, p.name, p.description, s.shipment_id, s.origin, s.destination, s.is_arrived
     FROM orders AS o
     LEFT JOIN products AS p ON o.product_id = p.id
     LEFT JOIN shipments AS s ON o.order_id = s.order_id;



     

    再次看web ui  界面 没有报错信息了,说明job run 成功  此job 会处于一直run 状态

     

     

    在mysql 和 psql 客户端 分别执行sql 看到数据同时同步到了es(实现原理是基于监听数据库binglog日志改动,执行日志重放 )

     

     

     

     



标签:name,cdc,orders,数据源,flink,id,default,order
From: https://www.cnblogs.com/New-beginning/p/17930285.html

相关文章

  • Flink计算TopN
    在ApacheFlink中实现高效的TopN数据处理,尤其是涉及时间窗口和多条件排序时,需要精细地控制数据流和状态管理。普通计算TopN:1.定义数据源(Source)首先,我们需要定义数据源。这可能是Kafka流、文件、数据库或任何其他支持的数据源。valstream:DataStream[YourType]=en......
  • 【Flink从入门到精通 05】Source&Sink
    【Flink从入门到精通05】Source&SinkFlink用于处理有状态的流式计算,需要对Source端的数据进行加工处理,然后写入到Sink端,下图展示了在Flink中数据所经历的过程,今天就根据这张图分别给大家分享下。01EnvironmentFlink所有的程序都从这一步开始,只有创建了执行环境,才能开......
  • Java版Flink(一)概述和入门案例
    一、概述1、Flink是什么ApacheFlinkisaframeworkanddistributedprocessingengineforstatefulcomputationsoverunboundedandboundeddatastreams.ApacheFlink是一个框架和分布式处理引擎,用于对无界和有界数据流进行状态计算。官网地址2、Flink特点......
  • 【flink番外篇】6、flink的WaterMark(介绍、基本使用、kafka的水印以及超出最大允许延
    Flink系列文章一、Flink专栏Flink专栏系统介绍某一知识点,并辅以具体的示例进行说明。1、Flink部署系列本部分介绍Flink的部署、配置相关基础内容。2、Flink基础系列本部分介绍Flink的基础部分,比如术语、架构、编程模型、编程指南、基本的datastreamapi用法、四大基......
  • 【flink番外篇】5、flink的window(介绍、分类、函数及Tumbling、Sliding、session窗口
    Flink系列文章一、Flink专栏Flink专栏系统介绍某一知识点,并辅以具体的示例进行说明。1、Flink部署系列本部分介绍Flink的部署、配置相关基础内容。2、Flink基础系列本部分介绍Flink的基础部分,比如术语、架构、编程模型、编程指南、基本的datastreamapi用法、四大基......
  • SpringBoot 这么实现动态数据源切换,就很丝滑!
    大家好,我是小富~简介项目开发中经常会遇到多数据源同时使用的场景,比如冷热数据的查询等情况,我们可以使用类似现成的工具包来解决问题,但在多数据源的使用中通常伴随着定制化的业务,所以一般的公司还是会自行实现多数据源切换的功能,接下来一起使用实现自定义注解的形式来实现一下。......
  • 【flink番外篇】4、flink的sink(内置、mysql、kafka、redis、clickhouse、分布式缓存、
    文章目录Flink系列文章一、maven依赖二、Jdbc/mysql示例1、maven依赖2、实现1)、userbean2)、内部匿名类实现3)、lambda实现4)、普通继承RichSinkFunction实现5)、完整代码3、验证本文介绍了Flink将数据sink到mysql中,其实是通过jdbc来将数据sink到rmdb中,mysql是一个常见的数据库,故......
  • Linux下,安装单机版Flink
    安装前准备jdk环境开始安装下载安装包地址1:https://archive.apache.org/dist/flink/flink-1.18.0/flink-1.18.0-bin-scala_2.12.tgz官方:https://dlcdn.apache.org/flink/flink-1.18.0/flink-1.18.0-bin-scala_2.12.tgz清华镜像:https://mirrors.tuna.tsinghua.edu.cn/apac......
  • 【资源汇总】TiDB-TiCDC 源码解读系列最全资源!!!
    作者:Billmay表妹TiCDC是什么?TiCDC(TiDBChangeDataCapture)是用来捕捉和输出TiDB/TiKV集群上数据变更的一个工具。它既可以作为TiDB增量数据同步工具,将TiDB集群的增量数据同步至下游数据库,也提供开放数据协议,支持把数据发布到第三方系统。还记得在社区的唠嗑茶话会中询问......
  • Flink CDC 3.0 正式发布,详细解读新一代实时数据集成框架
    一、FlinkCDC概述FlinkCDC是基于数据库日志CDC(ChangeDataCapture)技术的实时数据集成框架,支持了全增量一体化、无锁读取、并行读取、表结构变更自动同步、分布式架构等高级特性。配合Flink优秀的管道能力和丰富的上下游生态,FlinkCDC可以高效实现海量数据的实时集成。Flin......