首页 > 其他分享 >Spark-Scala语言实战(13)

Spark-Scala语言实战(13)

时间:2024-04-06 23:59:05浏览次数:18  
标签:13 p1 val Scala 累加器 RDD 键值 Spark 方法

在之前的文章中,我们学习了如何在spark中使用键值对中的keys和values,reduceByKey,groupByKey三种方法。想了解的朋友可以查看这篇文章。同时,希望我的文章能帮助到你,如果觉得我的文章写的不错,请留下你宝贵的点赞,谢谢。

Spark-Scala语言实战(12)-CSDN博客文章浏览阅读722次,点赞19次,收藏15次。今天开始的文章,我会带给大家如何在spark的中使用我们的键值对方法,今天学习键值对方法中的keys和values,reduceByKey,groupByKey三种方法。希望我的文章能帮助到大家,也欢迎大家来我的文章下交流讨论,共同进步。https://blog.csdn.net/qq_49513817/article/details/137385224今天的文章开始,我会继续带着大家如何在spark的中使用我们的键值对里的方法。今天学习键值对方法中的fullOuterJoin,zip,combineByKey三种方法。

目录

一、知识回顾

二、键值对方法

1.fullOuterJoin

2.zip

3.combineByKey

拓展-方法参数设置


一、知识回顾

 上一篇文章中我们学习了键值对的三种方法,分别是keys和values,reduceByKey,groupByKey。

keys和values分别对应了我们的键与值。

我们可以用它们来创建我们的RDD

 reduceByKey可以进行统计,将有相同键的值进行相加,统一输出。

而 groupByKey方法就是对我们的键值对RDD进行分组了

它可以将我们的相同的键,不同的值组合成一个组。

那么,开始今天的学习吧~ 

二、键值对方法

1.fullOuterJoin

  •  fullOuterJoin()方法用于对两个RDD进行全外连接,保留两个RDD中所有键的连接结果。
import org.apache.spark.{SparkConf, SparkContext}
object p1 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local").setAppName("p2")
    val sc = new SparkContext(conf)
    // 创建两个RDD(弹性分布式数据集)
    val p1 = sc.parallelize(Seq(("a1", "1"), ("a2", "2"), ("a3", "3")))
    val p2 = sc.parallelize(Seq(("a2", "A"), ("a3", "B"), ("a4", "C")))
    // 将RDD转换为键值对
    val pp1 = p1.map { case (key, value) => (key, value) }
    val pp2 = p2.map { case (key, value) => (key, value) }
    // 执行fullOuterJoin操作
    val ppp = pp1.fullOuterJoin(pp2)
    // 收集结果并打印
    ppp.collect().foreach(println)
    }
}

我们的代码创建了两个键值对RDD,那么使用 fullOuterJoin方法全外连接那么两个键值对都会连接。

可以看到两个键值对里的键与值都连接上了,互相没有的值即显示None值。 

2.zip

  • zip()方法用于将两个RDD组合成键值对RDD,要求两个RDD的分区数量以及元素数量相同,否则会抛出异常。
  • 将两个RDD组合成Key/Value形式的RDD,这里要求两个RDD的partition数量以及元素数量都相同,否则会抛出异常。
import org.apache.spark.{SparkConf, SparkContext}
object p1 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local").setAppName("p2")
    val sc = new SparkContext(conf)
    // 创建两个RDD
    val p1 = sc.parallelize(Seq(1, 2, 3))
    val p2 = sc.parallelize(Seq("a", "b", "c"))
    // 使用zip方法将两个RDD组合在一起
    val pp1 = p1.zip(p2)
    val pp2 = p2.zip(p1)
    // 收集结果并打印
    pp1.collect().foreach(println)
    pp2.collect().foreach(println)
    }
}

 代码创建了两个不同的RDD键值对,分别使用p1zip方法p2与p2zip方法p1,那么它们输出的结果会是一样的吗?

可以看到是不一样的,谁在前面谁就是键,反之是值。 

3.combineByKey

  • combineByKey()方法是Spark中一个比较核心的高级方法,键值对的其他一些高级方法底层均是使用combineByKey()方法实现的,如groupByKey()方法、reduceByKey()方法等。
  • combineByKey()方法用于将键相同的数据聚合,并且允许返回类型与输入数据的类型不同的返回值。
  • combineByKey()方法的使用方式如下。
    • combineByKey(createCombiner,mergeValue,mergeCombiners,numPartitions=None)
import org.apache.spark.{SparkConf, SparkContext}
object p1 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local").setAppName("p2")
    val sc = new SparkContext(conf)
    val p1 = sc.parallelize(Seq(("a", 1), ("b", 2), ("a", 3), ("b", 4), ("c", 5)))
    val p2 = p1.combineByKey(
      // createCombiner: 将第一个值转换为累加器
      (v: Int) => v,
      // mergeValue: 将新的值加到累加器上
      (c: Int, v: Int) => c + v,
      // mergeCombiners: 合并两个累加器
      (c1: Int, c2: Int) => c1 + c2
    )
    p2.collect().foreach { case (key, value) =>
      println(s"Key: $key, Value: $value")
    }
  }
}

我的代码中: 

createCombiner: 这个函数定义了如何将每个键的第一个值转换为初始的累加器值。 

代表着每个键,第一个出现的值将作为累加器的初始值。

mergeValue: 这个函数定义了如何将新值与当前的累加器值合并。在我的代码中,我将新值与累加器相加。

代表着每个键的后续值,它们都会被加到当前的累加器值上。

