首页 > 数据库 >Streamsets读取binlog数据实时同步到MySQL

Streamsets读取binlog数据实时同步到MySQL

时间:2023-05-31 15:34:39浏览次数:48  
标签:binlog pipeline 同步 Data gender MySQL Streamsets 数据 节点

原文:https://blog.csdn.net/maomaosi2009/article/details/108293217

1、说明
实时同步binlog数据到MySQL我使用了2种方式,

2、方式一
第一种方式较为繁琐,数据从binlog流出,经过JS数据解析器将必要的字段解析出来,流入操作选择器,根据具体需要执行的增删改操作选择最后的JDBC Producer,pipeline如下:

 

binlog数据到JS数据解析器之前是这样

{
"BinLogFilename": "mysql-bin.000001",
"Type": "INSERT",
"Table": "test",
"ServerId": 1,
"BinLogPosition": 3795464,
"Database": "test",
"Data": {
"address": "安徽蚌埠",
"name": "张张",
"update_at": 1585817413000,
"weight": 134.0,
"id": 14,
"create_at": 1585817413000,
"height": 177.0
},
"Timestamp": 1585788613000,
"Offset": "mysql-bin.000001:3795464"
} {
"BinLogFilename": "mysql-bin.000001",
"Type": "UPDATE",
"Table": "test",
"ServerId": 1,
"BinLogPosition": 3795837,
"Database": "test",
"OldData": {
"address": "美国",
"name": "守望",
"update_at": 1585763177000,
"weight": 166.0,
"id": 13,
"create_at": 1585763177000,
"height": 180.0
},
"Data": {
"address": "美国暴雪",
"name": "守望",
"update_at": 1585817417000,
"weight": 166.0,
"id": 13,
"create_at": 1585763177000,
"height": 180.0
},
"Timestamp": 1585788617000,
"Offset": "mysql-bin.000001:3795837"
}
binlog数据出JS数据解析器之后是这样的,Table、Database、Type、Offset这样的字段被解析到了record的attributes属性中,被留下的只有真实的数据,如果是INSERT或者UPDATE则取Data中的值,如果是DELETE则取OldData中的值

{
"address": "安徽蚌埠",
"name": "张张",
"update_at": 1585817413000,
"weight": 134.0,
"id": 14,
"create_at": 1585817413000,
"height": 177.0
}
{
"address": "美国",
"name": "守望",
"update_at": 1585763177000,
"weight": 166.0,
"id": 13,
"create_at": 1585763177000,
"height": 180.0
}
数据经过JS数据解析器解析之后流入操作选择器,根据record:attribute('Type')的具体值来执行具体的增删改操作,做到数据同步

 

 

 

但是这种方式较为繁琐,尤其是当数据想入到多个database的时候需要在操作选择器前面再加一个数据库选择器,根据不同的数据库流入不同的操作选择器,因为JDBC Producer中是没法动态声明数据库的,只能动态声明数据表。在请教同事之后他告诉我另一种简单的数据同步方式,见方式二。

3、方式二
方式二非常简单,数据从binlog流出,经过中间的Field Renamer进行字段名转换,然后直接就可以提供给JDBC Producer进行数据入库,不再需要人为判断操作类型,pipeline如下:

 

看一下Field Renamer1具体配置,General是基础配置

 

Rename配置栏才是我们需要配置的,INSERT操作只有Data节点,UPDATE节点Data和OldData节点都有,DELETE操作只有OldData节点。注意需要同时配置/OldData/(.*)和/Data/(.*),并且上下顺序不能错误

 

在最一开始的通过binlog数据同步到MySQL的时候我是希望只有2个stage,一个MySQL Binary Log产生数据,一个JDBC Producer输出数据到目的端,主要原因就是因为真正的MySQL数据字段是在record的Data节点里面,在JDBC Producer中我不想通过Field to Column Mapping将Data节点里面的属性一个个映射到目的端MySQL字段

{
"BinLogFilename": "mysql-bin.000001",
"Type": "INSERT",
"Table": "test",
"ServerId": 1,
"BinLogPosition": 3795464,
"Database": "test",
"Data": {
"address": "安徽蚌埠",
"name": "张张",
"update_at": 1585817413000,
"weight": 134.0,
"id": 14,
"create_at": 1585817413000,
"height": 177.0
},
"Timestamp": 1585788613000,
"Offset": "mysql-bin.000001:3795464"
}
如果将Data节点里面的属性都提取到根节点下,形似下面这种格式

