首页 > 编程语言 >python学习之路 - PySpark快速入门

python学习之路 - PySpark快速入门

时间:2024-08-31 22:51:35浏览次数:11  
标签:SparkContext 入门 PySpark python os rdd conf sc SparkConf

目录

一、PySpark实战

1、前言介绍

Spark:Spark是用于大规模数据处理的统一分析引擎,是一款分布式的计算框架,用于调度成百上千的服务器集群,计算TB、PB、EB等海量数据

pySpark:Spark对python的支持,就体现在python的第三方库pySpark上

2、基础准备

a、pySpark库的安装

命令:pip install pyspark
不知道操作步骤的可以看此文章 第六节 安装第三方python包

b、构建pySpark执行环境入口对象
from pyspark import SparkConf,SparkContext
#创建SparkConf类对象   setMaster:设置运行模式   setAppName:当前spark类的名称
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")       
#基于SparkConf类对象创建SparkContext类对象
sc = SparkContext(conf=conf)
#打印pySpark的运行版本
print(sc.version)
#停止Sparkcontext对象的运行
sc.stop()
c、pySpark编程模型
  • 数据输入:通过SparkContext类对象的成员方法完成数据的读取操作,读取后得到RDD类对象
  • 数据处理:通过RDD类对象的成员方法完成各种数据计算的需求
  • 数据输出:将处理完成后的RDD对象调用各种成员方法完成写出文件等操作

3、数据输入

a、python数据容器转RDD对象
  • 支持的数据容器有:list,tuple,set,dict,str
  • str容器会输出单个字符,字典容器会输出所有key,其他容器会输出原本内容
from pyspark import SparkConf,SparkContext

#定义数据容器
list = ['1', '2', '3']
tuple = ('1', '2', '3')
set = {'1', '2', '3'}
dict = {'1': 'abc', '2': 'def', '3': 'ghi'}
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc = SparkContext(conf=conf)
#将数据容器转换为RDD
rdd = sc.parallelize(dict)
print(rdd.collect())
sc.stop()
b、读取文件内容转RDD对象
  • 文件的每一行会变为一个元素

如创建一个文件,内容如图。
用下面代码取文件内容转换为RDD对象并输出

在这里插入图片描述

from pyspark import SparkConf,SparkContext
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc = SparkContext(conf=conf)
rdd = sc.textFile(文件地址)
print(rdd.collect())
sc.stop()

输出结果为:
['这是一个文件内容。', '但是', '这才是第三行内容', '你猜这是第几行', '对了,这是第五行']

4、数据计算

RDD中内置了丰富的成员方法,也叫“算子”

a、map算子
  • 功能:将RDD的数据一条一条处理(处理的逻辑是基于map算子中接收的处理函数),返回新的RDD
  • 多个map方法之间可以链式调用

案例1:将list中的每个元素都乘以10

from pyspark import SparkConf,SparkContext
#如果报错Python worker failed to connect back,需要引入os设置python安装位置
import os
os.environ["PYSPARK_PYTHON"] = "F:/Python/Python311/python.exe"

list = ['1', '2', '3']
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc = SparkContext(conf=conf)
rdd = sc.parallelize(list)
#写法一
# rdd2 = rdd.map(lambda x:int(x) * 10)

#写法二
def func(x):
    return int(x) * 10
rdd2 = rdd.map(func)
rdd2 = rdd.map(func)
print(rdd2.collect())
sc.stop()

结果为:
[10, 20, 30]

案例2:将list中的每个元素都先乘以10,再加上5,分为两个map写

from pyspark import SparkConf,SparkContext
import os
os.environ["PYSPARK_PYTHON"] = "F:/Python/Python311/python.exe"

list = ['1', '2', '3']
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc = SparkContext(conf=conf)
rdd = sc.parallelize(list)
rdd2 = rdd.map(lambda x:int(x) * 10).map(lambda x: int(x) + 5)		#这里支持链式调用
print(rdd2.collect())
sc.stop()
b、flatMap算子
  • 功能:对RDD执行map操作,然后解除嵌套操作
  • 解除嵌套:假如输入的list的多层嵌套的,那么最后的结果全部元素都为list的一层

案例:将多层嵌套的 list 取出所有元素放到一层中

