- 题目描述:
/**
* 清洗完成的数据中包含一个用户的响应状态码,获取每一种状态码对应的访问量
* 1、读取清洗完成的数据成为RDD[String]
* 2、可以把上一步得到的RDD通过map算子转换成一个键值对类型的RDD,以状态码为key 以不同用户的访问日志为value的数据
* 3、键值对类型的RDD通过countByKey行动算子计算每种状态码的访问量(不灵活)。也可以使用aggregateByKey或者combineByKey实现访问量的统计
*/
- 代码:方法一:
object A3StatusCount {
def main(args: Array[String]): Unit = {
val sparkConf:SparkConf = new SparkConf().setAppName("demo02").setMaster("local[3]")
val sc:SparkContext = new SparkContext(sparkConf)
val value: RDD[String] = sc.textFile("hdfs://node1:9000/dc")
val map = value.map((line: String) => {
val str: String = line.split(" ")(8)
(str, 1)
})
val count = map.reduceByKey(_ + _)
count.foreach(println(_))
sc.stop()
}
}
- 代码:方法二:
object A3StatusCount {
def main(args: Array[String]): Unit = {
val sparkConf:SparkConf = new SparkConf().setAppName("demo02").setMaster("local[3]")
val sc:SparkContext = new SparkContext(sparkConf)
val value: RDD[String] = sc.textFile("hdfs://node1:9000/dc")
val map: RDD[(String, String)] = value.map((line: String) => {
val strs = line.split(" ")
(strs(8), line)
})
val agg = map.aggregateByKey(0L)(
(a: Long, b: String) => {
a + 1L
},
(a: Long, b: Long) => {
a + b
}
)
agg.foreach(println(_))
sc.stop()
}
}
标签:map,String,val,RDD,sparkcore,案例,访问量,sc,line
From: https://www.cnblogs.com/jsqup/p/16630182.html