首页 > 编程问答 >无法使用任何运算符将具有简单过滤条件的列转换为布尔值

无法使用任何运算符将具有简单过滤条件的列转换为布尔值

时间:2024-07-30 12:08:39浏览次数:12  
标签:python pyspark conditional-statements boolean databricks

我正在尝试从 python 中的 dict 结构动态形成过滤条件,这是一个非常简单的条件,会给出以下错误:

Final constructed filter condition: Column<'(CompanyCode IN (1930, 1931))'> Type of final_condition: <class 'pyspark.sql.column.Column'> PySparkValueError: [CANNOT_CONVERT_COLUMN_INTO_BOOL] Cannot convert column into bool: please use '&' for 'and', '|' for 'or', '~' for 'not' when building DataFrame boolean expressions.

我的数据和列如下:

data = [("John", 1930), ("Doe", 1931), ("Jane", 1940)]
columns = ["Name", "Code"]

我正在尝试使用动态代码模仿下面的过滤器:

df = spark.createDataFrame(data, schema=columns)
df.filter(F.col("Code").isin(1930, 1931)).show()

我的代码如下:

    {
        "conditions": [
            {"column": "Code", "operator": "IN", "values": [1930, 1931]}
        ]
    }

下面是我的代码:

def construct_filter_condition(conditions):

    """
    Construct filter condition from a list of conditions.
    :param conditions: List of conditions
    :return: PySpark Column expression
    """
    combined_conditions = []
    for condition in conditions:
        

        column = condition["column"]
        operator = condition["operator"].lower()
        values = condition.get("values")
        value = condition.get("value")

        if operator == "in":
            new_condition = F.col(column).isin(values)
        elif operator == "eq":
            new_condition = F.col(column) == value
        elif operator == "ne":
            new_condition = F.col(column) != value
        elif operator == "lt":
            new_condition = F.col(column) < value
        elif operator == "le":
            new_condition = F.col(column) <= value
        elif operator == "gt":
            new_condition = F.col(column) > value
        elif operator == "ge":
            new_condition = F.col(column) >= value
        elif operator == "not_in":
            new_condition = ~F.col(column).isin(values)
        else:
            continue  # Skip invalid operators
    
        combined_conditions.append(new_condition)
    
    # Combine all conditions using 'AND' logic
        if combined_conditions:
            final_condition = combined_conditions[0]
            for cond in combined_conditions[1:]:
                final_condition = final_condition & cond
            return final_condition
        else:
            return None

示例条件:

    flt_conditions = [
        {
            "conditions": [
                {"column": "CompanyCode", "operator": "IN", "values": [1930, 1931]}
            ]
        }
    ]

构造并应用过滤器

final_condition = construct_filter_condition(flt_conditions[0]["conditions"])
if final_condition:
    print(f"Constructed filter condition: {final_condition}")
    filtered_df = df.filter(final_condition)
    print(f"DataFrame filter syntax: df.filter({final_condition})")
    filtered_df.show()
else:
    print("No valid filter conditions provided.")

你遇到的错误是因为你试图将条件列表传递给 construct_filter_condition 函数,而该函数被设计为接受单个条件字典。

以下是对你的代码进行更正和改进后的版本:

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

def construct_filter_condition(conditions):
    """
    从条件列表构建过滤器条件。
    :param conditions: 条件列表
    :return: PySpark 列表达式
    """
    combined_conditions = []
    for condition in conditions:
        column = condition["column"]
        operator = condition["operator"].lower()
        values = condition.get("values")
        value = condition.get("value")

        if operator == "in":
            new_condition = F.col(column).isin(values)
        elif operator == "eq":
            new_condition = F.col(column) == value
        elif operator == "ne":
            new_condition = F.col(column) != value
        elif operator == "lt":
            new_condition = F.col(column) < value
        elif operator == "le":
            new_condition = F.col(column) <= value
        elif operator == "gt":
            new_condition = F.col(column) > value
        elif operator == "ge":
            new_condition = F.col(column) >= value
        elif operator == "not_in":
            new_condition = ~F.col(column).isin(values)
        else:
            continue  # 跳过无效操作符

        combined_conditions.append(new_condition)

    # 使用 'AND' 逻辑组合所有条件
    if combined_conditions:
        final_condition = combined_conditions[0]
        for cond in combined_conditions[1:]:
            final_condition = final_condition & cond
        return final_condition
    else:
        return None

# 初始化 SparkSession
spark = SparkSession.builder.appName("FilterExample").getOrCreate()

# 示例数据和列
data = [("John", 1930), ("Doe", 1931), ("Jane", 1940)]
columns = ["Name", "Code"]
df = spark.createDataFrame(data, schema=columns)