from pyspark import SparkConf,SparkContext
import os
os.environ["PYSPARK_PYTHON"] = "F:/Python/Python311/python.exe"

list = [[1,2,3],[4,5,6],[7,8]]
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc = SparkContext(conf=conf)
rdd = sc.parallelize(list)
#写法一
rdd2 = rdd.flatMap(lambda x: x)
print(rdd2.collect())
sc.stop()

结果为:
[1, 2, 3, 4, 5, 6, 7, 8]
c、reduceByKey算子
  • 功能:针对KV型的RDD,自动按照key分组,然后根据提供的聚合逻辑,完成组内数据(value)的聚合操作
  • KV型的RDD其实就是二元元组,比如:[(‘a’,1) , (‘b’,1) , (‘c’,1)],每个元组中第一个值为key,第二个值为value

案例:将男女分组,并且计算两组的分数总和

from pyspark import SparkConf, SparkContext
import os

os.environ["PYSPARK_PYTHON"] = "F:/Python/Python311/python.exe"

list = [('男', 99), ('男', 88), ('女', 77), ('女', 66), ('男', 55)]
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc = SparkContext(conf=conf)
rdd = sc.parallelize(list)
# 写法一
rdd2 = rdd.reduceByKey(lambda a, b: a + b)
print(rdd2.collect())
sc.stop()

结果为:
[('女', 143), ('男', 242)]
d、综合案例

读取文件内容,统计各个元素出现次数,文件内容如下:

在这里插入图片描述

from pyspark import SparkConf, SparkContext
import os

os.environ["PYSPARK_PYTHON"] = "F:/Python/Python311/python.exe"

list1 = []
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc = SparkContext(conf=conf)
#读取文件内容
rdd = sc.textFile("C://Users//HLY//Desktop//test.txt")
#先将内容切割成单个元素并一层展示,再将元素设置成二元元组,最后将元素分组统计
rdd2 = rdd.flatMap(lambda x : x.strip().split(" ")).map(lambda x : (x,1)).reduceByKey(lambda a, b: a + b)
print(rdd2.collect())
sc.stop()

结果为:
[('test2', 3), ('test3', 4), ('test', 3), ('test1', 3), ('test4', 4), ('test5', 4)]
e、filter算子
  • 功能:过滤想要的数据进行保留

案例:过滤出所有偶数

from pyspark import SparkConf, SparkContext
import os

os.environ["PYSPARK_PYTHON"] = "F:/Python/Python311/python.exe"

list1 = [1,2,3,4,5,6,7,8,9,10]
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc = SparkContext(conf=conf)
#读取文件内容
rdd = sc.parallelize(list1)
rdd2 = rdd.filter(lambda x : x % 2 == 0)
print(rdd2.collect())
sc.stop()

结果为:
[2, 4, 6, 8, 10]
f、distinct算子
  • 功能:对RDD中的数据进行去重,返回新的RDD

案例:对已有的列表进行去重

from pyspark import SparkConf, SparkContext
import os

os.environ["PYSPARK_PYTHON"] = "F:/Python/Python311/python.exe"

list1 = [1,1,2,2,2,2,3,3,3,4,4,4]
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc = SparkContext(conf=conf)
#读取文件内容
rdd = sc.parallelize(list1)
rdd2 = rdd.distinct()
print(rdd2.collect())
sc.stop()

结果为:
[1, 2, 3, 4]
g、sortBy算子
  • 功能:对RDD数据进行排序,基于指定的排序依据
  • 参数:
    • func:告知RDD是对那个数据进行排序,比如lambda x:x[1] 表示对rdd中第二列元素进行排序
    • ascending:True升序,False降序
    • numPartitions:用多少分区排序,单个分区时传1

案例:对给出的二元集合根据第二个元素进行降序排列

from pyspark import SparkConf, SparkContext
import os

os.environ["PYSPARK_PYTHON"] = "F:/Python/Python311/python.exe"

list1 = [('test', 1), ('test1', 5), ('test3', 2), ('test4', 4), ('test5', 8), ('test6', 7), ('test7', 6)]
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc = SparkContext(conf=conf)
# 读取文件内容
rdd = sc.parallelize(list1)
rdd2 = rdd.sortBy(lambda x: x[1], ascending=False)
print(rdd2.collect())
sc.stop()

