pyspark程序清洗多分隔符数据案例
原始数据
可以看到原始数据是以“|#$”多分隔符进行数据分割的
POD9_6ec8794bd3297048d6ef7b6dff7b8be1|#$2023-10-24|#$0833|#$#|#$#|#$99999999999|#$#|#$12345678912
POD9_352858578708f144bb166a77bad743f4|#$2023-10-24|#$0391|#$#|#$#|#$99999999999|#$#|#$12345678912
POD9_c8c996cff241529b2427a29e9d6c68d7|#$2023-10-24|#$0834|#$#|#$#|#$99999999999|#$P00000016970|#$12345678912
需求
使用spark将文本文件转成DataFrame并使用spark.sql进行数据数据操作
1.使用spark.read.format
直接生成DF
首先我们可能会想到使用spark.read.format
进行文件读取指定分隔符直接生成DF,但是该方法对于指定多分隔符会报错 (CSV格式是可以指定分隔符的但是只限有一个分隔符的时候)
df = spark.read.format("csv")\
.option("sep", "|#$")\
.load("BUS_RECOMMEND_STAFF.log")
## 报错如下
pyspark.sql.utils.IllegalArgumentException: 'Delimiter cannot be more than one character: |#$'
经排查搜索发现该方法只能处理单个分隔符的情况下使用
2.转换思路先生成RDD处理多分隔符在转成DF
对于多分隔符日志文件使用sc.textFile生成RDD进行分隔符切割,然后再把RDD转成DF
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").appName("log ETL").getOrCreate()
sc = spark.sparkContext
# spark.read.format 和 sc.textFile的区别
# * spark.read.format 生成DateFrame or DateSet
# * sc.textFile 生成RDD
columns = ["SERIALNUM_BG", "BUS_DATE", "CITY_CODE", "STAFF_NUM",
"STAFF_NAME", "SERIAL_NUMBER", "CHANNEL_CODE", "BIND_USER_NUM"]
rdd = sc.textFile("BUS_RECOMMEND_STAFF.log") # 读取文件
# print(rdd.collect())
# ['POD9_6ec8794bd3297048d6ef7b6dff7b8be1|#$2023-10-24|#$0833|#$#|#$#|#$99999999999|#$#|#$12345678912', 'POD9_352858578708f144bb166a77bad743f4|#$2023-10-24|#$0391|#$#|#$#|#$99999999999|#$#|#$12345678912', 'POD9_c8c996cff241529b2427a29e9d6c68d7|#$2023-10-24|#$0834|#$#|#$#|#$99999999999|#$P00000016970|#$12345678912']
rdd1 = rdd.map(lambda x: x.split("|#$")) # 按指定分隔符进行分割
# print(rdd1.collect())
# [['POD9_6ec8794bd3297048d6ef7b6dff7b8be1', '2023-10-24', '0833', '#', '#', '99999999999', '#', '12345678912'], ['POD9_352858578708f144bb166a77bad743f4', '2023-10-24', '0391', '#', '#', '99999999999', '#', '12345678912'], ['POD9_c8c996cff241529b2427a29e9d6c68d7', '2023-10-24', '0834', '#', '#', '99999999999', 'P00000016970', '12345678912']]
df = rdd1.toDF(columns) # rdd转DF指定schema
# df.show()
# +--------------------+----------+---------+---------+----------+-------------+------------+-------------+
# | SERIALNUM_BG| BUS_DATE|CITY_CODE|STAFF_NUM|STAFF_NAME|SERIAL_NUMBER|CHANNEL_CODE|BIND_USER_NUM|
# +--------------------+----------+---------+---------+----------+-------------+------------+-------------+
# |POD9_6ec8794bd329...|2023-10-24| 0833| #| #| 99999999999| #| 12345678912|
# |POD9_352858578708...|2023-10-24| 0391| #| #| 99999999999| #| 12345678912|
# |POD9_c8c996cff241...|2023-10-24| 0834| #| #| 99999999999|P00000016970| 12345678912|
# +--------------------+----------+---------+---------+----------+-------------+------------+-------------+
df.createOrReplaceTempView("people") # 创建临时表
spark.sql("select BUS_DATE from people where CITY_CODE='0834'").show() # 使用sql查询临时表数据
# +----------+
# | BUS_DATE|
# +----------+
# |2023-10-24|
# +----------+
总结
- spark.read.format进行文件读取指定分隔符直接生成DF只能在当文件只有一个分隔符的情况下使用
- spark.read.format 和 sc.textFile的区别 :spark.read.format 生成DateFrame or DateSet ,sc.textFile 生成RDD
- 使用RDD转DF更方便构建数据schema : df = rdd1.toDF(columns)