首页 > 数据库 >三端一体计算方案:Unify SQL Engine

三端一体计算方案:Unify SQL Engine

时间:2022-08-15 18:14:13浏览次数:81  
标签:Engine Java ODPS Flink 引擎 计算 SQL Unify

简介: 本文将介绍数仓建设过程中面对三种计算模式,较低的研发效率、不可控的数据质量,以及臃肿数据接口服务的困境的解决方案。

背景

在漫长的数仓建设过程中,实时数仓与离线数仓分别由不同的团队进行独立建设,有大而广的离线数仓体系,也有需要追求业务时效,需要建设实时数仓。然而,业务数据需求和数据产品需求中,往往需要把实时数据与离线数据结合在一起进行比对和分析,但是这两个天然不一样的数据存储和计算结构,需要同时开发两套数据模型。在数据处理过程中,实时数仓需要使用Blink/Flink 处理,离线需要写ODPS SQL处理,还有在线计算模型,需要开发java代码处理。

image.png

如上图所示,实时数据与离线数据在存储层、计算层、服务层,都是割裂分离、独立建设的。实时数据,有着增量式计算的特性,需要快速的流转与计算,它主要以DataHub、Flink、Hbase等异构系统做为支撑,串联形成一个完整实时计算全链路。而离线数据,是定时、批量计算的特性,由存储计算统一的ODPS系统做为支撑。它们除了计算链路的差异,还有着数据处理的逻辑差异:

  1. 流批处理不能复用,ODPS和Blink/Flink的SQL标准不一致,他们的底层调度和数据处理逻辑也有根本的差别,一个是以MR作为核心的批处理方式,一个是以Flink/Blink为核心的流处理。
  2. 有些流批处理场景需要调用HSF接口,调用HSF接口,在Java Spring环境里,是信手拈来的事情,但到了ODPS/Flink环境里,就变得额外的一个大挑战,甚至不可实现,因为在ODPS/Flink是无法加载或者极难使用Spring容器的,这就会让开发在面对复杂流处理场景,更倾向使用自己熟悉的Java环境,但同时也意味失去ODPS/Flink那种贴近业务表达的SQL表达。
  3. 计算处理除了流批处理,还更广泛存在在线交互计算,这个和流处理异步处理还不太一样,是需要同步计算并返回结果,这通常是在Java环境开发HSF接口,但如果你要对外同时提供三种能力:流计算、批计算和在线计算的时候,就面临需要三端开发,流计算和批计算尚且还有SQL,虽然不太一致,但至少大同小异,但在线交互计算就是需要纯Java开发了,将SQL翻译成Java代码可是个不小的挑战。

 

Unify SQL VS 流批一体

面对三种计算模式,较低的研发效率、不可控的数据质量,以及臃肿数据接口服务的困境,三端(流计算、批计算、在线计算)一体计算的想法自然油然而生,在20年做价格力业务时候,我就一直思考有什么解法,其实,这个也是大数据架构经常面临的问题,业界达成共识,可以归纳两种方案:

▐  流批一体计算

 

同一个引擎承载流、批两种计算模式,在流计算模式下进行实时数据计算,在批计算模式下进行离线数据计算。

image.png

流批一体计算的典型架构是:Flink + Kappa架构。Flink可以实现基于SQL的流批一体的计算表达,复杂计算通过Java应用承接,价格力计算架构就是典型这种架构,但是这种架构存在以下问题:

  • 没有解决在线交互计算


Flink解决了流计算、批计算一体的能力,这两种都是异步处理,只是时效不同而已,但没有解决在线计算的能力,如果要提供在线计算能力,不得不在以下两个方案选择:

  1. 通过Java应用提供同步计算接口,这样就存在两套逻辑:一个是Flink实现的流批处理,另一个是Java实现的在线处理。
  2. 提供两个接口,一个接口是发起计算请求,将计算请求交到Flink处理后,再提供一个轮训查询接口,查询计算好后的数据,这个方案至少在计算上做到一套代码,但这种同步转异步处理的方案势必会影响产品的设计。

 

  • Flink的批处理吞吐量

 

