首页 > 其他分享 >大数据经典论文解读 - S4 + Storm

大数据经典论文解读 - S4 + Storm

时间:2023-04-09 12:22:06浏览次数:48  
标签:发送 Tuple S4 解读 Bolt Storm PE

S4

  • 流式计算的模型是什么样?要解决哪些问题?
  • S4是如何设计,如何进行分布式计算的?
  • S4有哪些缺陷?

在分析海量用户搜索、广告点击行为时,这个处理数据的的需求和MapReduce生成报表类似,但是要求数据统计的反馈时间尽可能短。如果频繁使用MapReduce将不得不面对:

  1. 大量“额外开销”
    MapReduce的额外开销不小
  2. 不得不让输入文件“碎片化”
    GFS将文件变为64MB大小的Block,但如果每分钟都处理数据,那么输入数据就要按照分钟分割成多个小文件,分不到GFS不同节点。文件变得很小,也丧失了顺序读取大文件的性能优势。

高频执行MapReduce还有很多问题,根本原因还是:

  1. MapReduce 是为“高吞吐量”设计的系统,没考虑低时延的要求
  2. MapReduce 是一份“边界明确(bounded)”的数据,进行处理前数据已存放在存储系统上。而实时数据计算要处理的是“无边界(unbounded)”数据

流式计算逻辑模型

S4把所有计算过程变成了一个个处理元素(Processing Element)对象(PE 对象)。具体实现时,PE就是一个编程语言中的对象,其包含4个要素:

  • 功能(functionality)PE类里实现的业务逻辑函数及配置
  • 能够处理的事件类型(types of events)
  • 能够处理的键(keyed attribute)
  • 处理事件的键对应的值(value)

流式处理就是由一个个PE组成的有向无环图(DAG),起点是无键PE(Keyless PE)对象,用于接收外部发送来的事件流。外部发送来的事件流就是一条条的消息。无键PE解析消息转换为事件,然后打上三个信息:

  1. 事件类型(Event Type)
  2. 事件的 Key
  3. 事件的 Value

事务发出后,下游PE根据自己定义的事件类型和能处理的键接收对应消息并处理。没有对应的键的PE,系统会创建一个新PE。PE对象处理完数据后,可选择立即发送新事件出去;也可在满足一定数量或事件要求后再发送消息。在DAG的终点也有一系列PE对象,把最终计算结果发布(Publish),发布频率与其他PE发送消息类似。

图中是TopK单词的例子:

  1. 起始节点是 QuoteSplitterPE,也是无键PE
    1. 接收外部传来的句子,分割成单词并统计单词出现次数
    2. 将每个单词出现次数作为 WordEvent 发送出去,对应 Event 的 Key 就是(Word,具体单词)的组合(Tuple),对应 Event 的值就是(Count,出现次数)的组合(Tuple)
  2. 一系列 WordCountPE 对象,声明只接受 WordEvent,每个不同单词对应一个PE对象,系统中有海量PE对象
    1. 上游PE将相同单词 WordEvent 发送到同一个PE,这个PE就可以统计单词出现的总次数
    2. 收到一个事件单词的出现次数就会更新,向下游发送一个 UpdatedCountEvent,也就是更新单词计数的事件
    3. 这个事件中对应的Key是(Sort,N)这样的组合,每个PE对象里的N都是随机的,但是固定不变的。
      这个组合为了一层的负载均衡,可设定N,越大下游的PE对象越多,数据就会到更多不同对象中计算。对应的值包括对应的单词是什么及对应的单词出现次数。也就是((Word,具体单词),(Count,出现次数))这么一个组合。
  3. 一系列 SortPE 对象,用于接收上游不同单词出现次数,在内部排序。最后输出TopK给下游。它相当于所有单词的某个分区的数据,这个分区包含了一部分单词的所有数据。前面设定了N是几,就会有几个SortPE对象
    1. 给下游的事件为 PartialTopkEvent,是一个部分数据的TopK
    2. 所有 SortPE 的对象输出消息的 Key 都是相同的。为了获得全局排序,要发送到同一个PE对象。这个Key被固定为(topk,1234).而Value则是K个(单词,出现次数)的集合
  4. 整个 DAG 的终点,是唯一一个 MergePE,用于接收 PartialTopkEvent,然后内部归并,选出全局 TopK。最终把对应数据写入到其他存储系统供其他应用读取