结果为:
[('test5', 8), ('test6', 7), ('test7', 6), ('test1', 5), ('test4', 4), ('test3', 2), ('test', 1)]

5、数据输出

a、collect算子
  • 功能:将RDD各个分区内的数据统一收集到Driver当中,形成一个List对象

案例:输出RDD的内容

from pyspark import SparkConf, SparkContext
import os

os.environ["PYSPARK_PYTHON"] = "F:/Python/Python311/python.exe"

list1 = [1,2,3,4,5]
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc = SparkContext(conf=conf)
# 读取文件内容
rdd = sc.parallelize(list1)
print(rdd.collect())
sc.stop()

结果为:
[1,2,3,4,5]
b、reduce算子
  • 对RDD的全部数据按照传入的逻辑进行聚合,返回一个数字

案例:计算列表中的所有元素和

from pyspark import SparkConf, SparkContext
import os

os.environ["PYSPARK_PYTHON"] = "F:/Python/Python311/python.exe"

list1 = [1, 2, 3, 4, 5]
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc = SparkContext(conf=conf)
# 读取文件内容
rdd = sc.parallelize(list1)
num = rdd.reduce(lambda a, b: a + b)
print(num)
sc.stop()

结果为:
15
c、take算子
  • 功能:取RDD的前N个元素,组成 List 返回

案例:取出列表前3个元素

from pyspark import SparkConf, SparkContext
import os

os.environ["PYSPARK_PYTHON"] = "F:/Python/Python311/python.exe"

list1 = [1, 2, 3, 4, 5]
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc = SparkContext(conf=conf)
# 读取文件内容
rdd = sc.parallelize(list1)
list = rdd.take(3)
print(list)
sc.stop()

结果为:
[1, 2, 3]
e、count算子
  • 功能:计算RDD有多少条数据,返回一个数字

案例:获取列表中的元素个数

from pyspark import SparkConf, SparkContext
import os

os.environ["PYSPARK_PYTHON"] = "F:/Python/Python311/python.exe"

list1 = [1, 2, 3, 4, 5]
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc = SparkContext(conf=conf)
# 读取文件内容
rdd = sc.parallelize(list1)
num = rdd.count()
print(num)
sc.stop()

结果为:
5
f、saveAsTextFile算子
  • 功能:将RDD的数据写入文本文件中
  • 执行此方法需要安装hadoop环境,具体配置过程可以看 这篇文章
  • 其输出内容是根据区分决定的,有多少分区就会输出多少个文件。内容会均匀分摊到各个文件中。分区数默认与电脑的CPU内核一致

案例1:输出列表内容到各个文件中

from pyspark import SparkConf, SparkContext
import os

os.environ["PYSPARK_PYTHON"] = "F:/Python/Python311/python.exe"
os.environ["HADOOP_HOME"] = "D:/hadoop/hadoop-3.0.0"

list1 = [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16]
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc = SparkContext(conf=conf)
# 读取文件内容
rdd = sc.parallelize(list1)
rdd.saveAsTextFile("C:/Users/HLY/Desktop/test")
sc.stop()

在这里插入图片描述
结果会生成16个内容文件和2个状态文件,并且16个内容文件中每个文件中都有一个数字

案例2:将列表内容输出到一个文件中

#方法一:配置全局并行度为1

from pyspark import SparkConf, SparkContext
import os

os.environ["PYSPARK_PYTHON"] = "F:/Python/Python311/python.exe"
os.environ["HADOOP_HOME"] = "D:/hadoop/hadoop-3.0.0"

list1 = [1, 2, 3, 4, 5]
#设置全局并行度为1
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app").set("spark.default.parallelism", "1")
sc = SparkContext(conf=conf)
# 读取文件内容
rdd = sc.parallelize(list1)
rdd.saveAsTextFile("C:/Users/HLY/Desktop/test")
sc.stop()
#方法二:设置分区数为1

from pyspark import SparkConf, SparkContext
import os

os.environ["PYSPARK_PYTHON"] = "F:/Python/Python311/python.exe"
os.environ["HADOOP_HOME"] = "D:/hadoop/hadoop-3.0.0"

