首页 > 其他分享 >spark (五) RDD的创建 & 分区

spark (五) RDD的创建 & 分区

时间:2022-11-20 16:46:45浏览次数:45  
标签:sparkContext val 分区 RDD spark SparkConf

目录

1. RDD的创建方式

1.1 从内存创建RDD

主要依赖如下两个方法

  • parallelize
  • makeRDD
    • 底层调用的还是parallelize
def main(args: Array[String]): Unit = {
    val sparkConfig: SparkConf = new SparkConf()
      .setMaster("local[*]")
      .setAppName("WordCount")
    
    val sparkContext: SparkContext = new SparkContext(sparkConfig)
    
    val rdd1: RDD[Int] = sparkContext.parallelize(
      List(1, 2, 3, 4)
    )
    
    // makeRDD 底层调用的还是parallelize
    val rdd2: RDD[Int] = sparkContext.makeRDD(
      List(1, 2, 3, 4)
    )
}

1.2 从外部存储(文件)创建RDD

由外部存储系统的数据集创建RDD包括

  • 本地文件系统
  • 所有Hadoop支持的数据集,比如HDFS、HBase等
def main(args: Array[String]): Unit = {
    val sparkConfig: SparkConf = new SparkConf()
      .setMaster("local[*]")
      .setAppName("WordCount")
    
    val sparkContext: SparkContext = new SparkContext(sparkConfig)
    
    val rdd1: RDD[String] = sparkContext.textFile("data") // 或者 hdfs://master:7077/input
    rdd1.collect().foreach(println)
    
    sparkContext.stop()
}

1.3 从其他的RDD创建

下述的flatMap map reduceByKey 每个操作都是以上一个RDD为基础创建另一个RDD

def main(args: Array[String]): Unit = {
    val sparkConfig: SparkConf = new SparkConf()
      .setMaster("local[*]")
      .setAppName("WordCount")
    
    val sparkContext: SparkContext = new SparkContext(sparkConfig)
    
    val rdd1: RDD[String] = sparkContext.textFile("data") // 或者 hdfs://master:7077/input
    rdd1
      .flatMap(word => word.split(" "))
      .map(word => (word, 1))
      .reduceByKey(_ + _)
      .collect()
      .foreach(println)
    
    sparkContext.stop()
}

1.4 直接 new RDD

spark框架中会这么操作

2. 分区(partition)

2.1 makeRDD的分区

可以通过以下优先级方式指定分区

  • 优先使用makeRDD的第二个参数指定的分区数量
  • 使用默认的配置的分区数量
    • SparkConf 若指定了 spark.default.parallelism, 则用这个
    • 否则使用CPU的核数(这里的CPU的核数在本地模式下,如local[3]则为3,local[*]等于物理机真实的CPU核数)
// 第二个参数指定numSlices, 即分区的数量
val rdd: RDD[Int] = sparkContext.makeRDD(List(1, 2, 3, 4), 2)

完整的表现分区的例子

package com.lzw.bigdata.spark.core.rdd_basic_usage_1

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Rdd_4_Partition_From_Mem {
  def main(args: Array[String]): Unit = {
    val sparkConfig: SparkConf = new SparkConf()
      .setMaster("local[*]")
      .setAppName("WordCount")
    //      .set("spark.default.parallelism", "5")
    val sparkContext: SparkContext = new SparkContext(sparkConfig)

    val rdd: RDD[Int] = sparkContext.makeRDD(List(1, 2, 3, 4), 2)

    // 保存成分区文件,每个分区会生成一个文件,可以借此查看真实生成了几个分区
    rdd.saveAsTextFile("output")

    sparkContext.stop()
  }
}

2.2 读取文件的分区例子

spark读取文件借助的是hadoop的方法, 所以以下的读取规则是hadoop的规则

// 可以指定 minPartitions, 不指定默认是 math.min(defaultParallelism, 2)
sparkContext.textFile("data/word.txt", 2)

以以下一个文件word.txt为例, 该文件算上换行符\n一共13个字节

1234
567
8900

2.2.1 读取文件分区规则

所以分区数计算方式为

分区数: 13 bytes / 2(这里2是minPartitions) = 6
       13 bytes / 6 bytes = 2分区 + 剩余的1byte => 1/6 > 0.1 => 3分区

2.2.2 每个分区的数据

