首页 > 其他分享 >学习记录23

学习记录23

时间:2024-02-04 20:25:11浏览次数:19  
标签:java val 记录 23 scala 学习 spark DStream net

本次学习了DStream无状态转换操作

DStream:RDD的集合

map(func) :对源DStream的每个元素,采用func函数进行转换,得到一个新的Dstream

flatMap(func): 与map相似,但是每个输入项可用被映射为0个或者多个输出项

filter(func): 返回一个新的DStream,仅包含源DStream中满足函数func的项

repartition(numPartitions): 通过创建更多或者更少的分区改变DStream的并行程度

reduce(func):利用函数func聚集源DStream中每个RDD的元素,返回一个包含单元素RDDs的新DStream

count():统计源DStream中每个RDD的元素数量

union(otherStream): 返回一个新的DStream,包含源DStream和其他DStream的元素

countByValue():应用于元素类型为K的DStream上,返回一个(K,V)键值对类型的新DStream,每个键的值是在原DStream的每个RDD中的出现次数

无状态转换操作实例:

之前“套接字流”部分介绍的词频统计,就是采用无状态转换,每次统计,都是只统计当前批次到达的单词的词频,和之前批次无关,不会进行累计

DStream有状态转换操作

包含历史信息

滑动窗口转换操作

事先设定一个滑动窗口的长度(也就是窗口的持续时间)设定滑动窗口的时间间隔(每隔多长时间执行一次计算),让窗口按照指定时间间隔在源DStream上滑动每次窗口停放的位置上,都会有一部分Dstream(或者一部分RDD)被框入窗口内,形成一个小段的Dstream可以启动对这个小段DStream的计算

 

window(windowLength, slideInterval) 基于源DStream产生的窗口化的批数据,计算得到一个新的Dstream

countByWindow(windowLength, slideInterval) 返回流中元素的一个滑动窗口数

reduceByWindow(func, windowLength, slideInterval) 返回一个单元素流。利用函数func聚集滑动时间间隔的流的元素创建这个单元素流。函数func必须满足结合律,从而可以支持并行计算

reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]) 应用到一个(K,V)键值对组成的DStream上时,会返回一个由(K,V)键值对组成的新的DStream。每一个key的值均由给定的reduce函数(func函数)进行聚合计算。注意:在默认情况下,这个算子利用了Spark默认的并发任务数去分组。可以通过numTasks参数的设置来指定不同的任务数

reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]) 更加高效的reduceByKeyAndWindow,每个窗口的reduce值,是基于先前窗口的reduce值进行增量计算得到的;它会对进入滑动窗口的新数据进行reduce操作,并对离开窗口的老数据进行“逆向reduce”操作。但是,只能用于“可逆reduce函数”,即那些reduce函数都有一个对应的“逆向reduce函数”(以InvFunc参数传入)

countByValueAndWindow(windowLength, slideInterval, [numTasks]) 当应用到一个(K,V)键值对组成的DStream上,返回一个由(K,V)键值对组成的新的DStream。每个key的值都是它们在滑动窗口中出现的频率

updateStateByKey操作

updateStateByKey(updateFunc)方法的输入参数updateFunc是一个函数,该函数的类型如下:(Seq[V], Option[S]) => Option[S]其中,V和S表示数据类型,如Int。可以看出,updateFunc函数的第1个输入参数属于Seq[V]类型,表示当前key对应的所有value,第2个输入参数属于Option[S]类型,表示当前key的历史状态,函数返回值类型Option[S],表示当前key的新状态。

updateStateByKey操作

需要在跨批次之间维护状态时,就必须使用updateStateByKey操作

词频统计实例:

对于有状态转换操作而言,本批次的词频统计,会在之前批次的词频统计结果的基础上进行不断累加,所以,最终统计得到的词频,是所有批次的单词的总的词频统计结果

和套接字的流程相似:

首先,进行目录结构的创建

