首页 > 编程问答 >优化 PySpark 代码:在保持功能的同时避免 For 循环并减少

优化 PySpark 代码:在保持功能的同时避免 For 循环并减少

时间:2024-08-04 15:01:41浏览次数:8  
标签:python python-3.x apache-spark pyspark

from pyspark.sql import Window
from pyspark.sql import functions as F
import functools
from datetime import datetime

def generate_new_rating_data(w_df, count_a, distinct_a, flag_a, suffix):
    if flag_a:
        w_df = w_df.where(
            (w_df[f"NR_Count{suffix}"] > 0) & (w_df[f"NR_Count{suffix}"] == w_df[f"Rate_Count{suffix}"])
        )
        window_spec = Window.partitionBy("ID").orderBy("rating_order")
        return {
            w_df.where(F.col(f"Rating_Rank{suffix}") >= 250)
                .withColumn("rank", F.row_number().over(window_spec))
                .where(F.col("rank") == 1)
                .select(
                    F.col("ID"),
                    F.col("Source").alias(f"Source{suffix}"),
                    F.col(f"Rating_Rank{suffix}"),
                    F.col(f"NormCode{suffix}").alias(f"Rating{suffix}")
                )
        }
    elif count_a == 3 and distinct_a == 2:
        temp_df = w_df.where(
            (w_df[f"NormCode{suffix}"] != "NR")
            & (w_df[f"NonNR_Count{suffix}"] == count_a)
            & (w_df[f"NonNR_Distinct{suffix}"] == distinct_a)
        )
        count_df = temp_df.groupby("ID", f"Rating_Rank{suffix}") \
            .agg(F.count("*").alias("total_count")) \
            .where(F.col("total_count") == 2)
        
        return {
            temp_df.join(count_df, on=["ID", f"Rating_Rank{suffix}"], how="inner")
                .withColumn("rank", F.row_number().over(window_spec))
                .where(F.col("rank") == 1)
                .select(
                    F.col("ID"),
                    F.col("Source").alias(f"Source{suffix}"),
                    F.col(f"Rating_Rank{suffix}"),
                    F.col(f"NormCode{suffix}").alias(f"Rating{suffix}")
                )
        }
    else:
        w_df = w_df.where(F.col(f"NormCode{suffix}") != "NR")
        temp_df = w_df.where(
            (w_df[f"NonNR_Count{suffix}"] == count_a)
            & (w_df[f"NonNR_Distinct{suffix}"] == distinct_a)
        )
        window_spec = Window.partitionBy("ID").orderBy("rating_order")
        return {
            temp_df.where(F.col(f"Rating_Rank{suffix}") == F.col(f"lr_rating"))
                .withColumn("rank", F.row_number().over(window_spec))
                .where(F.col("rank") == 1)
                .select(
                    F.col("ID"),
                    F.col("Source").alias(f"Source{suffix}"),
                    F.col(f"Rating_Rank{suffix}"),
                    F.col(f"NormCode{suffix}").alias(f"Rating{suffix}")
                )
        }

def loop_new_ratings(df, suffix=""):
    window_params = [
        [0, 0, f"Low{suffix}", F.asc(f"Rank{suffix}"), True],
        [3, 3, f"Mid{suffix}", F.asc(f"Rank{suffix}"), False],
        [3, 2, f"Low{suffix}", F.asc(f"Rank{suffix}"), False],
        [3, 1, f"Low{suffix}", F.asc(f"Rank{suffix}"), False],
        [2, 2, f"High{suffix}", F.desc(f"Rank{suffix}"), False],
        [2, 1, f"Low{suffix}", F.asc(f"Rank{suffix}"), False],
        [1, 1, f"Low{suffix}", F.asc(f"Rank{suffix}"), False],
    ]
    
    shortened_df = (
        df
        .select(
            "ID", "Source", f"NormCode{suffix}",
            f"Rank{suffix}", f"Rating_Rank{suffix}",
            f"Mid{suffix}", f"Distinct{suffix}", f"Rate_Count{suffix}", f"NR_Count{suffix}",
            f"NonNR_Distinct{suffix}"
        )
    )
    
    return functools.reduce(DataFrame.union, [generate_new_rating_data(shortened_df, *params, suffix) for params in window_params])

final_df = loop_new_ratings(rating_data)
final_df_short = loop_new_ratings(rating_data, "Short")
final_df_long = loop_new_ratings(rating_data, "Long")

