首页 > 其他分享 >spark自定义分区

spark自定义分区

时间:2022-12-28 16:06:09浏览次数:69  
标签:http val 自定义 分区 news spark com net partitions


目录

一、需求

二、代码展示

三、数据展示

四、结果展示

五、三种分区方式介绍



一、需求

       防止大量数据倾斜,自定义Partition的函数,map阶段使用元祖(int , String)int 去模做Hash,均匀分配到不同的Partion中。后续演化:自定义map的key值,key值为一个随机的范围数。

二、代码展示

两个类:defineSparkPartition.scala UsedefineSparkPartition.scala
注意事项:
(1)不要使用flatMap()方法
(2)只有Key-Value类型的RDD才有分区的,非Key-Value类型的RDD分区的值是None
(3)每个RDD的分区ID范围:0~numPartitions-1,决定这个值是属于那个分区的。

import org.apache.spark.Partitioner

/**
* Created by yuhui
*/
class defineSparkPartition(numParts: Int) extends Partitioner {

/**
* 这个方法需要返回你想要创建分区的个数
*/
override def numPartitions: Int = numParts

/**
*
* 这个函数需要对输入的key做计算,然后返回该key的分区ID,范围一定是0到numPartitions-1; *
* @param key
* @return
*/
override def getPartition(key: Any): Int = {
val domain = new java.net.URL(key.toString).getHost()
domain match {
case "blog.csdn.net" => 1 % numPartitions
case "news.cctv.com" => 2 % numPartitions
case "news.china.com" => 3 % numPartitions
case _ =>4 % numPartitions
}
}

/**
* 这个是Java标准的判断相等的函数,之所以要求用户实现这个函数是因为Spark内部会比较两个RDD的分区是否一样。
* @param other
* @return
*/
override def equals(other: Any): Boolean = other match {
case mypartition: defineSparkPartition =>
mypartition.numPartitions == numPartitions
case _ =>
false
}
override def hashCode: Int = numPartitions

}
import org.apache.spark.{SparkConf, SparkContext}

/**
* Created by yuhui
*/
object UsedefineSparkPartition {

def main(args: Array[String]) {

val conf=new SparkConf()
.setMaster("local[2]")
.setAppName("UsedefineSparkPartition")

val sc=new SparkContext(conf)

//读取本地文件
val lines=sc.textFile("D:/word.txt")

val splitMap=lines.map(line=>(line.split(",")(0),line.split(",")(1))).map(word=>(word._1,word._2))//注意:RDD一定要是key-value

//保存到本地文件
splitMap.partitionBy(new defineSparkPartition(4)).saveAsTextFile("D:/partrion/test")

sc.stop()

}

}

三、数据展示

​http://blog.csdn.net/silentwolfyh/article/details/76993419,blog.csdn.net​​​ http://blog.csdn.net/silentwolfyh/article/details/76860369,blog.csdn.net
http://blog.csdn.net/silentwolfyh/article/details/77571596,blog.csdn.net
http://blog.csdn.net/silentwolfyh/article/details/77188905,blog.csdn.net
http://news.cctv.com/2017/09/18/ARTIEX7bcZI2cYUqrsEC2DLf170918.shtml,news.cctv.com
http://news.cctv.com/2017/09/18/ARTI4McIqsaFV6115br9eiRJ170918.shtml,news.cctv.com
http://news.cctv.com/2017/09/18/ARTfdabrnntvV6115br9eiRJ170918.shtml,news.cctv.com
http://news.china.com/domestic/945/20170919/31463894.html,news.china.com
http://news.china.com/domestic/945/20170919/31464711.html,news.china.com
http://news.china.com/domestic/945/20170919/31464711.html,news.china.com
https://www.baidu.com/,www.baidu.com
http://news.163.com/17/0918/22/CULBLQUT0001899N.html,news.163.com
http://news.163.com/17/0919/06/CUM7EVQI0001899N.html,news.163.com
http://news.163.com/17/0919/03/CULRN5180001875P.html,news.163.com