S4将整个数据处理流程变成一个有向无环图的设计,后续流式系统都采用的方法。所有数据变成事件流,开发人员只需要:

  1. 设计整个 DAG 应该是什么样子
  2. 实现其中每个节点的业务逻辑代码

来自 MaprReduce 的设计理念

S4 和 MP 都是抽象的概念,但是S4选择了一个无中心的、完全对称的架构。S4没有Master节点,但是依赖ZooKeeper。所有服务器作为一个处理节点(ProcessingNode),简称PN,注册在zk上。集体的分配负载由各个节点协商决定,而不是由一个中心化的Master统一分配。

每个处理节点都是由上下两部分组成的:

上面,实际的业务处理逻辑模块

  • 通过 Event Listener,监听外部发送过来的消息,转发给对应PE对象
  • PE对象所有输出结果都发送给 Dispatcher,让 Dispatcher 确定应该发送给哪些 PE 里
  • 实际消息发送由 Dispatcher 交给 Emitter,对外发送出去

业务处理模块只确定对应的消息发送,应该发送给哪个逻辑上的PE。具体发送由下层的通信层模块决定

下面,通信层模块

  • 具体的路由,就是Event要去的某个逻辑PE到底在哪台物理服务器
  • 负载均衡,不同单词更新频率可能不同,所以根据节点的负载判断新PE该放到哪个节点上
  • 容错恢复机制,有特定节点挂掉时,要在其他节点上恢复原先这个节点被分配的PE
  • 传输协议,S4是一个“插件式”架构,底层传输协议可以切换的。可使用TCP确保消息能发送成功;也可用UDP支持更大吞吐量

这与 MapReduce 类似,开发者只要关注业务逻辑,具体通信无需关心

“过时”的伸缩和容错能力

看起来S4的DAG上要有海量的对象,可能是数万级别。S4的设计很粗糙,有很多问题:

  • 海量对象。每个Key都是一个对象,即使只出现一次也占用内存。S4的解决办法是设定TTL,定期清理
  • 没有时间窗口。实时处理时常常要统计“过去一分钟热搜”或“过去一小时热搜”这种有时间范围的数据
  • 容错处理很简单。S4中某个节点挂了就重启一个,但原先PE维护的信息都丢失了。不知道目前统计信息是什么,也不知目前处理到哪些事件
  • 不支持动态扩容。负载上升过快时,S4随机丢弃一些数据,本质是对数据进行采样,而不能通过简单的堆硬件解决问题

小结

S4内部将业务逻辑层、网络协议、数据路由、负载均衡拆分开,做成了可插拔(Pluggable)的系统架构。整个流式处理框架里,S4采用了典型的Actor模式。DAG中每个点都是一个处理元素,每条边都是一条消息传递的路径,每个处理元素的会被托管在某个处理节点里。计算结果保持在内存,并定期发布到外部存储。

S4很粗糙,但是设计了一个完全对称、没中心节点的分布式架构。没有单点故障问题,但也放弃了动态扩容,在突发流量时选择服务降级。

业务层,也只考虑了“计算节点”层面的容错。只是重启挂掉的节点,但历史数据都已丢失。节点间的数据传输也没有全链路的传输保障。

Storm

  • Storm 架构什么样?对比S4有哪些优点?
  • 如何对消息容错处理?为什么Storm可追踪消息的整个生命周期,但又不过度影响集群性能

S4中使用了无中心的PE逻辑单元,整个系统没有master,是完全对称架构。Storm采用了DAG逻辑模型,但是在系统架构上采用了经典的 Master + Worker 的分布式架构,并将传送的消息和对应消息的处理逻辑做了分离。

基于 Topology 的逻辑模型