cd /usr/local/spark/mycode/streaming
mkdir stateful
cd stateful/
mkdir -p src/main/scala
cd src/main/scala
vim NetworkWordCountStateful.scala
import org.apache.spark._
import org.apache.spark.streaming._
object NetworkWordCountStateful {
  def main(args: Array[String]) {
    //定义状态更新函数
    val updateFunc = (values: Seq[Int], state: Option[Int]) => {
      val currentCount = values.foldLeft(0)(_ + _)
      val previousCount = state.getOrElse(0)
      Some(currentCount + previousCount)
    }
      
    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCountStateful")
    val sc = new SparkContext(sparkConf)
    sc.setLogLevel("ERROR")    
    val ssc = new StreamingContext(sc, Seconds(5))
    ssc.checkpoint("file:///usr/local/spark/mycode/streaming/stateful/")    //设置检查点,检查点具有容错机制
    val lines = ssc.socketTextStream("localhost", 9999)
    val words = lines.flatMap(_.split(" "))
    val wordDstream = words.map(x => (x, 1))
    val stateDstream = wordDstream.updateStateByKey[Int](updateFunc)
    stateDstream.print()
    ssc.start()
    ssc.awaitTermination()
  }
}
cd /usr/local/spark/mycode/streaming/stateful
vim simple.sbt
name := "Simple Project"
version := "1.0"                    
scalaVersion := "2.12.15"
libraryDependencies += "org.apache.spark" %% "spark-core" % "3.2.0"
libraryDependencies += "org.apache.spark" % "spark-streaming_2.12" % "3.2.0" % "provided"
/usr/local/sbt/sbt package
/usr/local/spark/bin/spark-submit --class "NetworkWordCountStateful" ./target/scala-2.12/simple-project_2.12-1.0.jar

出现:

2024-01-25 23:23:09,506 ERROR scheduler.ReceiverTracker: Deregistered receiver for stream 0: Restarting receiver with delay 2000ms: Error connecting to localhost:9999 - java.net.ConnectException: 拒绝连接 (Connection refused)
    at java.net.PlainSocketImpl.socketConnect(Native Method)
    at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
    at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
    at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
    at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
    at java.net.Socket.connect(Socket.java:589)
    at java.net.Socket.connect(Socket.java:538)
    at java.net.Socket.<init>(Socket.java:434)
    at java.net.Socket.<init>(Socket.java:211)
    at org.apache.spark.streaming.dstream.SocketReceiver.onStart(SocketInputDStream.scala:61)
    at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:149)
    at org.apache.spark.streaming.receiver.ReceiverSupervisor.$anonfun$restartReceiver$1(ReceiverSupervisor.scala:198)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659)
    at scala.util.Success.$anonfun$map$1(Try.scala:255)
    at scala.util.Success.map(Try.scala:213)
    at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
    at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
    at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
    at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

-------------------------------------------
Time: 1706196190000 ms
-------------------------------------------

2024-01-25 23:23:11,508 ERROR scheduler.ReceiverTracker: Deregistered receiver for stream 0: Restarting receiver with delay 2000ms: Error connecting to localhost:9999 - java.net.ConnectException: 拒绝连接 (Connection refused)
    at java.net.PlainSocketImpl.socketConnect(Native Method)
    at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
    at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
    at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
    at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
    at java.net.Socket.connect(Socket.java:589)
    at java.net.Socket.connect(Socket.java:538)
    at java.net.Socket.<init>(Socket.java:434)
    at java.net.Socket.<init>(Socket.java:211)
    at org.apache.spark.streaming.dstream.SocketReceiver.onStart(SocketInputDStream.scala:61)
    at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:149)
    at org.apache.spark.streaming.receiver.ReceiverSupervisor.$anonfun$restartReceiver$1(ReceiverSupervisor.scala:198)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659)
    at scala.util.Success.$anonfun$map$1(Try.scala:255)
    at scala.util.Success.map(Try.scala:213)
    at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
    at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
    at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
    at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

2024-01-25 23:23:13,511 ERROR scheduler.ReceiverTracker: Deregistered receiver for stream 0: Restarting receiver with delay 2000ms: Error connecting to localhost:9999 - java.net.ConnectException: 拒绝连接 (Connection refused)
    at java.net.PlainSocketImpl.socketConnect(Native Method)
    at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
    at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
    at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
    at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
    at java.net.Socket.connect(Socket.java:589)
    at java.net.Socket.connect(Socket.java:538)
    at java.net.Socket.<init>(Socket.java:434)
    at java.net.Socket.<init>(Socket.java:211)
    at org.apache.spark.streaming.dstream.SocketReceiver.onStart(SocketInputDStream.scala:61)
    at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:149)
    at org.apache.spark.streaming.receiver.ReceiverSupervisor.$anonfun$restartReceiver$1(ReceiverSupervisor.scala:198)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659)
    at scala.util.Success.$anonfun$map$1(Try.scala:255)
    at scala.util.Success.map(Try.scala:213)
    at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
    at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
    at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
    at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

