一、pyspark
为了让Spark支持Python,Apache Spark社区发布了一个工具PySpark。使用PySpark,我们可以使用Python编程语言处理RDD。这一切是由一个名为Py4j的库达到的。其架构如下所示。
PySpark的优势之一是在开发中允许你直接调用Python的内置库和第三方库如果Spark是本地模式,可以直接调用Python的第三方库。但如果是集群模式的话,则会发生错误.原因是PySpark 需要在各个执行节点的机器上执行操作,而与操作相关的文件存在本地.
二、PySpark程序开发
PySpark的一大优势是允许使用Python调用Spark的,因此允许在开发程序是同时调用Python的众多第三方库。因为Python和Scala语言都支持函数式的编程,也都支持匿名函数。所以大部分的Spark接口在PySpark中都有支持,并且方法名也一致。
1.获得SparkContext对象
from pyspark import SparkContext
sc = SparkContext.getOrCreate()
2.创建RDD
PySpark没有makeRDD()方法,但常用的textFile()、parallelize()、textFile()、wholeTextFiles()。同时在PySpark中针对Python的使用习惯也增加了一些新的接口。PySpark支持的创建RDD的方法如下表所示。
表 PySpark API接口表-1
API | 示例 |
parallelize(c, numSlices=None) | sc.parallelize([0, 2, 3, 4, 6], 5).collect() |
range(start, end=None, step=1, numSlices=None) | sc.range(5).collect() |
textFile(name, minPartitions=None, use_unicode=True) | sc.textFile(path).collect() |
wholeTextFiles(path, minPartitions=None, use_unicode=True) | sc.wholeTextFiles(dirPath).collect() |
更详细的关于PySpark RDD创建的相关接口,可以查看官方API文档(http://spark.apache.org/docs/2.0.0/api/python/pyspark.html#module-pyspark)。
3.加载文件
我们在使用Spark的时候有时候需要将一些数据分发到计算节点中,可以使用addFile函数来分发这些文件。addFile方法可以接收本地文件(或者HDFS上的文件),甚至是文件夹(如果是文件夹,必须是HDFS路径),然后Spark的Driver和Exector可以通过SparkFiles.get()方法来获取文件的绝对路径。
调用SparkContext的addFile()方法加载文件。
sc.addFile(path)
调用SparkFiles的get()方法获取文件的绝对路径。
SparkFiles.get(filename) |
在PySpark中,为了能够让各个worker节点能加载并执行Python文件中的方法,SparkContext还还添加了一个addPyFile()方法。调用此方法加载Python文件后,可以直接import该Python文件,并调用该文件中的方法。
定义Python文件。
#sci.py
def sqrt(num):
return num * num
def circle_area(r):
return 3.14 * sqrt(r)
调用addPyFile()方法,并import该Python文件调用其中的方法。
sc.addPyFile("file:///root/sci.py") from sci import circle_area sc.parallelize([5, 9, 21]).map(lambda x : circle_area(x)).collect() |
这种使用方式可以令PySpark编程更加灵活,复用之前由Python开发代码,提高代码复用率。
4.匿名函数
在Scala中使用map()、filter()、flatmap()等算子时,经常会使用匿名函数作为这个高阶函数的参数。这里以列表的形式罗列一些PySpark中支持的常用算子及其函数原型。
表 PySpark常用算子及函数原型
算子类型 | 算子函数原型 |
转换算子Transformation | map(f, preservesPartitioning=False) |
filter(f) | |
mapValues(f) | |
distinct(numPartitions=None) | |
reduceByKey(func, numPartitions=None, partitionFunc=) | |
groupByKey(numPartitions=None, partitionFunc=) | |
sortByKey(ascending=True, numPartitions=None, keyfunc= at 0x7f839c2cf758>) | |
union(other) | |
join(other, numPartitions=None) | |
动作算子Actions | count() |
collect() | |
take(num) | |
first() | |
reduce(f) | |
foreach(f) | |
lookup(key) | |
max(key=None) | |
min(key=None) | |
saveAsTextFile(path, compressionCodecClass=None) |
在Python中也可以使用匿名函数,使用的方式是lambda函数。下面示例中,用Scala开发的Spark代码在PySpark中转换成如下代码。
Scala代码:
val a=sc.parallelize(List("dog","tiger","lion","cat","panther","eagle"))
val b=a.map(x=>(x,1))
b.collect
Python代码:
a=sc.parallelize(("dog","tiger","lion","cat","panther","eagle"))
b=a.map(lambda x:(x,1))
b.collect()
注意,在Python中lambda 函数只能有一个表达式。但是结合Python中的生成式和if...else三元表达式,lambda表达式可以覆盖绝大部分的使用场景。
5.SparkSQL
在PySpark中使用SparkSQL时,获得SparkSession对象的方法如下:
from pyspark.sql import SparkSession
# create the spark session
ss = SparkSession.builder.getOrCreate()
在PySpark中使用SparkSQL时,大部分使用方法与Spark中区别不大。但是需要重点掌握的是Spark DataFrame和Pandas DataFrame之间的转换。Pandas是Python数据处理中非常重要的第三方库,很多Python的第三方机器学习库或人工智能库的数据处理阶段都会使用到Pandas。
Pandas DataFrame 转 Spark DataFrame的方法
spark.createDataFrame(pandas_df)
Spark DataFrame转Pandas DataFrame
spark_df.toPandas()
spark读取hive数据
from pyspark.sql import SparkSession,HiveContext
app_name = 'feaHitStatis'
spark = SparkSession.builder.appName(app_name).getOrCreate()
hive_context= HiveContext(spark)
# 生成查询的SQL语句,这个跟hive的查询语句一样,所以也可以加where等条件语句
hive_database = "database1"
hive_table = "test"
hive_read = "select * from {}.{}".format(hive_database, hive_table)
# 通过SQL语句在hive中查询的数据直接是dataframe的形式
read_df = hive_context.sql(hive_read)
read_df.show()
写hive数据
方式1:
df.registerTempTable('test_hive')
hive_context.sql("create table default.write_test select * from test_hive")
方式二:write.mode方式
resB.repartition(1).sortWithinPartitions(resB.fea_cmple).write.mode('overwrite').saveAsTable("dm_mms_lhq.feature_statis_b_%s"%(task_id) )
方式一:配置环境变量 自动读取
spark = SparkSession.builder.master("local[*]")
.appName("test").enableHiveSupport().getOrCreate()
read_df=spark.sql("select * from dm_events.dm_usereventfinal limit 1")
read_df.show()
方式二:不需配置环境变量
spark = SparkSession.builder.master("spark://192.168.142.197:7077")
.config("hive.metastore.uris","thrift://192.168.142.197:9083")
.appName("test").enableHiveSupport().getOrCreate()
read_df = spark.sql("select * from dm_events.dm_usereventfinal limit 1")
read_df.show()
标签:None,PySpark,Python,pyspark,hive,Spark,spark
From: https://blog.51cto.com/u_12760266/6381246