首页 > 编程语言 >Python大数据处理利器,PySpark的入门实战

Python大数据处理利器,PySpark的入门实战

时间:2023-02-04 11:33:28浏览次数:40  
标签:Salary Name PySpark Python age df 20000 数据处理 spark


PySpark极速入门

一:Pyspark简介与安装

什么是Pyspark?

PySpark是Spark的Python语言接口,通过它,可以使用Python API编写Spark应用程序,目前支持绝大多数Spark功能。目前Spark官方在其支持的所有语言中,将Python置于首位。

Python大数据处理利器,PySpark的入门实战_spark

如何安装?

在终端输入

pip intsall pyspark

Python大数据处理利器,PySpark的入门实战_pandas_02

或者使用pycharm,在GUI界面安装

Python大数据处理利器,PySpark的入门实战_pandas_03

二:编程实践

加载、转换数据

# 导入pyspark
# 导入pandas, 稍后与pyspark中的数据结构做对比
import pyspark
import pandas as pd

在编写spark程序前,我们要创建一个SparkSession对象

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Spark极速入门").getOrCreate()

可以看到会话的一些信息:使用的Spark版本、运行模式、应用程序名字

演示环境用的是local本地模式, * 代表的是使用全部线程 如果想用集群模式的话,可以去查看集群搭建的相关教程 届时pyspark程序作为spark的客户端,设置连接集群,就是真正的分布式计算了 目前只是本地模式,用多线程去模拟分布式计算。

spark

Python大数据处理利器,PySpark的入门实战_pandas_04

看看我们将用到的test1数据吧

Python大数据处理利器,PySpark的入门实战_开发语言_05

使用read方法,用option设置是否读取csv的头,再指定路径就可以读取数据了

df_spark = spark.read.option("header", "true").csv("./data/test1.csv")

看看是什么类型

type(df_spark)
pyspark.sql.dataframe.DataFrame

再看看用pandas读取是什么类型

type(pd.read_csv("./data/test1.csv"))
pandas.core.frame.DataFrame

可以发现Spark读取这种结构化数据时,用的也是和pandas类似的dataframe结构 这也是Spark应用最广泛的数据结构

使用show方法打印数据

df_spark.show()
+---------+---+----------+------+
| Name|age|Experience|Salary|
+---------+---+----------+------+
| Krish| 31| 10| 30000|
|Sudhanshu| 30| 8| 25000|
| Sunny| 29| 4| 20000|
| Paul| 24| 3| 20000|
| Harsha| 21| 1| 15000|
| Shubham| 23| 2| 18000|
+---------+---+----------+------+

使用printSchema方法打印元数据信息,发现明明是数值类型的,它却读取为了字符串类型

df_spark.printSchema()
root
|-- Name: string (nullable = true)
|-- age: string (nullable = true)
|-- Experience: string (nullable = true)
|-- Salary: string (nullable = true)

在读取时,加上类型推断,发现此时已经能正确读取了

df_spark = spark.read.option("header", "true").csv("./data/test1.csv",inferSchema=True)
df_spark.printSchema()
root
|-- Name: string (nullable = true)
|-- age: integer (nullable = true)
|-- Experience: integer (nullable = true)
|-- Salary: integer (nullable = true)

选择某些列, 可以发现不管选多列还是选单列,返回的都是dataframe 返回的也同样可以printSchema、show等dataframe使用的方法,做到了结构的统一

df_spark.select(["Name", "age"])
DataFrame[Name: string, age: int]
df_spark.select("Name")
DataFrame[Name: string]
df_spark.select(["Name", "age", "Salary"]).printSchema()
root
|-- Name: string (nullable = true)
|-- age: integer (nullable = true)
|-- Salary: integer (nullable = true)

不用select,而用[]直接选取,就有点类似与pandas的series了

df_spark["Name"]
Column<'Name'>

column就不能直接show了

df_spark["age"].show()
---------------------------------------------------------------------------

TypeError Traceback (most recent call last)

Input In [15], in <cell line: 1>()
----> 1 df_spark["age"].show()


TypeError: 'Column' object is not callable

用describe方法可以对dataframe做一些简单的统计

df_spark.describe().show()
+-------+------+------------------+-----------------+------------------+
|summary| Name| age| Experience| Salary|
+-------+------+------------------+-----------------+------------------+
| count| 6| 6| 6| 6|
| mean| null|26.333333333333332|4.666666666666667|21333.333333333332|
| stddev| null| 4.179314138308661|3.559026084010437| 5354.126134736337|
| min|Harsha| 21| 1| 15000|
| max| Sunny| 31| 10| 30000|
+-------+------+------------------+-----------------+------------------+

用withColumn方法给dataframe加上一列

df_spark = df_spark.withColumn("Experience After 3 year", df_spark["Experience"] + 3)
df_spark.show()
+---------+---+----------+------+-----------------------+
| Name|age|Experience|Salary|Experience After 3 year|
+---------+---+----------+------+-----------------------+
| Krish| 31| 10| 30000| 13|
|Sudhanshu| 30| 8| 25000| 11|
| Sunny| 29| 4| 20000| 7|
| Paul| 24| 3| 20000| 6|
| Harsha| 21| 1| 15000| 4|
| Shubham| 23| 2| 18000| 5|
+---------+---+----------+------+-----------------------+

用drop方法删除列

df_spark = df_spark.drop("Experience After 3 year")
df_spark.show()
+---------+---+----------+------+
| Name|age|Experience|Salary|
+---------+---+----------+------+
| Krish| 31| 10| 30000|
|Sudhanshu| 30| 8| 25000|
| Sunny| 29| 4| 20000|
| Paul| 24| 3| 20000|
| Harsha| 21| 1| 15000|
| Shubham| 23| 2| 18000|
+---------+---+----------+------+

用withColumnRename方法重命名列

df_spark.withColumnRenamed("Name", "New Name").show()
+---------+---+----------+------+
| New Name|age|Experience|Salary|
+---------+---+----------+------+
| Krish| 31| 10| 30000|
|Sudhanshu| 30| 8| 25000|
| Sunny| 29| 4| 20000|
| Paul| 24| 3| 20000|
| Harsha| 21| 1| 15000|
| Shubham| 23| 2| 18000|
+---------+---+----------+------+

处理缺失值

看看接下来要带缺失值的test2数据吧

Python大数据处理利器,PySpark的入门实战_spark_06

CSeoe.png

df_spark = spark.read.csv("./data/test2.csv", header=True, inferSchema=True)
df_spark.show()
+---------+----+----------+------+
| Name| age|Experience|Salary|
+---------+----+----------+------+
| Krish| 31| 10| 30000|
|Sudhanshu| 30| 8| 25000|
| Sunny| 29| 4| 20000|
| Paul| 24| 3| 20000|
| Harsha| 21| 1| 15000|
| Shubham| 23| 2| 18000|
| Mahesh|null| null| 40000|
| null| 34| 10| 38000|
| null| 36| null| null|
+---------+----+----------+------+

用na.drop删除缺失值 how参数设置策略,any意思是只要一行里有缺失值,那就删了 any也是how的默认参数

df_spark.na.drop(how="any").show()
+---------+---+----------+------+
| Name|age|Experience|Salary|
+---------+---+----------+------+
| Krish| 31| 10| 30000|
|Sudhanshu| 30| 8| 25000|
| Sunny| 29| 4| 20000|
| Paul| 24| 3| 20000|
| Harsha| 21| 1| 15000|
| Shubham| 23| 2| 18000|
+---------+---+----------+------+

可以通过thresh参数设置阈值,代表超过一行中缺失值的数量超过这个值,才会被删除

df_spark.na.drop(how="any", thresh=2).show()
+---------+----+----------+------+
| Name| age|Experience|Salary|
+---------+----+----------+------+
| Krish| 31| 10| 30000|
|Sudhanshu| 30| 8| 25000|
| Sunny| 29| 4| 20000|
| Paul| 24| 3| 20000|
| Harsha| 21| 1| 15000|
| Shubham| 23| 2| 18000|
| Mahesh|null| null| 40000|
| null| 34| 10| 38000|
+---------+----+----------+------+

也可以用subset参数设置关注的列 下面代码意思是,在Experience列中,只要有缺失值就删掉

df_spark.na.drop(how="any", subset=["Experience"]).show()
+---------+---+----------+------+
| Name|age|Experience|Salary|
+---------+---+----------+------+
| Krish| 31| 10| 30000|
|Sudhanshu| 30| 8| 25000|
| Sunny| 29| 4| 20000|
| Paul| 24| 3| 20000|
| Harsha| 21| 1| 15000|
| Shubham| 23| 2| 18000|
| null| 34| 10| 38000|
+---------+---+----------+------+

用fillna填充缺失值, 可以用字典对各列的填充值进行设置

df_spark.fillna({'Name': 'unknown', 'age': 18, 'Experience': 0, 'Salary': 0}).show()
+---------+---+----------+------+
| Name|age|Experience|Salary|
+---------+---+----------+------+
| Krish| 31| 10| 30000|
|Sudhanshu| 30| 8| 25000|
| Sunny| 29| 4| 20000|
| Paul| 24| 3| 20000|
| Harsha| 21| 1| 15000|
| Shubham| 23| 2| 18000|
| Mahesh| 18| 0| 40000|
| unknown| 34| 10| 38000|
| unknown| 36| 0| 0|
+---------+---+----------+------+

还可以调用机器学习模块的相关方法, 通过设置策略,可以用平均数、众数等方式填充

from pyspark.ml.feature import Imputer

imputer = Imputer(
inputCols = ['age', 'Experience', 'Salary'],
outputCols = [f"{c}_imputed" for c in ['age', 'Experience', 'Salary']]
).setStrategy("mean")
imputer.fit(df_spark).transform(df_spark).show()
+---------+----+----------+------+-----------+------------------+--------------+
| Name| age|Experience|Salary|age_imputed|Experience_imputed|Salary_imputed|
+---------+----+----------+------+-----------+------------------+--------------+
| Krish| 31| 10| 30000| 31| 10| 30000|
|Sudhanshu| 30| 8| 25000| 30| 8| 25000|
| Sunny| 29| 4| 20000| 29| 4| 20000|
| Paul| 24| 3| 20000| 24| 3| 20000|
| Harsha| 21| 1| 15000| 21| 1| 15000|
| Shubham| 23| 2| 18000| 23| 2| 18000|
| Mahesh|null| null| 40000| 28| 5| 40000|
| null| 34| 10| 38000| 34| 10| 38000|
| null| 36| null| null| 36| 5| 25750|
+---------+----+----------+------+-----------+------------------+--------------+

过滤操作

还是切换到test1数据

df_spark = spark.read.csv("./data/test1.csv", header=True, inferSchema=True)
df_spark.show()
+---------+---+----------+------+
| Name|age|Experience|Salary|
+---------+---+----------+------+
| Krish| 31| 10| 30000|
|Sudhanshu| 30| 8| 25000|
| Sunny| 29| 4| 20000|
| Paul| 24| 3| 20000|
| Harsha| 21| 1| 15000|
| Shubham| 23| 2| 18000|
+---------+---+----------+------+

可以使用filter方法对数据进行过滤操作,类似于SQL中的where 可以使用字符串的方式,也可以利用column方式去传递条件

df_spark.filter("Salary <= 20000").show()
+-------+---+----------+------+
| Name|age|Experience|Salary|
+-------+---+----------+------+
| Sunny| 29| 4| 20000|
| Paul| 24| 3| 20000|
| Harsha| 21| 1| 15000|
|Shubham| 23| 2| 18000|
+-------+---+----------+------+
df_spark.filter(df_spark["Salary"]<=20000).show()
+-------+---+----------+------+
| Name|age|Experience|Salary|
+-------+---+----------+------+
| Sunny| 29| 4| 20000|
| Paul| 24| 3| 20000|
| Harsha| 21| 1| 15000|
|Shubham| 23| 2| 18000|
+-------+---+----------+------+

如果是字符串,用 and 表示同时满足多个条件 如果是用column,用( & ) 连接多个条件

df_spark.filter("Salary <= 20000 and age <= 24").show()
+-------+---+----------+------+
| Name|age|Experience|Salary|
+-------+---+----------+------+
| Paul| 24| 3| 20000|
| Harsha| 21| 1| 15000|
|Shubham| 23| 2| 18000|
+-------+---+----------+------+
df_spark.filter(
(df_spark["Salary"]<=20000)
& (df_spark["age"]<=24)
).show()
+-------+---+----------+------+
| Name|age|Experience|Salary|
+-------+---+----------+------+
| Paul| 24| 3| 20000|
| Harsha| 21| 1| 15000|
|Shubham| 23| 2| 18000|
+-------+---+----------+------+
column中,用|表示或, ~表示取反
df_spark.filter(
(df_spark["Salary"]<=20000)
| (df_spark["age"]<=24)
).show()
+-------+---+----------+------+
| Name|age|Experience|Salary|
+-------+---+----------+------+
| Sunny| 29| 4| 20000|
| Paul| 24| 3| 20000|
| Harsha| 21| 1| 15000|
|Shubham| 23| 2| 18000|
+-------+---+----------+------+
df_spark.filter(
(df_spark["Salary"]<=20000)
| ~(df_spark["age"]<=24)
).show()
+---------+---+----------+------+
| Name|age|Experience|Salary|
+---------+---+----------+------+
| Krish| 31| 10| 30000|
|Sudhanshu| 30| 8| 25000|
| Sunny| 29| 4| 20000|
| Paul| 24| 3| 20000|
| Harsha| 21| 1| 15000|
| Shubham| 23| 2| 18000|
+---------+---+----------+------+

分组聚合

换一个数据集test3

Python大数据处理利器,PySpark的入门实战_pandas_07

df_spark = spark.read.csv("./data/test3.csv", header=True, inferSchema=True)
df_spark.show()
+---------+------------+------+
| Name| Departments|salary|
+---------+------------+------+
| Krish|Data Science| 10000|
| Krish| IOT| 5000|
| Mahesh| Big Data| 4000|
| Krish| Big Data| 4000|
| Mahesh|Data Science| 3000|
|Sudhanshu|Data Science| 20000|
|Sudhanshu| IOT| 10000|
|Sudhanshu| Big Data| 5000|
| Sunny|Data Science| 10000|
| Sunny| Big Data| 2000|
+---------+------------+------+

使用groupby方法对dataframe某些列进行分组

df_spark.groupBy("Name")
<pyspark.sql.group.GroupedData at 0x227454d4be0>

可以看到分组的结果是GroupedData对象,它不能使用show等方法打印 GroupedData对象需要进行聚合操作,才能重新转换为dataframe 聚合函数有sum、count、avg、max、min等

df_spark.groupBy("Departments").sum().show()
+------------+-----------+
| Departments|sum(salary)|
+------------+-----------+
| IOT| 15000|
| Big Data| 15000|
|Data Science| 43000|
+------------+-----------+

三:总结

Pandas的dataframe与PySpark的dataframe有许多相似之处,熟悉Pandas的同学可以很快适应它的API。目前可以粗浅地把PySpark理解为”分布式的Pandas“,不过,PySpark还有分布式机器学习的功能——Spark MLlib(可以理解为分布式的Sklearn、TensorFlow等),后续会给大家介绍。在集群中,它的dataframe可以分布在不同的机器上,以此处理海量数据。有兴趣的小伙伴可以通过虚拟机搭建一个Spark集群,进一步学习Spark。

Apache Spark™ - 用于大规模数据分析的统一引擎

标签:Salary,Name,PySpark,Python,age,df,20000,数据处理,spark
From: https://blog.51cto.com/u_15687734/6037009

相关文章

  • 使用 Python 操作 Redis 数据库
    1.简介Redis是是一个高性能的key-value数据库。Redis支持数据的持久化,可以将内存中的数据保存在磁盘中,重启的时候可以再次加载进行使用。Redis不仅仅支持简单的key-......
  • 使用 Python 操作 Mongo 数据库
    1.简介MongoDB是一个基于分布式文件存储的数据库,旨在为WEB应用提供可扩展的高性能数据存储解决方案。MongoDB是一个介于关系数据库和非关系数据库之间的产品,它支持的查......
  • python如何替换word文档里面的特定的字符
    pipinstallpython-docx#第一步先安装python-docx模块importdocxdefreplace_text_in_word_document(file_path,old_text,new_text):doc=docx.Document(......
  • Python练习记录
    挑选了学习过程中的一部分练习进行记录,有些很简单的或重复性强的没有进行展示希望我们都能共同进步~练习1一个小球从100m的高度落下,每次弹回原高度的一半,计算:总共弹起来......
  • Python列表中你所不知道的事
    1.引言目前,Python是世界上使用最广泛、最受欢迎的编程语言之一。Python丰富的功能性使它非常流行,因为我们可以使用它创建任何内容。我将在本博客中与大家分享关于Python列......
  • 在Python中,下划线代表着什么?
    前缀单下划线,如:_name某个方法或变量如果使用了此格式命名只是提醒开发者这并不是要组成公共接口的,对于普通的单文件内的变量或方法来说,并没有实际限制作用。但是用此格式......
  • python字符编码问题处理
    编码编码目的是让机器读懂语言在python中,Python接受的是str即使输入的数据是其它格式,在Python内部都会自动转为str编码集因为电脑是根据二进制工作的,所以将二进制......
  • Python基本数据类型
    一、Number(数字)整型(int):通常被称为是整型或整数,是正或负整数,不带小数点。Python3整型是没有限制大小的,可以当作Long类型使用,所以Python3没有Python2的Long类型。......
  • Python中的关键字的用法
    Python有哪些关键字Python常用的关键字and,del,from,not,while,as,elif,global,or,with,assert,else,if,pass,yield,break,except,import,print,clas......
  • python django项目创建
    目标本文为创建django项目的第一步操作、实现项目创建、模块创建、虚拟环境指定、并实现健康检查接口一、环境准备安装python参考官方文档:https://www.python.org/down......