首页 > 其他分享 >使用PySpark计算AUC,KS与PSI

使用PySpark计算AUC,KS与PSI

时间:2023-08-20 16:22:08浏览次数:55  
标签:AUC PSI name PySpark sum withColumn quantitles date col

当特征数量或者模型数量很多的时候,使用PySpark去计算相关指标会节省很多的时间。网上关于使用PySpark计算相关指标的资料较少,这里抛砖引玉,写了三个风控常用的指标AUC,KS和PSI相关的计算方法,供参考。

AUC

AUC的相关概念网上已经有很多的很好的文章,这里不在赘述,AUC使用的到的计算公式如下:

\[AUC=\frac{\sum_{i\in positiveClass}rank_i-{\displaystyle\frac{M(1+M)}2}}{M\times N} \]

其中M为负类样本的数目,N为正类样本的数目

使用PySpark计算代码如下:

from pyspark.sql import functions as F
from pyspark.sql.window import Window

true_y_col = 'y'
pred_y_col = 'pred_y'
date_col = 'day'


auc_df = df.filter(F.col(true_y_col)>=0).filter(F.col(pred_y_col)>=0)\
           .select(true_y_col, pred_y_col, date_col, 'model_name')\
           .withColumn('totalbad', F.sum(F.col(true_y_col)).over(Window.patitonBy(date_col, 'model_name').orderBy(F.lit(1))))\
           .withColumn('totalgood', F.sum(1-F.col(true_y_col)).over(Window.patitonBy(date_col, 'model_name').orderBy(F.lit(1))))\
           .withColumn('rnk2', F.row_number().over(Window.partitionBy(date_col, 'model_name').orderBy(F.col(pred_y_col).asc())))\
           .filter(F.col(true_y_col)==1)\
           .groupBy(date_col, 'model_name')\
           .agg(((F.sum(F.col('rnk2'))-0.5*(F.max(F.col('totalbad')))*(1+F.max(F.col('totalbad'))))/(F.max(F.col('totalbad'))*F.max(F.col('totalgood')))).alias('AUC'))\
           .orderBy('model_name', date_col)

KS

KS统计量是基于经验累积分布函数(Empirical Cumulative Distribution Function,ECDF)
建立的,一般定义为:

\[KS=\max\left\{\left|cum\left(bad\_rate\right)-cum\left(good\_rate\right)\right|\right\} \]

即为TPRFPR差值绝对值的最大值。

\[KS=max\left(\left|TPR-FPR\right|\right) \]

KS计算方法有很多种,这里使用的是分箱法分别计算TPRFPR,然后得到KS。
使用PySpark计算代码如下:

from pyspark.sql import functions as F
from pyspark.sql.window import Window

true_y_col = 'y'
pred_y_col = 'pred_y'
date_col = 'day'
nBins = 10

ks_df = df.filter(F.col(true_y_col)>=0).filter(F.col(pred_y_col)>=0)\
          .select(true_y_col, pred_y_col, date_col, 'model_name')\
          .withColumn('Bin', F.ntile(nBins).over(Window.partitionBy(date_col, 'model_name').orderBy(pred_y_col)))\
          .groupBy(date_col, 'model_name', 'Bin').agg(F.sum(true_y_col).alias('N_1'), F.sum(1-F.col(true_y_col)).alias('N-0'))\
          .withColumn('ALL_1', F.sum('N_1').over(Window.partitionBy(date_col, 'model_name')))\
          .withColumn('ALL_0', F.sum('N_0').over(Window.partitionBy(date_col, 'model_name')))\
          .withColumn('SUM_1', F.sum('N_1').over(Window.partitionBy(date_col, 'model_name').orderBy('Bin')))\
          .withColumn('ALL_0', F.sum('N_0').over(Window.partitionBy(date_col, 'model_name').orderBy('Bin')))\
          .withColumn('KSn', F.expr('round(abs(SUM_1/ALL_1-SUM_0/ALL_0),6)'))\
          .withColumn('KS', F.round(F.max('KSn').over(Window.partitionBy(date_col, 'model_name')),6))

ks_df = ks_df.select(date_col, 'model_name', 'KS').filter(col('KS').isNotNull()).dropDuplicates()

PSI

群体稳定性指标(Population Stability Index,PSI)是风控场景常用的验证样本在各分数段的分布与建模样本分布的稳定性。在建模中,常用来筛选特征变量、评估模型稳定性

计算公式如下:

\[psi=\sum_{i=1}^n\left(A_i-E_i\right)\ast\ln\left(A_i/E_i\right) \]

其中\(A_i\)代表的是第i个分箱中实际分布(actual)样本占比,同理\(E_i\)代表的是第i个分箱中预期分布(excepted)样本占比

使用PySpark计算代码如下:

from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.functions import when


date_col = 'day'
nBins = 10
feature_list = ['fea_1', 'fea_2', 'fea_3']

df = df.withColumn('flag', when(F.col(date_col) == 'actual_date'), 0).when(F.col(date_col) == 'excepted_date').otherwise(None)

quantitles = df.filter(F.col('flag') == 0)\
               .approxQuantile(feature_list, [i/nBins for i in range(1, nBins)], 0.001) # 基准样本分箱

quantitles_dict = {col: quantitles[idx] for idx, col in enumerate(feature_list)}
f_quantitles_dict = F.create_map([F.lit(x) if isinstance(x, str) else F.array(*[F.lit(xx) for xx in x]) for i in quantitles_dict.items() for x in i])

unpivotExpr = "stack(3, 'fea_1', fea_1, 'fea_2', fea_2, 'fea_3', fea_3)"