Flink实现批处理,其实是有点一厢情愿,为啥这么说,因为其吞吐规模,跟MR批计算(ODPS)完全不是一个量级,如果Flink真能实现和ODPS完全对等的吞吐规模和资源成本,那完全不需要ODPS什么事了,但现实是,对于一些只有批量处理场景的(比如特征预处理、统计计算),ODPS仍然是第一优先选择,只有当面临流批同时存在的场景时候,并且对批处理规模要求不大时候,Flink的确提供非常不错的一体化解决方案。

▐  Unify SQL

 

同一SQL代码通过自动化转义,翻译到流计算引擎和批计算引擎上进行流、批计算,也包括翻译到HSF接口代码,提供在线交互计算能力。

image.png

Flink的流批一体架构非常优秀,能解决90%的流批一体问题,但不幸的是,我们有些业务场景(典型的价格计算场景),远不是Flink写写SQL可以解决的:

  1. 整个电商是个非常复杂的业务体系,就以我所处在的营销域里,就要面对招商模型、投放模型、活动模型、权益等等,这些都远不是一个单一系统可以承接的,也不是一个单一团队可以承接的,阿里针对这样复杂的业务,设计了HSF这样微服务电商架构,但是Flink和电商这样的Java技术栈明显是割裂的,怎么结合两种体系架构,一方面发挥电商的微服务架构的红利,一方面又能利用到Flink的SQL流批处理能力,考虑到Flink本身的局限性,与其让Flink支持HSF,不如让Java环境支持Flink SQL,换句话说,设计一个SQL引擎,它能通过sql的流处理方式,处理Java对象,将流引擎嵌入到Java里,随调随用。
  2. Flink的批引擎,在面对T级离线数据批量处理,是非常耗资源,几乎不可用,正如上面所讲的,Flink批处理的吞吐量远不及ODPS的MR批处理,那么,我们为何不让这样计算仍然交接给ODPS处理,但是,ODPS和Flink的SQL标准不一致,需要两端开发,现在问题变成:怎么统一ODPS和Flink的开发,说的再通俗点,我们可不可以在ODPS和Flink上面架设一层统一Unify SQL,这个SQL引擎可以翻译成ODPS或者Flink能理解的处理(ODPS翻译成MR程序,Flink翻译成Stream Operator)方式,抹平ODPS和Flink的SQL语义差别。
  3. 如果仅仅是抹平ODPS和Flink的SQL差异,带来的收益其实并不大,但是其统一SQL表达计算的设计,是可以进一步扩宽其应用范围,比如在线交互计算,或者说,我们可以进一步打造统一计算引擎,包括编排不同模式的计算能力,比如:有些场景对时效要求比较高,我们可以调度Flink计算,对时效没有要求,但数据量巨大,可以只调度离线计算,有些需要提供HSF接口,就调度应用启动spring接口。


Unify SQL Engine

 

淘系价格计算引擎,以Flink + Kappa为核心的数据架构,关于这种数据架构演进,可以参考我其他文章,三种计算模式的叠加是价格服务计算引擎的常态模式,他们都在各自核心计算发挥自己最大的优势:

  1. ODPS:离线批量计算引擎,核心优势,非常高的计算吞吐量,但时效差,有面向MR和SQL编程模式,业务和BI友好,主要用在数据预处理、离线特征加工、常见维表ETL等。
  2. Flink:流式处理引擎,核心优势,低延迟计算,时效好,极高的容错和高可靠性,但吞吐量相比ODPS一般,有面向Stream API和SQL API的编程模式,业务和BI友好,主要用在实时数据加工(优惠、订单等)、消息预处理等
  3. Java计算:核心优势,丰富的电商Java HSF接口,复杂的领域模型,面向对象设计,开发友好,但是业务和BI不友好,容错和可靠性依赖开发设计,延迟和吞吐量也高度依赖开发设计。

 

