- sparkstreaming创建有两种方式
1. 借助SparkConf对象创建
val conf = new SparkConf().setAppName("streamingContext").setMaster("local[4]")
/**
* streamingcontext第一种创建方式 需要传递两个值
* 1、sparkConf
* 2、是一个Durations时间 代表的是每隔多长时间形成一个批次的数据
* Seconds(Long)
* Milliseconds(Long)
* Minutes(Long)
* 第一种创建方式底层也会给我们创建一个SparkContext对象
*/
val ssc: StreamingContext = new StreamingContext(conf, Seconds(2))
2. 借助已有的SparkContext对象来创建
/*
streamingcontext创建方法二:
*/
val sc = new SparkContext(conf)
val ssc1: StreamingContext = new StreamingContext(sc, Seconds(2))
/*
具体使用
*/
val ds: DStream[String] = ssc.socketTextStream("node1", 44444, StorageLevel.MEMORY_ONLY)
ds.print()
ssc.start()
ssc.awaitTermination()
- 作用
1. Spark Streaming严格意义来说不算纯实时计算框架,因为Spark Streaming处理数据的时候每隔一段时间将一段数据封装为一个批次进行计算的
2. Spark Streaming读取不断数据源的数据都需要使用到StreamingContext
- sparkstreaming的运行流程
- 创建输入DStream来创建输入不同的数据源。
- 对DStream定义transformation和output等各种算子操作,来定义我们需要的各种实时计算逻辑。
- 调用StreamingContext的start()方法,进行启动我们的实时处理数据。
- 调用StreamingContext的awaitTermination()方法,来等待应用程序的终止。可以使用CTRL+C手动停止,或者就是让它持续不断的运行进行计算。
- 也可以通过调用StreamingContext的stop()方法,来停止应用程序。
- 注意
【注意】:
(1)基础数据源:HDFS、TCP网络端口
高级数据源:Kafka、Flume。。。。
(2)local 本地使用一个CPU内核运行的(一个CPU内核运行一个线程)---本地启动一个线程运行
创建StreamingContext时候必须指定运行模式的CPU核数(启动的线程数)大于等于2,Spark Streaming在做实时计算的时候,最小需要两个线程,一次线程用来接受数据源的数据,另外一个线程处理封装好的一个批次数据
(3)只要我们一个StreamingContext启动之后,我们就不能再往这个Application其中添加任何计算逻辑了。比如执行start()方法之后,还给某个DStream执行一个算子。
(4)一个StreamingContext停止之后,是肯定不能够重启的。调用stop()之后,不能再调用start()
(5)必须保证一个JVM同时只能有一个StreamingContext启动。在你的应用程序中,不能创建两个StreamingContext。
(6)调用stop()方法时,会同时停止内部的SparkContext,如果不希望如此,还希望后面继续使用SparkContext创建其他类型的Context,比如SQLContext,那么就用stop(false)。
(7)一个SparkContext可以创建多个StreamingContext,只要上一个先用stop(false)停止,再创建下一个即可。
标签:SparkContext,创建,流程,stop,sparkstreaming,数据源,线程,注意事项,StreamingContext
From: https://www.cnblogs.com/jsqup/p/16643474.html