Storm的DAG叫Topology即拓扑图,包含元素如下:

  • Spouts 数据源。
    Storm没有将一切定义为PE,Spout负责去读取或收集数据,对应S4的 Keyless PE
  • Tuple 元组
    就是Topology中传输的最小粒度的数据单元,是一个带命名的值的列表,可看作一个个KV对。但Key只在定义Tuple时出现,数据传输时只要传输对应的值。类似Thrift中字段名称定义在外部,传输时只要序号、类型、值。对应S4中事件(Event)
  • Streams 数据流
    一个流包含了无限多个 Tuple 序列,会被系统分布式地并行处理
  • Bolts 进行计算逻辑处理的地方
    处理任意多输入流,产生任意多输出流。也负责将结果写入到外部DB。看起来像S4中PE,但Bolts与PE完全不同

Storm 的 Bolt 只是逻辑处理单元,而不是每个Key就有一个Bolt对象,对于数据分发则通过Tuple的Grouping进行的,所以系统并行度可以设置,而不是与PE对象数量相关。

S4的PE不仅仅是功能逻辑单元,也是一个KV对的数据,同样类型事件下所有相同Key的数据会聚合到同一个PE下。这使得系统有大量PE对象,也导致S4的整个系统有几个显著设计问题:

  1. 内存占用和GC开销
    开发者只能控制单个PE对象,而PE的数量是又S4框架控制的。在内存不足时可通过应用层更灵活的操作,如更频繁地把数据输出到外部KV数据库,释放掉内存,但S4框架下做不到
  2. 业务逻辑代码里混入控制分布式数据分发的逻辑
    S4中将输出地Key变成(SortID,N)这种组合,也就是靠PE里逻辑代码设置拓扑图地并行度看,这使得分布式地分发逻辑和数据处理逻辑混合。如果数据量增加想增加并行度,不能仅仅修改参数,还要修改代码重新部署。而且历史上处理了(SortID,N)组合的PE在重新部署后可能要处理完全不同的数据,因为N已经变了

Storm中Bolt更类似MapReduce中的Map或Reduce。在Topology中可设置Bolt的并行度、数据流如何分组。但每个Bolt输出的Tuple本身无需通过生成一个类似(SortID,N)的特殊Key来定义下一层Bolt的并行度。Storm中数据流的分组:

  • 随机分组(Shuffle Grouping)
    每个Bolt随机发送,下层每个Bolt接收的数量都接近
  • 字段分组(Fields Grouping)
    选定Tuple中某个字段进行分组。如对于词频TopK排序,有Tuple (word,count),可指定对word字段分组,相应单词的Tuple分发到相同的Bolt去
  • 全部分组(All Grouping)
    类似数据广播,每个Bolt输出的Tuple要想下游每个Bolt都发一份
  • 全局分组(Global Grouping)
    所有上游Bolt都发送到某个唯一的Bolt,其拥有全局信息
  • 无分组(None Grouping)
    不关心分组,即随机分组
  • 指向分组(Direct Grouping)
    上游Bolt指定下游Bolt接收对应Tuple
  • 本地或随机分组(Local or Shuffle Grouping)
    当下游Bolt有“任务”(Tasks)和上游Bolt在同一个worker进程,那么Tuple分发到这个进程的任务里,若没有则使用随机分组的方式发送Tuple。主要为了性能考虑。

S4中WordCountPE的输出只会给一个SortPE
Storm里 WordCountBolt 的输出会发送给多个不同的 SortCountBolt,因为同一个Bolt会包含多个不同单词

Master + Worker 系统架构

Storm 架构和 MapReduce 相似。Storm 集群中由 Nimbus + Supervisor + Worker 三类进程组成,容错能力也增加

  1. Nimbus 进程
    Storm的Master,类似Hadoop的 JobTracker 或 MapReduce 的 Scheduler+Master,负责资源分配和任务调度。
    开发者提交一个Topology给它。这个DAG实际上是一个编译好的程序和配置
  2. Supervisor 进程
    类似Hadoop的TaskTracker,也就是MapReduce的Worker。每个服务器上都有一个。不负责执行任务,但负责接收Nimbus分配的任务,然后管理本地的Worker进程,让Worker进程执行
  3. Worker 进程
    一台服务器有多个Worker进程,通过JVM的Executor维护一个线程池。实际线程池里有多个 Spout/Bolt 任务。线程池中Spout和Bolt会复用同样的线程

