首页 > 其他分享 >学习记录21

学习记录21

时间:2024-02-04 20:13:11浏览次数:20  
标签:21 记录 scala 学习 streaming atguigu spark hadoop102 local

  本次学习学习了Spark的Streaming的一些外来输入源进行操作的形式

文件流

创建一个文件

[atguigu@hadoop102 ~]$ cd /usr/local/spark/mycode/
[atguigu@hadoop102 mycode]$ mkdir streaming
[atguigu@hadoop102 mycode]$ cd streaming
[atguigu@hadoop102 streaming]$ mkdir logfile
[atguigu@hadoop102 streaming]$ cd logfile

spark-shell创建文件流。请另外打开一个终端窗口,启动进入spark-shell

scala> import org.apache.spark.streaming._
import org.apache.spark.streaming._

scala> val ssc = new StreamingContext(sc, Seconds(20))
ssc: org.apache.spark.streaming.StreamingContext = org.apache.spark.streaming.StreamingContext@162bc9e8

scala> val lines = ssc.textFileStream("file:///usr/local/spark/mycode/streaming/logfile")
lines: org.apache.spark.streaming.dstream.DStream[String] = org.apache.spark.streaming.dstream.MappedDStream@78ad7d17

scala> val lines = ssc.textFileStream("file:///usr/local/spark/mycode/streaming/logfile")
lines: org.apache.spark.streaming.dstream.DStream[String] = org.apache.spark.streaming.dstream.MappedDStream@48187168

scala> val words = lines.flatMap(_.split(" "))
words: org.apache.spark.streaming.dstream.DStream[String] = org.apache.spark.streaming.dstream.FlatMappedDStream@1f32fb77

scala> val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts: org.apache.spark.streaming.dstream.DStream[(String, Int)] = org.apache.spark.streaming.dstream.ShuffledDStream@17dee9f9

scala> wordCounts.print()

scala> ssc.start()

scala> ssc.awaitTermination()
-------------------------------------------
Time: 1706013700000 ms
-------------------------------------------

-------------------------------------------
Time: 1706013720000 ms
-------------------------------------------

在目录结构下进行添加文件的操作,这里就可以进行显示

添加一个a.txt

[atguigu@hadoop102 logfile]$ vim a.txt
[atguigu@hadoop102 logfile]$ cat a.txt
Hadoop
Spark
Flink

会变为:每二十秒进行一次计算

-------------------------------------------
Time: 1706013700000 ms
-------------------------------------------

-------------------------------------------
Time: 1706013720000 ms
-------------------------------------------

-------------------------------------------
Time: 1706013740000 ms
-------------------------------------------
(Flink,1)
(Spark,1)
(Hadoop,1)

-------------------------------------------
Time: 1706013760000 ms
-------------------------------------------

-------------------------------------------
Time: 1706013780000 ms
-------------------------------------------

-------------------------------------------
Time: 1706013800000 ms
-------------------------------------------

-------------------------------------------
Time: 1706013820000 ms
-------------------------------------------
独立文件进行编写:
cd /usr/local/spark/mycode
mkdir streaming
cd streaming
mkdir -p src/main/scala
cd src/main/scala
vim TestStreaming.scala

用vim编辑器新建一个TestStreaming.scala代码文件,请在里面输入以下代码

import org.apache.spark._ 
import org.apache.spark.streaming._
object WordCountStreaming {  
  def main(args: Array[String]) {  
    val sparkConf = new SparkConf().setAppName("WordCountStreaming").setMaster("local[2]")//设置为本地运行模式,2个线程,一个监听,另一个处理数据    
    val ssc = new StreamingContext(sparkConf, Seconds(2))// 时间间隔为2秒    
    val lines = ssc.textFileStream("file:///usr/local/spark/mycode/streaming/logfile")  //这里采用本地文件,当然你也可以采用HDFS文件
    val words = lines.flatMap(_.split(" "))  
    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)  
    wordCounts.print()  
    ssc.start()  
    ssc.awaitTermination()  
  }  
} 

创建simple.sbt

cd /usr/local/spark/mycode/streaming
vim simple.sbt
name := "Simple Project"
version := "1.0"                    
scalaVersion := "2.12.15"
libraryDependencies += "org.apache.spark" %% "spark-core" % "3.2.0"
libraryDependencies += "org.apache.spark" % "spark-streaming_2.12" % "3.2.0" % "provided"

执行sbt打包编译的命令如下:

cd /usr/local/spark/mycode/streaming
/usr/local/sbt/sbt package

运行:

/usr/local/spark/bin/spark-submit --class "WordCountStreaming" /usr/local/spark/mycode/streaming/target/scala-2.12/simple-project_2.12-1.0.jar

修改file:///usr/local/spark/mycode/streaming/logfile的文件,控制台就可显示

