首页 > 其他分享 >分区器算子--转换算子

分区器算子--转换算子

时间:2022-08-24 18:13:37浏览次数:62  
标签:map String val -- 分区 RDD 算子 new

1. HashPartitioner

  • 定义:HashPartitioner----按照key值的hashcode的不同 分到不同分区里面
  • 弊端:可能会造成数据倾斜问题(每一个分区分配的数据可能差别很多)
object WordCount {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("WC").setMaster("local[2]")
    val sc = new SparkContext(conf)
    val line: RDD[String] = sc.textFile("hdfs://node1:9000/wc.txt")
    val words = line.flatMap(_.split(" "))
    val map = words.map((_, 1))


    val partition: RDD[(String, Int)] = map.partitionBy(new HashPartitioner(4))

    val value = partition.mapPartitionsWithIndex((index, tuple) => {
      println(s"分区为: $index , 里面的数据有: ${tuple.mkString(",")}")
      tuple
    })
    val result = value.reduceByKey((_ + _))
    result.foreach(println(_))
    sc.stop()
    /*
    运行结果:
    分区为: 1 , 里面的数据有: (Hadoop,1),(hbase,1),(hadoop,1),(hbase,1),(spark,1),(hadoop,1),(Chinese,1)
    分区为: 2 , 里面的数据有: (mapreduce,1),(mapreduce,1),(kafka,1),(kafka,1),(mapreduce,1)
    分区为: 3 , 里面的数据有: 
    分区为: 0 , 里面的数据有: (hive,1),(hive,1),(flink,1),(flink,1),(hive,1),(Azkaban,1),(English,1),(Math,1)
    */
  }
}

2. RangePartitioner

  • 定义:RangePartitioner---按照数据元素的范围划分分区数据,尽可能保证每一个分区的数据是均匀(抽样算法)
  • 好处:解决HashPartitioner的分区数据的倾斜问题,无法控制什么数据到什么分区
package wc

import org.apache.spark.rdd.RDD
import org.apache.spark.{HashPartitioner, RangePartitioner, SparkConf, SparkContext}

object WordCount {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("WC").setMaster("local[2]")
    val sc = new SparkContext(conf)
    val line: RDD[String] = sc.textFile("hdfs://node1:9000/wc.txt")
    val words = line.flatMap(_.split(" "))
    val map = words.map((_, 1))


    // val partition: RDD[(String, Int)] = map.partitionBy(new HashPartitioner(4))
    val partition: RDD[(String, Int)] = map.partitionBy(new RangePartitioner(3, map))

    val value = partition.mapPartitionsWithIndex((index, tuple) => {
      println(s"分区为: $index , 里面的数据有: ${tuple.mkString(",")}")
      tuple
    })
    val result = value.reduceByKey((_ + _))
    result.foreach(println(_))
    /*
        分区为: 0 , 里面的数据有: (Hadoop,1),(flink,1),(flink,1),(Azkaban,1),(English,1),(Chinese,1),(Math,1)
        分区为: 1 , 里面的数据有: (hive,1),(hbase,1),(hadoop,1),(hive,1),(hbase,1),(hadoop,1),(hive,1)
        分区为: 2 , 里面的数据有: (mapreduce,1),(mapreduce,1),(kafka,1),(spark,1),(kafka,1),(mapreduce,1)
    */
    sc.stop()
  }
}

3. 自定义分区

  • WCPartitioner.scala
package wc

import org.apache.spark.Partitioner

class WCPartitioner() extends Partitioner{
  override def numPartitions: Int = 3

  override def getPartition(key: Any): Int = {
    val word = key.toString
    val first:Char = word.charAt(0)
    if (first == 'h'){
      0
    }else if(first == 's'){
      1
    }else{
      2
    }
  }
}
  • WordCount.scala
package wc

import org.apache.spark.rdd.RDD
import org.apache.spark.{HashPartitioner, RangePartitioner, SparkConf, SparkContext}

object WordCount {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("WC").setMaster("local[2]")
    val sc = new SparkContext(conf)
    val line: RDD[String] = sc.textFile("hdfs://node1:9000/wc.txt")
    val words = line.flatMap(_.split(" "))
    val map = words.map((_, 1))


    // val partition: RDD[(String, Int)] = map.partitionBy(new HashPartitioner(4))
    // val partition: RDD[(String, Int)] = map.partitionBy(new RangePartitioner(3, map))
    val partition = map.partitionBy(new WCPartitioner())

    val value = partition.mapPartitionsWithIndex((index, tuple) => {
      println(s"分区为: $index , 里面的数据有: ${tuple.mkString(",")}")
      tuple
    })
    val result = value.reduceByKey((_ + _))
    result.foreach(println(_))
    sc.stop()
  }
}

标签:map,String,val,--,分区,RDD,算子,new
From: https://www.cnblogs.com/jsqup/p/16621084.html

相关文章

  • HTML编辑器粘贴图片自动上传到服务器(Java版)
    ​如何做到ueditor批量上传word图片?1、前端引用代码<!DOCTYPE html PUBLIC "-//W3C//DTDXHTML1.0Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-......
  • StringBuilder类
    StringBuilder是字符串对象的缓冲区对象,缓冲区(出现目的,为了高效),提高String类的效率StringBuilder类的实现原理一个可变的字符序列,字符序列就是字符数组String......
  • YApi-v1.9.2部署失败(Accessing non-existent property 'count' of module exports ins
    部署YApi时,出现报错信息:Accessingnon-existentproperty'count'ofmoduleexportsinsidecirculardependencyGitHub上未找到解决方案,网上发现其他同学也遇到了类似的......
  • VMware扩展磁盘
    以下操作不会破坏原有的数据,但还是有风险的,建议先备份数据。 1.关闭虚拟机,扩展磁盘 2.查看当前分区大小和分配情况df-hlsblkfdisk-l 3.扩展sda3fdisk/dev......
  • Shell 第二章《流控》
    前言无论什么编程语言都离不开条件判断(流控)。SHELL也不例外。例如,用户输入的密码不够长时提示用户,你太短了例如,用户输入了备份的目录,如果有目录继续备份,如果没有目录创建......
  • java泛型详解
    java泛型详解1.泛型​ Java泛型是J2SE1.5中引入的一个新特性,其本质是参数化类型,也就是说所操作的数据类型被指定为一个参数(typeparameter)这种参数类型可以用在类、接......
  • 17.upload-lab
    一.前端代码进行防护,浏览器禁用js,或者使用burp修改文件后缀  二.后端代码进行防护,使用burp修改文件content-type  三.利用前提:需要apache的配置文件里面......
  • 【力扣算法题】寻找树中最左下结点的值
    题目:给定一个二叉树的根节点root,请找出该二叉树的 最底层 最左边 节点的值。假设二叉树中至少有一个节点。样例示例1:输入:root=[2,1,3]输出:1示例2:  ......
  • DES加密解密:android、java、js
    需求:登录的时候WEB或APP将数据加密后传给JAVA后端,后端接收到数据解密后得到数据进行处理。eg:明文:12345678密文:PofrPuMcG5CiXuyR5B5ysQ==一、java端importjavax.cr......
  • Elasticsearch篇:Elasticsearch增、删、改、查
    目录一、Elasticsearch的文档增删查改(CURD)二、Elasticsearch之查询的两种方式三、term与match查询四、Elasticsearch之排序查询五、Elasticsearch之分页查询六、Elastics......