端口
// 地址,端口号,级别(将数据存储在所设置的级别中,这里设置级别为spark的内存)
val ds: DStream[String] = ssc.socketTextStream("node1", 44444, StorageLevel.MEMORY_ONLY)
读取HDFS中的数据
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object ByHDFS {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[3]").setAppName("hdfs")
val ssc: StreamingContext = new StreamingContext(conf, Seconds(10))
/*
定义从HDFS数据源读取流式数据 目录
看一定时间内目录下追加的新文件的个数
*/
val ds = ssc.textFileStream("hdfs://node1:9000/stream")
ds.print()
ssc.start()
ssc.awaitTermination()
}
}
标签:HDFS,val,数据源,端口,StreamingContext,spark,ssc
From: https://www.cnblogs.com/jsqup/p/16643480.html