psi_df = df.filter(F.col('flag').isNotNull()).select('flag', F.expr(unpivotExpr))\
           .withColumn('Bin', when(F.col('value').isNull(), 'Missing').otherwise(
            when(F.col('value') < f_quantitles_dict[F.col('varname')][0], 'bin_0')
            .when(F.col('value') < f_quantitles_dict[F.col('varname')][1], 'bin_1')
            .when(F.col('value') < f_quantitles_dict[F.col('varname')][2], 'bin_2')
            .when(F.col('value') < f_quantitles_dict[F.col('varname')][3], 'bin_3')
            .when(F.col('value') < f_quantitles_dict[F.col('varname')][4], 'bin_4')
            .when(F.col('value') < f_quantitles_dict[F.col('varname')][5], 'bin_5')
            .when(F.col('value') < f_quantitles_dict[F.col('varname')][6], 'bin_6')
            .when(F.col('value') < f_quantitles_dict[F.col('varname')][7], 'bin_7')
            .when(F.col('value') < f_quantitles_dict[F.col('varname')][8], 'bin_8')
            .when(F.col('value') < f_quantitles_dict[F.col('varname')][8], 'bin_9')))\
           .groupBy('varname', 'Bin').agg(F.sum('flag').alias('N_1'), F.sum(1-F.col('flag')).alias('N_0'))\
           .withColumn('ALL_1', F.sum('N_1').over(Window.partitionBy('varname')))\
           .withColumn('ALL_0', F.sum('N_0').over(Window.partitionBy('varname')))\
           .withColumn('actual', F.expr('round(N_0/ALL_0, 6)'))\
           .withColumn('excepted', F.expr('round(N_1/ALL_1, 6)'))\
           .withColumn('PSIn', F.expr('round((actual-excepted)*ln(actual/excepted), 6'))\
           .withColumn('PSI', F.round(F.sum('PSIn').over(Window.partitionBy('varname')), 6))

Reference

标签:AUC,PSI,name,PySpark,sum,withColumn,quantitles,date,col
From: https://www.cnblogs.com/harrylyx/p/17644162.html

相关文章

  • AUC计算及为何不受样例不均衡的影响
    在很多排序场景下,尤其是当前许多数据集正负样例都不太均衡;或者说因训练集过大,可能会对数据进行负采样等操作。这擦操作的前提是建立在AUC值不会受到正负样本比例的影响。看过很多博客也都在讨论:为什么AUC不会受正负样例不平衡的影响?为什么排序喜欢选择AUC作为评判指标。一方面,从A......
  • win10 搭建pyspark环境
    1、环境配置教程:地址 2、测试代码示例importosimportfindsparkos.environ['JAVA_HOME']='D:\Java\jdk1.8.0_311'#这里的路径为java的bin目录所在路径spark_home="D:\spark-3.0.3-bin-hadoop2.7"python_path="D:\Anaconda3\bin\python"f......
  • Distributions: Uniform | Cauchy |
    UniformDistribution:U(a,b):F(x)=x·1/(b-a)p(x)=1/(b-a)ifq<x<b;p(x)=0else.E(x)=(a+b)/2CauchyDistribution:F(x)=[arctan(x)+pi/2]·1/pip(x)=[1/(1+x^2)]·1/piE(x):non-exist......
  • pyspark写入文件
    ##py_spark_rdd2py.py#py_learn##CreatedbyZ.Steveon2023/8/1311:39.##1.rdd.collect()将rdd变为list#2.rdd.reduce()不分组,对rdd数据做两两聚合frompysparkimportSparkConf,SparkContextconf=SparkConf().setMaster("local[*]")......
  • pyspark小demo2
    ##py_pyspark_demo2.py#py_learn##CreatedbyZ.Steveon2023/8/1310:55.#importjson#1.导入库frompysparkimportSparkConf,SparkContext#2.创建SparkConf和SparkContext对象conf=SparkConf().setMaster("local[*]").setAppName(&q......
  • pyspark的filter()、distinct()、sortBy() 函数
    ##py_pyspark_test.py#py_learn##CreatedbyZ.Steveon2023/8/1217:38.#frompysparkimportSparkConf,SparkContextconf=SparkConf().setMaster("local[*]").setAppName("rdd_test")sc=SparkContext(conf=conf)#rdd=......
  • pyspark小案例
    ##py_pyspark_demo.py#py_learn##CreatedbyZ.Steveon2023/8/1215:33.##统计文件中各个单词出现的次数#1.导入库frompysparkimportSparkConf,SparkContext#2.创建SparkConf对象和SparkContext对象conf=SparkConf().setMaster("local......
  • pyspark使用
    ##py_pyspark.py#py_learn##CreatedbyZ.Steveon2023/8/1017:51.##pyspark编程主要分三步:1.数据输入。2.数据处理。3.数据输出。#RDD:ResilientDistributedDatasets弹性分布式数据集#1.安装pyspark库#pip3installpyspark#2.导入p......
  • 调和级数发散率证明|欧拉常数|ln n+gamma+varepsilon_k证明|sigma(1/i)
    最近在做一个练习,然后看到了调和级数这个东西,说实话这东西谁能在考场上想到,平日还是要多积累。开门见山但是我们今天只证这个东西:\[\sum^{n}_{i=1}\frac{1}{n}=\lnn+\gamma+\varepsilon_n\]其中\(\gamma\)gamma是欧拉常数(约等于0.57721566490153286060651209,关于欧......
  • 【Python】PySpark 数据计算 ② ( RDD#flatMap 方法 | RDD#flatMap 语法 | 代码示例 )
    文章目录一、RDD#flatMap方法1、RDD#flatMap方法引入2、解除嵌套3、RDD#flatMap语法说明二、代码示例-RDD#flatMap方法一、RDD#flatMap方法1、RDD#flatMap方法引入RDD#map方法可以将RDD中的数据元素逐个进行处理,处理的逻辑需要用外部通过参数传入map函数......