第一部分 Spark入门
第二部分 SparkCore
2 RDD
2.1 转换算子-map
map是将RDD的数据一条条处理,返回新的RDD
# 定义方法
def add(data):
return data*10
print(rdd.map(add).collect)
# 定义lamabda表达式
rdd.map(lambda data:data*10)
2.2 转换算子-flatMap
flatMap对RDD执行map操作,然后执行解除嵌套操作
rdd = sc.parallelize([('a',1),('a',11)])
# 将二元元组的所有value都*10进行处理
rdd.mapValues(lambda x:x*10)
data.map { case (label, feature) => ((feature, label), 1)
}.reduceByKey(_ + _).map { case ((feature, label), num) =>
(feature, List((label, num))) //feature,label,cnt
}.reduceByKey(_ ::: _).mapValues { x =>
val size_entro = x.map(_._2).sum
val res = x.map(_._2.toDouble / size_entro).map { t =>
-t * (Math.log(t) / Math.log(2))
}.sum
size_entro * res
}.mapValues { x => x / size }.map(_._2).sum
2.3转换算子-reduceByKey
针对KV型RRDD自动按照key进行分组,然后按照提供的聚合逻辑,对组内数据value完成聚合操作
rdd.reduceByKey(func)
val clickStat = joinDf .where(F.col("active_type")==="click") .rdd .map(row => { val mapInfo = Option(row.getMap[String,Double](row.fieldIndex(feat))) mapInfo match { case Some(x) => x case _ => null } }) .filter(_!=null) .flatMap(x=>x) .reduceByKey(_+_)
2.4 转换算子-mapValues
针对二元元组RDD,对其内部的二元元组的value进行map操作
rdd = sc.parallelize([('a',1),('a',11)]) # 将二元元组的所有value都*10进行处理 rdd.mapValues(lambda x:x*10)
data.map { case (label, feature) => ((feature, label), 1) }.reduceByKey(_ + _).map { case ((feature, label), num) => (feature, List((label, num))) //feature,label,cnt }.reduceByKey(_ ::: _).mapValues { x => val size_entro = x.map(_._2).sum val res = x.map(_._2.toDouble / size_entro).map { t => -t * (Math.log(t) / Math.log(2)) }.sum size_entro * res }.mapValues { x => x / size }.map(_._2).sum
2.5 转换算子-groupBy
将RDD的数据进行分组
rdd.groupBy(func)
rdd = sc.parallelize([('a',1),('a',11),('b',1)]) # 通过这个函数确认按照谁来分组(返回谁即可) print(rdd.groupBy(lambda x:x[0]).collect()) print(rdd.groupBy(lambda x:x[0]).collect()) # 结果为:
val userContentListHis = spark.thriftSequenceFile(inpath_his, classOf[LongVideoUserContentStat]) .map(l=>{ (l.getUid,l.getContent_properties.get(0).getId) }).toDF("uid", "docid") .groupBy($"uid")
2.6 转换算子-filter
过滤想要的数据进行保存
rdd = sc.parallelize([1,2,3,4,5,6]) rdd.filter(lamdba x:x%2 == 1) # 只保留奇数
val treatmentUser = spark.read.option("header", false).option("sep", "\t").csv(inpath) .select("_c0") .withColumnRenamed("_c0", "userid") .withColumn("flow", getexpId($"userid")) .filter($"flow" >= start and $"flow" <= end) .select("userid") .dropDuplicates()
2.7 转换算子-其他算子
distinct算子 rdd.distinct() 一般不写去重分区 val userContentHis = hisPathList.map(path =>{ val hisData = spark.thriftSequenceFile(path, classOf[LongVideoUserContentStat]) println(s"hisData ==>${hisData.count()}") hisData }).reduce(_ union _).distinct().repartition(partition)
union算子 2个rdd合并成一个rdd:rdd.union(other_rdd) 只合并不去重 rdd的类型不同也是可以合并的 rdd1 = sc.parallelize([1,2,3]) rdd2 = sc.parallelize([1,2,3,4]) rdd3 = rdd1.union(rdd2)
2.8 算子面试题
1.groupByKey和reduceByKey的区别: groupByKey仅仅有分组功能而已,reduceByKey除了分组还有聚合作用,是一个分组+聚合一体化的算子. 分组前先聚合再shuffle,预聚合,被shuffle的数据极大的减少,提升了性能.数据量越大,reduceByKey的性能优势也就越大. 2.rdd的分区数怎么查看? 通过getNumPartitions API查看,返回int 3.Transformation和Action的区别: 转换算子的返回值100%是rdd,而Action算子不一定.转换算子是懒加载的,只有遇到Action才会执行 4.哪两个算子不经过Driver直接输出? foreach 和 saveAsTextFile
3 RDD的持久化
3.1 RDD的持久化
rdd是过程数据 rdd进行相互迭代计算,执行开启时,新的RDD生成,老的RDD消失
3.2 RDD的缓存
val rawLog = profilePushLogReader(spark, date, span).persist()
3.3 RDD的checkPoint
也是将RDD的数据保存起来,仅支持磁盘存储,被认为是安全的, 不保留血缘关系
3.4 缓存面试题
4 案例
4.1 搜素引擎日志分析案例
4.2
4.3 ....
4.4 计算资源面试题
1.如何尽量提升任务计算的资源? 计算cpu核心和内存量,通过--executor-memory指定executor内存,通过--executor-cores指定executor的核心数