Nimbus 和 Supervisor 不直接通信,防止单点故障,而是使用Zookeeper进行任务分配持久化,由Paxos协议保障容错能力。Supervisor 从 Zookeeper 里读取对应的任务分配。

Nimbus 和 Supervisor 职责非常简单,Nimbus 只要进行 Topology 的解析和任务调度,而 Supervisor 只要接收任务并监控 Worker 进程是否存活。它们本身不处理数据也不在内存保持数据,挂掉后只要重启进程即可。与Megastore中协同服务器类似,只要让特定节点足够简单,即使称为单点,它们的稳定性也为处理复杂逻辑的进程要高。

各类分布式系统设计思路都是类似的,尤其是 Master + Worker 组合,Master负责调度,Worker负责执行。为了高可用引入分布式锁,确保分布式分配的数据不依赖Master。为了让整个系统更稳定,拆分调度任务的进程和直接执行任务的进程,让每个进程都只有单一的职责。

Storm 容错机制

S4没详细说明如何通信,在Storm中通过ZeroMQ完成Worker间通信。MQ的优势是异步、高性能。但是上游无法知道下游是否正确接收并处理了这条消息。分布式系统中错误是难免的。且流式计算中不可能只有一层链路。

论文中例子:统计Tweet里单词数量,先从一个TweetSpout中读取数据流里的Tweet,随机发送给一个ParseTweetBolt,这个Bolt解析Tweet成一个个单词,再发送给下游WordCountBolt。不同单词发给不同WordCountBolt,任何一个 WordCountBolt 没被成功处理就意味着面临“错误”。

Storm解决方案:从Spout发起的第一个Tuple作为树的根,下游所有衍生出来发送的Tuple都是树的一部分,任何Tuple处理失败或超时,就从Spout重发消息。

为此,Storm要引入AckerBolt,Spout发送出的消息要同时告知AckerBolt。Bolt完成根Tuple相关消息也通知Acker两个消息:告知已处理完Tuple,向下游发送了哪些衍生的Tuple。等最后一个Bolt告知后面没有新的Tuple时,AckerBolt就有了从Spout开始的整棵树的信息。

AckerBolt的开销岂不是非常大?需要将所有的还在处理中的Tuple存储下来。

Storm采用了位运算中的异或(XOR),Storm给每个发送的Tupe分配一个64位 message id。消息从Spout发送时AckerBolt接收到一个message-id,开始追踪此Tuple树。Acker会维护 message-id 到校验码(checksum)的映射关系。初始状态,校验码是用0和message-id异或一下。下游每个新发送的Tuple都要带上根Tuple的message-id,新Tuple发送后Bolt通知AckerBolt,内容就是根message-id到校验码的映射。

这个校验码就是当前对外发送的所有消息的message-id和已处理完的消息的message-id做异或,AckerBolt收到这个消息会把收到的校验码和本地校验码做异或,生成新的。我们发送和接受一条消息各一次,最后结果一定是0。只要Tuple没被acking,校验码就不是0;所有Tuple都被acking后校验码一定是0。

所有Bolt通知Acker最新执行内容只要发送 16byte 的 message-id 和校验码。即不必发送Tuple原始内容,也不必为下游每个Tuple单独发送消息。但是,这个机制只能保障Spout发送的Tuple至少处理一次(At Least Once),避免不了 Tuple 被重复处理。AckerBolt不适用所有场景,需要按需启用。关闭acking机制就实现了 At Most Once。Storm没有实现 Exactly Once。

小结

Storm 使用 Nimbus 这个主节点进行任务调度,通过 Zookeeper 存储所有元数据,通过把工作节点拆分为 Supervisor 和 Worker 提升系统稳定性。

相比S4仅仅在高压下实现降级,Storm 利用异或操作实现了追踪Tuple的整个生命周期,实现“At Least Once”保障。

标签:发送,Tuple,S4,解读,Bolt,Storm,PE
From: https://www.cnblogs.com/zhh567/p/17300028.html