[atguigu@hadoop102 logfile]$ cat log2.txt 
HADOOP
SPARK
HBASE
HIVE

显示结果:

....
-------------------------------------------
Time: 1706014916000 ms
-------------------------------------------
(HIVE,1)
(SPARK,1)
(HADOOP,1)
(HBASE,1)
.....

套接字流

编写程序

[atguigu@hadoop102 spark]$ cd /usr/local/spark/mycode
[atguigu@hadoop102 mycode]$ cd streaming/
[atguigu@hadoop102 streaming]$ mkdir  socket
[atguigu@hadoop102 streaming]$ cd socket/
[atguigu@hadoop102 socket]$ mkdir  -p  src/main/scala
[atguigu@hadoop102 socket]$ cd  /usr/local/spark/mycode/streaming/socket/src/main/scala
[atguigu@hadoop102 scala]$ vim  NetworkWordCount.scala
import org.apache.spark._
import org.apache.spark.streaming._
object NetworkWordCount {
  def main(args: Array[String]) {
    if (args.length < 2) {
      System.err.println("Usage: NetworkWordCount <hostname> <port>")
      System.exit(1)
    }
    val sparkConf = new SparkConf().setAppName("NetworkWordCount").setMaster("local[2]")
    val sc = new SparkContext(sparkConf)
    sc.setLogLevel("ERROR")
    val ssc = new StreamingContext(sc, Seconds(1))
    val lines = ssc.socketTextStream(args(0), args(1).toInt)
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
    wordCounts.print()
    ssc.start()
    ssc.awaitTermination()
  }
}

在src同级目录下创建simple.sbt

[atguigu@hadoop102 socket]$ vim simple.sbt
name := "Simple Project"
version := "1.0"                    
scalaVersion := "2.12.15"
libraryDependencies += "org.apache.spark" %% "spark-core" % "3.2.0"
libraryDependencies += "org.apache.spark" % "spark-streaming_2.12" % "3.2.0" % "provided

进行打包

[atguigu@hadoop102 streaming]$ cd socket/
[atguigu@hadoop102 socket]$ /usr/local/sbt/sbt package
[atguigu@hadoop102 socket]$ /usr/local/spark/bin/spark-submit --class "NetworkWordCount" /usr/local/spark/mycode/streaming/socket/target/scala-2.12/simple-project_2.12-1.0.jar localhost 9999

开启进程

[atguigu@hadoop102 ~]$ nc -lk 9999
hadoop
hive
habse

实时监测:

 

使用Socket编程实现自定义数据源

下面我们再前进一步,把数据源头的产生方式修改一下,不要使用nc程序,而是采用自己编写的程序产生Socket数据源

[atguigu@hadoop102 ~]$ cd  /usr/local/spark/mycode/streaming/socket/src/main/scala
[atguigu@hadoop102 scala]$ vim  DataSourceSocket.scala
import  java.io.{PrintWriter}
import  java.net.ServerSocket 
import  scala.io.Source
object DataSourceSocket {
  def index(length: Int) = { //返回位于0到length-1之间的一个随机数
    val rdm = new java.util.Random
    rdm.nextInt(length)
  }
  def main(args: Array[String]) {
    if (args.length != 3) {
      System.err.println("Usage: <filename> <port> <millisecond>")
      System.exit(1)
    }
    val fileName = args(0)  //获取文件路径
    val lines = Source.fromFile(fileName).getLines.toList  //读取文件中的所有行的内容
    val rowCount = lines.length  //计算出文件的行数
    val listener = new ServerSocket(args(1).toInt)  //创建监听特定端口的ServerSocket对象
    while (true) {
      val socket = listener.accept()
      new Thread() {
        override def run = {
          println("Got client connected from: " + socket.getInetAddress)
          val out = new PrintWriter(socket.getOutputStream(), true)
          while (true) {
            Thread.sleep(args(2).toLong)  //每隔多长时间发送一次数据
            val content = lines(index(rowCount))  //从lines列表中取出一个元素
            println(content)
            out.write(content + '\n')  //写入要发送给客户端的数据
            out.flush()  //发送数据给客户端
          }
          socket.close()
        }
      }.start()
    }
  }
}

执行sbt打包编译:

[atguigu@hadoop102 scala]$ cd  /usr/local/spark/mycode/streaming/socket
[atguigu@hadoop102 socket]$ /usr/local/sbt/sbt  package

DataSourceSocket程序需要把一个文本文件作为输入参数,所以,在启动这个程序之前,需要首先创建一个文本文件word.txt并随便输入几行内容:/usr/local/spark/mycode/streaming/socket/word.txt

[atguigu@hadoop102 socket]$ vim word.txt
[atguigu@hadoop102 socket]$ cat word.txt 
HADOOP
HBASE
HIVE
FLINK

启动DataSourceSocket程序:

