首页 > 其他分享 >累加器的高级使用--实现wordcount

累加器的高级使用--实现wordcount

时间:2022-08-24 20:12:36浏览次数:69  
标签:String val -- 累加器 wordcount wordCountMap mutable def

  • HighWordCountAccumulator.scala
package accumulator

import org.apache.spark.util.AccumulatorV2

import scala.collection.mutable

/*
继承AccumulatorV2类,
传递两个泛型,第一个泛型代表的是累加器add的时候传递数据类型
第二泛型代表的是累加器最终value给你返回的数据类型
 */
class HighWordCountAccumulator extends AccumulatorV2[Array[String], collection.mutable.Map[String, Long]] {
  // 累加器,累加单词出现的总次数
  var wordCountMap = collection.mutable.Map[String, Long]()

  // 判断集合是否为空
  override def isZero: Boolean = {
    wordCountMap.isEmpty
  }

  override def copy(): AccumulatorV2[Array[String], mutable.Map[String, Long]] = {
    val wordCountAccumulator = new HighWordCountAccumulator()
    wordCountAccumulator.wordCountMap = wordCountMap
    wordCountAccumulator
  }

  override def reset(): Unit = {
    wordCountMap = collection.mutable.Map[String, Long]()
  }

  override def add(v: Array[String]): Unit = {
    for (word <- v) {
      val flag = wordCountMap.contains(word)
      if (flag) {
        wordCountMap.update(word, wordCountMap.getOrElse(word, 0L) + 1L)
      } else {
        wordCountMap.put(word, 1L)
      }
    }
  }

  override def merge(other: AccumulatorV2[Array[String], mutable.Map[String, Long]]): Unit = {
    val res = other.value
    for (elem <- res) {
      val word = elem._1
      val count = elem._2
      val flag = wordCountMap.contains(word)
      if (flag) {
        wordCountMap.update(word, wordCountMap.getOrElse(word, 0L)+count)
      } else {
        wordCountMap.put(word, count)
      }
    }
  }

  override def value: mutable.Map[String, Long] = {
    wordCountMap
  }
}
  • HighAccCode.scala
package accumulator

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object HighAccCode {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("accumulator")
    val sc = new SparkContext(sparkConf)
    val rdd: RDD[String] = sc.textFile("hdfs://node1:9000/wc.txt")

    val hwca = new HighWordCountAccumulator()
    sc.register(hwca)

    val value = rdd.flatMap((line: String) => {
      val wordArrays = line.split(" ")
      hwca.add(wordArrays)
      wordArrays
    })
    value.collect()

    println(hwca.wordCountMap)
    sc.stop()
  }
}

标签:String,val,--,累加器,wordcount,wordCountMap,mutable,def
From: https://www.cnblogs.com/jsqup/p/16621404.html

相关文章

  • [转]WIFI智能配网 - SmartConfig - Ray Liang - 博客园
    要开始IoT项目的第一步是什么?当然不是硬件,而是硬件与硬件的连接!即使有各种各样的通信协议没有好的连接方式绝对不行。那外设上没有的屏幕,没有键盘怎末输入密码怎末选择网络......
  • 基本数据类型之列表
    1.列表的定义1.采用变量名=[]的方式定义2.采用变量名=list()的方式定义2.列表的作用列表是用来存多个数据,并且这些数据是需要按位置存放的,后面我们可以通过索引取出列表......
  • maven标准目录结构、 Maven生命周期
    maven标准目录结构图解:  Maven生命周期图解: ......
  • MapReduce-day1
    MapReducehadoop-ha问题dfs.ha.fencing.methods表示:alistofscriptsorJavaclasseswhichwillbeusedtofencetheActiveNameNodeduringafailover而配置......
  • 前端解决跨域问题的方法:jsonp
    同源策略同源策略/SOP(Sameoriginpolicy)是一种约定,是浏览器最核心也最基本的安全功能,现在所有支持JavaScript的浏览器都会使用这个策略。如果缺少了同源策略,浏览器很容......
  • 【pycharm】社区版给函数生成注释
    1、打开要提前加注释的函数    2、双击选中函数名,右键   3、点击【showContextActions】   4、点击【insertadocumentationstringsub】......
  • correct
    correct[fromcom-(COM-)+regere'toleadstraight']Correct-definitionofcorrectbyTheFreeDictionarySynonyms:correct,rectify,remedy,redress,rev......
  • Mysql--计算方法
    四舍五入:round()select100/6as四舍五入前结果:16.6667selectround(100/6)as四舍五入后结果:17进一法:ceiling()select100/6as进一前结果:16.6667selectc......
  • vscode中使用prettier和typescript
    参考博文文章标题:HowtousePrettierwithESLintandTypeScriptinVSCode链接:https://khalilstemmler.com/blogs/tooling/prettier/总结使用npm安装prettiernpm......
  • 二分查找法
    使用二分查找的条件:有序数组需求在数组{1,2,3,4,5,6,7,8,9,10}中,查找某个元素的位置实现步骤定义两个变量,表示要查找的范围。默认min=0,max=最大索引......