每个分区里面的数据, (hadoop按偏移量之后按行读取):
      分区1预期 [0, 6] => 1234\n
                         567\n
      分区2预期[6, 12] => 但是上一个分区因为是偏移量到了某一行,某行就都被读走了,所以真实的
                     => 8900
      分区3预计[12, 13] => 已无数据可读

2.2.3 完整示例

package com.lzw.bigdata.spark.core.rdd_basic_usage_1

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Rdd_5_Partition_From_File {
  def main(args: Array[String]): Unit = {
    val sparkConfig: SparkConf = new SparkConf()
      .setMaster("local[*]")
      .setAppName("WordCount")
    //      .set("spark.default.parallelism", "5")
    val sparkContext: SparkContext = new SparkContext(sparkConfig)

    /*
      1234\n
      567\n
      8900

    分区数: 13 bytes / 2(这里2是minPartitions) = 6
           13 bytes / 6 bytes = 2分区 + 剩余的1byte => 1/6 > 0.1 => 3分区

    每个分区里面的数据, (hadoop按偏移量之后按行读取):
          分区1预期 [0, 6] => 1234\n
                             567\n
          分区2预期[6, 12] => 但是上一个分区因为是偏移量到了某一行,某行就都被读走了,所以真实的
                         => 8900
          分区3预计[12, 13] => 已无数据可读
     */

    val rdd: RDD[String] = sparkContext.textFile("data/word.txt", 2)

    rdd.saveAsTextFile("output")

    sparkContext.stop()
  }
}



标签:sparkContext,val,分区,RDD,spark,SparkConf
From: https://www.cnblogs.com/baoshu/p/spark_5.html

相关文章

  • 分区磁盘空间不足时的简易处理
    背景某些操作系统进行安装时默认的"/"分区的空间较为狭小虽然可以通过增加磁盘扩展的方式进行处理.但是某些特殊机器,比如SMZYJ等系统,处理起来的成本很高.此时应该......
  • Ansible磁盘分区
    #main.yaml----hosts:localvars:disks:-disk:/dev/sdbnumber:1start:0%end:50%fstype:x......
  • 把PE写进U盘的一个分区
    我们通常情况下,在制作PE启动盘的时候,软件都会将U盘格式化,然后进行制作,但是如果我们的U盘有分区的话,制作完启动盘,就又被软件给只分成了一个区。如下图是用老毛桃制作启动盘之......
  • IDEA提交任务到spark standalone集群
    参考文章:在idea里面怎么远程提交spark任务到yarn集群代码注意setJars,提交的代码,要提前打好包。否则会报找不到类的错误个人理解就相当于运行的main方法是起了一个spar......
  • Flink/Spark中ETL的简单模版
    我们往往可以忽略外界的干扰因素,避免焦虑,专心做自己想做的事情,反正焦虑又解决不了问题引言使用flink或者spark的时候,写好固定的模版很重要,对于一下etl的实时任务,只需要执行......
  • 13_Kafka高级_生产者分区策略
    刚才主要讲的是存储的内容,主要的index和.log两个文件。kafka的生产者:有个分区策略:分区的原因:1、可以以partition为单位进行读写2、提高集群的负载能力。生产者分区的原......
  • CentOS 7 调整 XFS 格式的 LVM 分区大小
    XFS是centos7预装的操作系统,XFS只能扩大不能缩小,所以需要利用xfsdump/xfsrestore工具在必须缩小lvm的情况下,备份与还原资料需求:把/vg-home分区缩小至20G,把多出的容......
  • postgresql函数:定期删除模式下指定天数前的表数据及分区物理表
    一、现有函数--1、现有函数调用selectods.deletePartitionIfExists('fact_ship'||'_'||to_char(CURRENT_DATE-INTERVAL'2month','yyyymmdd'));--2、函数内容CREAT......
  • SparkSQL 核心编程
    DataFrame创建DataFrame从Spark数据源进行创建➢查看Spark支持创建文件的数据源格式scala>spark.read.csvformatjdbcjsonloadoptionoptionsorcparqu......
  • Flink 按键分区状态基本介绍
    在实际应用中,我们一般都需要将数据按照某个key进行分区,然后再进行计算处理;所以最为常见的状态类型就是KeyedState。之前介绍到keyBy之后的聚合、窗口计算,算子所持有的状态......