首页 > 其他分享 >Doris同步多库多表

Doris同步多库多表

时间:2022-11-18 16:01:08浏览次数:49  
标签:同步 多表 val batch kafka topic Doris 多库


官方的东西抽象到技术层面,跟具体的业务有点脱节,我们需要下沉封装,而不是削足适履。

引言

  Doris用多了,把一些坑都免疫了,遇到就知道不该跳,就像spark/flink的算子调优一样,还用后期调优吗?不应该在写的时候,就肌肉记忆的使用reduceByKey来代替groupByKey吗?
  与其叫“Doris同步多库多表”不如叫“Doris同步binlog踩坑指南”,基于当前大众化的实时架构,来将业务库的数据同步到Doris,做到数据的一致性,后期也希望palo团队做一下库表的过滤。
  这里我说的是业务库,业务库,业务库,一般来说业务库格式更加标准和统一,并且不存在删除操作,也不会用mysql来存太大的字段,利用这几点特性来实现数据的统一,我们目前有二十几个mysql实例,3000多张表,只需要维护二十几个flink任务即可,不懂doris原理的同学照样可以进行数据同步。

routineLoad不是个好选择

有一些文章推荐使用RoutineLoad来做业务库的数据同步,这是个坏的选择

  1. routineLoad依靠的kafka的topic,并且是基于表纬度的数据同步
      如果你的上游用的maxwell,那么你需要读kafka,过滤库表,再写入kafka,最后用routineLoad去接,增加了一步数据etl,并且将链路又加长了,增加了业务不稳定性;
      如果你用的canal来同步binlog,那么你要对每一个表创建一条链路,会导致canal压力增大,cup飙升,影响整个实时链路
  2. 使用复杂
      编写任务时,要指明字段和过滤格式,不好同一管理,当上游表结构发生变更时,必须进行重写任务,而不是重启,有点蠢
  3. 管理缺陷,有多少表就有多少routineLoad任务
      Doris没有提供可视化界面,几千个呢,即使有问题出现了也不知道是那个出问题,集群出问题,所有任务都要重启,重启也不能指定时间重启,出问题的时候就知道有多蠢了。
  4. routineLoad的本质是streamLoad
      官方文档中说的很明白,所有千万不要说routineLoad是实时同步,是微批同步而已,要额外指定batch_rows和batch_interval的,与其把压力给BE,不如给flink做这个etl
  5. routineLoad同步业务库数据时要注意的几个点
      1)不要指定error_row,既然是业务库数据,就要保证百分百的正确,有容错条数是几个意思?
      2)kafka上游要严格指定messageKey,建议是(库名+表名+主键),这样才能保证同一个主键下的数据永远在同一个partition中,不会出现乱序的问题。
      3)下游表设计时,varchar的长度尽量大点,Doris不允许超长度,mysql却可以,如果上游表字段长度太长,下游一定报错。
      4)下游表设计时,default字段为null,既然是binlog的数据,就不会有缺少字段,就不要硬给数值,设置成null的好处还有就是streamLoad同步json时,表结构发生变更不会报错
    上边这几条是通用的

准备

  • topic中的数据是以库为单位,或者是实例为单位
  • binlog写入kafka要有严格的messageKey
  • 稳定的kafka和flink集群

开发

可以提前看一下我的这篇博客​​Flink写入Doris的实时应用​​

确定我们的参数

  • batch_rows和batch_interval:批次提交行数和时间间隔
  • 重启方式:时间、offset,既然是业务库,不怕重复消费kafka,建议是时间,比如2h前或者2021/04/26/18(精确到小时就行)
  • 指定topic:读那个topic,写入那些表可以做映射,也可以读Doris上对应的库表名
  • 提交curl的线程数:既然是streamLoad,那肯定是基于表的,同时允许几个curl提交,单库不得超过100个线程

解析conf

  传参也好,读xml也好,读yaml也好,根据自己的喜好来。

source

  想维护offset,可以看我的文章Flink手动维护​​kafka的offset​​   关键是用时间戳来读取kafka数据

etl

  传统的步骤,过滤出binlog里的(data、database、table)就够了,重新组合成一个结构,准备给sink使用

sink

切记:streamLoad 不要带 -H “max_filter_ratio:0.01”,业务库数据不允许丢数据

  1. 根据topic获取要写入的表,存到一个set中。比如我的topic是以库为单位的,那么doris的库和mysql中的也是对应的,用jdbc去读一下这个库里有那些表就可以了;或者是我将要同步的表维护到redis,间隔一段时间去redis里同步一下要写入的表。
  2. 定义一个缓存数据的list来积累数据,并开始计数和计时
  3. 到达batch_rows和batch_interval阈值时进行提交
  4. 创建一个HashMap[String, ArrayList[String]],对set和list进行过滤,即put({databses}.{tablename}, ArrayList[{binlog中的data}])
  5. 遍历HashMap,多线程写入doris
  6. 重置batch_rows、batch_interval、list、HashMap

第5步的代码(简化版),结合​​Flink写入Doris的实时应用​​去看会更清晰,是我略的部分,CurlCallableThread是个执行curl的多线程。
curlThread是我之前提到的一个参数:“提交curl的线程数”,另外对任务返回结果的一个报错。