四、结果展示

part-00000

(https://www.baidu.com/,www.baidu.com)
(http://news.163.com/17/0918/22/CULBLQUT0001899N.html,news.163.com)
(http://news.163.com/17/0919/06/CUM7EVQI0001899N.html,news.163.com)
(http://news.163.com/17/0919/03/CULRN5180001875P.html,news.163.com)

part-00001

(http://blog.csdn.net/silentwolfyh/article/details/76993419,blog.csdn.net)
(http://blog.csdn.net/silentwolfyh/article/details/76860369,blog.csdn.net)
(http://blog.csdn.net/silentwolfyh/article/details/77571596,blog.csdn.net)
(http://blog.csdn.net/silentwolfyh/article/details/77188905,blog.csdn.net)

part-00002

(http://news.cctv.com/2017/09/18/ARTIEX7bcZI2cYUqrsEC2DLf170918.shtml,news.cctv.com)
(http://news.cctv.com/2017/09/18/ARTI4McIqsaFV6115br9eiRJ170918.shtml,news.cctv.com)
(http://news.cctv.com/2017/09/18/ARTfdabrnntvV6115br9eiRJ170918.shtml,news.cctv.com)

part-00003

(http://news.china.com/domestic/945/20170919/31463894.html,news.china.com)
(http://news.china.com/domestic/945/20170919/31464711.html,news.china.com)
(http://news.china.com/domestic/945/20170919/31464711.html,news.china.com)

五、三种分区方式介绍

defaultPartitioner.scala

/**
* Choose a partitioner to use for a cogroup-like operation between a number of RDDs.
*
* If any of the RDDs already has a partitioner, choose that one.
*
* Otherwise, we use a default HashPartitioner. For the number of partitions, if
* spark.default.parallelism is set, then we'll use the value from SparkContext
* defaultParallelism, otherwise we'll use the max number of upstream partitions.
*
* Unless spark.default.parallelism is set, the number of partitions will be the
* same as the number of partitions in the largest upstream RDD, as this should
* be least likely to cause out-of-memory errors.
*
* We use two method parameters (rdd, others) to enforce callers passing at least 1 RDD.
*/
def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = {
val bySize = (Seq(rdd) ++ others).sortBy(_.partitions.size).reverse
for (r <- bySize if r.partitioner.isDefined && r.partitioner.get.numPartitions > 0) {
return r.partitioner.get
}
if (rdd.context.conf.contains("spark.default.parallelism")) {
new HashPartitioner(rdd.context.defaultParallelism)
} else {
new HashPartitioner(bySize.head.partitions.size)
}
}

HashPartitioner分区的原理:对于给定的key,计算其hashCode,并除于分区的个数取余,如果余数小于0,则用余数+分区的个数,最后返回的值就是这个key所属的分区ID。实现如下:

/**
* A [[org.apache.spark.Partitioner]] that implements hash-based partitioning using
* Java's `Object.hashCode`.
*
* Java arrays have hashCodes that are based on the arrays' identities rather than their contents,
* so attempting to partition an RDD[Array[_]] or RDD[(Array[_], _)] using a HashPartitioner will
* produce an unexpected or incorrect result.
*/
class HashPartitioner(partitions: Int) extends Partitioner {
require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")

def numPartitions: Int = partitions

def getPartition(key: Any): Int = key match {
case null => 0
case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
}

override def equals(other: Any): Boolean = other match {
case h: HashPartitioner =>
h.numPartitions == numPartitions
case _ =>
false
}

override def hashCode: Int = numPartitions
}

HashPartitioner分区弊端:可能导致每个分区中数据量的不均匀,极端情况下会导致某些分区拥有RDD的全部数据。

RangePartitioner分区优势:尽量保证每个分区中数据量的均匀,而且分区与分区之间是有序的,一个分区中的元素肯定都是比另一个分区内的元素小或者大;

但是分区内的元素是不能保证顺序的。简单的说就是将一定范围内的数映射到某一个分区内。

RangePartitioner作用:将一定范围内的数映射到某一个分区内,在实现中,分界的算法尤为重要。算法对应的函数是rangeBounds。

代码如下:

/**
* A [[org.apache.spark.Partitioner]] that partitions sortable records by range into roughly
* equal ranges. The ranges are determined by sampling the content of the RDD passed in.
*
* Note that the actual number of partitions created by the RangePartitioner might not be the same
* as the `partitions` parameter, in the case where the number of sampled records is less than
* the value of `partitions`.
*/
class RangePartitioner[K : Ordering : ClassTag, V](
partitions: Int,
rdd: RDD[_ <: Product2[K, V]],
private var ascending: Boolean = true)
extends Partitioner {

// We allow partitions = 0, which happens when sorting an empty RDD under the default settings.
require(partitions >= 0, s"Number of partitions cannot be negative but found $partitions.")

private var ordering = implicitly[Ordering[K]]

// An array of upper bounds for the first (partitions - 1) partitions
private var rangeBounds: Array[K] = {
if (partitions <= 1) {
Array.empty
} else {
// This is the sample size we need to have roughly balanced output partitions, capped at 1M.
val sampleSize = math.min(20.0 * partitions, 1e6)
// Assume the input partitions are roughly balanced and over-sample a little bit.
val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.size).toInt
val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), sampleSizePerPartition)
if (numItems == 0L) {
Array.empty
} else {
// If a partition contains much more than the average number of items, we re-sample from it
// to ensure that enough items are collected from that partition.
val fraction = math.min(sampleSize / math.max(numItems, 1L), 1.0)
val candidates = ArrayBuffer.empty[(K, Float)]
val imbalancedPartitions = mutable.Set.empty[Int]
sketched.foreach { case (idx, n, sample) =>
if (fraction * n > sampleSizePerPartition) {
imbalancedPartitions += idx
} else {
// The weight is 1 over the sampling probability.
val weight = (n.toDouble / sample.size).toFloat
for (key <- sample) {
candidates += ((key, weight))
}
}
}
if (imbalancedPartitions.nonEmpty) {
// Re-sample imbalanced partitions with the desired sampling probability.
val imbalanced = new PartitionPruningRDD(rdd.map(_._1), imbalancedPartitions.contains)
val seed = byteswap32(-rdd.id - 1)
val reSampled = imbalanced.sample(withReplacement = false, fraction, seed).collect()
val weight = (1.0 / fraction).toFloat
candidates ++= reSampled.map(x => (x, weight))
}
RangePartitioner.determineBounds(candidates, partitions)
}
}
}

def numPartitions: Int = rangeBounds.length + 1

private var binarySearch: ((Array[K], K) => Int) = CollectionsUtils.makeBinarySearch[K]

def getPartition(key: Any): Int = {
val k = key.asInstanceOf[K]
var partition = 0
if (rangeBounds.length <= 128) {
// If we have less than 128 partitions naive search
while (partition < rangeBounds.length && ordering.gt(k, rangeBounds(partition))) {
partition += 1
}
} else {
// Determine which binary search method to use only once.
partition = binarySearch(rangeBounds, k)
// binarySearch either returns the match location or -[insertion point]-1
if (partition < 0) {
partition = -partition-1
}
if (partition > rangeBounds.length) {
partition = rangeBounds.length
}
}
if (ascending) {
partition
} else {
rangeBounds.length - partition
}
}

override def equals(other: Any): Boolean = other match {
case r: RangePartitioner[_, _] =>
r.rangeBounds.sameElements(rangeBounds) && r.ascending == ascending
case _ =>
false
}

override def hashCode(): Int = {
val prime = 31
var result = 1
var i = 0
while (i < rangeBounds.length) {
result = prime * result + rangeBounds(i).hashCode
i += 1
}
result = prime * result + ascending.hashCode
result
}

@throws(classOf[IOException])
private def writeObject(out: ObjectOutputStream): Unit = Utils.tryOrIOException {
val sfactory = SparkEnv.get.serializer
sfactory match {
case js: JavaSerializer => out.defaultWriteObject()
case _ =>
out.writeBoolean(ascending)
out.writeObject(ordering)
out.writeObject(binarySearch)

val ser = sfactory.newInstance()
Utils.serializeViaNestedStream(out, ser) { stream =>
stream.writeObject(scala.reflect.classTag[Array[K]])
stream.writeObject(rangeBounds)
}
}
}

@throws(classOf[IOException])
private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
val sfactory = SparkEnv.get.serializer
sfactory match {
case js: JavaSerializer => in.defaultReadObject()
case _ =>
ascending = in.readBoolean()
ordering = in.readObject().asInstanceOf[Ordering[K]]
binarySearch = in.readObject().asInstanceOf[(Array[K], K) => Int]

val ser = sfactory.newInstance()
Utils.deserializeViaNestedStream(in, ser) { ds =>
implicit val classTag = ds.readObject[ClassTag[Array[K]]]()
rangeBounds = ds.readObject[Array[K]]()
}
}
}
}



标签:http,val,自定义,分区,news,spark,com,net,partitions
From: https://blog.51cto.com/silentwolfyh/5975552

相关文章

  • jupyter notebook中运行pyspark代码
    前提是windows下安装pyspark​​设置连接​​用jupyternotebook编写pyspark代码frompyspark.sqlimportSparkSession#环境配置spark=SparkSession.builder.master("lo......
  • layui自定义列表文件超链接
    <scripttype="text/html"id="operator_bar_files">{{#layui.each(d.files,function(index,item){}}{{#if(index<=1){}}<ahref="{{window.fil......
  • 常见的分词方法接口+ jieba自定义领域内的词表然后加载词表进行分词
    ​​中文分词常见方法_mandagod的博客_中文分词​​另外,我们收集了如下部分分词工具,供参考:中科院计算所NLPIR ​​http://ictclas.nlpir.org/nlpir/​​ansj分词器 ​​ht......
  • 006 使用动态代理实现自定义注解功能
    问题的提出:自定义一个注解,如@MyLog,当把此注解加在函数上时,该函数的调用会被自动日志。解题思路:创建函数所在对象的动态代理,当该函数被调用时,在代理中进行日志。两种方法:......
  • 虚拟机sda扩容后,重新分区,扩充root
    虚拟机sda扩容后,重新分区,扩充root[root@santiagod~]#[email protected]'192.168.3.176(192.168.3.176)'can'tbeestablished.ECDS......
  • Vue自定义hook
    自定义hook函数什么是hook?——本质是一个函数,把setup函数中使用的CompositionAPI进行了封装。类似于vue2.x中的mixin。自定义hook的优势:复用代码,让setup中......
  • Python学习笔记--PySpark的基础学习(二)
    filter方法(过滤想要的数据进行保留)具体实现(保留奇数):具体实现(保留偶数):distinct方法(对RDD进行去重,返回新的RDD)且无需传参具体实现(去重):sortBy方法(排序,基于我们制定的......
  • iOS6下自定义UI控件外观效果
    尽管iOS原生的UI控件就已经有很不错的显示效果,但是App开发者仍然希望自己的产品与众不同,所以自定义UI外观成了每个App产品开发必做之事。今天就来做一个在iOS6下实现自定义U......
  • 自定义异常
    自定义异常:       ......
  • ASP.NET 2.0中使用自定义provider (2)
     在teched2005上,有对asp.net2.0的介绍,其中讲到asp.net2.0提供了很多功能,让程序员做少很多东西,这引起了大家的疑惑:asp.net2.0的自定义能力如何?扩......