mergeCombiners: 这个函数定义了当两个累加器(对应于同一个键但可能来自不同的分区)需要合并时应该执行的操作。在我的代码中,也是将两个累加器值相加

这确保了无论数据如何在分区之间分布,最终每个键都会得到正确的累加结果。

看看输出效果

可以看到我们的键值对成功累加。

快去试试吧~ 

拓展-方法参数设置

方法参数描述例子
fullOuterJoinotherRDD另一个要与之进行全外连接的RDDrdd1.fullOuterJoin(rdd2)
fullOuterJoinnumPartitions结果RDD的分区数(可选)rdd1.fullOuterJoin(rdd2, numPartitions=10)
zipotherRDD要与之进行zip操作的另一个RDDrdd1.zip(rdd2)
combineByKeycreateCombiner处理第一个出现的每个键的值的函数lambda v: (v, 1)
combineByKeymergeValue合并具有相同键的值的函数lambda acc, v: (acc[0] + v, acc[1] + 1)
combineByKeymergeCombiners合并两个累积器的函数lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])
combineByKeynumPartitions结果RDD的分区数(可选)rdd.combineByKey(createCombiner, mergeValue, mergeCombiners, numPartitions=5)

标签:13,p1,val,Scala,累加器,RDD,键值,Spark,方法
From: https://blog.csdn.net/qq_49513817/article/details/137412258

相关文章

  • Protobuf - Scalar Types
    InProtobuf,typesthatareconsideredsimpleandthatareprovidedbyProtobufoutoftheboxare calledscalartypes.Wecanuse15ofsuchtypes,aslistedhere:•int32•int64•uint32•uint64•sint32•sint64•fixed32 (unsigned,cannotstorenega......
  • CF613D Kingdom and its Cities
    CF613DKingdomanditsCities虚树优化dp考虑无解的情况,若有两个重要城市相邻,那么无解。对于有解的情况,朴素的如何求解最少占领的城市数?考虑从叶子节点开始向上贪心,假如当前\(u\)节点为关键点,那么对于它的子树\(v\),若它的关键点能到\(v\),就要和他断开。如果\(u\)节点不......
  • 20211325高进涛加密API研究
    密码引擎-加密API研究 Content任务详情0.研究学习原始文档CryptoAPIPKCS#11GM/T0016-2012智能密码钥匙密码应用接口规范GM/T0018-2012密码设备应用接口规范1.总结这些API在编程中的使用方式CryptoAPIPKCS#11SKF2.列出这些API包含的函数,进行分类,并总结它......
  • 1311. 分跳绳
    题目:分跳绳问题描述:学校新买来m根跳绳,每个班分n根,最多可以分给几个班的同学?还剩多少根?输入&输出:接下来,直接献上代码:(C++)#include<iostream>usingnamespacestd;intmain(){intm,n;cin>>m>>n;cout<<m/n<<""<<m%n;return0;} 记得点赞+关......
  • C113 带修莫队 P1903 [国家集训队] 数颜色/维护队列
    视频链接:   LuoguP1903[国家集训队]数颜色/维护队列//带修莫队O(n^(5/3))#include<iostream>#include<cstring>#include<algorithm>#include<cmath>usingnamespacestd;constintN=1000005;intn,m,B,mq,mr,a[N];intsum,cnt[N],ans[N];st......
  • 代码随想录算法训练营DAY18|C++二叉树Part.5|513.找树左下角的值、112. 路径总和、113
    文章目录513.找树左下角的值层序-迭代遍历前中后序-递归遍历思路伪代码CPP代码112.路径总和、113.路径总和II112.路径总和思路伪代码实现CPP代码113.路径总和II思路伪代码实现CPP代码实现106\105.从中(前)序与后(中)序遍历序列构造二叉树106.从中序与后序遍历序列......
  • 最新AI创作系统ChatGPT网站系统源码+Ai绘画网站源码+Suno-v3-AI音乐生成大模型(sparkAi
    一、前言SparkAi创作系统是基于ChatGPT进行开发的Ai智能问答系统和Midjourney绘画系统,支持OpenAI-GPT全模型+国内AI全模型。本期针对源码系统整体测试下来非常完美,那么如何搭建部署AI创作ChatGPT?小编这里写一个详细图文教程吧。已支持GPT语音对话、GPT-4模型、DALL-E3文生图、......
  • 【LeetCode刷题记录】简单篇-13-罗马数字转整数
    【题目描述】 罗马数字包含以下七种字符: I, V, X, L,C,D 和 M。字符数值I1V5X10L50C100D500M1000例如,罗马数字 2 写做 II ,即为两个并列的1。12 ......
  • Spark面试整理-解释Spark中的内存管理和持久化机制
    在Apache Spark中,内存管理和持久化机制是核心特性,它们对于提高大规模数据处理的效率和性能至关重要。内存管理统一的内存管理:Spark使用统一的内存管理模型,将执行内存(用于计算如shuffle、join等)和存储内存(用于缓存数据如RDDs)合并在一起。这种模型提供了更高的灵活性和效......
  • mysql 报错 ERROR 1396 (HY000): Operation ALTER USER failed for root@localhost 解
    mysql修改密码ALTERUSER‘root’@‘localhost’IDENTIFIEDBY‘123’;时,报错ERROR1396(HY000):OperationALTERUSERfailedforroot@localhost解决方案:2024-4-3段子手1681、首先连接权限数据库:mysql>usemysql;2、查看user主机名:mysql>selectuse......