首页 > 其他分享 >[Spark streaming举例]-- 统计一段时间内的热搜词

[Spark streaming举例]-- 统计一段时间内的热搜词

时间:2022-11-03 14:32:38浏览次数:40  
标签:val RDD -- -------------- streaming window println Spark ssc


如下

package com.my.scala

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Durations, StreamingContext}

/**
*
* 使用开窗函数实现spark streaming,版本统计一段时间内前三的热搜词汇
*
* 测试结果:测试成功
* 步骤: 先开启hadoop集群,start-all.sh
* 再在h15上启动端口:nc -lk 8888
* 再输入数据:如---》"ds sdf sdfa wfasd sdf",一定要以空格分开
* 启动本程序
* 查看控制台是否正常
*
*/
object WindowBasedTopWord {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("WindowBasedTopWord").setMaster("local[2]")
val ssc = new StreamingContext(conf,Durations.seconds(5)) //这里的5秒是指切分RDD的间隔
ssc.checkpoint("hdfs://h15:8020/wordcount_checkpoint") //设置docheckpoint目录,没有会自动创建


val words = ssc.socketTextStream("h15",8888) //可以从kafka集群中获取信息
val pairs = words.flatMap(_.split(" ")).map(x => (x,1))
pairs.foreachRDD(rdd => {
println("--------------split RDD begin--------------")
rdd.foreach(println)
println("--------------split RDD end--------------")
})
/*
reduceByKeyAndWindow(reduceFunc,invReduceFunc,windowDuration,slideDuration)
reduceFunc:用于计算window框住的RDDS
invReduceFunc:用于优化的函数,减少window滑动中去计算重复的数据,通过“_-_”即可优化
windowDuration:表示window框住的时间长度,如本例5秒切分一次RDD,框20秒,就会保留最近4次切分的RDD
slideDuration:表示window滑动的时间长度,即每隔多久执行本计算


本例5秒切分一次RDD,每次滑动10秒,window框住20秒的RDDS,即:每10秒计算最近20秒切分的RDDS,中间有10秒重复,
通过invReduceFunc参数进行去重优化
*/
val pairsWindow = pairs.reduceByKeyAndWindow(_+_,_-_,Durations.seconds(20),Durations.seconds(10))
val sortDstream = pairsWindow.transform(rdd => {
val sortRdd = rdd.map(t => (t._2,t._1)).sortByKey(false).map(t => (t._2,t._1)) //降序排序
val more = sortRdd.take(3) //取前3个输出
println("--------------print top 3 begin--------------")
more.foreach(println)
println("--------------print top 3 end--------------")
sortRdd
})
sortDstream.print()


ssc.start()
ssc.awaitTermination()
}
}

 

标签:val,RDD,--,--------------,streaming,window,println,Spark,ssc
From: https://blog.51cto.com/u_13966077/5819859

相关文章

  • [Java基础]-- servlet调用Url传输文件或者字符串
    实例:A和B两台服务器之间传输log文件或者其他文件,      如果成功就返回字符串"1";如果失败则返回"0"1、发送log文件和字符串参数的servletmportjava.io.Buffere......
  • [Java web]-- jquery设置元素成为disabled
    对元素应用disabled和readonly属性的方法,如下:  1.readonly属性举例  $('input').attr("readonly","readonly");     //将input元素设置为readonly ......
  • [Spark streaming举例]-- 实时统计并且存储到mysql数据库中
    举例packagecom.scala.myimportorg.apache.spark.SparkConfimportorg.apache.spark.streaming.Durationsimportorg.apache.spark.streaming.StreamingContext/****@......
  • [scala基础]--拆分List操作
    运行环境:Jdk1.7、scala-2.10.4packagestudy/***Document:本类作用---->拆分List*User:yangjf*Date:2016/8/148:57*/objectTestArrays{defmain(args:A......
  • [ElasticSearch基础]-- elasticsearch安装
    基本架构#h15  kibala +marvel+elasticsearch#h16  elasticsearch+marvel#h17  elasticsearch+marvel 1. 准备文件:elasticsearch-2.2.0.tar.gz、kibana-4.4.1-linu......
  • app直播源代码,Extjs Grid自动换行
    app直播源代码,ExtjsGrid自动换行定义列的时候加个renderer,例 Js代码      {      header:'序号',      dataIndex:'ind',    ......
  • 496 下一个更大的元素 |
    题目496下一个更大的元素|nums1中数字x的下一个更大元素是指x在nums2中对应位置右侧的第一个比x大的元素。给你两个没有重复元素的数组nums1和nu......
  • 并发编程之ThreadLocal
    并发编程之ThreadLocal前言当多线程访问共享可变数据时,涉及到线程间同步的问题,并不是所有时候,都要用到共享数据,所以就需要线程封闭出场了。数据都被封闭在各自的线程之......
  • 452.minimum-number-of-arrows-to-burst-balloons 用最少数量的箭引爆气球
    问题描述452.用最少数量的箭引爆气球解题思路首先,按照\(x_start\)从小到大的顺序排序,然后开始分析需要的弓箭数。if(points[i][0]>points[i-1]),说明两个气球不存......
  • Segmentation 2 -- usage
    ThesegmentationmechanismsupportedbytheIA-32architecturecanbeusedtoimplementawidevarietyof systemdesigns.Thesedesignsrangefromflatmodel......