SparkSQL数据清洗API
前面我们处理的数据实际上都是已经被处理好的规整数据,但是在大数据整个生产过程中,需要先对数据进行数据清洗,将杂乱无章的数据整理为符合后面处理要求的规整数据。
去重方法dropDuplicates
功能:对DF的数据进行去重,如果重复数据有多条,取第一条
缺失值处理dropna
fillna填充缺失值
if __name__ == '__main__':
spark = SparkSession.builder.appName("test").master("local[*]").getOrCreate()
sc = spark.sparkContext
df = spark.read.format("csv").option("header", "true").option("sep", ";").load("../data/sql/people.csv")
# 数据清洗:数据去重
# dropDuplicates是DataFrame 的API,可以完成数据去重
# 无参数使用,对全部的列联合起来进行比较,去除重复值,只保留一条
df.dropDuplicates().show()
df.dropDuplicates(['age', 'job']).show()
# 数据清洗:缺失值处理
# dropna api是可以对缺失值的数据进行删除
# 无参数使用,只要列中有null就删除这一行数据
df.dropna().show()
# thresh = 3表示,最少满足3个有效列,不满足就删除当前行数据
df.dropna(thresh=3).show()
# 对缺失值的处理,即缺失数据就填充
df.fillna("loss").show()
# 对指定列填充
df.fillna("N/A", subset=["job"]).show()
# 设定一个字典,对所有列进行充值填充
df.fillna({"name":"未知姓名","age":1, "job":"worker"}).show()
3.8 DataFrame数据写出
SparkSQL统一API写出DataFrame数据
统一—API语法:
1 df.write.mode( ) .format( ) .option(K,V).save(PATH)
2 # mode,传入模式字符串可选: append 追加,overwrite 覆盖,ignore忽略,error重复就报异常(默认的)
3# format,传入格式字符串,可选: text, csv,json,parquet, orc, avro, jdbc
4#注意text源只支持单列df写出
5# option设置属性,如:.option( "sep", " , " ) r
6# save 写出的路径,支持本地文件和HDFS
3.9 DataFrame通过JDBC读写数据库(MySQL示例)
注意:
· jdbc连接字符串中,建议使用useSSL=false确保连接可以正常连接( 不使用SSL安全协议进行连接)
. jdbc连接字符串中,建议使用useUnicode=true来确保传输中不出现乱码
. save()不要填参数,没有路径,是写出数据库
· dbtable属性:指定写出的表名
df.write.mode("overwrite").\
format("jdbc").\
option("url", "jdbc:mysql://localhost:3306/test?useSSL=false&useUnicode=true").\
option("dbtable", "movies").\
option("user", "root").\
option("password", "1234").\
save()
df2 = spark.read.format("jdbc"). \
option("url", "jdbc:mysql://localhost:3306/test?useSSL=false&useUnicode=true"). \
option("dbtable", "movies"). \
option("user", "root"). \
option("password", "1234").\
load()
df2.printSchema()
df2.show()
总结:
1. DataFrame在结构层面上由StructField组成列描述,由StructType构造表描述。在数据层面上,Column对象记录列数据,Row对象记录行数据
2. DataFrame可以从RDD转换、Pandas DF转换、读取文件、读取JDBC等方法构建
3. spark.read.format()和df.write.format()是DataFrame读取和写出的统一化标准API
4. SparkSQL默认在Shuffle阶段200个分区,可以修改参数获得最好性能
5. dropDuplicates可以去重、dropna可以删除缺失值、fillna可以填充缺失值
6. SparkSQL支持JDBC读写,可用标准API对数据库进行读写操作
标签:option,show,df,format,29,笔记,DataFrame,进度,数据 From: https://www.cnblogs.com/yuncannotjava/p/17994814