-------------------------------------------
Time: 1706196195000 ms
-------------------------------------------

再打开一个终端:

[atguigu@hadoop102 ~]$ nc -localhost 9999
Hdao
Hdoop
a
Hadoop
Spark
Hbase
Hadoop

结果:

-------------------------------------------
Time: 1706196250000 ms
-------------------------------------------
(H,1)

-------------------------------------------
Time: 1706196255000 ms
-------------------------------------------
(Hdoop,1)
(H,1)
(,1)

-------------------------------------------
Time: 1706196260000 ms
-------------------------------------------
(Hdoop,1)
(H,1)
(Spark,1)
(,1)
(Hadoop,1)

-------------------------------------------
Time: 1706196265000 ms

输出操作

修改源文件即可

import org.apache.spark._
import org.apache.spark.streaming._
object NetworkWordCountStatefulTxt {
  def main(args: Array[String]) {
    //定义状态更新函数
    val updateFunc = (values: Seq[Int], state: Option[Int]) => {
      val currentCount = values.foldLeft(0)(_ + _)
      val previousCount = state.getOrElse(0)
      Some(currentCount + previousCount)
    }
      
    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCountStateful")
    val sc = new SparkContext(sparkConf)
    sc.setLogLevel("ERROR")
    val ssc = new StreamingContext(sc, Seconds(5))
    ssc.checkpoint("file:///usr/local/spark/mycode/streaming/stateful/")    //设置检查点,检查点具有容错机制
    val lines = ssc.socketTextStream("localhost", 9999)
    val words = lines.flatMap(_.split(" "))
    val wordDstream = words.map(x => (x, 1))
    val stateDstream = wordDstream.updateStateByKey[Int](updateFunc)
    stateDstream.print()
    //下面是新增的语句,把DStream保存到文本文件中
    stateDstream.saveAsTextFiles("file:///usr/local/spark/mycode/streaming/dstreamoutput/output")
    ssc.start()
    ssc.awaitTermination()
  }
}

在程序后加入这个即可:stateDstream.saveAsTextFiles("file:///usr/local/spark/mycode/streaming/dstreamoutput/output")

会生成多个文件

output -1479951955000
output -1479951960000
output -1479951965000
output -1479951970000
output -1479951975000
output-1479951980000
output-1479951985000

写操作到mysql里

启动MySQL数据库,并完成数据库和表的创建:

[atguigu@hadoop102 stateful]$ sudo service mysqld start
Redirecting to /bin/systemctl start mysqld.service
[atguigu@hadoop102 stateful]$ mysql -u root -p
Enter password: 
Welcome to the MySQL monitor.  Commands end with ; or \g.
...
mysql> use spark
mysql> create table wordcount (word char(20), count int(4));

创建好的“spark”数据库中创建一个名称为“wordcount”的表。

[atguigu@hadoop102 streaming]$ mkdir statefulmysql
[atguigu@hadoop102 streaming]$ cd statefulmysql/
[atguigu@hadoop102 statefulmysql]$ mkdir -p src/main/scala
[atguigu@hadoop102 statefulmysql]$ cd src/main/scala
[atguigu@hadoop102 scala]$ vim NetworkWordCountStatefulMySQL.scala
[atguigu@hadoop102 scala]$ cd ..
[atguigu@hadoop102 main]$ cd ..
[atguigu@hadoop102 src]$ cd ..
[atguigu@hadoop102 statefulmysql]$ vim simple.sbt
import java.sql.{PreparedStatement, Connection, DriverManager}
import java.util.concurrent.atomic.AtomicInteger
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.{Seconds, StreamingContext}


