首页 > 编程语言 >【Python】PySpark 数据计算 ② ( RDD#flatMap 方法 | RDD#flatMap 语法 | 代码示例 )

【Python】PySpark 数据计算 ② ( RDD#flatMap 方法 | RDD#flatMap 语法 | 代码示例 )

时间:2023-08-07 12:38:52浏览次数:45  
标签:flatMap PySpark 对象 元素 示例 RDD 嵌套



文章目录

  • 一、RDD#flatMap 方法
  • 1、RDD#flatMap 方法引入
  • 2、解除嵌套
  • 3、RDD#flatMap 语法说明
  • 二、代码示例 - RDD#flatMap 方法







一、RDD#flatMap 方法




1、RDD#flatMap 方法引入



RDD#map 方法 可以 将 RDD 中的数据元素 逐个进行处理 , 处理的逻辑 需要用外部 通过 参数传入 map 函数 ;

RDD#flatMap 方法 是 在 RDD#map 方法 的基础上 , 增加了 " 解除嵌套 " 的作用 ;

RDD#flatMap 方法 也是 接收一个 函数 作为参数 , 该函数被应用于 RDD 中的每个元素及元素嵌套的子元素 , 并返回一个 新的 RDD 对象 ;



2、解除嵌套



解除嵌套 含义 : 下面的的 列表 中 , 每个元素 都是一个列表 ;

lst = [[1, 2], [3, 4, 5], [6, 7, 8]]

如果将上述 列表 解除嵌套

lst = [1, 2, 3, 4, 5, 6, 7, 8]



RDD#flatMap 方法 先对 RDD 中的 每个元素 进行处理 , 然后再 将 计算结果展平放到一个新的 RDD 对象中 , 也就是 解除嵌套 ;

这样 原始 RDD 对象 中的 每个元素 , 都对应 新 RDD 对象中的若干元素 ;



3、RDD#flatMap 语法说明



RDD#flatMap 语法说明 :

newRDD = oldRDD.flatMap(lambda x: [element1, element2, ...])

旧的 RDD 对象 oldRDD 中 , 每个元素应用一个 lambda 函数 , 该函数返回多个元素 , 返回的多个元素就会被展平放入新的 RDD 对象 newRDD 中 ;



代码示例 :

# 将 字符串列表 转为 RDD 对象
rdd = sparkContext.parallelize(["Tom 18", "Jerry 12", "Jack 21"])

# 应用 map 操作,将每个元素 按照空格 拆分
rdd2 = rdd.flatMap(lambda element: element.split(" "))






二、代码示例 - RDD#flatMap 方法



代码示例 :

"""
PySpark 数据处理
"""

# 导入 PySpark 相关包
from pyspark import SparkConf, SparkContext
# 为 PySpark 配置 Python 解释器
import os
os.environ['PYSPARK_PYTHON'] = "Y:/002_WorkSpace/PycharmProjects/pythonProject/venv/Scripts/python.exe"

# 创建 SparkConf 实例对象 , 该对象用于配置 Spark 任务
# setMaster("local[*]") 表示在单机模式下 本机运行
# setAppName("hello_spark") 是给 Spark 程序起一个名字
sparkConf = SparkConf() \
    .setMaster("local[*]") \
    .setAppName("hello_spark")

# 创建 PySpark 执行环境 入口对象
sparkContext = SparkContext(conf=sparkConf)

# 打印 PySpark 版本号
print("PySpark 版本号 : ", sparkContext.version)

# 将 字符串列表 转为 RDD 对象
rdd = sparkContext.parallelize(["Tom 18", "Jerry 12", "Jack 21"])

# 应用 map 操作,将每个元素 按照空格 拆分
rdd2 = rdd.flatMap(lambda element: element.split(" "))

# 打印新的 RDD 中的内容
print(rdd2.collect())

# 停止 PySpark 程序
sparkContext.stop()

执行结果 :

Y:\002_WorkSpace\PycharmProjects\pythonProject\venv\Scripts\python.exe Y:/002_WorkSpace/PycharmProjects/HelloPython/hello.py
23/07/31 23:02:58 WARN Shell: Did not find winutils.exe: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/07/31 23:02:59 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
PySpark 版本号 :  3.4.1
['Tom', '18', 'Jerry', '12', 'Jack', '21']

Process finished with exit code 0

【Python】PySpark 数据计算 ② ( RDD#flatMap 方法 | RDD#flatMap 语法 | 代码示例 )_代码示例


标签:flatMap,PySpark,对象,元素,示例,RDD,嵌套
From: https://blog.51cto.com/u_14202100/6992394

相关文章

  • 【Python】PySpark 数据计算 ④ ( RDD#filter 方法 - 过滤 RDD 中的元素 | RDD#distin
    文章目录一、RDD#filter方法1、RDD#filter方法简介2、RDD#filter函数语法3、代码示例-RDD#filter方法示例二、RDD#distinct方法1、RDD#distinct方法简介2、代码示例-RDD#distinct方法示例一、RDD#filter方法1、RDD#filter方法简介RDD#filter方法可以根据指定......
  • container/ring 使用示例
    packagemainimport("container/ring""fmt")varsizeint=10funcmain(){myRing:=ring.New(size)fmt.Println("Emptyring:",*myRing)fori:=0;i<myRing.Len()-1;i++{myRing.Value......
  • 参考示例之“复制对象|拷贝对象|BeanUtils工具类学习”
    //设置需要拷贝的字段Set<String>targetSet=newHashSet<>();targetSet.addAll(Arrays.asList("totalRefund","actualAdvertisingCost","expensesOfTaxation"));//调用拷贝方法copyProperties(com......
  • 使用Locust进行接口性能测试:安装、命令参数解析与示例解读(一)
    “Locust是一款开源的Python性能测试工具,它可以模拟大量并发用户对网站或者其他接口进行压力测试”一、Locust简介与安装1.使用pip安装Locust:pip3installlocust2.通过GitHub克隆项目并安装(推荐Python3):gitclonehttps://github.com/locustio/locustcdlocustpython......
  • html滚动示例
    <!DOCTYPEhtml><htmllang="en"><head><metacharset="UTF-8"><title>滚动测试</title><scriptsrc="https://code.jquery.com/jquery-3.7.0.min.js"crossorigin="anonymous"......
  • NET7下的WEB API示例
    NET7下的WEBAPI示例 [Route("api/[controller]")][ApiController]publicclassShopADController:ControllerBase{privatereadonlyIRepository<Model.ShopAD,int>_shopAD;publicShopADController(IRepository&l......
  • 【Azure K8S | AKS】在AKS集群中创建 PVC(PersistentVolumeClaim)和 PV(PersistentVol
    问题描述在AKS集群中创建PVC(PersistentVolumeClaim)和PV(PersistentVolume)示例 问题解答在AzureKubernetesService(AKS)的官方网站中,关于存储的选项介绍中,并没有具体的yaml实例来创建PV,PVC。特别是使用自定义的Disk的情况。本文将根据以上图片中的AzureManagedDisk+......
  • 网络工具示例
    如指定IP包长度大于100:tcpdump-ieth0-n'ip[2:2]>100'traceroute使用oot@zh-hz-hr-ygyradius~]#traceroute-I-p2000192.168.1.1tracerouteto192.168.1.1(192.168.1.1),30hopsmax,60bytepackets1gateway(192.168.3.250)3.987ms4.331......
  • Spark Core源码分析: RDD基础
    RDD RDD初始参数:上下文和一组依赖1.abstractclass2.@[email protected] 以下需要仔细理清:AlistofPartitionsFunctiontocomputesplit(subRDDimpl)AlistofDependenciesPartitionerforK-VRDDs(Optional)Preferredl......
  • 多语言API接口接入电商平台获得商品快递费用源代码演示示例
     商品快递费用API接口的作用是通过调用接口获取特定商品的快递费用信息。具体而言,该接口可以提供以下功能和作用:实时获取快递费用:通过API接口可以实时查询不同快递公司对于指定商品的运费费用。用户可以根据商品的重量、尺寸、寄送地址等信息,调用接口获取最准确的快递费用。便于物......