首页 > 其他分享 >pyspark小demo2

pyspark小demo2

时间:2023-08-13 11:44:22浏览次数:32  
标签:category 04 district pyspark price demo2 rdd json

#
#   py_pyspark_demo2.py
#   py_learn
#
#   Created by Z. Steve on 2023/8/13 10:55.
#

import json

# 1. 导入库
from pyspark import SparkConf, SparkContext

# 2. 创建 SparkConf 和 SparkContext 对象
conf = SparkConf().setMaster("local[*]").setAppName("demo2")
sc = SparkContext(conf=conf)

# 3. 读取文件获得文件 rdd. textFile() 会按行读取文本文件, 将每行数据作为一个字符串, 最终将所有行的数据放到一个 list 中
rdd_file = sc.textFile("/Users/stevexhz/PycharmProjects/py_learn/json_data.txt")

# 4. 进行计算操作
# 4.1 取出独立的每个 json 数据. 对每行数据进行 split("|"), 通过 split("|") 获取每个 json 字符串
rdd_json_str = rdd_file.flatMap(lambda x: x.split("|"))
# print(rdd_json_str.collect())                         # 数据格式为 list, list 中的每个元素为一个 json 字符串

# 4.2 将 json 字符串转为 dict 对象
rdd_json_dict = rdd_json_str.map(lambda x: json.loads(x))
# print(rdd_json_dict.collect())                        # list 中的每个元素都是一个 dict 对象

# 4.3 取出字典中的城市 和 销售额组成一个二元元组
rdd_tuple = rdd_json_dict.map(lambda x: (x["district"], int(x["price"])))
# print(rdd_tuple.collect())

# 4.4 按城市分组, 按销售额聚合
rdd_result = rdd_tuple.reduceByKey(lambda a, b: a + b).sortBy(lambda x: x[1], ascending=False, numPartitions=1)
print(rdd_result.collect())


# TODO 2. 所有在售卖的商品类别
rdd_category = rdd_json_dict.map(lambda x: x["category"]).distinct()
print(rdd_category.collect())

# TODO 3. 查看北京地区的所有类别
rdd_bj_category = rdd_json_dict.filter(lambda x: x["district"] == "北京").map(lambda x: x["category"])
print(rdd_bj_category.collect())



'''
json_data.txt 文件内容


{"id":1, "timestamp": "2023年08月13日11:04:34", "category": "平板电脑", "district": "北京", "price": "5099"}|{"id":2, "timestamp": "2022年08月14日11:04:34", "category": "智能手表", "district": "上海", "price": "25999"}
{"id":3, "timestamp": "2020年09月13日12:04:34", "category": "移动硬盘", "district": "天津", "price": "4099"}|{"id":4, "timestamp": "2021年04月14日11:04:34", "category": "电脑", "district": "上海", "price": "25999"}
{"id":5, "timestamp": "2023年08月13日11:04:34", "category": "智能音箱", "district": "北京", "price": "5099"}|{"id":6, "timestamp": "2022年08月14日11:04:34", "category": "充电宝", "district": "上海", "price": "27999"}|{"id":7, "timestamp": "2023年08月13日11:04:34", "category": "平板电脑", "district": "北京", "price": "5099"}|{"id":8, "timestamp": "2022年08月14日11:04:34", "category": "电脑", "district": "上海", "price": "27999"}
{"id":9, "timestamp": "2023年08月13日11:04:34", "category": "手机", "district": "天津", "price": "195099"}|{"id":10, "timestamp": "2022年08月14日11:04:34", "category": "电脑", "district": "天津", "price": "49999"}|{"id":11, "timestamp": "2023年08月13日11:04:34", "category": "平板电脑", "district": "天津", "price": "15099"}|{"id":12, "timestamp": "2022年08月14日11:04:34", "category": "电脑", "district": "天津", "price": "217999"}

'''

标签:category,04,district,pyspark,price,demo2,rdd,json
From: https://www.cnblogs.com/zxhoo/p/17626339.html

相关文章

  • pyspark的filter()、distinct()、sortBy() 函数
    ##py_pyspark_test.py#py_learn##CreatedbyZ.Steveon2023/8/1217:38.#frompysparkimportSparkConf,SparkContextconf=SparkConf().setMaster("local[*]").setAppName("rdd_test")sc=SparkContext(conf=conf)#rdd=......
  • pyspark小案例
    ##py_pyspark_demo.py#py_learn##CreatedbyZ.Steveon2023/8/1215:33.##统计文件中各个单词出现的次数#1.导入库frompysparkimportSparkConf,SparkContext#2.创建SparkConf对象和SparkContext对象conf=SparkConf().setMaster("local......
  • pyspark使用
    ##py_pyspark.py#py_learn##CreatedbyZ.Steveon2023/8/1017:51.##pyspark编程主要分三步:1.数据输入。2.数据处理。3.数据输出。#RDD:ResilientDistributedDatasets弹性分布式数据集#1.安装pyspark库#pip3installpyspark#2.导入p......
  • 【Python】PySpark 数据计算 ② ( RDD#flatMap 方法 | RDD#flatMap 语法 | 代码示例 )
    文章目录一、RDD#flatMap方法1、RDD#flatMap方法引入2、解除嵌套3、RDD#flatMap语法说明二、代码示例-RDD#flatMap方法一、RDD#flatMap方法1、RDD#flatMap方法引入RDD#map方法可以将RDD中的数据元素逐个进行处理,处理的逻辑需要用外部通过参数传入map函数......
  • 【Python】PySpark 数据计算 ④ ( RDD#filter 方法 - 过滤 RDD 中的元素 | RDD#distin
    文章目录一、RDD#filter方法1、RDD#filter方法简介2、RDD#filter函数语法3、代码示例-RDD#filter方法示例二、RDD#distinct方法1、RDD#distinct方法简介2、代码示例-RDD#distinct方法示例一、RDD#filter方法1、RDD#filter方法简介RDD#filter方法可以根据指定......
  • 【错误记录】PySpark 运行报错 ( Did not find winutils.exe | HADOOP_HOME and hadoo
    文章目录一、报错信息二、解决方案(安装Hadoop运行环境)一、报错信息核心报错信息:WARNShell:Didnotfindwinutils.exe:java.io.FileNotFoundException:java.io.FileNotFoundException:HADOOP_HOMEandhadoop.home.dirareunset.在PyCharm中,调用PySpark执......
  • pyspark 环境搭建和相关操作redis ,es
    一.环境搭建1.创建虚拟环境,指定python包2.切换到虚拟环境,安装你所需要的python相关模块包3.把整个虚拟环境打成.zip4.将zip上传的hadfs5.spark-submit指定python包的路径可以参考 https://dandelioncloud.cn/article/details/1589470996832964609二.pyspark数据r......
  • PysparkNote006---pycharm加载spark环境
    pycharm配置pyspark环境,本地执行pyspark代码spark安装、添加环境变量不提了File-Settings-Project-ProjectStructure-addcontentroot添加如下两个路径D:\code\spark\python\lib\py4j-0.10.7-src.zipD:\code\spark\python\lib\pyspark.zip                ......
  • 算法岗必读中文-0天吃掉pyspark实战
    pyspark......
  • 异步爬虫demo2
    importreimportaiohttpimportasyncioclassAsyn:def__init__(self):self.__headers={'user-agent':'Mozilla/5.0(WindowsNT10.0;Win64;x64)AppleWebKit/537.36(KHTML,likeGecko)Chrome/112.0.0.0Safari/537......