object NetworkWordCountStatefulMySQL {
  def main(args: Array[String]) {
    //定义状态更新函数
    val updateFunc = (values: Seq[Int], state: Option[Int]) => {
      val currentCount = values.foldLeft(0)(_ + _)
      val previousCount = state.getOrElse(0)
      Some(currentCount + previousCount)
    }
    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCountStatefulMySQL")
    val sc = new SparkContext(sparkConf)
    sc.setLogLevel("ERROR")
    val ssc = new StreamingContext(sc, Seconds(5))
    ssc.checkpoint("file:///usr/local/spark/mycode/streaming/dstreamoutput/")    //设置检查点,检查点具有容错机制
    val lines = ssc.socketTextStream("localhost", 9999)
    val words = lines.flatMap(_.split(" "))
    val wordDstream = words.map(x => (x, 1))
    val stateDstream = wordDstream.updateStateByKey[Int](updateFunc)
    stateDstream.print()

    //下面是新增的语句,把DStream保存到MySQL数据库中
    stateDstream.foreachRDD(rdd => {
      //内部函数
      def func(records: Iterator[(String, Int)]) {
        var conn: Connection = null
        var stmt: PreparedStatement = null
        try {
          val url = "jdbc:mysql://localhost:3306/spark"
          val user = "root"
          val password = "123456" //数据库密码是123456
          conn = DriverManager.getConnection(url, user, password)
          records.foreach(p => {
            val sql = "insert into wordcount(word,count) values (?,?)"
            stmt = conn.prepareStatement(sql);
            stmt.setString(1, p._1.trim)
            stmt.setInt(2, p._2.toInt)
            stmt.executeUpdate()
          })
        } catch {
          case e: Exception => e.printStackTrace()
        } finally {
          if (stmt != null) {
            stmt.close()
          }
          if (conn != null) {
            conn.close()
          }
        }
      }

      val repartitionedRDD = rdd.repartition(3)
      repartitionedRDD.foreachPartition(func)
    })


    ssc.start()
    ssc.awaitTermination()
  }
}
name := "Simple Project"
version := "1.0"                    
scalaVersion := "2.12.15"
libraryDependencies += "org.apache.spark" %% "spark-core" % "3.2.0"
libraryDependencies += "org.apache.spark" % "spark-streaming_2.12" % "3.2.0" % "provided"

进行打包:

/usr/local/sbt/sbt package

执行:

/usr/local/spark/bin/spark-submit --jars /usr/local/spark/jars/mysql-connector-java-5.1.37.jar --class "NetworkWordCountStatefulMySQL" /usr/local/spark/mycode/streaming/statefulmysql/target/scala-2.12/simple-project_2.12-1.0.jar

在打开一个终端进行输入

[atguigu@hadoop102 scala]$ nc -localhost 9999
woyaosuijiao
suijio^Ha
suijioa
woyaosuijiao
woyaosuijiao

屏幕显示

Time: 1706199380000 ms
-------------------------------------------
(suijioa,1)
(suijia,1)
(woyaosuijiao,1)

-------------------------------------------
Time: 1706199385000 ms
-------------------------------------------
(suijioa,1)
(suijia,1)
(woyaosuijiao,3)

mysql结果:

[atguigu@hadoop102 ~]$ sudo service mysqld start
Redirecting to /bin/systemctl start mysqld.service
[atguigu@hadoop102 ~]$ mysql -u root -p
Enter password: 
Welcome to the MySQL monitor.  Commands end with ; or \g.
Your MySQL connection id is 30
Server version: 5.7.28 MySQL Community Server (GPL)

Copyright (c) 2000, 2019, Oracle and/or its affiliates. All rights reserved.

Oracle is a registered trademark of Oracle Corporation and/or its
affiliates. Other names may be trademarks of their respective
owners.

Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.

mysql> use spark
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A

Database changed
mysql> select * from wordcount;
+--------------+-------+
| word         | count |
+--------------+-------+
| suijia     |     1 |
| woyaosuijiao |     1 |
| suijioa      |     1 |
| suijia     |     1 |
| woyaosuijiao |     1 |
| suijioa      |     1 |
| suijia     |     1 |
| woyaosuijiao |     1 |
| suijioa      |     1 |
| suijia     |     1 |
| woyaosuijiao |     1 |
| suijioa      |     1 |
| suijia     |     1 |
| woyaosuijiao |     3 |
+--------------+-------+

 