var isStop = false
var lastRes = ""
if(insertDataMap.nonEmpty){
val latch = new CountDownLatch(1)
val pool = Executors.newFixedThreadPool(curlThread)

val reslist = new java.util.ArrayList[(Future[String], String)]()
try{

for(elem <- insertDataMap){
val data = elem._3.mkString("[",",\n","]")//组合成jsonArray
val database = elem._1 //得到database
val table = elem._2 //得到table
val path = s"/tmp/flink_doris/$topic/$getCurrentThreadId/$database.$table"
val c1 = new CurlCallableThread(data, path, database, table, latch)
val f1 = pool.submit(c1)
reslist.add((f1, table))
}
latch.await()
for (i <- 0 until reslist.length){
val res = reslist(i)._1.get().toString
if(!res.startsWith("accessed")){
Logger.warn(res)
lastRes = res
if(res.contains("ErrorURL") || res.contains("unknown table")){
//打印错误
Logger.error(content)
Logger.error(lastRes)
//删除
val table = reslist(i)._2
tableErr(table)
}else{
isStop = true
}
}
}
}catch {
case e: InterruptedException => e.printStackTrace()
} finally {
Logger.warn(s"${timestampToDate(System.currentTimeMillis())} upsert $topic data $listLength t")
pool.shutdown()
insertDataMap.clear()
if(isStop){
val content = s"topic $topic stop in "+ timestampToDate(System.currentTimeMillis())
//打印错误
Logger.error(content)
Logger.error(lastRes)
//停止
println(1/0)
}
}

  自己做好报错监控,邮件、钉钉、短信等等

使用

  存量数据补完(后期会出一篇如何快速补存量数据的文章),topic准备好就可以开始启动了
  指定batch_rows和batch_interval,topic,重启方式,线程数就可以了。
当业务表字段发生变更,提前到Doris执行alter命令就可以了,字段default设置为null就不会报错
  新建表时,在Doris提前建好,重启识别一下要写入的表就行了
  如果没有及时的进行变更,那也没事,把重启方式的时间往前推到变更业务表之前,重新补数据即可。
  如果发现报字段长度问题,超了65533,结合业务看看是不是可以删掉这个字段,不影响数据同步。
  根据业务库的重要程度来调整batch_interval,建议10s以上,准实时即可

感谢百度同学热心帮助和老师的指导,以及数仓同学的case


标签:同步,多表,val,batch,kafka,topic,Doris,多库
From: https://blog.51cto.com/u_15879559/5868559

相关文章

  • MySQL中的多表操作
    MySQL多表操作1、联合查询联合查询:union,是指将多个查询结果合并成一个结果显示,联合查询是针对查询结果的合并(多条select语句合并)基本语法select查询[决定字段......
  • DorisDB在某二梯队互联网公司的实践
    PART1.实时数据分析场景为了解决实时数据分析问题,我们先后调研了TiDB、ClickHouse和DorisDB,总结如下:1.1TiDBTiDB是一款同时支持OLTP与OLAP的融合型分布式数据库产品,具备......
  • 巨蟒python全栈开发django8:基于对象和基于双下划线的多表查询
    1.编辑删除&&多对多关系的其他方法提交,数据,得到结果  查看运行给编辑和删除,添加样式我们点击删除,可以成功删除 打印sql语句的,在settings.py里边的配置 LOGGING={......
  • 022.OneToMany对象关联查询(多表级联查询)
    1.案例分析1.班级(1)--->学生(n)学生必须持有班级的外键2.1对1通过主键关联3.多对多必须单独抽象出一张中间表  2.商品和详情对象关联查询(一对多)2.1创建商品......
  • MapReduce实战之 MapReduce中多表合并案例
     MapReduce中多表合并案例1)需求:订单数据表t_order:idpidamount1001011100202210030331001   01   11002   02   21003   03   31004   01 ......
  • SQL数据分析,多表拼接
    在日常数据查询时,绝大多数情况是将表格关联起来进行查询的,而不仅仅是对一张表格的数据进行查询,在之前的例子中,学生表用于存储学生信息、课程表用于存储课程信息、成绩表用于......
  • SpringBootJPA多表多条件查询(参数可能为空)语句
    @Query(value="SELECTc.bynameasbyname,c.cartascart,c.phoneasphone,c.surnameassurname,c.idasid,c.update_timeasupdateTime,c.head_imgasheadImg,c.i......
  • mysql多表连查的性能问题
    存在以下问题嵌套循环连接(nestedloopjoin--NLJ)类似于我们平时写的多层嵌套循环,这个性能受第一层循环的次数影响。一般是小表驱动大表的方式,当小表筛选后的数量很大,则......
  • 5分钟搞定 PostgreSQL 到 Doris 数据迁移和同步
    简述ApacheDoris是一个现代化的MPP分析型数据库产品,仅需亚秒级响应时间即可获得查询结果,能有效地支持实时数据分析。本文主要介绍如何使用CloudCanal快速构建一......
  • NHibernate1.2 执行多表查询
    个人写法可能有点傻···在改进先贴出来!让大家找点思路····目的:将两个表的内容综合到一个集合里,然后邦定到GridView上解决方法:1.创建一个综合了两个实体类的一个新的......