首页 > 其他分享 >StarRocks flink 同步工具 smt 使用

StarRocks flink 同步工具 smt 使用

时间:2022-09-21 18:56:00浏览次数:165  
标签:StarRocks smt flink xx starrocks mysql test table

功能简介

StarRocks 提供 Flink CDC connector、flink-connector-starrocks 和 StarRocks-migrate-tools(简称smt),实现 MySQL 数据实时同步至 StarRocks,满足业务实时场景的数据分析。

  • smt 实际上是个读 mysql 生成 flink cdc 脚本、starrocks 表、starrocks mysql 外表的工具

基本原理

通过 Flink CDC connector、flink-connector-starrocks 和 smt 可以实现 MySQL 数据的秒级同步至StarRocks。

76.png

如图所示,Smt 可以根据 MySQL 和 StarRocks 的集群信息和表结构自动生成 source table 和 sink table 的建表语句。
通过 Flink-cdc-connector 消费 MySQL 的 binlog,然后通过 Flink-connector-starrocks 写入 StarRocks。

操作步骤

  • 忽略 mysql binlog 和 flink 相关配置
  1. 下载并解压 smt.tar.gz https://www.starrocks.com/zh-CN/download/community
venn@venn smt % ls
README.md conf result starrocks-migrate-tool

  1. 修改 conf/config_prod.conf

venn@venn smt % cat conf/config_prod.conf
[db]
host = ip # mysql 配置
port = 3306
user = user
password = pass
# currently available types: `mysql`, `pgsql`, `oracle`, `hive`, `clickhouse`, `sqlserver`, `tidb`
type = mysql
# # only takes effect on `type == hive`.
# # Available values: kerberos, none, nosasl, kerberos_http, none_http, zk, ldap
# authentication = kerberos

[other]
# number of backends in StarRocks
be_num = 3
# `decimal_v3` is supported since StarRocks-1.8.1
use_decimal_v3 = true
# directory to save the converted DDL SQL
output_dir = ./result # 数据文件路径


# !!!`database` `table` `schema` are case sensitive in `oracle`!!!
[table-rule.1]
# pattern to match databases for setting properties
# !!! database should be a `whole instance(or pdb) name` but not a regex when it comes with an `oracle db` !!!
database = db_name # 数据库名字,完整名字
# pattern to match tables for setting properties
table = ^table$ # 表名正则
# `schema` only takes effect on `postgresql` and `oracle` and `sqlserver`
schema = ^public$

############################################
### starrocks table configurations
############################################
# # set a column as the partition_key
partition_key = id # 指定 starrocks 表的主键
# # override the auto-generated partitions
# partitions = START ("2021-01-02") END ("2021-01-04") EVERY (INTERVAL 1 day)
# # only take effect on tables without primary keys or unique indexes
duplicate_keys=id # 指定 starrocks 表的重复键
# # override the auto-generated distributed keys
distributed_by=id # 指定 starrocks 表的分桶键
# # override the auto-generated distributed buckets
bucket_num=8 # 桶数量
# # properties.xxxxx: properties used to create tables
properties.in_memory = false

############################################
### flink sink configurations
### DO NOT set `connector`, `table-name`, `database-name`, they are auto-generated
############################################ flink-connector-starrocks sink 表配置
flink.starrocks.jdbc-url=jdbc:mysql://ip:19030
flink.starrocks.load-url=ip:18030
flink.starrocks.username=root
flink.starrocks.password=123456
flink.starrocks.sink.max-retries=10
flink.starrocks.sink.buffer-flush.interval-ms=1500
flink.starrocks.sink.properties.format=json
flink.starrocks.sink.properties.strip_outer_array=false
# # used to set the server-id for mysql-cdc jobs instead of using a random server-id
# flink.cdc.server-id = 5000

############################################
### flink-cdc configuration for `tidb`
############################################
# # Only takes effect on TiDB before v4.0.0.
# # TiKV cluster's PD address.
# flink.cdc.pd-addresses = 127.0.0.1:2379

############################################
### flink-cdc plugin configuration for `postgresql`
############################################
# # for `9.*` decoderbufs, wal2json, wal2json_rds, wal2json_streaming, wal2json_rds_streaming
# # refer to https://ververica.github.io/flink-cdc-connectors/master/content/connectors/postgres-cdc.html
# # and https://debezium.io/documentation/reference/postgres-plugins.html
# flink.cdc.decoding.plugin.name = decoderbufs%

执行

[starrocks@dcmp12 smt]$ ./starrocks-migrate-tool

2022/08/15 19:36:46 source/mysql.go:108 SLOW SQL >= 200ms
[3785.337ms] [rows:2294] SELECT * FROM `information_schema`.`tables` WHERE TABLE_TYPE='BASE TABLE' ORDER BY TABLE_SCHEMA asc, TABLE_NAME asc

2022/08/15 19:36:48 source/mysql.go:116 SLOW SQL >= 200ms
[1853.744ms] [rows:35344] SELECT * FROM `information_schema`.`columns` ORDER BY ORDINAL_POSITION asc

