首页 > 其他分享 >sparkstreaming行动算子

sparkstreaming行动算子

时间:2022-09-02 13:58:03浏览次数:51  
标签:String val 行动 sparkstreaming 算子 import spark DStream ssc

查看

// 默认是前10条
print(num)

保存数据

  • 一批次产生一个文件
package SparkStreaming.action

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Milliseconds, StreamingContext}

object Action1 {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName("action1").setMaster("local[2]")
    val ssc = new StreamingContext(sparkConf, Milliseconds(3000))

    val ds: DStream[String] = ssc.socketTextStream("node1", 44444)

    // 将a开头的数据 保存成为一个文本文件格式
    ds.filter((line: String) => {
      val first: Char = line.charAt(0)
      if (first == 'a') {
        true
      } else {
        false
      }
    }).saveAsTextFiles("hdfs://node1:9000/spark", ".txt")

    // 将b开头的数据 保存成为一个Object序列化对象数据
    ds.filter((line: String) => {
      val first:Char = line.charAt(0)
      if(first == 'b'){
        true
      }else{
        false
      }
    }).saveAsObjectFiles("hdfs://node1:9000/sparkobject",".sequence")

    ssc.start()
    ssc.awaitTermination()
  }
}

foreachRDD算子:常用的一种行动算子

  • 遍历DStream中每一个RDD,得到DStream中的每一个RDD,然后对每一个RDD数据保存到MySQL数据库,或者使用SparkSession进行SQL查询分析,因为这个算子是DStream的行动算子,会触发DStream依赖链的执行,但是得到的每一个RDD也必须调用行动算子触发
  • 我们可以在foreachRDD算子中,实现对DStream每一批次RDD数据保存操作,保存到数据库、保存到结构化文件中等等,通过还可以在foreachRDD算子中将RDD转换为DataFrame或者Dataset实现结构化数据SQL处理操作
  • 【注意问题】如果把数据转换DataFrame或者Dataset保存到数据库,那么我们要求DataFrame、Dataset的Structype中列名必须和数据表的列名字段一致
package SparkStreaming.action

import java.util.Properties

import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, SaveMode, SparkSession}
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Milliseconds, StreamingContext}

object Action2 {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName("action2").setMaster("local[2]")
    val ssc = new StreamingContext(sparkConf, Milliseconds(3000))

    val ds: DStream[String] = ssc.socketTextStream("node1", 44444)

    ds.foreachRDD((rdd: RDD[String]) => {
      val rdd1 = rdd.map((line: String) => {
        val array = line.split(" ")
        (array(0), array(1).toInt)
      })
      val session = SparkSession.builder().config(sparkConf).getOrCreate()
      import session.implicits._
      val frame = rdd1.toDF("name", "age")

      // 借助Spark SQL把数据保存到MySQL中
      val properties = new Properties()
      properties.put("user","root")
      properties.put("password","Jsq123456...")
      frame.write.mode(SaveMode.Append).jdbc("jdbc:mysql://node1:3306/project?serverTimezone=UTC", "student_info", properties)
    })

    ssc.start()
    ssc.awaitTermination()
  }
}
一般情况下 Spark Streaming处理流式数据的时候,如果实时数据是结构化数据类型的话,可以在Spark Streaming中无缝衔接Spark SQL技术对实时的结构化进行统计分析。

标签:String,val,行动,sparkstreaming,算子,import,spark,DStream,ssc
From: https://www.cnblogs.com/jsqup/p/16649586.html

相关文章

  • sparkstreaming转换算子--窗口函数
    window画图理解说明countByWindow对每个滑动窗口的数据执行count操作reduceByWindow对每个滑动窗口的数据执行reduce操作reduceByKeyAndWindow对每个滑动窗口的......
  • SparkStreaming中的转换算子2--有状态的转换算子updateStateByKey
    将之前批次的状态保存,packageSparkStreaming.transimportorg.apache.spark.SparkConfimportorg.apache.spark.storage.StorageLevelimportorg.apache.spark.str......
  • SparkStreaming中的转换算子1
    转换算子1---map,flatMapRDD支持的转换算子DStream大部分都是支持的map、flatMap、filter、distinct、union、join、reduceByKey......RDD中部分行动算子DStream会当作......
  • sparkstreaming的创建方式及运行流程及注意事项
    sparkstreaming创建有两种方式1.借助SparkConf对象创建valconf=newSparkConf().setAppName("streamingContext").setMaster("local[4]")/***streamingcontex......
  • DataFrame中的行动算子操作2
    ##修改hdfs-site.xml<property><name>hive.metastore.warehouse.dir</name><value>hdfs://node1:9000/user/hive/warehouse</value><description>locationof......
  • DataFrame中的行动算子操作1
    valconf=newSparkConf().setAppName("action").setMaster("local[*]")valsession=SparkSession.builder().config(conf).getOrCreate()valseq:Seq[(String,In......
  • DataFrame中的转换算子操作1
    valsparkConf=newSparkConf().setMaster("local[2]").setAppName("tran")valsparkSession=SparkSession.builder().config(sparkConf).getOrCreate()valseq:Seq......
  • DataFrame中的转换算子2
    valsparkConf=newSparkConf().setMaster("local[2]").setAppName("tran")valsparkSession=SparkSession.builder().config(sparkConf).getOrCreate()valseq:Seq......
  • 分区器算子--转换算子
    1.HashPartitioner定义:HashPartitioner----按照key值的hashcode的不同分到不同分区里面弊端:可能会造成数据倾斜问题(每一个分区分配的数据可能差别很多)objectWordCo......
  • 键值对行动算子
    1.countByKey定义:countByKey():scala.collection.Map(K,Long)按照key值计算每一个key出现的总次数案例:valrdd:RDD[(String,Int)]=sc.makeRDD(Array(("zs",60),("zs",70)......