一、概览
Structured Streaming是一个基于SparkSQL引擎构建的可扩展且容错的流处理引擎。可以像在静态数据上表达批量计算一样表达流计算。SparkSQL引擎将负责以增量方式连续运行它,并在流数据继续到达时更新最终结果。可以使用Scala、Java、Python或R中的Dataset/DataFrame API来进行流聚合、事件时间窗口、流到批处理连接等的开发。它基于SparkSQL引擎优化而执行。最后,系统通过 checkpoint 和预写日志确保端到端的精确一次容错保证。简而言之,Structured Streaming提供快速、可扩展、容错、端到端的精确一次流处理,而无需用户对流进行推理。
默认, Structured Streaming使用微批次处理作业引擎进行处理,该引擎将数据流作为一系列小批次作业进行处理,从而实现低至100毫秒的端到端延迟和精确一次的容错保证。自Spark 2.3以来,又引入了一种新的低延迟处理模式,称为连续处理,它可以实现低至1毫秒的端到端延迟,并提供至少保证一次的语义。
二、官方示例
我们还是以从socket接收文本数据进行word count统计来了解它的工作原理。代码如下:
object StructuredNetworkWordCount {
def main(args: Array[String]): Unit = {
if (args.length < 2) {
System.err.println("Usage: StructuredNetworkWordCount <hostname> <port>")
System.exit(1)
}
val host = args(0)
val port = args(1).toInt
val spark = SparkSession
.builder
.appName("StructuredNetworkWordCount")
.getOrCreate()
import spark.implicits._
// 创建表示从连接到主机端口的输入行流的DataFrame
val lines = spark.readStream
.format("socket")
.option("host", host)
.option("port", port)
.load()
// 将一行文本按空格分割
val words = lines.as[String].flatMap(_.split(" "))
// 分组统计
val wordCounts = words.groupBy("value").count()
// 启动运行并将结果输出到控制台
val query = wordCounts.writeStream
.outputMode("complete")
.format("console")
.start()
query.awaitTermination()
}
}
可以看到里面用到了spark sql 来进行流式处理,我们看下该官方示例在源码中的位置也就明白了
三、编程模型
Structured Streaming 的关键思想是将实时数据流视为不断追加的表。它是一种类似Spark Streaming(微批处理模型)的新流式处理模型。这使我们可以像查询表一样来处理流式数据。
1、基本概念
将输入数据流视为“输入表”。到达流中的每个数据项就像对输入表做追加操作。每个微小的间隔都会最加到输入表中。
在输入表中进行查询会得到结果表。随着输入表的增加,结果表也会增加或改变。最终将其写入外部系统或输出。
输出模式有以下三种:
- Complete模式:将结果表整体写入外部系统
- Append模式:只将处理完的当下微批次结果写入外部系统
- Update模式:只将处理完的当下微批次且有更新的数据结果写入外部系统
我们用第二部分的示例代码来说明下:
lines
DataFrame是输入表
wordCounts
DataFrame是结果表
对流式lines
DataFrame生成wordCounts
的查询与静态DataFrame完全相同。但是,当这个查询启动时,Spark将不断检查来自套接字连接的新数据。如果有新数据,Spark将运行一个“增量”查询,将以前的运行计数与新数据结合起来计算更新的计数,如下所示:
请注意,Structured Streaming 不会物化整个表。它从流数据源中读取最新的可用数据,增量处理以更新结果,然后丢弃源数据。它只保留更新结果所需的最小中间状态数据(例如前面示例中的中间计数)。
2、处理事件时间和延迟数据
从示例中可以发现Structured Streaming 并没有指出每个批次的间隔时间,
因为Structured Streaming 使用的是嵌入在数据本身中的时间。对于许多应用程序,您可能希望对这个事件时间进行操作。例如,如果想获取IoT设备每分钟生成的事件数量,那么可能希望使用数据生成的时间(即数据中的事件时间),而不是Spark接收它们的时间。这个事件时间在这个模型中表达得非常自然,因为把它作为了数据的一部分,也就是表中的一列。
这使得基于窗口的聚合(例如每分钟的事件数量)只是事件时间列上的一种特殊类型的分组和聚合——每个时间窗口是一个组,每行可以属于多个窗口/组。因此,这种event-time-window-based聚合查询可以在静态数据集和数据流上一致定义。
因此,该模型可以处理预期晚到达的数据。由于Spark正在更新结果表,它可以完全控制在有延迟数据时更新旧聚合,以及清理旧聚合以限制中间状态数据的大小。从Spark 2.1开始,我们支持水印,允许用户指定延迟数据的阈值,并允许引擎相应地清理旧状态。
四、运行官方示例
运行Netcat
nc -lk 9999
新建一个窗口运行官方示例
cd /opt/cloudera/parcels/CDH-6.3.1-1.cdh6.3.1.p0.1470567/lib/spark/
bin/run-example sql.streaming.StructuredNetworkWordCount cdh1 9999
他和sparksql一样默认的分区为200个,如果数据量很小,速度非常慢。需要根据数据量来设置自己的分区数。
大多数高校硕博生毕业要求需要参加学术会议,发表EI或者SCI检索的学术论文会议论文:
可访问艾思科蓝官网,浏览即将召开的学术会议列表。会议如下: