首页 > 其他分享 >pyspark

pyspark

时间:2023-05-30 19:02:16浏览次数:78  
标签:None PySpark Python pyspark hive Spark spark

一、pyspark

为了让Spark支持Python,Apache Spark社区发布了一个工具PySpark。使用PySpark,我们可以使用Python编程语言处理RDD。这一切是由一个名为Py4j的库达到的。其架构如下所示。

pyspark_spark

PySpark的优势之一是在开发中允许你直接调用Python的内置库和第三方库如果Spark是本地模式,可以直接调用Python的第三方库。但如果是集群模式的话,则会发生错误.原因是PySpark 需要在各个执行节点的机器上执行操作,而与操作相关的文件存在本地.


二、PySpark程序开发

PySpark的一大优势是允许使用Python调用Spark的,因此允许在开发程序是同时调用Python的众多第三方库。因为Python和Scala语言都支持函数式的编程,也都支持匿名函数。所以大部分的Spark接口在PySpark中都有支持,并且方法名也一致。


1.获得SparkContext对象

from pyspark import SparkContext
sc = SparkContext.getOrCreate()

2.创建RDD

PySpark没有makeRDD()方法,但常用的textFile()、parallelize()、textFile()、wholeTextFiles()。同时在PySpark中针对Python的使用习惯也增加了一些新的接口。PySpark支持的创建RDD的方法如下表所示。

表 PySpark API接口表-1

API

示例

parallelize(c, numSlices=None)

sc.parallelize([0, 2, 3, 4, 6], 5).collect()

range(start, end=None, step=1, numSlices=None)

sc.range(5).collect()

textFile(name, minPartitions=None, use_unicode=True)

sc.textFile(path).collect()

wholeTextFiles(path, minPartitions=None, use_unicode=True)

sc.wholeTextFiles(dirPath).collect()

更详细的关于PySpark RDD创建的相关接口,可以查看官方API文档(http://spark.apache.org/docs/2.0.0/api/python/pyspark.html#module-pyspark)。

3.加载文件

我们在使用Spark的时候有时候需要将一些数据分发到计算节点中,可以使用addFile函数来分发这些文件。addFile方法可以接收本地文件(或者HDFS上的文件),甚至是文件夹(如果是文件夹,必须是HDFS路径),然后Spark的Driver和Exector可以通过SparkFiles.get()方法来获取文件的绝对路径。

调用SparkContext的addFile()方法加载文件。

sc.addFile(path)

调用SparkFiles的get()方法获取文件的绝对路径。

SparkFiles.get(filename)

在PySpark中,为了能够让各个worker节点能加载并执行Python文件中的方法,SparkContext还还添加了一个addPyFile()方法。调用此方法加载Python文件后,可以直接import该Python文件,并调用该文件中的方法。

定义Python文件。

#sci.py
def sqrt(num):
        return num * num
def circle_area(r):
        return 3.14 * sqrt(r)

调用addPyFile()方法,并import该Python文件调用其中的方法。

sc.addPyFile("file:///root/sci.py")

from sci import circle_area

sc.parallelize([5, 9, 21]).map(lambda x : circle_area(x)).collect()

这种使用方式可以令PySpark编程更加灵活,复用之前由Python开发代码,提高代码复用率。

4.匿名函数

在Scala中使用map()、filter()、flatmap()等算子时,经常会使用匿名函数作为这个高阶函数的参数。这里以列表的形式罗列一些PySpark中支持的常用算子及其函数原型。

表 PySpark常用算子及函数原型

算子类型

算子函数原型

转换算子Transformation

map(f, preservesPartitioning=False)

filter(f)

mapValues(f)

distinct(numPartitions=None)

reduceByKey(func, numPartitions=None, partitionFunc=)

groupByKey(numPartitions=None, partitionFunc=)

sortByKey(ascending=True, numPartitions=None, keyfunc= at 0x7f839c2cf758>)

union(other)

join(other, numPartitions=None)

动作算子Actions

count()

collect()

take(num)

first()

reduce(f)

foreach(f)

lookup(key)

max(key=None)

min(key=None)

saveAsTextFile(path, compressionCodecClass=None)

在Python中也可以使用匿名函数,使用的方式是lambda函数。下面示例中,用Scala开发的Spark代码在PySpark中转换成如下代码。

Scala代码:

val a=sc.parallelize(List("dog","tiger","lion","cat","panther","eagle"))
val b=a.map(x=>(x,1))
b.collect

Python代码:

a=sc.parallelize(("dog","tiger","lion","cat","panther","eagle"))
b=a.map(lambda x:(x,1))
b.collect()

注意,在Python中lambda  函数只能有一个表达式。但是结合Python中的生成式和if...else三元表达式,lambda表达式可以覆盖绝大部分的使用场景。

5.SparkSQL

在PySpark中使用SparkSQL时,获得SparkSession对象的方法如下:

from pyspark.sql import SparkSession
# create the spark session
ss = SparkSession.builder.getOrCreate()

在PySpark中使用SparkSQL时,大部分使用方法与Spark中区别不大。但是需要重点掌握的是Spark DataFrame和Pandas DataFrame之间的转换。Pandas是Python数据处理中非常重要的第三方库,很多Python的第三方机器学习库或人工智能库的数据处理阶段都会使用到Pandas。

Pandas DataFrame 转 Spark DataFrame的方法

spark.createDataFrame(pandas_df)

Spark DataFrame转Pandas DataFrame

spark_df.toPandas()




spark读取hive数据

from pyspark.sql import SparkSession,HiveContext

app_name = 'feaHitStatis'
spark = SparkSession.builder.appName(app_name).getOrCreate()
hive_context= HiveContext(spark)

# 生成查询的SQL语句,这个跟hive的查询语句一样,所以也可以加where等条件语句
hive_database = "database1"
hive_table = "test"
hive_read = "select * from {}.{}".format(hive_database, hive_table)
# 通过SQL语句在hive中查询的数据直接是dataframe的形式
read_df = hive_context.sql(hive_read)
read_df.show()

写hive数据

方式1:

df.registerTempTable('test_hive')
hive_context.sql("create table default.write_test select * from test_hive")

方式二:write.mode方式

resB.repartition(1).sortWithinPartitions(resB.fea_cmple).write.mode('overwrite').saveAsTable("dm_mms_lhq.feature_statis_b_%s"%(task_id) )




方式一:配置环境变量 自动读取

spark = SparkSession.builder.master("local[*]")
.appName("test").enableHiveSupport().getOrCreate()
read_df=spark.sql("select * from dm_events.dm_usereventfinal limit 1")
read_df.show()

方式二:不需配置环境变量

spark = SparkSession.builder.master("spark://192.168.142.197:7077") 
.config("hive.metastore.uris","thrift://192.168.142.197:9083")
.appName("test").enableHiveSupport().getOrCreate()
read_df = spark.sql("select * from dm_events.dm_usereventfinal limit 1")
read_df.show()

标签:None,PySpark,Python,pyspark,hive,Spark,spark
From: https://blog.51cto.com/u_12760266/6381246

相关文章

  • pyspark list[dict]转pyspark df
    数据处理把list嵌套字段转成pysparkdataframe #coding=utf-8frompyspark.sqlimportSparkSessionfrompyspark.sql.typesimport*importpandasaspdfrompyspark.sqlimportRowclassSparkContext:def__init__(self,name="cleaner"):self.s......
  • PySpark学习
    学习基于AmitNandi的SparkforPythonDevelopers 1.1 wordcountexample  Chapter5  StreamingLiveDatawithSpark 目的:“investigatevariousimplementationsusinglivesourcesofdatasuchasTCPsocketstotheTwitterfirehoseandputinpl......
  • pyspark 集成jupyter与pyspark on yarn
    标签(空格分隔):Spark的部分一:安装jupyterHadoop集群+spark集群安装忽略yuminstallepel-releaseyuminstallpython36pip3install--upgradepip#升级pip到最新版本pip3installjupyter#安装jupyterjupyternotebook--generate-config#安装后......
  • pyspark 结构化数据开发实例
    本文是一个基于pyspark的进行海量数据ETL和统计分析的代码示例,仅供参考要点:1,使用pyspark读取mysql表数据。2,使用rddapi对结构化数据做简单ETL,设置了简单的清洗......
  • Python大数据处理利器,PySpark的入门实战
    PySpark极速入门一:Pyspark简介与安装什么是Pyspark?PySpark是Spark的Python语言接口,通过它,可以使用PythonAPI编写Spark应用程序,目前支持绝大多数Spark功能。目前Spark官方在......
  • 基于pyspark的随机森林
    关于随机森林的定义就不赘叙importfindsparkfromnumpyimportfrompyfuncfrompyspark.mlimportclassificationfrompyspark.sql.functionsimportspark_partition_idf......
  • 【博学谷学习记录】超强总结,用心分享 | pyspark基础操作
    【博学谷IT技术支持】Spark是一种快速、通用、可扩展的大数据分析引擎,2009年诞生,2010年开源,2013年成为Apache孵化项目,2014年成为Apache顶级项目。目前,Spark生态系统已经发......
  • pyspark
    spark用于大规模数据处理的统一(适用面广)分析引擎(数据处理)。RDD:弹性分布式数据集。rdd是一种分布式内存抽象,能够在大规模集群中做内存运算,并且有一定的容错方式。s......
  • jupyter notebook中运行pyspark代码
    前提是windows下安装pyspark​​设置连接​​用jupyternotebook编写pyspark代码frompyspark.sqlimportSparkSession#环境配置spark=SparkSession.builder.master("lo......
  • Python学习笔记--PySpark的基础学习(二)
    filter方法(过滤想要的数据进行保留)具体实现(保留奇数):具体实现(保留偶数):distinct方法(对RDD进行去重,返回新的RDD)且无需传参具体实现(去重):sortBy方法(排序,基于我们制定的......