{
"address": "安徽蚌埠",
"name": "张张",
"update_at": 1585817413000,
"weight": 134.0,
"id": 14,
"create_at": 1585817413000,
"height": 177.0
}
那这个record流入JDBC Producer的时候因为这些属性字段和MySQL中是一一对应的,因为不再需要我们手动配置Field to Column Mapping就可以自动适配上,这也是我在方式一中需要使用JavaScript数据解析器的原因。但是在这里我们使用了Field Renamer将Data节点和OldData节点下面的所有属性都提取到了根节点下,变成了这样

{
"Table": "test",
"address": "上海",
"OldData": {},
"weight": 110.0,
"Data": {},
"Timestamp": 1585873767000,
"Offset": "mysql-bin.000001:9166277",
"BinLogFilename": "mysql-bin.000001",
"Type": "UPDATE",
"ServerId": 1,
"BinLogPosition": 9166277,
"Database": "test",
"name": "囡囡",
"update_at": 1585873767000,
"id": 27,
"create_at": 1585820937000,
"height": 174.0
}
这样的话根节点里面的字段就可以和目的端的数据库字段对应上了,但是这样有个问题,JDBC Producer怎么知道该执行UPDATE还是INSERT、DELETE操作呢?我们看一下MySQL Binary Log的输出,除了必要的数据值外还有Record Header,里面包含了sdc.operation.type,这个就对应了具体数据库的增删改操作,这个sdc.operation.type我们只需要一直往后传递下去即可

 

在JDBC Producer中有个Default Operation,以前我们都是配置这里来执行具体的增删改操作,但是实际上这个操作是在record header中没有设置sdc.operation.type时才会执行的操作,也就是说如果我们的record header自带了sdc.operation.type值的话就会直接根据已有的来执行

 

配置好这个pipeline后运行测试了一下,发现数据可以正确的进行同步过来了。我们接着测试一下数据结构发生改变的情况

3.1、源端schema改变
我们在源表添加一个新的字段gender,观察pipeline运行情况,发现pipeline不报错,正常运行

 

 

我们现在插入一条新的数据,并且给gender一个值,注意这时候目标端还没有这个gender字段,发现整个pipeline运行正常,目标端新增的数据同步过来了,只是没有gender字段而且,其他已经有的字段值都被正确同步到了目标端。

 

我们修改源端一条数据记录的gender值,pipeline的dashboard上可以看到有一条数据input了,并且正确的output了,但是因为目标端这边还没有gender这个字段,所以实际上目标端这边数据并没有变化。我们修改源端一条数据记录的address值,这个字段在目标端这边是有的,发现数据被正确的同步更新了。

我们修改目标端schema,也增加一个gender字段,让两边的schema保持一致,测试修改源端记录中的gender值,发现pipeline虽然可以正常运行,但是目标端的gender值没有被同步过来,而修改其他字段值就可以正常同步。这时候往源端插入数据的时候即使两边schema已经一致了,gender值依旧无法同步过来。因为同时把File Renamer的输出数据也落地到本地了一份,所以可以看到File Renamer输出的数据中实际上是有gender值的,猜测是不是因为这个pipeline在启动的时候就把目标端的schema进行了缓存

 

 

重启一下该pipeline,并重置offset为修改schema之前的值,发现所有update操作都同步过来了,目标端的gender值被正确更新,但是insert操作无法同步,因为主键重复而报错了。这时在源库插入数据的时候发现可以正确同步到目标端了。

针对删除字段而言的话如果源端先删除,目标端后删除,则pipeline同步数据没问题。

3.2、目标端schema改变
pipeline运行过程中在目标端添加一个字段department,pipeline运行无异常。往源端插入数据,修改数据,删除数据,都可以正确的同步。

修改源端schema,同样增加一个字段department,两边schema保持一直,pipeline运行无异常。但是新增的字段值无法同步。

针对删除字段而言的话如果目标端先删除,源端后删除,则pipeline可以继续运行,但是会出现error记录,因为源端字段在目标端找不到,即使不给这个字段赋值也会报错。
————————————————
版权声明:本文为CSDN博主「芦苇_」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/maomaosi2009/article/details/108293217