/usr/local/spark/bin/spark-submit --class "DataSourceSocket" ./target/scala-2.12/simple-project_2.12-1.0.jar ./word.txt 9999 1000

下面就可以启动客户端,即NetworkWordCount程序。新建一个终端(这里称为“流计算终端”),输入以下命令启动NetworkWordCount程序:

[atguigu@hadoop102 socket]$ /usr/local/spark/bin/spark-submit --class "NetworkWordCount" /usr/local/spark/mycode/streaming/socket/target/scala-2.12/simple-project_2.12-1.0.jar localhost 9999

标签:21,记录,scala,学习,streaming,atguigu,spark,hadoop102,local
From: https://www.cnblogs.com/JIANGzihao0222/p/18006909

相关文章

  • 学习记录22
    本次学习了sparkStreaming里进行读取高级数据源的一些操作进行安装kafka网站:https://dblab.xmu.edu.cn/blog/1096/接下来在Ubuntu系统环境下测试简单的实例,按顺序执行如下命令:#进入kafka所在的目录cd/usr/local/kafkabin/zookeeper-server-start.shconfig/zookee......
  • 【学习笔记】OI 数学学习笔记
    OI数学学习笔记001_整除001.1_整除基础001.1A基本定义整除与因数倍数的定义:设\(a,b\in\mathbb{Z},b\ne0\),若存在\(q\in\mathbbZ\),使得\(a=bq\),则称\(b\)整除\(a\),记为\(b\mida\),此时称\(a\)为\(b\)的因数,\(b\)为\(a\)的倍数.带余除法与余数的......
  • 云打印会记录打印内容吗?
    随着云打印服务的迅猛发展,现在很多有打印需求的朋友都会去选择易绘创云打印服务。一些比较注重文件安全的用户比较警觉,想要了解云打印的安全性。那么云打印会记录打印内容吗?今天就一起来了解一下。 云打印会记录打印内容吗?相信很多有过打印经历的朋友都知道,很多时候一些打印......
  • 学习MarkDown
    MarkDown学习标题三级标题字体HelloWord!(加粗:两边两个*)HelloWord!(斜体:两边一个*)HelloWord!(斜体且加粗:三个*)HelloWord!(字体划线:两边两个~)引用大于号后面写内容用来摘抄引用分割线用三个-或者三个*图片用感叹号加中括号加括号(名字+地址)![图片](C:\Users\2......
  • Pandas库学习笔记(6) -- Pandas 基本方法
    Pandas基本方法实例到目前为止,我们了解了三个PandasDataStructures以及如何创建它们。由于它在实时数据处理中的重要性,因此我们将主要关注DataFrame对象,并讨论其他一些DataStructures。方法描述axes返回行轴标签的列表dtype返回对象的dtype。empty如果Series......
  • 第一章学习Markdown语法详解
    Markdown学习一、标题:一级标题一个井号空格加标题名字就可以了二级标题两个井号空格加标题名字就可以了三级标题三个井号空格加标题名字就可以了四级标题四个井号空格加标题名字就可以了五级六级标题把对应的#写够即可。注意最多只支持到六级标题二、字体Hello,World!......
  • 软件测试学习笔记丨Seleium的BUG:页面元素文案重复空格处理
    前言需求做WEB的UI自动化练习,其需求为:访问慕课网的实战页面,获取实战页面的课程列表信息,并逐个点击进入详情并且关闭详情,直到最后一个。环境Java8MavenSelenium4.0Junit5初步代码importorg.junit.jupiter.api.AfterAll;importorg.junit.jupiter.api.BeforeAll;importorg......
  • Tcpdump和Wireshark的学习与使用
    Tcpdump和Wireshark的学习与使用背景2024年2月份农历小年时。同事为了解决一个应用忽快忽慢的问题去了上海客户那里。第二天自己在理发时(周末)接到了他的电话,说到了一些问题情况。比较明确的是,应用和数据库的请求经常出现20ms左右的高延迟的情况。其实子很早之前学习过tc......
  • I2S中断问题记录
    之前写过一篇I2S+DMA的使用(I2S通信工程建立-kkk123456**-博客园(cnblogs.com))这次又用到了I2S+DMA,但是和上次不同,这次DMA使用Normal模式,我想等I2S接收完成之后再使用DMA把数据从外设搬到内存中配置修改如下:生成工程后,定义了一个标志位iis_flag,在I2S接收回调函数里将iis......
  • 【DM】-6521: 当前触发器不支持DDL语句
    问题:当代码块中有DDL(create,delete,alter)等操作时,报错“当前触发器不支持DDL语句”。这个问题是因为DDL_TV_TRIGGER参数值为0导致解决:需要在数据库目录下面的ini文件中增加DDL_TV_TRIGGER该参数解决;默认情况下,该参数值为0是关闭的;首先查询配置文件中参数名称包含DDL;(其实我在第一......