首页 > 其他分享 >pyspark 结构化数据开发实例

pyspark 结构化数据开发实例

时间:2023-02-27 18:45:49浏览次数:45  
标签:结构化 pyspark df get cityCode rdd 实例 print lambda

本文是一个基于pyspark 的进行海量数据ETL 和统计分析的代码示例,仅供参考

要点:

1, 使用pyspark 读取 mysql 表数据。

2,使用rdd api 对 结构化数据做简单ETL,设置了简单的清洗规则。

  1,cityCode 字段非空,全部为数字, 位数为9位, 前3位必须为”001“ 。 

3, 使用3种抽象层级的API (RDD API , Dataframe api, SQL api )对数据进行分析计算 ,比较3种API的使用区别

4,包括了一些 rdd, Datafram 相互转换, ROW类型的使用

 

# Imports
from pyspark.sql import SparkSession

# Create SparkSession
spark = SparkSession.builder \
    .appName('SparkByExamples.com') \
    .config("spark.jars", "mysql-connector-java-5.1.28.jar") \
    .getOrCreate()

# Read from MySQL Table
table_df = spark.read \
    .format("jdbc") \
    .option("driver", "com.mysql.jdbc.Driver") \
    .option("url", "jdbc:mysql://134.**.**.**:9200/hesc_stm_xhm") \
    .option("dbtable", "temp_user_grid") \
    .option("user", "root") \
    .option("password", "****") \
    .load()

# check  read accessable
# print( table_df.count())  # 总行数

# etl 使用rdd 算子
rdd = table_df.rdd
# print(rdd.first())

#  cityCode
# print(rdd.filter(lambda r: r(5) == None).count())  # gridCode为空的行数

rdd1 = rdd.filter(lambda r: Row.asDict(r).get("cityCode") != None).filter(
    lambda r: len(Row.asDict(r).get("cityCode")) == 9)


# print(rdd.map(lambda r: Row.asDict(r).get("cityCode")).take(5))   # ROW类型的元素读取 使用 r(19)读取列有问题

def checkCityCode(str):
    # 判断字符串的格式,前3位为001,而且全为数字
    if (str[:3] == '001') and str.isnumeric():
        return True
    else:
        return False


rdd2 = rdd1.filter(lambda r: checkCityCode(Row.asDict(r).get("cityCode")))  
print(rdd2.first())


#  数据分析 使用 rdd  df算子 sql 三种算子 ; 统计不同网格的人员数量。
# rdd operator

map = rdd2.map(lambda r: (Row.asDict(r).get("gridCode"), Row.asDict(r).get("id"))).countByKey()
print(map)   #  查询python  rdd api


# df/ds operator   dataset 1.6之后加入, 整合了RDD 的强类型便于使用lambda函数以及 sqpark sql 优化引擎
# python 没有dataset 类型。java scala 可以。 dataframe是 dataset 的 一种。 dataframe 适用python .

df = rdd2.toDF()
df1 = df.groupBy('gridCode').count()  # dataframe  特定编程语言 对结构化数据操作, 也称 无类型dataset算子
df1.show(4)

# sql  operator
df.createOrReplaceTempView('temp_user_grip')
df2 = spark.sql("select gridCode, count(id)  from temp_user_grip group by gridCode")
df2.show(2)

spark.stop()

 运行输出:

 

 

 

 

标签:结构化,pyspark,df,get,cityCode,rdd,实例,print,lambda
From: https://www.cnblogs.com/gao1261828/p/17152582.html

相关文章