首页 > 其他分享 >[实时计算flink]数据摄入YAML作业快速入门

[实时计算flink]数据摄入YAML作业快速入门

时间:2024-10-21 10:20:08浏览次数:8  
标签:02 56 15 入门 flink YAML 2023 MySQL order

实时计算Flink版基于Flink CDC,通过开发YAML作业的方式有效地实现了将数据从源端同步到目标端的数据摄入工作。本文介绍如何快速构建一个YAML作业将MySQL库中的所有数据同步到StarRocks中。

前提条件

背景信息

假设MySQL实例中有一个order_dw_mysql库,里面有名称为orders、orders_pay和product_catalog的3张业务表。此时,如果您希望开发一个数据摄入YAML作业,将这些表和数据都同步到StarRocks的order_dw_sr数据库中,则可以按照以下步骤进行:

  1. 步骤一:准备RDS MySQL测试数据

  2. 步骤二:开发数据摄入YAML作业

  3. 步骤三:启动数据摄入YAML作业

  4. 步骤四:在StarRocks上查看同步结果

步骤一:准备RDS MySQL测试数据

  1. 创建数据库和账号。

    为目标实例创建名称为order_dw_mysql数据库和具有对应数据库读写权限的普通账号。具体操作请参见创建数据库和账号管理数据库

  2. 通过DMS登录RDS MySQL。

    详情请参见通过DMS登录RDS MySQL

  3. 在已登录的SQL Console窗口,输入如下命令后单击执行,创建数据库和三张业务表,并插入数据。

    CREATE TABLE `orders` (
      order_id bigint not null primary key,
      user_id varchar(50) not null,
      shop_id bigint not null,
      product_id bigint not null,
      buy_fee numeric(20,2) not null,   
      create_time timestamp not null,
      update_time timestamp not null default now(),
      state int not null 
    );
    
    
    CREATE TABLE `orders_pay` (
      pay_id bigint not null primary key,
      order_id bigint not null,
      pay_platform int not null, 
      create_time timestamp not null
    );
    
    
    CREATE TABLE `product_catalog` (
      product_id bigint not null primary key,
      catalog_name varchar(50) not null
    );
    
    -- 准备数据
    INSERT INTO product_catalog VALUES(1, 'phone_aaa'),(2, 'phone_bbb'),(3, 'phone_ccc'),(4, 'phone_ddd'),(5, 'phone_eee');
    
    INSERT INTO orders VALUES
    (100001, 'user_001', 12345, 1, 5000.05, '2023-02-15 16:40:56', '2023-02-15 18:42:56', 1),
    (100002, 'user_002', 12346, 2, 4000.04, '2023-02-15 15:40:56', '2023-02-15 18:42:56', 1),
    (100003, 'user_003', 12347, 3, 3000.03, '2023-02-15 14:40:56', '2023-02-15 18:42:56', 1),
    (100004, 'user_001', 12347, 4, 2000.02, '2023-02-15 13:40:56', '2023-02-15 18:42:56', 1),
    (100005, 'user_002', 12348, 5, 1000.01, '2023-02-15 12:40:56', '2023-02-15 18:42:56', 1),
    (100006, 'user_001', 12348, 1, 1000.01, '2023-02-15 11:40:56', '2023-02-15 18:42:56', 1),
    (100007, 'user_003', 12347, 4, 2000.02, '2023-02-15 10:40:56', '2023-02-15 18:42:56', 1);
    
    INSERT INTO orders_pay VALUES
    (2001, 100001, 1, '2023-02-15 17:40:56'),
    (2002, 100002, 1, '2023-02-15 17:40:56'),
    (2003, 100003, 0, '2023-02-15 17:40:56'),
    (2004, 100004, 0, '2023-02-15 17:40:56'),
    (2005, 100005, 0, '2023-02-15 18:40:56'),
    (2006, 100006, 0, '2023-02-15 18:40:56'),
    (2007, 100007, 0, '2023-02-15 18:40:56');

步骤二:开发数据摄入YAML作业

  1. 登录实时计算管理控制台

  2. 在左侧导航栏选择数据开发 > 数据摄入

  3. 单击新建,选择MySQL到Starrocks数据同步,单击下一步

  4. 填写作业名称存储位置和选择引擎版本后,单击确定

  5. 配置YAML作业代码信息。

    将MySQL中order_dw_mysql数据库下的所有表同步到starrocks的order_dw_sr数据库中,代码示例如下。

    source:
      type: mysql
      hostname: rm-bp1rk934iidc3****.mysql.rds.aliyuncs.com
      port: 3306
      username: ${secret_values.mysqlusername}
      password: ${secret_values.mysqlpassword}
      tables: order_dw_mysql.\.*
      server-id: 5405-5415
    
    sink:
      type: starrocks
      name: StarRocks Sink
      jdbc-url: jdbc:mysql://fe-c-b76b6aa51807****-internal.starrocks.aliyuncs.com:9030
      load-url: fe-c-b76b6aa51807****-internal.starrocks.aliyuncs.com:8030
      username: ${secret_values.starrocksusername}
      password: ${secret_values.starrockspassword}
      table.create.properties.replication_num: 1
      
    route:
      - source-table: order_dw_mysql.\.*
        sink-table: order_dw_sr.<>
        replace-symbol: <>
        description: route all tables in source_db to sink_db
    
    pipeline:
      name: Sync MySQL Database to StarRocks

    关于MySQL和Starrocks的本示例需要的配置信息说明如下表所示,数据摄入更多参数详情请参见MySQLStarRocks

    类别

    参数

    说明

    示例值

    source

    hostname

    MySQL数据库的IP地址或者Hostname。

    建议填写专有网络VPC地址。

    rm-bp1rk934iidc3****.mysql.rds.aliyuncs.com

    port

    MySQL数据库服务的端口号。

    3306

    username

    MySQL数据库服务的用户名和密码。填写您步骤一:准备RDS MySQL测试数据中创建的账号和密码信息。

    说明

    本示例使用变量,可以避免明文展示密码等信息,详情请参见变量管理

    ${secret_values.mysqlusername}

    password

    ${secret_values.mysqlpassword}

    tables

    MySQL表名。支持正则表达式以读取多个表的数据。

    本文将同步order_dw_mysql数据库所有表及数据。

    order_dw_mysql.\.*

    server-id

    数据库客户端的一个数字ID。

    5405-5415

    sink

    jdbc-url

    JDBC连接的URL。

    指定FE(Front End)的IP和查询端口,格式为jdbc:mysql://ip:port

    您可以在E-MapReduce控制台实例详情页签,查看目标实例的FE内网地址查询端口

     jdbc:mysql://fe-c-b76b6aa51807****-internal.starrocks.aliyuncs.com:9030

    load-url

    连接到FE节点的HTTP服务URL。

    您可以在E-MapReduce控制台实例详情页签,查看目标实例的FE内网地址HTTP端口

    fe-c-b76b6aa51807****-internal.starrocks.aliyuncs.com:8030

    username

    StarRocks连接用户名和密码。

    此处需要填写为您开通StarRocks时填写的用户名和密码信息。

    说明

    本示例使用变量,可以避免明文展示密码等信息,详情请参见变量管理

    ${secret_values.starrocksusername}

    password

    ${secret_values.starrockspassword}

    route

    source-table

    指定生效上游表。

    order_dw_mysql.\.*

    sink-table

    指定数据路由的目标位置。

    order_dw_sr.<>

    replace-symbol

    在使用模式匹配功能时,用于指代上游表名的字符串。

    <>

  6. 单击部署

步骤三:启动数据摄入YAML作业

  1. 数据摄入页面,单击部署后,在弹出的对话框中,单击确定

  2. 运维中心 > 作业运维页面,单击目标YAML作业操作中的启动

  3. 单击启动

    本示例选择为无状态启动,参数配置详情请参见作业启动。作业启动后,您可以在作业运维页面观察作业的运行信息和状态。

步骤四:在StarRocks上查看同步结果

当YAML作业处于运行中后,您就可以在StarRocks查看数据同步情况。

  1. 通过EMR StarRocks Manager连接StarRocks实例

  2. 在左侧导航栏,单击SQL Editor,在数据库页签,单击

    image

    按钮。

    您会看到default_catalog下出现名称为order_dw_sr的数据库。

  3. 查询列表页签,单击+文件,新建查询脚本后,输入以下SQL语句,单击运行

    SELECT * FROM default_catalog.order_dw_sr.orders order by order_id;
    SELECT * FROM default_catalog.order_dw_sr.orders_pay order by pay_id;
    SELECT * FROM default_catalog.order_dw_sr.product_catalog order by product_id;
  4. 在命令下方查看同步结果。

    您会看到StarRocks中已存在和MySQL数据库中相同名称的表及数据。

    image