标签:java,val,记录,23,scala,学习,spark,DStream,net
From: https://www.cnblogs.com/JIANGzihao0222/p/18006921

相关文章

  • 学习记录20
    本次学习学习了spark的流计算相干概念静态数据和流计算静态数据:很多企业为了支持决策分析而构建的数据仓库系统,其中存放的大量历史数据就是静态数据,技术人员可以利用数据挖掘和OLAP(On-LineAnalyticalProcessing)分析工具从静态数据中找到对企业有价值的信息流数据:近年......
  • 学习记录21
    本次学习学习了Spark的Streaming的一些外来输入源进行操作的形式文件流创建一个文件[atguigu@hadoop102~]$cd/usr/local/spark/mycode/[atguigu@hadoop102mycode]$mkdirstreaming[atguigu@hadoop102mycode]$cdstreaming[atguigu@hadoop102streaming]$mkdir......
  • 学习记录22
    本次学习了sparkStreaming里进行读取高级数据源的一些操作进行安装kafka网站:https://dblab.xmu.edu.cn/blog/1096/接下来在Ubuntu系统环境下测试简单的实例,按顺序执行如下命令:#进入kafka所在的目录cd/usr/local/kafkabin/zookeeper-server-start.shconfig/zookee......
  • 【学习笔记】OI 数学学习笔记
    OI数学学习笔记001_整除001.1_整除基础001.1A基本定义整除与因数倍数的定义:设\(a,b\in\mathbb{Z},b\ne0\),若存在\(q\in\mathbbZ\),使得\(a=bq\),则称\(b\)整除\(a\),记为\(b\mida\),此时称\(a\)为\(b\)的因数,\(b\)为\(a\)的倍数.带余除法与余数的......
  • 云打印会记录打印内容吗?
    随着云打印服务的迅猛发展,现在很多有打印需求的朋友都会去选择易绘创云打印服务。一些比较注重文件安全的用户比较警觉,想要了解云打印的安全性。那么云打印会记录打印内容吗?今天就一起来了解一下。 云打印会记录打印内容吗?相信很多有过打印经历的朋友都知道,很多时候一些打印......
  • 学习MarkDown
    MarkDown学习标题三级标题字体HelloWord!(加粗:两边两个*)HelloWord!(斜体:两边一个*)HelloWord!(斜体且加粗:三个*)HelloWord!(字体划线:两边两个~)引用大于号后面写内容用来摘抄引用分割线用三个-或者三个*图片用感叹号加中括号加括号(名字+地址)![图片](C:\Users\2......
  • Pandas库学习笔记(6) -- Pandas 基本方法
    Pandas基本方法实例到目前为止,我们了解了三个PandasDataStructures以及如何创建它们。由于它在实时数据处理中的重要性,因此我们将主要关注DataFrame对象,并讨论其他一些DataStructures。方法描述axes返回行轴标签的列表dtype返回对象的dtype。empty如果Series......
  • Good Bye 2023
    A本质就是判断\(\prod_{i=1}^{n}b_i\)是否能整除\(2023\)。输出被移除的数,只要令\(k-1\)个为\(1\),剩下的一个随便算算即可。B非常难绷。首先将\(a\)和\(b\)都除以\(\operatorname{gcd}(a,b)\),并记录原来的\(\operatorname{gcd}(a,b)\)为\(t\)。如果\(a=1\),......
  • 第一章学习Markdown语法详解
    Markdown学习一、标题:一级标题一个井号空格加标题名字就可以了二级标题两个井号空格加标题名字就可以了三级标题三个井号空格加标题名字就可以了四级标题四个井号空格加标题名字就可以了五级六级标题把对应的#写够即可。注意最多只支持到六级标题二、字体Hello,World!......
  • 软件测试学习笔记丨Seleium的BUG:页面元素文案重复空格处理
    前言需求做WEB的UI自动化练习,其需求为:访问慕课网的实战页面,获取实战页面的课程列表信息,并逐个点击进入详情并且关闭详情,直到最后一个。环境Java8MavenSelenium4.0Junit5初步代码importorg.junit.jupiter.api.AfterAll;importorg.junit.jupiter.api.BeforeAll;importorg......