# 示例条件
flt_conditions = [
    {"column": "Code", "operator": "IN", "values": [1930, 1931]}
]

# 构造并应用过滤器
final_condition = construct_filter_condition(flt_conditions)
if final_condition:
    print(f"构造的过滤器条件: {final_condition}")
    filtered_df = df.filter(final_condition)
    print(f"DataFrame 过滤器语法: df.filter({final_condition})")
    filtered_df.show()
else:
    print("未提供有效的过滤器条件。")

# 停止 SparkSession
spark.stop()

更改:

  1. 直接传递条件: flt_conditions[0]["conditions"] 更改为 flt_conditions ,以便直接将条件列表传递给 construct_filter_condition 函数。
  2. 简化代码: 删除了一些不必要的代码,使代码更简洁。

这段代码应该可以正常工作,并根据提供的条件过滤数据框。

标签:python,pyspark,conditional-statements,boolean,databricks
From: 78809223

相关文章

  • 在家用电脑上设置 Python 和 Jupyter,尝试打开 Jupyter 笔记本并显示错误,无法获取
    我有最新的Python版本3.12.4和以下版本的Jupyter:SelectedJupytercorepackages...IPython:8.26.0ipykernel:6.29.5ipywidgets:notinstalledjupyter_client:8.6.2jupyter_core:5.7.2jupyter_server:2.14.2jupyterlab......
  • Python - Reloading a module
    Eachmoduleisloadedintomemoryonlyonceduringaninterpretersessionorduringaprogramrun,regardlessofthenumberoftimesitisimportedintoaprogram.Ifmultipleimportsoccur,themodule’scodewillnotbeexecutedagainandagain.Suppose......
  • vscode python 3.7 pylance debugpy 插件 vsix
    可能报错  crashed5timesinthelast3minutes.Theserverwillnotberestarted.  ---pylance 可能报错  cannotreadpropertiesofundefinedreadingresolveEnvironment   --- debugger可能      vscodepython3.7调试没有反应......
  • Python获取秒级时间戳与毫秒级时间戳的方法[通俗易懂]
    参考资料:https://cloud.tencent.com/developer/article/21581481、获取秒级时间戳与毫秒级时间戳、微秒级时间戳代码语言:javascript复制importtimeimportdatetimet=time.time()print(t)#原始时间数据print(int(t))......
  • CEFPython
    在Tkinter界面中直接嵌入Selenium的浏览器视图并不是一件直接的事情,因为Selenium本身并不提供图形界面嵌入的功能。Selenium主要用于自动化web浏览器,但它并不直接控制浏览器窗口的显示方式,而是依赖于WebDriver来与浏览器交互。然而,你可以使用一些替代方案来在Tkinter应用中模拟或......
  • 《最新出炉》系列初窥篇-Python+Playwright自动化测试-58 - 文件下载
    1.简介前边几篇文章讲解完如何上传文件,既然有上传,那么就可能会有下载文件。因此宏哥就接着讲解和分享一下:自动化测试下载文件。可能有的小伙伴或者童鞋们会觉得这不是很简单吗,还用你介绍和讲解啊,不说就是访问到下载页面,然后定位到要下载的文件的下载按钮后,点击按钮就可以了。其实......
  • Python - Function Annotations
     deffunc(s:str,i:int,j:int)->str:returns[i:j]Theparametersissupposedtobeastring,soweplaceacolonaftertheparameternameandthenwritestr.Parametersiandjaresupposedtobeintegerssowewriteintforthem.Returntypeis......
  • 使用带有 pythonKit XCODE 的嵌入式 Python,在 iOS 应用程序中与 OpenCV-python 签名不
    我根据Beewares使用指南在XCODE中将Python嵌入到我的iOS项目中https://github.com/beeware/Python-Apple-support/blob/main/USAGE.md运行时,我得到pythonKit找不到由ultralytics导入的cv2错误。当我将OpenCV-python添加到我的app_packages文件夹时......
  • Python - Arguments and Parameters
    ParametersinFunctionDefinitionA.deffunc(name):MatchbypositionorbynameB.deffunc(name=value):DefaultargumentC.deffunc(*args):CollectextrapositionalargumentsintuplenamedargsD.deffunc(**kwargs):Collectextrakeywordargumentsi......
  • Python MySQL 无法连接,原因不明
    当我尝试使用python连接到我的MySQL数据库时,由于未知原因显示错误:dTraceback(mostrecentcalllast):File"/usr/local/bin/flask",line8,in<module>sys.exit(main())^^^^^^File"/usr/local/lib/python3.12/site-packages/flask/cli.py&......