标签:02,56,15,入门,flink,YAML,2023,MySQL,order
From: https://blog.csdn.net/segwy/article/details/142655269

相关文章

  • Maven入门到进阶:构建、依赖与插件管理详解
    文章目录一、Maven介绍1、什么是Maven2、Maven的核心功能二、Maven核心概念1、坐标GAVP1.1、GroupId1.2、ArtifactId1.3、Version1.3.1、版本号的组成1.4、Packaging2、POM、父POM和超级POM2.1、POM(ProjectObjectModel)2.1、父POM(ParentPOM)2.3、超级POM(SuperPOM......
  • 如何用3个月零基础入门网络安全?_网络安全零基础怎么学习
    ......
  • 【学术论文投稿】单片机原理与应用详解:从入门到进阶
     【会后3-4个月检索|IEEE出版】第五届人工智能与计算机工程国际学术会议(ICAICE2024)_艾思科蓝_学术一站式服务平台更多学术会议论文投稿链接:https://ais.cn/u/nuyAF3目录引言一、单片机概述二、单片机原理三、单片机开发环境四、单片机应用实例五、单片机进阶应用......
  • stm32入门教程--TIM编码器接口
    TIM编码器接口是一种重要的硬件接口,主要用于接收增量(正交)编码器的信号。以下是对TIM编码器接口的详细介绍:一、功能与作用TIM编码器接口可以接收由编码器旋转产生的正交信号脉冲。这些信号脉冲被用来自动控制一个计数器(CNT)的自增或自减,从而能够指示编码器的位置、旋转方向和......
  • 【蓝桥杯】C++ 第20场 小白入门赛
    一、四个亲戚题目四个亲戚 题目分析字面意思:Daiyu+‘kind’代码#include<iostream>usingnamespacestd;intmain(){cout<<"Daiyu'kind'";return0;}二、黛玉泡茶题目黛玉泡茶 题目分析1.我们可以c2.然后c3.计算c,如果不能,整除后的答案还要加1 ......
  • 新手必看详细搭建网站全流程教程从零开始快速入门
    1.确定网站需求网站类型:静态网站、动态网站(如博客、电商网站)功能需求:基本展示、用户注册、支付功能等预计访问量:低流量、中流量、高流量2.购买云服务器选择云服务提供商:阿里云、腾讯云、AWS等选择服务器配置:CPU:1核或2核内存:1GB或2GB存储:20GB或50GB带宽:1Mbps或5Mbp......
  • Elasticsearch快速入门
    Elasticsearch是一个分布式、RESTful风格的搜索和数据分析引擎,同时是可扩展的数据存储和矢量数据库,能够应对日益增多的各种用例。作为ElasticStack的核心,Elasticsearch能够集中存储您的数据,实现闪电般的搜索速度、精细的相关性调整以及强大的分析能力,并且能够轻松地进行......
  • Prometheus快速入门
    什么是Prometheus?Prometheus是一个开源的系统监控和告警工具包,最初由SoundCloud构建。自2012年创建以来,许多公司和组织已经采用了Prometheus,该项目拥有非常活跃的开发人员和用户社区。现在它是一个独立的开源项目,独立于任何公司维护。为了强调这一点,并澄清项目的治理结构,Prome......
  • 分布式缓存的基本概念入门以及如何保证数据一致性
    一、分布式缓存基本概念和常见技术框架JavaWeb中的分布式缓存是指在多台服务器之间共享缓存数据的技术。在分布式系统中,单个应用实例通常不会运行在一个单一的服务器上,而是部署在多个节点上以实现负载均衡和高可用性。为了在这些节点之间共享数据,就需要使用分布式缓存技术......
  • Atcoder Library 配置入门
    配置首先,你需要在这个blog里面下载AtcoderLibrary的压缩包。可以发现里面有三堆东西,一个python程序,两种语言的document,还有一个库文件夹。把库文件夹直接拖到你的编译器库文件相同目录下。Mingw的路径应该都是\lib\gcc\x86_64-w64-mingw32\8.1.0\include\c++,如果不是......