2022/08/15 19:36:48 source/mysql.go:124 SLOW SQL >= 200ms
[216.231ms] [rows:2763] SELECT * FROM `information_schema`.`key_column_usage`
Successfully got tables from the source database. Converting them to StarRocks DDL...
Writing starrocks ddl reults...
Done writing to: /opt/smt/result
Writing starrocks ddl reults...
Done writing to: /opt/smt/result
Writing starrocks ddl reults...
Done writing to: /opt/smt/result
[starrocks@dcmp12 smt]$ ls
conf/ README.md result/ starrocks-migrate-tool
[starrocks@dcmp12 smt]$ ls result/
flink-create.1.sql flink-create.all.sql starrocks-create.1.sql starrocks-create.all.sql starrocks-external-create.1.sql starrocks-external-create.all.sql

生成 sql


CREATE TABLE IF NOT EXISTS `default_catalog`.`test_db`.`test_table_src` (
`id` STRING NULL,
`memcardno` STRING NULL,
.....

) with (
'connector' = 'mysql-cdc',
'hostname' = 'ip',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'database-name' = 'db_name',
'table-name' = 'table_name'
);

CREATE TABLE IF NOT EXISTS `default_catalog`.`test_db`.`test_table_sink` (
`id` STRING NULL,
`memcardno` STRING NULL,
....

) with (
'sink.buffer-flush.interval-ms' = '1500',
'username' = 'root',
'load-url' = 'xx.xx.xx.228:18030;xx.xx.xx.229:18030;xx.xx.xx.230:18030',
'table-name' = 'test_table',
'connector' = 'starrocks',
'database-name' = 'test_db',
'sink.properties.strip_outer_array' = 'false',
'sink.max-retries' = '10',
'jdbc-url' = 'jdbc:mysql://xx.xx.xx.228:19030,xx.xx.xx.229:19030,xx.xx.xx.230:19030',
'password' = '123456',
'sink.properties.format' = 'json'
);

INSERT INTO `default_catalog`.`test_db`.`test_table_sink` SELECT * FROM `default_catalog`.`test_db`.`test_table_src`;

starrocks 表

CREATE TABLE IF NOT EXISTS `test_db`.`test_table` (
`id` STRING NULL COMMENT "自增主键",
`memcardno` STRING NULL COMMENT "会员ID",
.....

) ENGINE=olap
DUPLICATE KEY(id)
COMMENT "test_table"
DISTRIBUTED BY HASH(id) BUCKETS 8
PROPERTIES (
"in_memory" = "false",
"replication_num" = "3"
);

starrocks mysql 外表

CREATE EXTERNAL TABLE `mysql_external_test_db`.`test_table` (
`id` STRING NULL COMMENT "自增主键",
`memcardno` STRING NULL COMMENT "会员ID",
.....
) ENGINE=mysql
COMMENT "test_table.xlsx"
PROPERTIES (
"host" = "xx.xx.xx.xx",
"port" = "3306",
"user" = "root",
"password" = "123456",
"database" = "test_db",
"table" = "test_table"
);

已知局限

  1. smt 只能生成明细表
  2. 不能识别表的主键或者唯一键,只能指定固定字段名,对懒人来说,勉强凑合用

标签:StarRocks,smt,flink,xx,starrocks,mysql,test,table
From: https://www.cnblogs.com/Springmoon-venn/p/16624328.html

相关文章

  • Flink系列--Flink catalog
    Catalog提供了元数据信息,例如数据库、表、分区、视图以及数据库或其他外部系统中存储的函数和信息。数据处理最关键的方面之一是管理元数据。元数据可以是临时的,例如临......
  • Flink-状态一致性(如何保证exactly-once、flink+kafka端到端保证exactly-once)
    当在分布式系统中引入状态时,自然也引入了一致性问题。一致性实际上是"正确性级别"的另一种说法,也就是说在成功处理故障并恢复之后得到的结果,与没有发生任何故障时得到的结......
  • Flink-checkpoint配置及重启策略
    Flink-checkpoint配置及重启策略valenv=StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)//---------------checkpoint配置-......
  • springboot+Flink 接收、处理数据20220919
     1、pom.xml<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot......
  • 【Azure 事件中心】Flink消费Event Hub中事件, 使用Azure默认示例代码,始终获取新产生
    问题描述根据AzureEventHub示例文档,[将ApacheFlink与适用于ApacheKafka的Azure事件中心配合使用],配置好 consumer.config文件后,为什么不能自动消费EventHub......
  • Flink基础概念入门
    Flink概述什么是Flink    ApacheApacheFlink是一个开源的流处理框架,应用于分布式、高性能、高可用的数据流应用程序。可以处理有限数据流和无限数据,即能够处理......
  • Flink的时间和窗口
    Flink中的时间及时流处理是有状态流处理的扩展,实现及时流处理的时间起到了很大的作用。在Flink的时间概念中主要分为下面两种:事件时间:事件时间是每个单独事件在其生......
  • StarRocks 万能数据库:查快,写快
    StarRocks是新一代极速全场景MPP数据库。StarRocks的愿景是能够让用户的数据分析变得更加简单和敏捷。用户无需经过复杂的预处理,就可以用StarRocks来支持多种数据分析......
  • Flink学习
    一、Flink部署1.集群角色:hadoop102:JobManager;hadoop103:TaskManager;hadoop104:TaskManager2.集群启动$bin/start-cluster.sh3.查看flink状态:jps4.停止集群$b......
  • Flink入门
    Flink快速上手1-创建一个Maven项目2-引入依赖版本根据自己的情况和需求进行更改<?xmlversion="1.0"encoding="UTF-8"?><projectxmlns="http://maven.apache.org/P......