combined_final_df = (
    join_dfs(
        [
            final_df,
            final_df_short,
            final_df_long
        ],
        select_list=[
            "ID", "Source", "SourceShort", "SourceLong",
            "Rating_Rank", "Rating_RankShort", "Rating_RankLong",
            "Rating", "RatingShort", "RatingLong"
        ]
    )
)

print(datetime.now(), "final combined df:")
combined_final_df.show()

我有一个 PySpark 脚本,它使用窗口函数和聚合处理评级数据。该代码工作正常,但没有优化,因为 它使用 for 循环和 functools.reduce 来组合 DataFrame,我相信这可以改进。我希望通过避免 for 循环和减少 来优化此脚本,同时保持相同的功能。

我尝试使用同一 DataFrame 中的窗口函数和条件将 DataFrame 转换合并为单个操作。但是,我找不到完全消除循环和 functools.reduce 的方法。


from pyspark.sql import Window
from pyspark.sql import functions as F
from datetime import datetime

def generate_new_rating_data(w_df, suffix):
    window_spec = Window.partitionBy("ID").orderBy("rating_order")

    return (
        w_df
        .withColumn(
            "flag_a", 
            (F.col(f"NR_Count{suffix}") > 0) & (F.col(f"NR_Count{suffix}") == F.col(f"Rate_Count{suffix}"))
        )
        .withColumn(
            "count_a",
            F.when((F.col(f"NormCode{suffix}") != "NR") & (F.col(f"NonNR_Distinct{suffix}") == 2), 3).otherwise(
                F.when(F.col(f"NormCode{suffix}") != "NR", F.col(f"NonNR_Count{suffix}")).otherwise(None)
            )
        )
        .withColumn(
            "distinct_a",
            F.when(F.col(f"NormCode{suffix}") != "NR", F.col(f"NonNR_Distinct{suffix}")).otherwise(None)
        )
        .withColumn("rank", F.row_number().over(window_spec))
        .filter(
            (F.col("flag_a") & (F.col(f"Rating_Rank{suffix}") >= 250)) |
            ((F.col("count_a") == 3) & (F.col("distinct_a") == 2) & (F.col("total_count") == 2)) |
            ((F.col("count_a").isin(3, 2, 1)) & (F.col(f"Rating_Rank{suffix}") == F.col(f"lr_rating")))
        )
        .groupBy("ID", "Source", f"Rating_Rank{suffix}", f"NormCode{suffix}", "count_a", "distinct_a")
        .agg(F.min("rank").alias("rank"))
        .filter(F.col("rank") == 1)
        .select(
            F.col("ID"),
            F.col("Source").alias(f"Source{suffix}"),
            F.col(f"Rating_Rank{suffix}"),
            F.col(f"NormCode{suffix}").alias(f"Rating{suffix}")
        )
    )

def process_ratings(df, suffix=""):
    shortened_df = (
        df
        .select(
            "ID", "Source", f"NormCode{suffix}",
            f"Rank{suffix}", f"Rating_Rank{suffix}",
            f"Mid{suffix}", f"Distinct{suffix}", f"Rate_Count{suffix}", f"NR_Count{suffix}",
            f"NonNR_Distinct{suffix}", f"lr_rating", "rating_order"
        )
        .withColumn("total_count", F.count("*").over(Window.partitionBy("ID", f"Rating_Rank{suffix}")))
    )

    return generate_new_rating_data(shortened_df, suffix)

final_df = process_ratings(rating_data)
final_df_short = process_ratings(rating_data, "Short")
final_df_long = process_ratings(rating_data, "Long")

combined_final_df = (
    final_df
    .join(final_df_short, on="ID", how="inner")
    .join(final_df_long, on="ID", how="inner")
    .select(
        "ID", "Source", "SourceShort", "SourceLong",
        "Rating_Rank", "Rating_RankShort", "Rating_RankLong",
        "Rating", "RatingShort", "RatingLong"
    )
)

print(datetime.now(), "final combined df:")
combined_final_df.show()

以下是代码更改和优化的说明:

  1. 消除 loop_new_ratings 函数和 for 循环:
    • 我们不需要迭代 window_params 列表,因为我们可以使用条件聚合和窗口函数在单个 DataFrame 操作中实现相同的结果。
  2. 使用条件聚合简化 generate_new_rating_data 函数:
    • 我们使用 F.when 创建新的条件列 flag_a count_a distinct_a ,以根据条件应用不同的逻辑。
    • 我们使用 F.count("*").over(Window.partitionBy("ID", f"Rating_Rank{suffix}")) 计算 total_count ,用于特定条件。
  3. 合并 DataFrame 操作:
    • 我们将所有筛选、分组和聚合操作合并到单个 DataFrame 转换链中,以避免创建不必要的中间 DataFrame。
  4. 使用 join 代替 functools.reduce
    • 我们使用 join 操作有效地组合最终的 DataFrame,而不是使用 functools.reduce