list1 = [1, 2, 3, 4, 5]
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc = SparkContext(conf=conf)
#设置分区数为1
rdd = sc.parallelize(list1,numSlices= 1)
rdd.saveAsTextFile("C:/Users/HLY/Desktop/test")
sc.stop()

在这里插入图片描述

最后会生成1个结果文件,3个其他文件,并且内容都会在 part-00000 文件中显示

标签:SparkContext,入门,PySpark,python,os,rdd,conf,sc,SparkConf
From: https://blog.csdn.net/qq_41444892/article/details/141719080

相关文章

  • Nautilus-NRP入门教程-ChatGPT翻译版
    原文链接快速开始指南这个快速开始指南将:指导你获取与NRP(NautilusResearchPlatform)进行交互所需的工具。配置你的Kubernetes客户端与NRP集群进行通信。显示查询NRP集群以查看正在运行的进程的命令。提供进一步提交作业和学习Kubernetes的资源。请注意容器......
  • Python入门
    #1.输出:Python是一种跨平台、开源、免费的高级编程语言。print("Python是一种跨平台、开源、免费的高级编程语言。")foriinrange(1,10):print(str(i)*i)foriinrange(1,10):j=1whilej<=i:print("*",end='')j+=1print()......
  • opencv/c++的一些简单的操作(入门)
    目录读取图片读取视频读取摄像头图像处理腐蚀膨胀调整图像大小裁剪和缩放 绘制绘制矩形绘制圆形绘制线条透视变换颜色检测轮廓查找人脸检测检测人脸检测嘴巴可适当调整参数读取图片读取路径widows使用vissto一定是\斜杠#include<opencv2/imgcodec......
  • 基于Python的顾客购物数据可视化分析
    数据可视化分析实验数据集简介        本文在实验中考虑到实验使用设备的性能和环境的局限性,采用了kaggle官网上的的消费者购物数据集,数据地址:https://www.kaggle.com/datasets/iamsouravbanerjee/customer-shopping-trends-dataset。此数据包含了3900条记录,每条......
  • # yyds干货盘点 # 盘点一个Python正则表达式问题
    大家好,我是皮皮。一、前言前几天在Python最强王者交流群【大锤子】问了一个Python正则表达式处理的问题,这里拿出来给大家分享下。下图是代码:二、实现过程这个问题确看上去是正则表达式的问题,这里【杯酒】提出问题并给出建议:使用+号,就能匹配所有符合条件的文字,而不是第一段。不过后......
  • python和c语言有什么不同
    1、语言类型Python是一种基于解释器的语言,解释器会逐行读取代码;首先将Python编译为字节码,然后由大型C程序解释。C是一种编译语言,完整的源代码将直接编译为机器代码,由CPU直接执行。2、内存管理Python使用自动垃圾收集器进行内存管理。在C语言中,程序员必须自己进行内存管......
  • python浮点数怎么写
    python提供了三种浮点值:内置的float与complex类型,以及标准库的decimal.Decimal类型。float类型存放双精度的浮点数,具体取值范围依赖于构建python的c编译器,由于精度受限,进行相等性比较不可靠。如果需要高精度,可使用decimal模块的decimal.Decimal,这种类型可以准确的表示循环......
  • 硬件工程师入门笔记---LDO原理和应用(来源--Trent带你学硬件)
    LDO原理LDO参数LDO手册解读 LDO设计要点及案例分析......
  • 【数模资料包】最新数模国赛word+latex模版|数模常用的算法python+matlab代码
     【2024最全国赛研赛数模资料包】C君珍贵国一数模资料|最新数模国赛word+latex模版|数模常用的算法python+matlab代码国赛指:高教社杯全国大学生数学建模竞赛,研赛指:华为杯研究生数学建模竞赛。资料内容具体看文末卡片以下是三个相关的资料内容:1C君珍贵国一数模资料2最......
  • Datawhale X 李宏毅苹果书 AI夏令营-深度学习入门班-task2-分段线性曲线
    引入上一篇文章中我们了解了机器学习中最基本的模型线性模型(Linearmodels),由于其过于简单(只能调整其斜率w与截距b)无法反映真实数据中多数折线或曲线情况这种限制称为模型偏差(modelbias)。下文介绍:如何构建更复杂,误差更小的函数解决问题。注:此处的bias与线性模型中的b不同。......