那么如何整合这3个不同计算架构,Flink提出一个引擎承接所有计算模式,也就是Flink的流批一体引擎,但这带来的问题就是,不同计算模式,底层的引擎本身就很难完全周全到,与其去统一计算引擎,为何不统一表达和调度,而把真正的计算下放到各自计算引擎,这就是Unify Engine的核心思想。

 

▐  SQL引擎技术

 

在实现三端一体化时候,有个核心技术难点,就是SQL引擎,很多数据产品都自带自己的SQL引擎,Flink内部有SQL引擎,ODPS内部有C++实现的SQL引擎,Hive也有,Mysql内部也有SQL解析引擎,这些SQL引擎都高度集成到各自的存储和计算里,如果你说要找个独立的可用在Java环境的SQL引擎,市面上有是有,不过要么是非常复杂的calcite sql引擎,要么是非常简单的select * 简易sql引擎,能做的事情非常少,开箱即用的几乎没有。但Unify SQL引擎又是实现三端一体化的核心组件,没有它,其他什么事情都无从谈起。
从无设计一个SQL引擎成本是非常高的,其中不说复杂的语法解析,生成AST语法树,就单单SQL逻辑计划优化,就是非常复杂,幸运的是,业界是存在一个可以二次开发的SQL引擎,就是calcite SQL引擎,其实,很多SQL引擎都是基于calcite二次开发的,比如Flink、Spark内部的SQL解析引擎就是基于calcite二次开发的,我们设计的SQL引擎也是基于calcite的。
Calcite 使用了基于关系代数的查询引擎,聚焦在关系代数的语法分析和查询逻辑的规划,通过calcite提供的SQL API(解析、验证等)将它们转换成关系代数的抽象语法树,并根据一定的规则或成本估计对AST关系进行优化,最后进一步生成ODPS/Flink/Java环境可以理解的执行代码。
calcite的主要功能:

  1. SQL解析:Calcite的SQL解析是通过JavaCC实现,使用JavaCC变成SQL语法描述文件,将SQL解析成未经校验(unvalided AST)的AST语法树。
  2. SQL校验:无状态校验,即验证SQL语句是否符合规范;有状态校验,通过和元数据验证SQL的schema,字段,UDF是否存在,以及类型是否匹配等。这一步生成的是未经优化的RelNode(逻辑计划树)
  3. SQL查询优化:对上面步骤的输出(RelNode),进行优化,这一过程会循环使用优化器(RBO规则优化器和CBO成本优化器),在保持语义等价的基础上,生成执行成本最低的SQL逻辑树(Lo)


