我正在尝试从 python 中的 dict 结构动态形成过滤条件,这是一个非常简单的条件,会给出以下错误:
Final constructed filter condition: Column<'(CompanyCode IN (1930, 1931))'> Type of final_condition: <class 'pyspark.sql.column.Column'> PySparkValueError: [CANNOT_CONVERT_COLUMN_INTO_BOOL] Cannot convert column into bool: please use '&' for 'and', '|' for 'or', '~' for 'not' when building DataFrame boolean expressions.
我的数据和列如下:
data = [("John", 1930), ("Doe", 1931), ("Jane", 1940)]
columns = ["Name", "Code"]
我正在尝试使用动态代码模仿下面的过滤器:
df = spark.createDataFrame(data, schema=columns)
df.filter(F.col("Code").isin(1930, 1931)).show()
我的代码如下:
{
"conditions": [
{"column": "Code", "operator": "IN", "values": [1930, 1931]}
]
}
下面是我的代码:
def construct_filter_condition(conditions):
"""
Construct filter condition from a list of conditions.
:param conditions: List of conditions
:return: PySpark Column expression
"""
combined_conditions = []
for condition in conditions:
column = condition["column"]
operator = condition["operator"].lower()
values = condition.get("values")
value = condition.get("value")
if operator == "in":
new_condition = F.col(column).isin(values)
elif operator == "eq":
new_condition = F.col(column) == value
elif operator == "ne":
new_condition = F.col(column) != value
elif operator == "lt":
new_condition = F.col(column) < value
elif operator == "le":
new_condition = F.col(column) <= value
elif operator == "gt":
new_condition = F.col(column) > value
elif operator == "ge":
new_condition = F.col(column) >= value
elif operator == "not_in":
new_condition = ~F.col(column).isin(values)
else:
continue # Skip invalid operators
combined_conditions.append(new_condition)
# Combine all conditions using 'AND' logic
if combined_conditions:
final_condition = combined_conditions[0]
for cond in combined_conditions[1:]:
final_condition = final_condition & cond
return final_condition
else:
return None
示例条件:
flt_conditions = [
{
"conditions": [
{"column": "CompanyCode", "operator": "IN", "values": [1930, 1931]}
]
}
]
构造并应用过滤器
final_condition = construct_filter_condition(flt_conditions[0]["conditions"])
if final_condition:
print(f"Constructed filter condition: {final_condition}")
filtered_df = df.filter(final_condition)
print(f"DataFrame filter syntax: df.filter({final_condition})")
filtered_df.show()
else:
print("No valid filter conditions provided.")
你遇到的错误是因为你试图将条件列表传递给
construct_filter_condition
函数,而该函数被设计为接受单个条件字典。
以下是对你的代码进行更正和改进后的版本:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
def construct_filter_condition(conditions):
"""
从条件列表构建过滤器条件。
:param conditions: 条件列表
:return: PySpark 列表达式
"""
combined_conditions = []
for condition in conditions:
column = condition["column"]
operator = condition["operator"].lower()
values = condition.get("values")
value = condition.get("value")
if operator == "in":
new_condition = F.col(column).isin(values)
elif operator == "eq":
new_condition = F.col(column) == value
elif operator == "ne":
new_condition = F.col(column) != value
elif operator == "lt":
new_condition = F.col(column) < value
elif operator == "le":
new_condition = F.col(column) <= value
elif operator == "gt":
new_condition = F.col(column) > value
elif operator == "ge":
new_condition = F.col(column) >= value
elif operator == "not_in":
new_condition = ~F.col(column).isin(values)
else:
continue # 跳过无效操作符
combined_conditions.append(new_condition)
# 使用 'AND' 逻辑组合所有条件
if combined_conditions:
final_condition = combined_conditions[0]
for cond in combined_conditions[1:]:
final_condition = final_condition & cond
return final_condition
else:
return None
# 初始化 SparkSession
spark = SparkSession.builder.appName("FilterExample").getOrCreate()
# 示例数据和列
data = [("John", 1930), ("Doe", 1931), ("Jane", 1940)]
columns = ["Name", "Code"]
df = spark.createDataFrame(data, schema=columns)
# 示例条件
flt_conditions = [
{"column": "Code", "operator": "IN", "values": [1930, 1931]}
]
# 构造并应用过滤器
final_condition = construct_filter_condition(flt_conditions)
if final_condition:
print(f"构造的过滤器条件: {final_condition}")
filtered_df = df.filter(final_condition)
print(f"DataFrame 过滤器语法: df.filter({final_condition})")
filtered_df.show()
else:
print("未提供有效的过滤器条件。")
# 停止 SparkSession
spark.stop()
更改:
-
直接传递条件:
将
flt_conditions[0]["conditions"]
更改为flt_conditions
,以便直接将条件列表传递给construct_filter_condition
函数。 - 简化代码: 删除了一些不必要的代码,使代码更简洁。
这段代码应该可以正常工作,并根据提供的条件过滤数据框。
标签:python,pyspark,conditional-statements,boolean,databricks From: 78809223