词频数统计
问题描述
统计一个文本文件中的每个单词的出现次数,数据格式:
首先通过textFile()函数将文件读入JavaRDD,然后通过flatMap算子将每一行的数据进行分割,得到多个String,一行数据分割得到的多个String以Iterator的迭代器格式返回,返回之后的Iterator中的每一个String都会作为一个RDD。接着通过mapToPair算子给每一个word添加计数标记1(代表出现1次),该算子返回一个键值对RDD。最后通过reduceByKey算子根据相同的key对RDD进行reduce聚合操作,进行统计计数。
————————————————
版权声明:本文为CSDN博主「一手阳光」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/FFXNFFXN/article/details/114665182
public class SparkWordCount { public static void main(String[] args){ SparkConf conf = new SparkConf(); //添加这一行则在本地运行,不添加这一行则默认在集群执行 conf.setMaster("local"); conf.setAppName("WordCount"); //基本的初始化 JavaSparkContext sc=new JavaSparkContext(conf); //创建String类型的RDD,并从本地文件中读取数据 JavaRDD<String> fileRDD = sc.textFile("src/main/files/words.txt");//通过文件读入创建RDD //flatMap()算子用来分割操作,将原RDD中的数据分成一个个片段 //new FlatMapFunction<String, String>中的两个String分别表示输入和输出类型 JavaRDD<String> wordRDD = fileRDD.flatMap(new FlatMapFunction<String, String>() { @Override public Iterator<String> call(String line) throws Exception { //通过Iterator迭代器可以将分割后的多个数据元素全部返回输出 return Arrays.asList(line.split("\\s+")).iterator(); } }); //mapToPair()算子是用来对分割后的一个个片段结果添加计数标志的,如出现次数1,该函数用来创建并返回pair类型的RDD. new PairFunction<String, String, Integer>中分别是输入类型String和输出类型<String, Integer>. JavaPairRDD<String, Integer> wordOneRDD = wordRDD.mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String word) throws Exception { return new Tuple2<>(word, 1); //Tuple2是spark的二元数组类型,Java中没有 } }); //reduceByKey()算子是根据key来聚合,reduce阶段.new Function2<Integer, Integer, Integer>中分别是用来聚合的两个输入类型Integer,Integer和聚合后的输出类型Integer. JavaPairRDD<String, Integer> wordCountRDD = wordOneRDD.reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer i1, Integer i2) throws Exception { return i1 + i2; } }); wordCountRDD.saveAsTextFile("E:\\result7"); } }
标签:总结,String,Integer,new,RDD,算子,今日,public From: https://www.cnblogs.com/zhaoyueheng/p/17978361