这些更改消除了 for 循环和 functools.reduce ,从而使代码更简洁、更高效。通过利用 PySpark DataFrame API 的强大功能,我们可以在单个转换链中表达复杂的逻辑,从而提高性能并降低代码复杂性。

标签:python,python-3.x,apache-spark,pyspark
From: 78830410

相关文章

  • 如何在python中使用xarray打开grib2文件?
    将xarray导入为xr导入cfgrib导入生态码将pandas导入为pddata=xr.open_dataset(r"C:\Users\new\forecast_data.grib2",engine="cfgrib")这是我的代码。我只想使用xarray读取这个文件。错误是:无法识别的引擎cfgrib必须是以下之一:['netcdf4'、'scipy'、'......
  • 如何在 java 或 python 中使用 HTTP(S) 解决无法解析的主机名或无法识别的名称错误?
    我尝试以编程方式访问网站的信息,但在Java和Python上都无法解析主机名。如果我指定IP地址,则会将错误更改为TLSV1_UNRECOGNIZED_NAME。不过,这个网站无需任何额外的工作就可以通过任何浏览器解决。我在这里浏览了很多潜在的解决方案,但对于Python,它说这个问题应该在2.7......
  • Python 请求 POST 请求与 websockets 库一起使用时挂起
    我使用Python中的requests库发送POST请求,同时维护与websockets库的WebSocket连接:importasyncioimportrequestsimportwebsocketsasyncdefwebsocket_handler(uri):asyncwithwebsockets.connect(uri)aswebsocket:whileTrue:me......
  • 在Python中,list1[::] = list2的空间复杂度是多少?
    此代码首先迭代列表nums,更新整数0、1、2(也分别称为红色、白色和蓝色)的计数。nums保证只有整数0、1和/或2。找到计数后,代码使用[::],这是一种就地修改列表的技巧,以排序numsdefsortColors(self,nums:List[int])->None:re......
  • [附开题]flask框架高校资产管理系统d8y3s(源码+论文+python)
    本系统(程序+源码+数据库+调试部署+开发环境)带论文文档1万字以上,文末可获取,系统界面在最后面。系统程序文件列表开题报告内容研究背景随着高等教育事业的快速发展,高校资产规模日益庞大,种类繁多,管理难度显著增加。传统的资产管理方式往往依赖于手工记录和纸质档案,不仅效率低......
  • [附开题]flask框架贺州图特产管理系统uuy79(源码+论文+python)
    本系统(程序+源码+数据库+调试部署+开发环境)带论文文档1万字以上,文末可获取,系统界面在最后面。系统程序文件列表开题报告内容研究背景贺州,这座历史悠久、文化底蕴深厚的城市,以其丰富的自然资源和独特的地理位置孕育了众多令人瞩目的特产。然而,在信息化快速发展的今天,贺州特......
  • [附开题]flask框架红枫超市会员管理系统ew5iq(源码+论文+python)
    本系统(程序+源码+数据库+调试部署+开发环境)带论文文档1万字以上,文末可获取,系统界面在最后面。系统程序文件列表开题报告内容研究背景随着零售行业的快速发展与消费者需求的日益多样化,超市作为人们日常生活中不可或缺的一部分,其管理效率和服务质量直接影响着顾客的购物体验......
  • PYTHON专题-(4)python叫你搞对象
    什么是面向过程编程?面向过程的程序设计把计算机程序视为一系列的命令集合,即一组函数的顺序执行。为了简化程序设计,面向过程把函数继续切分为子函数,即把大块函数通过切割成小块函数来降低系统的复杂度。什么是面向对象编程?面向对象编程——ObjectOrientedProgramming,简......
  • Python 基础教学:中文编码处理
    《Python基础教学:中文编码处理》在编程中,处理中文字符时经常会遇到编码问题。Python3默认使用UTF-8编码,但在处理文件、网络数据或与旧系统交互时,可能需要处理GBK、GB2312等其他编码。1.字符串的编码和解码在Python中,字符串(str)默认是Unicode编码。当你需要将......
  • Python 基础教学:深入了解 continue、break 和 pass 语句
    《Python基础教学:深入了解continue、break和pass语句》Python中的控制流语句不仅仅包括条件语句和循环,还包括continue、break和pass这三个特殊的关键字,它们在特定情况下可以控制程序的流程。1.continue语句continue用于跳过当前循环的剩余代码,在循环控制结......