标签:binlog,pipeline,同步,Data,gender,MySQL,Streamsets,数据,节点
From: https://www.cnblogs.com/wjs2019/p/17446241.html

相关文章

  • MySQL:一文弄懂时区&time_zone
    https://zhuanlan.zhihu.com/p/448999520你还在被以下问题困扰吗:MySQL的安装规范中应该设置什么时区?JAVA应用读取到的时间和北京时间差了14个小时,为什么?怎么解决?已经运行一段时间的业务,修改MySQL的时区会影响已经存储的时间类型数据吗?迁移数据时会有导致时间类型数据时区......
  • 各类数据库适配mysql
    1.clickhouse:Array(Date)=TEXTArray(DateTime=TEXTArray(FLoat32)=TEXTArray(FLoat64)=TEXTArray(Int16)=TEXTArray(Int32)=TExTArray(Int64)=TEXTArray(Int8)TExTDateTime=DATETIMEAnray(stringD=TExTArray(UInt16)=INTArav(uInt32)=INTArray(UInt64)=INTArr......
  • Mysql 逗号分隔的字段查询
    二、find_in_set为了解决like存在的问题,还可以使用mysql提供的find_in_set(str,strlist),sql可以这样写:select*fromuserwherefind_in_set('1',hobby_ids);这样mysql就会把hobby_ids的值,按照逗号分隔的一个个元素去匹配。如果我们要匹配多个值要怎么办呢,比如查找喜欢1-篮球......
  • MySQL——json类型的应用
    在制作动态报表的时候,如果需要字段灵活配置,用json存储数据,可以让查询变得非常简单。 业务场景:绩效系统中,需要从10个系统中抓取不同数据,并且性能不会太高,但是用于计算的只有其中3个,用哪3个看领导心情,设计一张表存储抓取到的数据。1、傻瓜式做法,设计一张表,从字段1列到......
  • mysql数据库自动删除
    关于早上发现数据库丢失,留下了一份文档 检查了下发现是命令的问题 由于使用了这条命令,导致后门被开了,真是血泪的教训。修改远程登录权限的话还是使用updateuser表来修改。......
  • java同步mysql的数据到PostgreSQL时报错ERROR: invalid byte sequence for encoding "
    最近,同事在做一个功能,通过java程序将mysql中的一张表的数据同步到pgsql中,在同步过程中,插入到pgsql中出现了如下错误:`###Errorupdatingdatabase.Cause:org.postgresql.util.PSQLException:ERROR:invalidbytesequenceforencoding"UTF8":0x00在位置:unnamedportalpa......
  • 关于mysql 创建索引报错 1071 specified key was too long ;max key length is 3027
    另一种张表也是相同的字段创建索引却能成功,在网上查了一些资料。后来发现是两张表字段都用的varchar类型,不过能成功建索引的表设置的长度是50,而不能成功的表里设置的255,修改字符长度就能成功建索引了。关于varchar(50)和varchar(255)的区别:https://dba.stackexchange.com/questio......
  • 关于MySQL的一些优化(单表访问)
    以此表为例CREATETABLE`single_table`(`id`intNOTNULLAUTO_INCREMENT,`key1`varchar(100)CHARACTERSETutf8mb4COLLATEutf8mb4_0900_ai_ciNULLDEFAULTNULL,`key2`intNULLDEFAULTNULL,`key3`varchar(100)CHARACTERSETutf8mb4COLLATEutf8mb......
  • SQLite与MySQL与PostgreSQL:关系数据库的比较
    概述和功能SQLite是一个基于文件的嵌入式RDBMS,不需要任何安装或设置。反过来,这意味着应用程序不在需要启动,停止或配置的单独服务器进程下运行。这种无服务器架构使数据库能够跨平台兼容。完整的SQL数据库包含在单个磁盘文件中,所有读取和写入都直接在此磁盘文件上进行。由于数据直接......
  • MySQL数据库,货币格式化
    MySQL数据库,货币格式化如何将数字表示为美元格式例$10,000.00查询语句SELECTCONCAT('$',FORMAT(price*1000,2,','))ASPriceFROMitem;千位分隔FORMAT(number,decimal,places),即FORMAT(数字,保留位数,分隔符)添加美元符号$CONCAT(str1,str2,...),str1、s......