概述
SparkStreaming 是用于流式数据的处理。数据输入后可以用高级抽象原语(就是 SparkCore 中的算子,这里只是为了区分),如 map、reduce、window 等进行计算。
SparkStreaming 使用离散化流(discretized stream)作为抽象表示(DStream)。DStream 是随时间推移而收到的数据的序列。在内部,每个时间区间收到的数据都作为 RDD 存在,是对 RDD 在实时数据处理场景的一种封装。
为了更好的协同数据接收速率和资源处理能力,SparkStreaming 引入了背压机制(Spark Streaming Backpressuere):根据 JobScheduler 反馈作业的执行信息来动态调整 Receiver 数据接收率。
WordCount 案例
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
val ssc = new StreamingContext(sparkConf, Seconds(3))
// 获取端口数据
val lines: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)
val words: DStream[String] = lines.flatMap(_.split(" "))
val wordToOne: DStream[(String, Int)] = words.map((_, 1))
val wordToCount: DStream[(String, Int)] = wordToOne.reduceByKey(_ + _)
wordToCount.print()
// 由于 SparkStreaming 采集器是长期执行的任务,所以不能直接关
// 如果 main 方法执行完毕,应用也会自动结束,所以不能让 main 方法关闭
// ssc.stop()
// 1. 启动采集器
ssc.start()
// 2. 等待采集器关闭
ssc.awaitTermination()
运行程序,然后在命令行中打开 9999 端口 nc -l 9999
,并输入数据。
Stream 创建
RDD 队列
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
val ssc = new StreamingContext(sparkConf, Seconds(3))
// 创建 RDD 队列
val rddQueue: mutable.Queue[RDD[Int]] = new mutable.Queue[RDD[Int]]()
// 创建 QueueInputDStream
val inputStream: InputDStream[Int] = ssc.queueStream(rddQueue, oneAtATime = false)
// 处理队列中的 RDD 数据
val mappedStream: DStream[(Int, Int)] = inputStream.map((_, 1))
val reducedStream = mappedStream.reduceByKey(_ + _)
reducedStream.print()
ssc.start()
for (i <- 1 to 5) {
rddQueue += ssc.sparkContext.makeRDD(1 to 300, 10)
Thread.sleep(2000)
}
ssc.awaitTermination()
自定义数据源
需要继承 Receiver,并实现 onStart
、onStop
方法来自定义数据源采集。
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
val ssc = new StreamingContext(sparkConf, Seconds(3))
val messageDS: ReceiverInputDStream[String] = ssc.receiverStream(new MyReceiver())
messageDS.print()
ssc.start()
ssc.awaitTermination()
}
/*
自定义数据采集器
1. 继承Receiver,定义泛型, 传递参数
2. 重写方法
*/
class MyReceiver extends Receiver[String](StorageLevel.MEMORY_ONLY) {
private var flag = true
override def onStart(): Unit = {
new Thread(new Runnable {
override def run(): Unit = {
while (flag) {
val message: String = "采集的数据为:" + new Random().nextInt(10).toString
// 存储数据,底层自动封装为指定的StorageLevel.MEMORY_ONLY
store(message)
Thread.sleep(500)
}
}
}).start()
}
override def onStop(): Unit = {
flag = false
}
}
Kafka 数据源
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.2.1</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.10.1</version>
</dependency>
// 定义 Kafka 参数
val kafkaPara: Map[String, Object] = Map[String, Object](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "kafka1:9001,kafka2:9002,kafka3:9003",
ConsumerConfig.GROUP_ID_CONFIG -> "groupId",
"key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
"value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer"
)
// 读取 Kafka 数据创建 DStream
val kafkaDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](ssc,
// 由哪个 Executor 负责采集数据,由框架自己选择
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](Set("topicName"), kafkaPara))
// 将kafka每条消息的 value 取出
val valueDStream: DStream[String] = kafkaDStream.map(record => record.value())
// 计算 WordCount
valueDStream.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).print()
DStream 转换
DStream 上的操作与 RDD 的类似,分为 Transformations(转换)和 Output Operation(输出)两种,此外转换操作还有一些比较特殊的原语,如 updateStateByKey()、transform() 和各种 window 原语。
与 SparkCore 转化操作不同的是 DStream 转换操作有状态的概念。看是否保存了某个周期的计算结果,如果保存了就是有状态,如果不保存就是无状态。
无状态转换
无状态转换操作就是把简单的 RDD 转换操作应用到每个批次上,也就是转换 DStream 中的每个 RDD。
// 使用有状态操作,需要设定检查点路径
ssc.checkpoint("checkpoint")
val datas = ssc.socketTextStream("localhost", 9999)
val wordToOne = datas.map((_, 1))
// 无状态数据操作,只对当前的采集周期内的数据进行处理
// 在某些场合下,需要保留数据统计结果(状态),实现数据的汇总
val wordToCount = wordToOne.reduceByKey(_ + _)
// updateStateByKey:根据 Key 对数据的状态进行更新
// 传递的参数中含有两个值
// 第一个值:相同 Key 的 Value 数据
// 第二个值:缓冲区相同 Key 的 Value 数据
val state = wordToOne.updateStateByKey(
(seq: Seq[Int], buff: Option[Int]) => {
val newCount = buff.getOrElse(0) + seq.sum
Option(newCount)
}
)
state.print()
// wordToCount.print()
Transform
Transform 允许 DStream 上执行任意的 RDD-to-RDD 函数。即使这些函数并没有在 DStream 的 API 中暴露出来,通过这个函数可以扩展 Spark API,这个函数每一批次调度一次。
// transform方法可以将底层RDD获取到后进行操作
// transform使用场景
// 1. DStream功能不完善
// 2. 需要代码周期性的执行
// Code : Driver端
val newDS: DStream[String] = lines.transform(
rdd => {
// Code : Driver端,(周期性执行)
rdd.map(
str => {
// Code : Executor端
str
}
)
}
)
// Code : Driver端
val newDS1: DStream[String] = lines.map(
data => {
// Code : Executor端
data
}
)
Join
两个流之间的 Join 需要两个流的批次大小一致,这样才能做到同时触发。计算过程就是对当前批次的两个流中各自的 RDD 进行 Join,与两个 RDD 的 Join 效果相同。
val data9999 = ssc.socketTextStream("localhost", 9999)
val data8888 = ssc.socketTextStream("localhost", 8888)
val map9999: DStream[(String, Int)] = data9999.map((_,9))
val map8888: DStream[(String, Int)] = data8888.map((_,8))
// 所谓的DStream的Join操作,其实就是两个RDD的join
val joinDS: DStream[(String, (Int, Int))] = map9999.join(map8888)
joinDS.print()
有状态转换
Window Operations
可以设置窗口的大小和滑动窗口的间隔来动态的获取当前 Streaming 的允许状态。
- 窗口时长:计算内容的时间范围
- 滑动步长:隔多久触发一次
这两个参数都必须是采集周期的整数倍。
val lines = ssc.socketTextStream("localhost", 9999)
val wordToOne = lines.map((_, 1))
// 窗口的范围应该是采集周期的整数倍
// 窗口是可以滑动的,但是默认情况下,以一个采集周期进行滑动
// 为了避免重复数据的计算,可以改变滑动的幅度(第二个参数)
val windowDS: DStream[(String, Int)] = wordToOne.window(Seconds(6), Seconds(6))
val wordToCount = windowDS.reduceByKey(_ + _)
wordToCount.print()
// 设置检查点
ssc.checkpoint("checkpoint")
val lines = ssc.socketTextStream("localhost", 9999)
val wordToOne = lines.map((_,1))
// reduceByKeyAndWindow : 当窗口范围比较大,但是滑动幅度比较小,那么可以采用增加数据和删除数据的方式
// 无需重复计算,提升性能。
val windowDS: DStream[(String, Int)] =
wordToOne.reduceByKeyAndWindow(
(x:Int, y:Int) => { x + y}, // 窗口中增加的数据
(x:Int, y:Int) => {x - y}, // 窗口中减少的数据
Seconds(9), Seconds(3))
windowDS.print()
标签:String,val,Int,SparkStreaming,RDD,DStream,ssc
From: https://www.cnblogs.com/fireonfire/p/16819874.html