首页 > 其他分享 >idea构建spark streaming环境

idea构建spark streaming环境

时间:2022-10-28 21:45:34浏览次数:42  
标签:val idea 5s streaming spark DStream ssc

package com.streaming

import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Durations, StreamingContext}


object Demo01WordCountOnStreaming {
  def main(args: Array[String]): Unit = {
    //先构建SparkSession
    val spark: SparkSession = SparkSession
      .builder()
      .appName("Demo01WordCountOnStreaming")
      .master("local[2]") // 在SparkStreaming中接受数据需要一直占用一个线程,所以这里需要至少2个及以上的线程数
      .getOrCreate()

    // 构建Spark Streaming 环境
    /**
     * duration:指定批次的大小
     * 在这里相当于每隔5s中会将这5s中所接受到的数据封装成一个小的批次,并启动一个任务完成该小批次数据的计算,得到结果
     * 这5s并不是按照任务启动之后每隔5s计算一次,而是以0点0时0分0毫秒开始计算,每隔5s相当于会有一个触发点,可以触发任务的执行
     * 任务刚启动时,第一个批次可以没有达到5s的数据但也会触发任务计算
     */
    val ssc: StreamingContext = new StreamingContext(spark.sparkContext, Durations.seconds(5))

    // 使用Socket模拟消息队列
    /**
     * nc -lk 8888 建立Socket连接在xshell中
     */

    /**
     * Spark Core基于RDD
     * Spark SQL基于DataFrame
     * Spark Streaming 中的编程模型为 DStream
     */

    val ds: DStream[String] = ssc.socketTextStream("master", 8888)

    ds.print()//DStream的打印方式

    //将每一行数据进行切分 并将每一个单词展开
    val wordDS: DStream[String] = ds.flatMap(_.split(","))

    //将每一个单词变成一个二元组,二元组的第二个元素为1
    val wordKVDS: DStream[(String, Int)] = wordDS.map(word => (word, 1))

    // 统计每个单词的数量
    // 只能对当前批次接受到的数据进行统计 并不会考虑历史状态(某一时刻某一批次计算的到的结果)
    // 如果需要考虑历史状态,则可以使用有状态算子
    val wordCnt: DStream[(String, Int)] = wordKVDS.reduceByKey(_ + _)

    wordCnt.print()

    //启动任务
    ssc.start()
    ssc.awaitTermination()
    ssc.stop()
    
  }

}

 

标签:val,idea,5s,streaming,spark,DStream,ssc
From: https://www.cnblogs.com/wqy1027/p/16837604.html

相关文章

  • Spark中RDD对DF的转换
    SparkRDDToDFpackagecom.sqlimportorg.apache.spark.rdd.RDDimportorg.apache.spark.sql.{DataFrame,Row,SparkSession}objectDemo06RDDtoDF{defmain(ar......
  • Spark整合hive
    Spark-SQL写代码方式1、在IDEA中将代码编写好打包上传到集群中运行(上线使用)使用spark-submit提交2、spark-shell(repl)里面使用sqlContext测试使用,简单任务使......
  • idea - mac idea 无法在 project 导入或添加 module
    idea-macidea无法在project导入或添加module解决方法:本地hosts映射导致,注释掉一下localhost的映射,重试0.0.0.0localhost127.0.0.1localhost255.255.255.255lo......
  • PostgreSQL warm standby hot standby 以及 Streaming Replicatio
    ​​http://blog.sina.com.cn/s/blog_79c0eb870102ww38.html​​​​WarmStandby​​​​25.5.HotStandby​​......
  • idea - plantuml
    brewinstallGraphvizorsudoapt-getinstallgraphviz​​download1​​ gyjw​​download2 ​​​​plantuml​​......
  • git提交指定文件,如果配合IDEA操作,改动文件自动add。使用命令行 git status 查看仓库状
    git提交指定文件如果配合IDEA操作,改动文件自动add。使用命令行gitstatus查看仓库状态,gitcommitsrc/main/java/com/test01.javasrc/main/java/com/test01.java......
  • Spark SQL概述、函数用法
    SparkSQL  底层还是基于RDD的,常用的语言DSL底层架构    在idea中的操作引入pom依赖<dependency><groupId>org.apache.spark</gr......
  • SparkSQL(二)
    【理解】SparkSQL执行流程接收到查询,既可以是SQL语句,也可以是DSL语法,以一个SQL语句为例:1、Parser,第三方类库Antlr实现。将sql字符串切分成Token,根据语义规则......
  • SparkCore(四)
    【理解】Spark内核原理RDD依赖RDD的5大特性中,第三个是【与父RDD的依赖关系】依赖关系可以按照是否有shuffle进一步分类窄依赖:【没有】shuffle,父RDD的一个分......
  • SparkSQL
    DataFrame创建DataFrame1.转换为DataFrame方式1将RDD[元组或列表]转换为DataFrame定义RDD,每个元素是Row类型将上面的RDD[Row]转换成DataFrame,df=spark.createDat......