1. HashPartitioner
- 定义:HashPartitioner----按照key值的hashcode的不同 分到不同分区里面
- 弊端:可能会造成数据倾斜问题(每一个分区分配的数据可能差别很多)
object WordCount {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("WC").setMaster("local[2]")
val sc = new SparkContext(conf)
val line: RDD[String] = sc.textFile("hdfs://node1:9000/wc.txt")
val words = line.flatMap(_.split(" "))
val map = words.map((_, 1))
val partition: RDD[(String, Int)] = map.partitionBy(new HashPartitioner(4))
val value = partition.mapPartitionsWithIndex((index, tuple) => {
println(s"分区为: $index , 里面的数据有: ${tuple.mkString(",")}")
tuple
})
val result = value.reduceByKey((_ + _))
result.foreach(println(_))
sc.stop()
/*
运行结果:
分区为: 1 , 里面的数据有: (Hadoop,1),(hbase,1),(hadoop,1),(hbase,1),(spark,1),(hadoop,1),(Chinese,1)
分区为: 2 , 里面的数据有: (mapreduce,1),(mapreduce,1),(kafka,1),(kafka,1),(mapreduce,1)
分区为: 3 , 里面的数据有:
分区为: 0 , 里面的数据有: (hive,1),(hive,1),(flink,1),(flink,1),(hive,1),(Azkaban,1),(English,1),(Math,1)
*/
}
}
2. RangePartitioner
- 定义:RangePartitioner---按照数据元素的范围划分分区数据,尽可能保证每一个分区的数据是均匀(抽样算法)
- 好处:解决HashPartitioner的分区数据的倾斜问题,无法控制什么数据到什么分区
package wc
import org.apache.spark.rdd.RDD
import org.apache.spark.{HashPartitioner, RangePartitioner, SparkConf, SparkContext}
object WordCount {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("WC").setMaster("local[2]")
val sc = new SparkContext(conf)
val line: RDD[String] = sc.textFile("hdfs://node1:9000/wc.txt")
val words = line.flatMap(_.split(" "))
val map = words.map((_, 1))
// val partition: RDD[(String, Int)] = map.partitionBy(new HashPartitioner(4))
val partition: RDD[(String, Int)] = map.partitionBy(new RangePartitioner(3, map))
val value = partition.mapPartitionsWithIndex((index, tuple) => {
println(s"分区为: $index , 里面的数据有: ${tuple.mkString(",")}")
tuple
})
val result = value.reduceByKey((_ + _))
result.foreach(println(_))
/*
分区为: 0 , 里面的数据有: (Hadoop,1),(flink,1),(flink,1),(Azkaban,1),(English,1),(Chinese,1),(Math,1)
分区为: 1 , 里面的数据有: (hive,1),(hbase,1),(hadoop,1),(hive,1),(hbase,1),(hadoop,1),(hive,1)
分区为: 2 , 里面的数据有: (mapreduce,1),(mapreduce,1),(kafka,1),(spark,1),(kafka,1),(mapreduce,1)
*/
sc.stop()
}
}
3. 自定义分区
- WCPartitioner.scala
package wc
import org.apache.spark.Partitioner
class WCPartitioner() extends Partitioner{
override def numPartitions: Int = 3
override def getPartition(key: Any): Int = {
val word = key.toString
val first:Char = word.charAt(0)
if (first == 'h'){
0
}else if(first == 's'){
1
}else{
2
}
}
}
- WordCount.scala
package wc
import org.apache.spark.rdd.RDD
import org.apache.spark.{HashPartitioner, RangePartitioner, SparkConf, SparkContext}
object WordCount {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("WC").setMaster("local[2]")
val sc = new SparkContext(conf)
val line: RDD[String] = sc.textFile("hdfs://node1:9000/wc.txt")
val words = line.flatMap(_.split(" "))
val map = words.map((_, 1))
// val partition: RDD[(String, Int)] = map.partitionBy(new HashPartitioner(4))
// val partition: RDD[(String, Int)] = map.partitionBy(new RangePartitioner(3, map))
val partition = map.partitionBy(new WCPartitioner())
val value = partition.mapPartitionsWithIndex((index, tuple) => {
println(s"分区为: $index , 里面的数据有: ${tuple.mkString(",")}")
tuple
})
val result = value.reduceByKey((_ + _))
result.foreach(println(_))
sc.stop()
}
}
标签:map,String,val,--,分区,RDD,算子,new
From: https://www.cnblogs.com/jsqup/p/16621084.html