我需要将 Json 对象(b 列)扩展到多列。
从此表中,
A 列 | B 列 |
---|---|
id1 | [{a:1,b:'letter1' }] |
id2 | [{a:1,b:'letter2',c:3,d:4}] |
对于此表,
A 列 | a | b | c | d |
---|---|---|---|---|
id1 | 1 | 2 | ||
id2 | 1 | 2 | 3 | 4 |
我尝试过转换数据帧| ||来自本地和spark ,但都不起作用。 从本地
,我通过多个循环提取了B列中的kv(这一步成功了)。 但是
当我尝试将提取的kv(字典结构中)转换为数据帧,出现此错误:“ValueError:所有数组必须具有相同的长度”。 因为来自
Json对象 的键c和键d是空的。所以未能这样做。 在这种情况下引用了以下答案。
将包含 JSON 对象的数据帧扩展为更大的数据帧
,我得到 类型错误 (类似于当将 pandas 数据帧转换为 Spark 数据帧时,longtype 和 stringtype 无法被重新识别。 因此,我将 pandas 数据帧转换为字符串类型
,并且我可以将其转换为 Spark 数据帧。
df.astype(str)
现在,当我试图扩展它...
def func(df):
spark = (
SparkSession.builder.appName("data")
.enableHiveSupport()
.getOrCreate()
)
df1 = spark.createDataFrame(df)
在这种情况下引用了以下答案。 我无法对模式进行硬编码,因为有很多不同的 JSON 列。
for i in df.columns:
if i == 'a_column':
# Since the rows became string instead of list.
# I need to remove the first and last characters which are the [ and ].
# But I get error here: failed due to error Column is not iterable
df.withColumn(i,substring(i,2,length(i)))
df.withColumn(i,substring(i,1,length(i)-1))
# transform each row (json string) to json object
# But I get error here: ValueError: 'json' is not in list ; AttributeError: json
# I assume the x.json means convert row to json obejct?
df = df.map(lambda x:x.json)
print(df.take(10))
Pyspark:将列中的 json 分解为多个列
每一个欣赏。
Every appreciate.
使用 Pandas 和 Pyspark 展开 JSON 对象
遇到的问题是由于 JSON 对象中的键不一致导致的。下面是如何使用 Pandas 和 Pyspark 解决这个问题的步骤:
1. 使用 Pandas 读取数据
import pandas as pd
data = {'A 列': ['id1', 'id2'],
'B 列': [[{'a': 1, 'b': 'letter1'}], [{'a': 1, 'b': 'letter2', 'c': 3, 'd': 4}]]}
df = pd.DataFrame(data)
2. 使用 Pandas 处理 JSON 对象
# 使用 json_normalize 处理 JSON 对象
df_expanded = df['B 列'].apply(lambda x: pd.json_normalize(x)).explode().reset_index(drop=True)
# 合并数据
df = df.drop('B 列', axis=1).merge(df_expanded, left_index=True, right_index=True)
3. 使用 Pyspark 处理数据
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, schema_of_json
spark = SparkSession.builder.appName("data").getOrCreate()
# 创建 Spark DataFrame
sdf = spark.createDataFrame(df)
# 获取 JSON schema
json_schema = spark.sql(f"SELECT schema_of_json('{sdf.select('a').first()[0]}')")
# 使用 from_json 函数解析 JSON 字符串
sdf = sdf.withColumn("B 列", from_json(col("B 列").cast("string"), json_schema.first()[0]))
# 展开 JSON 对象
sdf = sdf.select("A 列", "B 列.*")
# 显示结果
sdf.show()
最终结果:
+------+---+---+----+----+
| A 列| a| b| c| d|
+------+---+---+----+----+
| id1| 1|letter1|null|null|
| id2| 1|letter2| 3| 4|
+------+---+---+----+----+
代码解释:
Pandas:
-
json_normalize
: 用于将 JSON 对象转换为表格格式。 -
explode
: 用于将列表中的每一项展开为一行。 -
merge
: 用于合并两个 DataFrame。
Pyspark:
-
schema_of_json
: 用于获取 JSON 字符串的 Schema。 -
from_json
: 用于将 JSON 字符串解析为指定 Schema 的列。 -
select
: 用于选择需要的列。
注意:
- 这段代码假设的 JSON 数据在 "B 列" 中。
- 如果的 JSON 数据结构不同,则需要修改代码以适应的数据结构。
希望这段代码可以帮助解决问题!
标签:python,json,apache-spark,transform From: 78769601