相关文章

  • 大数据经典论文解读 - Spanner
    SpannerMegastore存在各种缺点:跨实体组事务需要昂贵的两阶段事务,所有跨数据中心的数据写入都通过Paxos算法,使得单个实体组只能支持每秒几次的事务。Spanner是一个全新设计的新系统,而不是Megastore或Bigtable上的修修补补。两个主题:解决了Megastore中哪些不足数据库事务,特别......
  • etcd/raft选举源码解读
    ETCD-raft笔记0.引言该篇博客基于etcdv3.5.7版本,首先会简单介绍etcd/raft对Raft选举部分的算法优化,然后通过源码分析etcd/raft的选举实现。1.etcd对于raft选举算法优化措施该优化措施均在raft博士论文中有讲解etcd/raft实现的与选举有关的优化有Pre-Vote、CheckQuorum、......
  • 【论文解读】不和谐区域定位
    【论文解读】不和谐区域定位一、简要介绍 不和谐区域定位的目的是在与周围背景不兼容的合成图像中定位该区域。不和谐问题主要是由于图像编辑技术产生的颜色和照明不一致。在本研究中,作者倾向于将输入的图像转换到另一个颜色空间,以放大不和谐区域与背景之间的域差异,从而使模......
  • 基于RS485智能电表及PLC的智慧工厂电能信息化管理系统
    在工业4.0时代,智能工厂是自动化、智能化的重点标志,而工业大数据则是未来工业市场进展中发挥优势的关键,其中数据采集就是生产制造中最硬性的需求。结合我国节能降耗以及双碳降低等政策,物通博联推出智慧工厂电能信息化管理系统,实现用电数据采集、传输以及智能化管理。制造工厂中遍布......
  • 华普物联4G DTU HP-RS4G-T200
    HP-RS4G-T200是华普物联网推出的高可靠性、高速率、低延迟的4GDTU。支持三大运营商4G网络接入,产品采用工业级设计标准,内置独立硬件看门狗,采用RS232和RS485两种标准端子接口,RS485完全独立电器隔离,支持多路SOCKET,同时可连接四个服务器,支持自定义心跳包/SN心跳包/1CCID心跳包/1......
  • 大数据经典论文解读 - Metastore
    MetastoreMegastore:Providingscalable,highlyavailablestorageforinteractiveservices在Bigtable上支持SQL,实现分布式数据库:跨数据中心的多副本同步数据复制支持为多数据表的字段建立Schema,且通过SQL接口访问支持数据库的二级索引支持数据库的事务Megastore是......
  • DS4700/DS4800 存储巡检
    DS4800——(M02,连B控1口)192.168.128.100ping192.168.128.102DS4700——(O07,M04)port1B控192.168.128.102/241.安装点击0035.exe,简体中文(ok),next,next,accept(next),选择路径(next),Typical(next),Automaticallystartmonitor(next),install,Done2.连接3.查看......
  • PhpStorm、PyCharm、WebStorm恢复代码(附:git撤销commit、add操作)
    由于同时管理多个项目,多种开发语言同步开发,开了好多个Git窗口。今天在提交python某项目的时候不小心在vue的项目中执行了gitadd、gitcommit操作,在push的时候悬崖勒马,于是故事开始了:我先回滚了commit,接着想把add也回滚一下,结果直接回滚到了上次提交的那个节点上,哦豁,新写的代码........
  • 解读锂电池结构技术
    喜欢电动车的朋友们在看各种汽车测评的时候,会遇到很多专业名词,比如我们要说的,MTP、CTP、CTB、CTC这类的,名词,刚接触的朋友们会有点蒙圈,肯定都会想这都是啥意思呢,有啥关联呢,通过这期倒霉哥的讲解,我相信你们应该就不会蒙圈了!我们先说一下新能源动力电池结构技术发展过程,是从MTP(有模......
  • 大数据经典论文解读 - Spark
    SparkResilientDistributedDatasets:AFault-TolerantAbstractionforIn-MemoryClusterComputing可看作一个支持多轮迭代的MapReduce模型,但是引入了RDD这样的函数式对象的数据集概念。Spark的多轮迭代中无需反复读写磁盘,而是直接在内存中操作。RDD是什么,怎么优化......