本次学习学习了Spark的Streaming的一些外来输入源进行操作的形式
文件流
创建一个文件
[atguigu@hadoop102 ~]$ cd /usr/local/spark/mycode/ [atguigu@hadoop102 mycode]$ mkdir streaming [atguigu@hadoop102 mycode]$ cd streaming [atguigu@hadoop102 streaming]$ mkdir logfile [atguigu@hadoop102 streaming]$ cd logfile
spark-shell创建文件流。请另外打开一个终端窗口,启动进入spark-shell
scala> import org.apache.spark.streaming._ import org.apache.spark.streaming._ scala> val ssc = new StreamingContext(sc, Seconds(20)) ssc: org.apache.spark.streaming.StreamingContext = org.apache.spark.streaming.StreamingContext@162bc9e8 scala> val lines = ssc.textFileStream("file:///usr/local/spark/mycode/streaming/logfile") lines: org.apache.spark.streaming.dstream.DStream[String] = org.apache.spark.streaming.dstream.MappedDStream@78ad7d17 scala> val lines = ssc.textFileStream("file:///usr/local/spark/mycode/streaming/logfile") lines: org.apache.spark.streaming.dstream.DStream[String] = org.apache.spark.streaming.dstream.MappedDStream@48187168 scala> val words = lines.flatMap(_.split(" ")) words: org.apache.spark.streaming.dstream.DStream[String] = org.apache.spark.streaming.dstream.FlatMappedDStream@1f32fb77 scala> val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) wordCounts: org.apache.spark.streaming.dstream.DStream[(String, Int)] = org.apache.spark.streaming.dstream.ShuffledDStream@17dee9f9 scala> wordCounts.print() scala> ssc.start() scala> ssc.awaitTermination() ------------------------------------------- Time: 1706013700000 ms ------------------------------------------- ------------------------------------------- Time: 1706013720000 ms -------------------------------------------
在目录结构下进行添加文件的操作,这里就可以进行显示
添加一个a.txt
[atguigu@hadoop102 logfile]$ vim a.txt [atguigu@hadoop102 logfile]$ cat a.txt Hadoop Spark Flink
会变为:每二十秒进行一次计算
------------------------------------------- Time: 1706013700000 ms ------------------------------------------- ------------------------------------------- Time: 1706013720000 ms ------------------------------------------- ------------------------------------------- Time: 1706013740000 ms ------------------------------------------- (Flink,1) (Spark,1) (Hadoop,1) ------------------------------------------- Time: 1706013760000 ms ------------------------------------------- ------------------------------------------- Time: 1706013780000 ms ------------------------------------------- ------------------------------------------- Time: 1706013800000 ms ------------------------------------------- ------------------------------------------- Time: 1706013820000 ms -------------------------------------------
独立文件进行编写:
cd /usr/local/spark/mycode mkdir streaming cd streaming mkdir -p src/main/scala cd src/main/scala vim TestStreaming.scala
用vim编辑器新建一个TestStreaming.scala代码文件,请在里面输入以下代码
import org.apache.spark._ import org.apache.spark.streaming._ object WordCountStreaming { def main(args: Array[String]) { val sparkConf = new SparkConf().setAppName("WordCountStreaming").setMaster("local[2]")//设置为本地运行模式,2个线程,一个监听,另一个处理数据 val ssc = new StreamingContext(sparkConf, Seconds(2))// 时间间隔为2秒 val lines = ssc.textFileStream("file:///usr/local/spark/mycode/streaming/logfile") //这里采用本地文件,当然你也可以采用HDFS文件 val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) wordCounts.print() ssc.start() ssc.awaitTermination() } }
创建simple.sbt
cd /usr/local/spark/mycode/streaming vim simple.sbt
name := "Simple Project" version := "1.0" scalaVersion := "2.12.15" libraryDependencies += "org.apache.spark" %% "spark-core" % "3.2.0" libraryDependencies += "org.apache.spark" % "spark-streaming_2.12" % "3.2.0" % "provided"
执行sbt打包编译的命令如下:
cd /usr/local/spark/mycode/streaming /usr/local/sbt/sbt package
运行:
/usr/local/spark/bin/spark-submit --class "WordCountStreaming" /usr/local/spark/mycode/streaming/target/scala-2.12/simple-project_2.12-1.0.jar
修改file:///usr/local/spark/mycode/streaming/logfile的文件,控制台就可显示
[atguigu@hadoop102 logfile]$ cat log2.txt HADOOP SPARK HBASE HIVE
显示结果:
.... ------------------------------------------- Time: 1706014916000 ms ------------------------------------------- (HIVE,1) (SPARK,1) (HADOOP,1) (HBASE,1) .....
套接字流
编写程序
[atguigu@hadoop102 spark]$ cd /usr/local/spark/mycode [atguigu@hadoop102 mycode]$ cd streaming/ [atguigu@hadoop102 streaming]$ mkdir socket [atguigu@hadoop102 streaming]$ cd socket/ [atguigu@hadoop102 socket]$ mkdir -p src/main/scala [atguigu@hadoop102 socket]$ cd /usr/local/spark/mycode/streaming/socket/src/main/scala [atguigu@hadoop102 scala]$ vim NetworkWordCount.scala
import org.apache.spark._ import org.apache.spark.streaming._ object NetworkWordCount { def main(args: Array[String]) { if (args.length < 2) { System.err.println("Usage: NetworkWordCount <hostname> <port>") System.exit(1) } val sparkConf = new SparkConf().setAppName("NetworkWordCount").setMaster("local[2]") val sc = new SparkContext(sparkConf) sc.setLogLevel("ERROR") val ssc = new StreamingContext(sc, Seconds(1)) val lines = ssc.socketTextStream(args(0), args(1).toInt) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) wordCounts.print() ssc.start() ssc.awaitTermination() } }
在src同级目录下创建simple.sbt
[atguigu@hadoop102 socket]$ vim simple.sbt
name := "Simple Project" version := "1.0" scalaVersion := "2.12.15" libraryDependencies += "org.apache.spark" %% "spark-core" % "3.2.0" libraryDependencies += "org.apache.spark" % "spark-streaming_2.12" % "3.2.0" % "provided
进行打包
[atguigu@hadoop102 streaming]$ cd socket/ [atguigu@hadoop102 socket]$ /usr/local/sbt/sbt package [atguigu@hadoop102 socket]$ /usr/local/spark/bin/spark-submit --class "NetworkWordCount" /usr/local/spark/mycode/streaming/socket/target/scala-2.12/simple-project_2.12-1.0.jar localhost 9999
开启进程
[atguigu@hadoop102 ~]$ nc -lk 9999 hadoop hive habse
实时监测:
使用Socket编程实现自定义数据源
下面我们再前进一步,把数据源头的产生方式修改一下,不要使用nc程序,而是采用自己编写的程序产生Socket数据源
[atguigu@hadoop102 ~]$ cd /usr/local/spark/mycode/streaming/socket/src/main/scala [atguigu@hadoop102 scala]$ vim DataSourceSocket.scala
import java.io.{PrintWriter} import java.net.ServerSocket import scala.io.Source object DataSourceSocket { def index(length: Int) = { //返回位于0到length-1之间的一个随机数 val rdm = new java.util.Random rdm.nextInt(length) } def main(args: Array[String]) { if (args.length != 3) { System.err.println("Usage: <filename> <port> <millisecond>") System.exit(1) } val fileName = args(0) //获取文件路径 val lines = Source.fromFile(fileName).getLines.toList //读取文件中的所有行的内容 val rowCount = lines.length //计算出文件的行数 val listener = new ServerSocket(args(1).toInt) //创建监听特定端口的ServerSocket对象 while (true) { val socket = listener.accept() new Thread() { override def run = { println("Got client connected from: " + socket.getInetAddress) val out = new PrintWriter(socket.getOutputStream(), true) while (true) { Thread.sleep(args(2).toLong) //每隔多长时间发送一次数据 val content = lines(index(rowCount)) //从lines列表中取出一个元素 println(content) out.write(content + '\n') //写入要发送给客户端的数据 out.flush() //发送数据给客户端 } socket.close() } }.start() } } }
执行sbt打包编译:
[atguigu@hadoop102 scala]$ cd /usr/local/spark/mycode/streaming/socket [atguigu@hadoop102 socket]$ /usr/local/sbt/sbt package
DataSourceSocket程序需要把一个文本文件作为输入参数,所以,在启动这个程序之前,需要首先创建一个文本文件word.txt并随便输入几行内容:/usr/local/spark/mycode/streaming/socket/word.txt
[atguigu@hadoop102 socket]$ vim word.txt [atguigu@hadoop102 socket]$ cat word.txt HADOOP HBASE HIVE FLINK
启动DataSourceSocket程序:
/usr/local/spark/bin/spark-submit --class "DataSourceSocket" ./target/scala-2.12/simple-project_2.12-1.0.jar ./word.txt 9999 1000
下面就可以启动客户端,即NetworkWordCount程序。新建一个终端(这里称为“流计算终端”),输入以下命令启动NetworkWordCount程序:
[atguigu@hadoop102 socket]$ /usr/local/spark/bin/spark-submit --class "NetworkWordCount" /usr/local/spark/mycode/streaming/socket/target/scala-2.12/simple-project_2.12-1.0.jar localhost 9999标签:21,记录,scala,学习,streaming,atguigu,spark,hadoop102,local From: https://www.cnblogs.com/JIANGzihao0222/p/18006909