本文是一个基于pyspark 的进行海量数据ETL 和统计分析的代码示例,仅供参考
要点:
1, 使用pyspark 读取 mysql 表数据。
2,使用rdd api 对 结构化数据做简单ETL,设置了简单的清洗规则。
1,cityCode 字段非空,全部为数字, 位数为9位, 前3位必须为”001“ 。
3, 使用3种抽象层级的API (RDD API , Dataframe api, SQL api )对数据进行分析计算 ,比较3种API的使用区别
4,包括了一些 rdd, Datafram 相互转换, ROW类型的使用
# Imports from pyspark.sql import SparkSession # Create SparkSession spark = SparkSession.builder \ .appName('SparkByExamples.com') \ .config("spark.jars", "mysql-connector-java-5.1.28.jar") \ .getOrCreate() # Read from MySQL Table table_df = spark.read \ .format("jdbc") \ .option("driver", "com.mysql.jdbc.Driver") \ .option("url", "jdbc:mysql://134.**.**.**:9200/hesc_stm_xhm") \ .option("dbtable", "temp_user_grid") \ .option("user", "root") \ .option("password", "****") \ .load() # check read accessable # print( table_df.count()) # 总行数 # etl 使用rdd 算子 rdd = table_df.rdd # print(rdd.first()) # cityCode # print(rdd.filter(lambda r: r(5) == None).count()) # gridCode为空的行数 rdd1 = rdd.filter(lambda r: Row.asDict(r).get("cityCode") != None).filter( lambda r: len(Row.asDict(r).get("cityCode")) == 9) # print(rdd.map(lambda r: Row.asDict(r).get("cityCode")).take(5)) # ROW类型的元素读取 使用 r(19)读取列有问题 def checkCityCode(str): # 判断字符串的格式,前3位为001,而且全为数字 if (str[:3] == '001') and str.isnumeric(): return True else: return False rdd2 = rdd1.filter(lambda r: checkCityCode(Row.asDict(r).get("cityCode"))) print(rdd2.first()) # 数据分析 使用 rdd df算子 sql 三种算子 ; 统计不同网格的人员数量。 # rdd operator map = rdd2.map(lambda r: (Row.asDict(r).get("gridCode"), Row.asDict(r).get("id"))).countByKey() print(map) # 查询python rdd api # df/ds operator dataset 1.6之后加入, 整合了RDD 的强类型便于使用lambda函数以及 sqpark sql 优化引擎 # python 没有dataset 类型。java scala 可以。 dataframe是 dataset 的 一种。 dataframe 适用python . df = rdd2.toDF() df1 = df.groupBy('gridCode').count() # dataframe 特定编程语言 对结构化数据操作, 也称 无类型dataset算子 df1.show(4) # sql operator df.createOrReplaceTempView('temp_user_grip') df2 = spark.sql("select gridCode, count(id) from temp_user_grip group by gridCode") df2.show(2) spark.stop()
运行输出:
标签:结构化,pyspark,df,get,cityCode,rdd,实例,print,lambda From: https://www.cnblogs.com/gao1261828/p/17152582.html