首页 > 编程问答 >在pyspark(python)中将json字符串扩展到多列

在pyspark(python)中将json字符串扩展到多列

时间:2024-07-20 17:40:49浏览次数:19  
标签:python json apache-spark transform

我需要将 Json 对象(b 列)扩展到多列。

从此表中,

A 列 B 列
id1 [{a:1,b:'letter1' }]
id2 [{a:1,b:'letter2',c:3,d:4}]

对于此表,

A 列 a b c d
id1 1 2
id2 1 2 3 4

我尝试过转换数据帧| ||来自本地和spark ,但都不起作用。 从本地

,我通过多个循环提取了B列中的kv(这一步成功了)。 但是

当我尝试将提取的kv(字典结构中)转换为数据帧,出现此错误:“ValueError:所有数组必须具有相同的长度”。 因为来自

Json对象 的键c和键d是空的。所以未能这样做。 在这种情况下引用了以下答案。

将包含 JSON 对象的数据帧扩展为更大的数据帧

从 Spark


,我得到 类型错误 (类似于当将 pandas 数据帧转换为 Spark 数据帧时,longtype 和 stringtype 无法被重新识别。 因此,我将 pandas 数据帧转换为字符串类型

,并且我可以将其转换为 Spark 数据帧。 df.astype(str) 现在,当我试图扩展它...

def func(df):    
    spark = (
        SparkSession.builder.appName("data")
        .enableHiveSupport()
        .getOrCreate()
    )
    

    df1 = spark.createDataFrame(df)

在这种情况下引用了以下答案。 我无法对模式进行硬编码,因为有很多不同的 JSON 列。

for i in df.columns:
 if i == 'a_column':
    # Since the rows became string instead of list.
    # I need to remove the first and last characters which are the [ and ].
    # But I get error here: failed due to error Column is not iterable
    df.withColumn(i,substring(i,2,length(i)))
    df.withColumn(i,substring(i,1,length(i)-1))
    
    # transform each row (json string) to json object
    # But I get error here: ValueError: 'json' is not in list ; AttributeError: json
    # I assume the x.json means convert row to json obejct?
    df = df.map(lambda x:x.json)
    print(df.take(10))

Pyspark:将列中的 json 分解为多个列

Pyspark:解析一列 json 字符串

有人请帮忙。你能告诉我如何从本地和火花中做到这一点吗?

每一个欣赏。

Every appreciate.


使用 Pandas 和 Pyspark 展开 JSON 对象

遇到的问题是由于 JSON 对象中的键不一致导致的。下面是如何使用 Pandas 和 Pyspark 解决这个问题的步骤:

1. 使用 Pandas 读取数据

import pandas as pd

data = {'A 列': ['id1', 'id2'],
        'B 列': [[{'a': 1, 'b': 'letter1'}], [{'a': 1, 'b': 'letter2', 'c': 3, 'd': 4}]]}
df = pd.DataFrame(data)

2. 使用 Pandas 处理 JSON 对象

# 使用 json_normalize 处理 JSON 对象
df_expanded = df['B 列'].apply(lambda x: pd.json_normalize(x)).explode().reset_index(drop=True)

# 合并数据
df = df.drop('B 列', axis=1).merge(df_expanded, left_index=True, right_index=True)

3. 使用 Pyspark 处理数据

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, schema_of_json

spark = SparkSession.builder.appName("data").getOrCreate()

# 创建 Spark DataFrame
sdf = spark.createDataFrame(df)

# 获取 JSON schema
json_schema = spark.sql(f"SELECT schema_of_json('{sdf.select('a').first()[0]}')")

# 使用 from_json 函数解析 JSON 字符串
sdf = sdf.withColumn("B 列", from_json(col("B 列").cast("string"), json_schema.first()[0]))

# 展开 JSON 对象
sdf = sdf.select("A 列", "B 列.*")

# 显示结果
sdf.show()

最终结果:

+------+---+---+----+----+
|   A 列|  a|  b|   c|   d|
+------+---+---+----+----+
|   id1|  1|letter1|null|null|
|   id2|  1|letter2|   3|   4|
+------+---+---+----+----+

代码解释:

Pandas:

  • json_normalize : 用于将 JSON 对象转换为表格格式。
  • explode : 用于将列表中的每一项展开为一行。
  • merge : 用于合并两个 DataFrame。

Pyspark:

  • schema_of_json : 用于获取 JSON 字符串的 Schema。
  • from_json : 用于将 JSON 字符串解析为指定 Schema 的列。
  • select : 用于选择需要的列。

注意:

  • 这段代码假设的 JSON 数据在 "B 列" 中。
  • 如果的 JSON 数据结构不同,则需要修改代码以适应的数据结构。

希望这段代码可以帮助解决问题!

标签:python,json,apache-spark,transform
From: 78769601

相关文章

  • python实现文件的读写
    python实现文件的读写路径的书写:open("E:\\ai_03\\code\\ai_03_python\\day07\\data.txt")#两个斜杠open(r"E:\ai_03\code\ai_03_python\day07\data.txt","w",encoding="utf8")#建议使用读文件读文件的格式要以读文件的模式打开一个文件对象,使用Python......
  • python模块化设计
    在Python中,模块化是将代码分解为独立的功能块,并通过导入和使用这些功能块来实现代码复用和组织的一种方式。模块化的编程风格使得代码更易于维护、扩展和测试。以下是Python实现模块化的一些常用方法:使用import语句导入模块:可以使用import语句导入其他Python文件(.py文件)作为......
  • 5分钟解锁python多线程
    以下是一个使用Python多线程的简单示例代码:importthreadingdefprint_numbers():foriinrange(1,6):print(i)defprint_letters():forletterin['A','B','C','D','E']:print(letter)if__nam......
  • 浅谈Open.Json.pickle.Os
    一、Open函数使用open函数是Python中用于打开文件的内置函数,它返回一个文件对象,该文件对象提供了对文件进行读写操作的方法。使用 open 函数时,通常需要指定至少两个参数:文件名(file)和模式(mode)。模式决定了文件是以只读、只写、追加、读写等哪种方式被打开的。file_object......
  • 看过来!看过来!python九大数据类型大整合!
    目录一、Int(整型)二、Float(浮点型)三、Bool(布尔类型)四、Str(字符串)(1)拼接:(2)格式化:(3)查找和替换:(4)分割和连接:(5)大小写转换:(6)去除空白字符:五、None(空值)初始化变量作为函数的返回值:在条件语句中检查:六、List(列表)创建List访问List元素修改ListList的遍历七......
  • springboot系列十: 自定义转换器,处理JSON,内容协商
    文章目录自定义转换器基本介绍应用实例查看源码注意事项和细节处理JSON需求说明应用实例内容协商基本介绍应用实例debug源码优先返回xml注意事项和细节⬅️上一篇:springboot系列九:接收参数相关注解......
  • win系统 python 安装 osgeo库安装(最简单)
    Python osgeo库安装用法介绍安装使用osgeo库,本质是安装gdal一、下载对应python版本压缩包下载地址在结尾二、解压压缩包在解压之后的文件夹当中,找到这两个文件夹三、复制文件夹到python安装目录当中如python环境文件夹路径为D:\Local\Programs\miniconda3\envs\py31......
  • 【Python】使用库 -- 详解
    库就是别人已经写好了的代码,可以让我们直接拿来用。一个编程语言能不能流行起来,一方面取决于语法是否简单方便容易学习,一方面取决于生态是否完备。所谓的“生态” 指的就是语言是否有足够丰富的库,来应对各种各样的场景。在实际开发中,也并非所有的代码都自己手写,而是要充分利......
  • python函数基础
    1.函数目的函数是组织好的,可重复使用的,用来实现单一,或相关联功能的代码段。函数能提高应用的模块性,和代码的重复利用率。函数可以封装一定的功能2.函数的定义函数代码块以 def 关键词开头,后接函数标识符名称和圆括号 ()。任何传入参数和自变量必须放在圆括号中间,圆括......