至于calcite的比较详细的原理,可以详解:Apache Calcite 处理流程详解(地址:https://xie.infoq.cn/article/1df5a39bb071817e8b4cb4b29),这里不详解了。
有了calcite,解决了SQL->逻辑树,但是真正执行SQL计算的,还需要进一步将逻辑数转换成物理执行树(Physical Exec DAG),在这个DAG,是包含可执行的Java代码(JavaCode)片段,最后下发到不同执行环境,会被进一步串联可被环境执行的链路,比如在ODPS环境,会生成MR代码,在Flink环境,会被转换成Stream Operator,在Java环境,会被转换成CollectorChain,在Spring环境,会被转换成Bean组件。

image.png

PS:如果你们看过Flink源码,对上面流程会非常眼熟,是的,Unify SQL Engine不是从头设计的,是基于Flink 1.12源码魔改的,其中Parse和下面要说的Codegen技术都是直接参考了Flink设计,当然说是魔改的,就是还有大量代码需要基于上面做二次开发,比如从执行DAG到各个环境真正可执行的MR/Bean/Stream。

▐  Codegen技术

 

在SQL解析后,经过逻辑优化器和物理优化器,产生的PhyscialRel物理计划树,包含大量的复杂数据逻辑处理,比如SQL常见的CASE WHEN语句,常见的做法是给所有符号运算定义个父类(比如ExecNode),实际运行时,委派给真实的子类运行,这涉及到大量虚拟函数表的搜寻,最终这种分支指令一定程度阻止指令的管道化和并行执行,导致这种搜寻成本比函数本身执行成本还高。

 

Codegen技术就是专门针对这样的场景孕育而生,行业做的比较出色的Codegen技术,有LLVM和Janino,LLVM主要针对编译器,而Java的代码codegen通常使用Janino,Janino做为一种小巧快速的Java编译器,不仅能像Javac将一组java文件编译成Class文件,也可以将Java表达式、语句块、类定义块或者Java文件进行编译,直接加载成ByteCode,并在同一个JVM里进行运行。

 

Unify SQL Engine也使用Janino用来做CodeGen技术,并有效地提升代码的执行效率。关于Janino更多内容,可以参考这篇文章:Java CodeGen编译器Janino(地址:https://zhuanlan.zhihu.com/p/407857568)。这里有采用Codegen和不采用Codegen的技术性能对比:

 

表达式

100*x+20/2

(x+y)(xx+y)/(x-y)100/(xy)

Node树遍历执行

10ms

88ms

Janino生成代码执行

6ms

9ms

 

可以看出当表达式越复杂时,使用Janino的效果就会体现越明显。

 

▐  有状态计算

 

通常计算分为无状态计算和有状态计算,无状态计算一般是过滤、project映射,其每次计算依赖当前数据上下文,相互独立的,不依赖前后数据,因此,不需要有额外的存储保存中间计算结果或者缓存数据,但还有一类是有状态计算,除了当前数据上下文,还需要依赖之前计算的中间态数据,典型的比如:

  1. sum求和:需要有存储保存当前求和的结果,当有新的数据过来,结合当前中间结果基础上累加
  2. 去重:去掉之前重复出现的数据,需要保存之前已经处理过哪些数据,然后有新的数据需要计算,要和保存的数据比较是否重复
  3. 排序:需要有存储保存之前排好的数据,当有新的数据过来,会变更之前的排序结果,并diff后,将重新排序后有变动的数据重新发到下游

image.png

可见,当需要进行有状态计算,需要有后背存储来承载中间状态结果,Unify SQL Engine是支持3种后背存储:内存、Redis和Hbase:

  1. 内存State是只保存到内存,一旦重新启动,就丢失历史数据,内存State通常用在单机有状态计算,并且容忍数据丢失。一般用在ODPS的MR程序里,因为一次MR调用状态计算,只需要当前执行上下文的累计结果,不需要放在全局缓存,不同批次之间的累计是通过MR API之间传输,内存State完全够用。
  2. Redis:对于需要跨多机状态计算,就会用到Redis作为后背存储,Unify SQL Engine在Java环境里默认是使用这个作为后背存储。Redis后备存储一般用在Java计算环境,数据会流经过不同生产机器,计算的中间结果需要全局可见。
  3. Hbase:如果状态数据超过100G,可以选择Hbase做为后背存储,性能虽然比不上Redis,但状态可以保存很长时间,对于长周期的状态计算非常有用。

 

▐  JOIN语义

 

 

Flink是可以支持双流Join,但是Flink的双流Join的语义完全照搬了SQL的JOIN语义,就是一边的数据会和另一边的所有数据JOIN,这个对于离线分析没有任何问题,但是对于实时计算是会存在重复计算,在有些场景还有损业务逻辑,比如:当订单流去双流JOIN优惠表的时候,就会出现这个问题,优惠表的数据是会不停变化的,但是我们希望以快照数据做为JOIN的依据,而不是把优惠变更的数据都复现一遍,Unify SQL Engine是做到后者语义的,也就是SNAPSHOT JOIN,也是业务场景常见的语义:

image.png


一些想法

 

▐  统一调度

 

 

Unify SQL Engine现在已经可以做到将SQL翻译成不同执行环境可运行的任务,通过Unify SQL统一表达了不同环境的逻辑计算,但是离最终我们期望的还很远,其中一点就是要做到统一调度和分配,现在不同环境的协调是需要开发者自己去分配和调度,比如哪些计算需要下发到ODPS MR计算,哪些是在Java环境运行,未来我们希望这些分配也是可以做到统一调度和运行,包括全量和增量计算的自动协同,离线和在线数据协同等

 

▐  资源成本

 

 

通过Unify SQL Engine,开发者可以自己选择底层的计算引擎,对于数据量较大但对时效要求不高的场景,可以选择在ODPS计算,对于时效有要求同时数据规模可接受内,可以选择在Flink调度,对于计算逻辑复杂,需要大量依赖HSF接口,可以选择在Java环境启动,选择自己最容易接受的资源和成本,承接其计算语义。

 

同时,也是希望通过Unify SQL Engine最大化的利用计算资源,比如Java应用,很多情况下是空闲状态的,CPU利用率是比较低下的,比如一些流计算可以下发到这些空闲的应用,并占用非常小的CPU(比如5%以内),整体的资源利用率就提升了,还比如,Flink计算资源是比较难申请,那么可以选择在Java环境里计算(Java相比Flink环境缺乏一些特性,比如Exactly once语义)等等。

原文链接:https://click.aliyun.com/m/1000353354/ 本文为阿里云原创内容,未经允许不得转载。

标签:Engine,Java,ODPS,Flink,引擎,计算,SQL,Unify
From: https://www.cnblogs.com/yunqishequ/p/16589199.html

相关文章

  • MySQL事务
    何为事务? 一言蔽之,事务是逻辑上的一组操作,要么都执行,要么都不执行。事务演示经典的转账问题mysql>starttransaction;QueryOK,0rowsaffected(0.00sec)mysql>......
  • mysql基础
    #SQL语句的分类:1、DQL(数据查询语言):所有的SELECT语句都是数据查询语句2、DDL(数据定义语言):CREATEDROPALTER等,对数据库、表(结构)进行增删改操作3、DML(数据操作语......
  • mybatis_3_使用xml构建SqlSeesionFactory
    使用xml构建SqlSeesionFactory,分为两步:第一步:创建一个XML配置文件;第二步:创建SqlSessionFactory实例;示例:第一步:创建mybatis-config2.xml文件<?xmlversion="1.0"enco......
  • 转 pstack 命令 分析mysql hang
    pstack主要分析mysqlhang的函数,分析不了锁的情况,比较高深 参考文档https://blog.csdn.net/n88Lpo/article/details/106484780https://www.cnblogs.com/nanxiang/......
  • MySQL字段类型
    一、字符编码与配置文件'''MySQL的服务端字符编码默认使用的是latinl所以我们在写入中文的时候会出现乱码情况我们可以临时在把这个表的字符编码给改成utf8但是这样......
  • sql语句进行统计所占百分比
    亲爱的宝子们好久不见,今天得闲给大家分享一下大屏设计时常用到得统计所占百分比;这里为了方便大家实操我直接给大家匆匆建立了两个表格sql语句如下:CREATEtablet_group(......
  • Linux client with AD authentication login windows SQL Server
    Pre-WorklinuxshouldjoinADsameasSQLServersetspnonwindowsSQLServerforlinuxWindowsSQLServerC:\Users\Administrator>setspn-LsqladminRegi......
  • Docker 安装mysql 5.6
    一、dockerhub上面查找mysql镜像dockersearchmysql二、hub上拉取mysql镜像到本地标签为5.6dockerpullmysql:5.6三、使用mysql5.6镜像创建容器(也叫运行镜像)dockerru......
  • mysql 容器内部初始化
    mysql容器内部初始化我推到了dockerhub上一个镜像dockerpullliwenchao1995/mysql8:empty这个镜像是官方提供的mysql8版本,在容器内部把init的命令和要导入的sql放到......
  • mysql-递归查询
    0.背景最近接触到的业务中需要通过mysql查询部门的组织架构层级关系,最一开始的思路是想通过自定义函数来完成,但是查询效率真的是“感人”。又另辟蹊径找到mysql的递归查......