PySpark 机器学习教程(全)
一、数据的演变
在理解 Spark 之前,有必要理解我们今天所目睹的这种数据洪流背后的原因。在早期,数据是由工人生成或积累的,因此只有公司的员工将数据输入系统,数据点非常有限,只能捕获几个领域。然后出现了互联网,使用互联网的每个人都可以很容易地获取信息。现在,用户有权输入和生成自己的数据。这是一个巨大的转变,因为互联网用户的数量呈指数级增长,这些用户创建的数据甚至以更高的速度增长。例如:登录/注册表单允许用户填写自己的详细信息,在各种社交平台上上传照片和视频。这导致了巨大的数据生成,并且需要一个快速且可扩展的框架来处理如此大量的数据。
数据生成
如图 1-1 所示,随着机器生成和累积数据,这种数据生成现已进入下一个阶段。我们周围的每一个设备都在捕捉数据,比如汽车、建筑、手机、手表、飞机引擎。它们嵌入了多个监测传感器,每秒记录数据。该数据甚至比用户生成的数据更大。
图 1-1
数据 进化
早些时候,当数据仍处于企业级别时,关系数据库足以满足系统需求,但随着数据规模在过去几十年中呈指数级增长,处理大数据的结构发生了变化,这就是 Spark 的诞生。传统上,我们将数据带到处理器中进行处理,但现在数据太多,处理器不堪重负。现在,我们将多个处理器用于数据处理。这就是所谓的并行处理,因为数据是在许多地方同时处理的。
让我们看一个例子来理解并行处理。假设在一条特定的高速公路上,只有一个收费站,每辆车必须排成一排才能通过收费站,如图 1-2 所示。如果平均每辆车通过收费站需要 1 分钟,那么八辆车总共需要 8 分钟。对于 100 辆车,需要 100 分钟。
图 1-2
单线程处理
但是想象一下,如果不是一个收费站,而是在同一条高速公路上有八个收费站,车辆可以使用其中的任何一个通过。如图 1-3 所示,由于现在不存在依赖性,所有 8 辆车通过收费站总共只需要 1 分钟。我们已经将操作并行化。
图 1-3
并行处理
并行或分布式计算的工作原理类似,因为它将任务并行化,并在最后累积最终结果。Spark 是一个以高速并行处理来处理海量数据集的框架,是一个健壮的机制。
火花
Apache Spark 于 2009 年开始作为加州大学伯克利分校 AMPLab 的一个研究项目,并于 2010 年初开源,如图 1-4 所示。从此,再也没有回头。2016 年,Spark 发布深度学习 TensorFrames。
图 1-4
火花 进化
在引擎盖下,Spark 使用了一种不同的数据结构,称为 RDD(弹性分布式数据集)。从某种意义上说,它是有弹性的,因为它们有能力在执行过程中重新创建任何时间点。因此,RDD 用最后一个创建了一个新的 RDD,并且在出现任何错误的情况下都有能力进行重建。它们也是不可变的,因为原始 rdd 保持不变。由于 Spark 是一个分布式框架,它在主节点和工作节点设置上工作,如图 1-5 所示。执行任何活动的代码首先写在 Spark Driver 上,并在数据实际驻留的工作节点之间共享。每个 worker 节点都包含实际执行代码的执行器。集群管理器为下一个任务分配检查各种工作节点的可用性。
图 1-5
火花 运转正常
Spark 大受欢迎的主要原因是因为它非常容易用于数据处理、机器学习和数据流;相对来说,它非常快,因为它完成了所有内存中的计算。由于 Spark 是一个通用的数据处理引擎,它可以很容易地用于各种数据源,如 HBase ,Cassandra,亚马逊 S3,,等。Spark 为用户提供了四种语言选项:Java、Python、Scala 和 r。
火花核心
Spark 核心是 Spark 最基本的构建模块,如图 1-6 所示。它是 Spark 最高功能特性的支柱。Spark Core 支持驱动数据并行和分布式处理的内存计算。Spark 的所有功能都建立在 Spark Core 之上。Spark 核心负责管理任务、I/O 操作、容错和内存管理等。
图 1-6
星火 建筑
火花部件
让我们看看组件。
Spark SQL
该组件主要处理结构化数据处理。关键思想是获取更多关于数据结构的信息来执行额外的优化。它可以被认为是一个分布式 SQL 查询引擎。
火花流
该组件以可扩展和容错的方式处理实时流数据。它使用微批处理来读取和处理传入的数据流。它创建小批量的流数据,执行批处理,并将其传递给一些文件存储或实时仪表板。Spark Streaming 可以从 Kafka 和 Flume 等多个来源获取数据。
Spark MLlib(消歧义)
该组件用于以分布式方式在大数据上构建机器学习模型。当数据量巨大时,使用 Python 的 scikit learn 库构建 ML 模型的传统技术面临许多挑战,而 MLlib 的设计方式提供了大规模的特征工程和机器学习。MLlib 具有为分类、回归、聚类、推荐系统和自然语言处理实现的大多数算法。
火花图形 x/图形框架
该组件擅长图形分析和图形并行执行。图表框架可用于理解潜在的关系,并使数据的洞察力可视化。
设置环境
本章的这一节将介绍如何在系统上设置 Spark 环境。基于操作系统,我们可以选择在系统上安装 Spark 的选项。
Windows 操作系统
要下载的文件:
-
Anaconda (Python 3.x)
-
Java(如果没有安装)
-
Apache Spark 最新版本
-
Winutils.exe
蟒蛇装置
从链接 https://www.anaconda.com/download/#windows
下载 Anaconda 发行版并安装在您的系统上。在安装它的时候需要注意的一点是,启用将 Anaconda 添加到 path 环境变量的选项,以便 Windows 可以在启动 Python 时找到相关文件。
一旦安装了 Anaconda,我们就可以使用命令提示符检查 Python 在系统上是否工作正常。您可能还想通过尝试以下命令来检查 Jupyter 笔记本是否也打开了:
[In]: Jupyter notebook
Java 安装
访问 https://www.java.com/en/download/link
下载 Java(最新版本)并安装 Java。
火花装置
在您选择的位置创建一个名为 spark 的文件夹。假设我们决定在 D:/ drive 中创建一个名为 spark 的文件夹。转到 https://spark.apache.org/downloads.html
并选择您想要安装在机器上的 Spark 发布版本。选择“为 Apache Hadoop 2.7 和更高版本预构建”的包类型选项请继续下载。tgz 文件复制到我们之前创建的 spark 文件夹中,并提取所有文件。您还会发现在解压缩后的文件中有一个名为 bin 的文件夹。
下一步是下载 winutils.exe,为此你需要去链接 https://github.com/steveloughran/winutils/blob/master/hadoop-2.7.1/bin/winutils.exe
并下载。exe 文件并保存到解压后的 spark 文件夹的 bin 文件夹中(D:/spark/spark_unzipped/bin)。
现在我们已经下载了所有需要的文件,下一步是添加环境变量以便使用 pyspark。
转到 Windows 的开始按钮,搜索“为您的帐户编辑环境变量”让我们继续为 winutils 创建一个新的环境变量,并为其分配路径。单击 new,创建一个名为 HADOOP_HOME 的新变量,并在变量值占位符中传递文件夹的路径(D:/spark/spark_unzipped)。
我们对 spark 变量重复相同的过程,创建一个名为 SPARK_HOME 的新变量,并在变量值占位符中传递 SPARK 文件夹的路径(D:/spark/spark_unzipped)。
让我们添加几个变量来使用 Jupyter notebook。创建一个名为 PYSPARK_DRIVER_PYTHON 的新变量,并在变量值占位符中传递 Jupyter。创建另一个名为 PYSPARK_DRIVER_PYTHON_OPTS 的变量,并在值字段中传递笔记本。
在同一个窗口中,查找 PATH 或 Path 变量,点击 edit,向其中添加 D:/spark/spark_unzipped/bin。在 Windows 7 中,您需要用分号分隔 Path 中的值。
我们还需要将 Java 添加到环境变量中。因此,创建另一个变量 JAVA_HOME,并传递安装 JAVA 的文件夹的路径。
我们可以打开 cmd 窗口,运行 Jupyter notebook。
[In]: Import findspark
[In]: findspark.init()
[In]:import pyspark
[In]:from pyspark.sql import SparkSession
[In]: spark=SparkSession.builder.getOrCreate()
IOS
假设我们已经在 Mac 上安装了 Anaconda 和 Java,我们可以下载最新版本的 Spark 并保存到主目录。我们可以打开终端,使用
[In]: cd ~
将下载的 spark 压缩文件复制到主目录,并解压缩文件内容。
[In]: mv /users/username/Downloads/ spark-2.3.0-bin-hadoop2.7 /users/username
[In]: tar -zxvf spark-2.3.0-bin-hadoop2.7.tgz
验证您是否有一个. bash_profile。
[In]: ls -a
接下来,我们将编辑。bash_profile,这样我们就可以在任何目录下打开 Spark 笔记本。
[In]: nano .bash_profile
将下面的项目粘贴到 bash 配置文件中。
export SPARK_PATH=~/spark-2.3.0-bin-hadoop2.7
export PYSPARK_DRIVER_PYTHON="jupyter"
export PYSPARK_DRIVER_PYTHON_OPTS="notebook"
alias notebook='$SPARK_PATH/bin/pyspark --master local[2]'
[In]: source .bash_profile
现在尝试在终端中打开 Jupyter notebook,导入 Pyspark 使用。
码头工人
我们可以使用 Jupyter 存储库中的映像直接将 PySpark 与 Docker 一起使用,但这需要在您的系统上安装 Docker。
大数据
Databricks 还提供了一个免费的社区版帐户,并提供了 6 GB 的 PySpark 集群。
结论
在这一章中,我们研究了 Spark 体系结构、各种组件以及设置本地环境以使用 Spark 的不同方式。在接下来的章节中,我们将深入 Spark 的各个方面,并使用它建立一个机器学习模型。
二、机器学习导论
当我们出生时,我们没有能力做任何事情。那时候我们连头都抬不直,但最终我们开始学习。起初,我们都笨手笨脚,犯很多错误,摔倒,撞到头很多次,但慢慢地学会了坐、走、跑、写和说。作为一种内置的机制,我们不需要大量的例子来学习一些东西。例如,仅仅通过看到路边的两到三所房子,我们就可以很容易地学会识别一所房子。只要看到周围有几辆汽车和自行车,我们就能很容易地区分汽车和自行车。我们很容易区分猫和狗。尽管对我们人类来说这看起来非常简单和直观,但对机器来说这可能是一项艰巨的任务。
机器学习是一种机制,通过这种机制,我们试图让机器学习,而不用显式地对它们进行编程。简单来说,我们向机器展示了大量猫和狗的图片,只够机器学习两者之间的差异并正确识别新图片。这里的问题可能是这样的:学习像区分猫和狗这样简单的事情需要这么多的图片吗?机器面临的挑战是,它们能够仅从一些图像中学习整个模式或抽象特征;他们需要足够多的例子(在某些方面有所不同)来学习尽可能多的特征,以便能够做出正确的预测,而作为人类,我们有这种惊人的能力在不同的层次上进行抽象,并容易识别物体。这个例子可能是针对图像识别的,但是对于其他应用程序,机器也需要大量的数据来学习。
机器学习是过去几年谈论最多的话题之一。越来越多的企业希望采用信息技术来保持竞争优势;然而,很少有人真正拥有合适的资源和适当的数据来实现它。在本章中,我们将涵盖机器学习的基本类型,以及企业如何从使用机器学习中受益。
互联网上有大量关于机器学习的定义,尽管如果我可以尝试用简单的术语来描述,它看起来会像这样:
- 机器学习使用统计技术,有时使用高级算法来进行预测或学习数据中的隐藏模式,并从本质上取代基于规则的系统,使数据驱动的系统更加强大。
让我们详细地过一遍这个定义。顾名思义,机器学习就是让机器学习,尽管当我们谈论让机器学习时,会涉及到许多组件。
一个组成部分是数据,它是任何模型的支柱。机器学习在相关数据上蓬勃发展。数据中的信号越多,预测就越准确。机器学习可以应用于不同的领域,如金融、零售、医疗保健和社交媒体。另一部分是算法。基于我们试图解决的问题的性质,我们相应地选择算法。最后一部分由硬件和软件组成。Spark 和 Tensorflow 等开源分布式计算框架的出现使得机器学习对每个人来说都变得更加容易。当场景有限时,基于规则的系统就出现了,所有的规则都可以手动配置来处理这些情况。最近,这种情况有所改变,特别是场景数部分。例如,欺诈发生的方式在过去几年中发生了巨大的变化,因此为这种情况创建手动规则实际上是不可能的。因此,在从数据中学习并适应新数据并做出相应决策的场景中,机器学习正在发挥作用。事实证明,这对每个人都有巨大的商业价值。
让我们看看不同类型的机器学习及其应用。我们可以将机器学习分为四大类:
-
监督机器学习
-
无监督机器学习
-
半监督机器学习
-
强化学习
上述每个类别都有特定的用途,使用的数据也互不相同。归根结底,机器学习是从数据(历史或实时)中学习,并基于模型训练做出决策(离线或实时)。
监督机器学习
这是机器学习的主要类别,为企业带来了大量应用和价值。在监督学习中,我们在标记的数据上训练我们的模型。通过标记,它意味着数据有正确的答案或结果。让我们举一个例子来说明监督学习。如果有一家金融公司希望在接受他们的贷款请求之前根据他们的档案筛选客户,机器学习模型将根据历史数据进行训练,这些数据包含有关过去客户的档案和客户是否拖欠贷款的标签列的信息。样本数据如表 2-1 所示。
表 2-1
客户详细信息
|客户 ID
|
年龄
|
性别
|
薪水
|
贷款数量
|
作业类型
|
贷款违约
|
| --- | --- | --- | --- | --- | --- | --- |
| 到 23 号 | Thirty-two | M | 80K | one | 永久的 | 不 |
| AX43 | Forty-five | F | 10.5 万 | Two | 永久的 | 不 |
| BG76 | Fifty-one | M | 75K | three | 合同 | 是 |
在监督学习中,模型从也有标签/结果/目标列的训练数据中学习,并使用它对看不见的数据进行预测。在上面的示例中,年龄、性别和薪金等列被称为属性或功能,而最后一列(贷款违约)被称为目标或标签,模型试图对其进行预测以获取不可见的数据。一个包含所有这些值的完整记录称为观察值。该模型需要足够多的观察数据来进行训练,然后根据类似的数据进行预测。在监督学习中,模型至少需要一个输入要素/属性才能与输出列一起接受训练。机器能够从训练数据中学习的原因是因为潜在的假设,即这些输入特征中的一些单独地或组合地对输出列有影响(贷款违约)。
有许多应用程序使用监督学习设置,例如:
案例 1:是否有特定的客户会购买该产品?
案例 2:访问者是否会点击广告?
案例 3:这个人是否会拖欠贷款?
案例 4:给定房产的预期售价是多少?
案例五:如果这个人是不是恶性肿瘤?
以上是监督学习的一些应用,还有很多。使用的方法有时会因模型试图预测的输出类型而异。如果目标标签是分类类型,则它属于分类类别;如果目标特征是一个数值,它将属于回归类别。一些受监督的 ML 算法如下:
-
线性回归
-
逻辑回归
-
支持向量机
-
朴素贝叶斯分类器
-
决策树
-
组装方法
监督学习的另一个特性是可以评估模型的性能。基于模型的类型(分类/回归/时间序列),可以应用评估指标,并且可以测量性能结果。这主要通过将训练数据分成两组(训练集和验证集)并在训练集上训练模型并在验证集上测试其性能来实现,因为我们已经知道验证集的正确标签/结果。然后,我们可以更改超参数(在后面的章节中介绍)或使用特征工程引入新的特征来提高模型的性能。
无监督机器学习
在无监督学习中,我们在类似种类的数据上训练模型,除了这个数据集不包含任何标签或结果/目标列。本质上,我们在没有任何正确答案的情况下根据数据训练模型。在无监督学习中,机器试图在数据中找到隐藏的模式和有用的信号,这些数据可以在以后用于其他应用。其中一个用途是在客户数据中寻找模式,并将客户分组到代表某些属性的不同聚类中。例如,我们来看看表 2-2 中的一些客户数据。
表 2-2
客户详细信息
|客户 ID
|
歌曲类型
|
| --- | --- |
| AS12 | 浪漫的 |
| BX54 | 嘻哈音乐 |
| BX54 | 岩石 |
| AS12 | 岩石 |
| CH87 | 嘻哈音乐 |
| CH87 | 经典的 |
| AS12 | 岩石 |
在上面的数据中,我们有客户和他们喜欢的音乐类型,没有任何目标或输出列,只有客户和他们的音乐偏好数据。
我们可以使用无监督学习,将这些客户分组到有意义的集群中,以更多地了解他们的群体偏好,并采取相应的行动。我们可能必须将数据集调整为其他形式,以实际应用无监督学习。我们简单地计算每个客户的价值,看起来就像表 2-3 所示。
表 2-3
客户详细信息
|客户 ID
|
浪漫的
|
嘻哈音乐
|
岩石
|
经典的
|
| --- | --- | --- | --- | --- |
| AS12 | one | Zero | Two | Zero |
| BX54 | Zero | one | one | Zero |
| CH87 | Zero | one | Zero | one |
现在,我们可以形成一些有用的用户组,并应用这些信息来推荐和制定基于集群的策略。我们当然可以提取的信息是,哪些客户在偏好方面是相似的,并且可以从内容的角度作为目标。
图 2-1
聚类后无监督学习
如图 2-1 所示,聚类 A 可以属于只喜欢摇滚的客户,聚类 B 可以属于喜欢浪漫&古典音乐的人,最后一个聚类可能属于嘻哈和摇滚爱好者。无监督学习的另一个用途是发现是否有任何异常活动或异常检测。无监督学习有助于确定数据集中的例外。大多数时候,无监督学习可能非常棘手,因为没有清晰的组或多个组之间的重叠值,这不能给出聚类的清晰图像。例如,如图 2-2 所示,数据中没有清晰的分组,无监督学习无法帮助形成真正有意义的数据点聚类。
图 2-2
重叠集群
有许多应用程序使用无监督学习设置,例如
案例 1:总客户群中有哪些不同的群体?
案例二:这个交易是异常还是正常?
无监督学习中使用的算法有
-
聚类算法(K 均值,分层)
-
维度缩减技术
-
主题建模
-
关联规则挖掘
无监督学习的整个思想是发现和找出模式,而不是做出预测。因此,无监督学习主要在两个方面不同于有监督学习。
-
没有标注的训练数据,也没有预测。
-
模型在无监督学习中的性能无法评估,因为没有标签或正确答案。
半监督学习
顾名思义,半监督学习介于监督学习和非监督学习之间。事实上,它使用了这两种技术。这种类型的学习主要与我们处理混合类型的数据集的场景相关,这种数据集包含有标签和无标签的数据。有时它只是完全未标记的数据,但我们手动标记了其中的一部分。可以对这一小部分标记数据使用半监督学习来训练模型,然后使用它来标记数据的其他剩余部分,然后可以将其用于其他目的。这也称为伪标记,因为它标记了未标记的数据。举一个简单的例子,我们有很多来自社交媒体的不同品牌的图片,其中大部分都没有标签。现在使用半监督学习,我们可以手动标记这些图像中的一些,然后在标记的图像上训练我们的模型。然后,我们使用模型预测来标记剩余的图像,以将未标记的数据完全转换为标记的数据。
半监督学习的下一步是在整个标记数据集上重新训练模型。它提供的优势是模型在更大的数据集上进行训练,这在早期是不存在的,现在更健壮,更擅长预测。另一个优势是半监督学习节省了大量人工标记数据的精力和时间。这样做的另一面是,伪标记很难获得高性能,因为它使用一小部分标记数据来进行预测。然而,这仍然是比手动标记数据更好的选择,手动标记数据可能非常昂贵且耗时。
强化学习
是第四种也是最后一种学习,在数据使用及其预测方面略有不同。强化学习本身就是一个很大的研究领域,整本书都可以围绕它来写。我们不会对此进行太深入的探讨,因为这本书更侧重于使用 PySpark 构建机器学习模型。其他类型的学习和强化学习之间的主要区别是,我们需要数据,主要是历史数据来训练模型,而强化学习是在奖励系统上工作的。它主要是基于代理采取的改变其状态以最大化回报的某些行动的决策。让我们使用可视化将它分解为单个元素。
-
自主主体:这是整个学习过程中负责采取行动的主要角色。如果是一个游戏,代理采取行动来完成或达到最终目标。
-
操作:这些是代理为了在任务中前进可以采取的一组可能的步骤。每个动作都会对代理的状态产生一些影响,并且可能导致奖励或惩罚。例如,在网球比赛中,动作可能是发球、回球、向左或向右移动等。
-
奖励:这是强化学习取得进步的关键。奖励使代理能够根据积极的奖励或惩罚采取行动。它是一种反馈机制,区别于传统的监督和非监督学习技术
-
环境:这是代理可以参与的领域。环境决定了行动者采取的行动是奖励还是惩罚。
-
状态:代理在任何给定时间点所处的位置定义了代理的状态。为了前进或达到最终目标,代理人必须不断地朝积极的方向改变状态,以使回报最大化。
强化学习的独特之处在于,有一个反馈机制,基于总贴现报酬最大化来驱动代理人的下一个行为。一些使用强化学习的突出应用是自动驾驶汽车、优化能耗和游戏领域。然而,它也可以用来建立推荐系统。
结论
在这一章中,我们简要地看了不同类型的机器学习方法和一些应用。在接下来的章节中,我们将使用 PySpark 详细研究监督和非监督学习。
三、数据处理
本章试图涵盖使用 PySpark 处理和分析数据的所有主要步骤。尽管我们在本节中考虑的数据规模相对较小,但是使用 PySpark 处理大型数据集的步骤完全相同。数据处理是执行机器学习所需的关键步骤,因为我们需要清理、过滤、合并和转换我们的数据,使其成为所需的形式,以便我们能够训练机器学习模型。我们将利用多个 PySpark 函数来执行数据处理。
加载和读取数据
假设我们已经安装了 Spark 2.3 版,为了使用 Spark,我们首先从导入和创建SparkSession
对象开始。
[In]: from pyspark.sql import SparkSession
[In]: spark=SparkSession.builder.appName('data_processing').getOrCreate()
[In]: df=spark.read.csv('sample_data.csv',inferSchema=True,header=True)
我们需要确保数据文件位于我们打开 PySpark 的同一个文件夹中,或者我们可以指定数据所在文件夹的路径以及数据文件名。我们可以用 PySpark 读取多种数据文件格式。我们只需要根据文件格式(csv、JSON、parquet、table、text)更新读取格式参数。对于制表符分隔的文件,我们需要在读取文件时传递一个额外的参数来指定分隔符(sep='\t'
)。将参数inferSchema
设置为 true 表示 Spark 将在后台自己推断数据集中值的数据类型。
上面的命令使用示例数据文件中的值创建了一个 spark 数据帧。我们可以认为这是一个带有列和标题的表格格式的 Excel 电子表格。我们现在可以在这个 Spark 数据帧上执行多个操作。
[In]: df.columns
[Out]: ['ratings', 'age', 'experience', 'family', 'mobile']
我们可以使用“columns”方法打印数据帧中的列名列表。如我们所见,我们的数据框架中有五列。为了验证列数,我们可以简单地使用 Python 的length
函数。
[In]: len(df.columns)
[Out]: 5
我们可以使用count
方法来获得数据帧中的记录总数:
[In]: df.count
[Out] : 33
我们的数据框架中共有 33 条记录。在进行预处理之前,最好打印出数据帧的形状,因为它给出了行和列的总数。Spark 中没有任何检查数据形状的直接函数;相反,我们需要结合列的数量和长度来打印形状。
[In]: print((df.count),(len(df.columns))
[Out]: ( 33,5)
查看数据框中列的另一种方法是 spark 的printSchema
方法。它显示了列的数据类型以及列名。
[In]:df.printSchema()
[Out]: root
|-- ratings: integer (nullable = true)
|-- age: integer (nullable = true)
|-- experience: double (nullable = true)
|-- family: double (nullable = true)
|-- Mobile: string (nullable = true)
nullable
属性指示对应的列是否可以采用空值(true)或不采用空值(false)。我们还可以根据需要改变列的数据类型。
下一步是先睹为快,查看数据帧的内容。我们可以使用 Spark show
方法来查看数据帧的顶行。
[In]: df.show(3)
[Out]:
自从我们在show
方法中传递了值 5 之后,我们只能看到五条记录和所有的五列。为了只查看某些列,我们必须使用select
方法。让我们只查看两列(年龄和手机):
[In]: df.select('age','mobile').show(5)
[Out]:
Select
函数仅从数据帧中返回两列和五条记录。在本章中,我们将继续使用select
函数。下一个要使用的函数是用于分析数据帧的describe
。它返回数据帧中每一列的统计度量。我们将再次使用 show 和 describe,因为 describe 将结果作为另一个数据帧返回。
[In]: df.describe().show()
[Out]:
对于数字列,它返回中心的度量值,并随计数一起传播。对于非数字列,它显示计数以及最小值和最大值,这些值基于这些字段的字母顺序,并不表示任何实际意义。
添加新列
我们可以使用 spark 的withColumn
函数在 dataframe 中添加一个新列。让我们通过使用age
列向我们的数据框架添加一个新列(10 年后的年龄)。我们简单地给age
列中的每个值加上 10 年。
[In]: df.withColumn("age_after_10_yrs",(df["age"]+10)).show(10,False)
[Out]:
正如我们所观察到的,我们在数据帧中有了一个新列。show
函数帮助我们查看新的列值,但是为了将新的列添加到数据帧中,我们需要将其分配给一个现有的或新的数据帧。
[In]: df= df.withColumn("age_after_10_yrs",(df["age"]+10))
这一行代码确保更改发生,并且 dataframe 现在包含新列(10 年后的年龄)。
要将age
列的数据类型从 integer 改为 double,我们可以使用 Spark 中的cast
方法。我们需要从pyspark.types:
导入DoubleType
[In]: from pyspark.sql.types import StringType,DoubleType
[In]: df.withColumn('age_double',df['age'].cast(DoubleType())).show(10,False)
[Out]:
因此,上面的命令创建了一个新列(age_double
),它将年龄值从整数转换为双精度类型。
过滤数据
根据条件筛选记录是处理数据时的常见要求。这有助于清理数据并仅保留相关记录。PySpark 中的过滤非常简单,可以使用filter
函数来完成。
条件 1
这是仅基于数据帧的一列的最基本的过滤类型。假设我们只想获取“Vivo”手机的记录:
[In]: df.filter(df['mobile']=='Vivo').show()
[Out]:
我们有所有Mobile
列有‘Vivo’值的记录。在筛选记录后,我们可以进一步只选择几列。例如,如果我们想查看使用“Vivo”手机的人的年龄和评级,我们可以在过滤记录后使用select
功能来完成。
[In]: df.filter(df['mobile']=='Vivo').select('age','ratings','mobile').show()
[Out]:
条件 2
这涉及基于多列的筛选,并且仅当满足所有条件时才返回记录。这可以通过多种方式实现。比方说,我们只想过滤“Vivo”用户和那些拥有 10 年以上经验的用户。
[In]: df.filter(df['mobile']=='Vivo').filter(df['experience'] >10).show()
[Out]:
为了将这些条件应用于各个列,我们使用了多个筛选函数。还有另一种方法可以达到同样的效果,如下所述。
[In]: df.filter((df['mobile']=='Vivo')&(df['experience'] >10)).show()
[Out]:
列中的不同值
如果我们想要查看任何 dataframe 列的不同值,我们可以使用distinct
方法。让我们查看数据帧中 m obile
列的不同值。
[In]: df.select('mobile').distinct().show()
[Out]:
为了获得列中不同值的计数,我们可以简单地使用count
和distinct
函数。
[In]: df.select('mobile').distinct().count()
[Out]: 5
分组数据
Grouping
is a
非常有用的理解数据集各个方面的方法。它有助于根据列值对数据进行分组,并提取洞察力。它还可以与其他多种功能一起使用。让我们看一个使用数据帧的groupBy
方法的例子。
[In]: df.groupBy('mobile').count().show(5,False)
[Out]:
这里,我们根据 m obile
列中的分类值对所有记录进行分组,并使用count
方法计算每个类别的记录数。我们可以通过使用orderBy
方法按照定义的顺序对这些结果进行排序,从而进一步细化这些结果。
[In]: df.groupBy('mobile').count().orderBy('count',ascending=False).show(5,False)
[Out]:
现在,mobiles
的计数根据每个类别按降序排序。
我们还可以应用groupBy
方法来计算统计度量,例如每个类别的平均值、总和、最小值或最大值。让我们看看其余列的平均值是多少。
[In]: df.groupBy('mobile').mean().show(5,False)
[Out]:
mean
方法给出了每个手机品牌的平均年龄、评级、体验和家庭规模栏。我们也可以通过使用sum
方法和groupBy
来获得每个移动品牌的总和。
[In]: df.groupBy('mobile').sum().show(5,False)
[Out]:
现在让我们来看看每个手机品牌的用户数据的最小值和最大值。
[In]: df.groupBy('mobile').max().show(5,False)
[Out]:
[In]:df.groupBy('mobile').min().show(5,False)
[Out]:
聚集
我们也可以使用agg
函数来获得与上面类似的结果。让我们使用 PySpark 中的agg
函数来简单地计算每个手机品牌的总体验。
[In]: df.groupBy('mobile').agg({'experience':'sum'}).show(5,False)
[Out]:
因此,这里我们只需使用agg
函数,并传递我们希望进行聚合的列名(experience)。
用户定义函数(UDF)
UDF 广泛用于数据处理中,对数据帧进行某些变换。PySpark 有两种 UDF:传统的 UDF 和熊猫 UDF。熊猫 UDF 在速度和处理时间方面更强大。我们将看到如何在 PySpark 中使用这两种类型的 UDF。首先,我们必须从 PySpark 函数中导入udf
。
[In]: from pyspark.sql.functions import udf
现在,我们可以通过使用 lambda 或典型的 Python 函数来应用基本的 UDF。
传统 Python 函数
我们创建了一个简单的 Python 函数,它根据移动品牌返回价格范围的类别:
[In]:
def price_range(brand):
if brand in ['Samsung','Apple']:
return 'High Price'
elif brand =='MI':
return 'Mid Price'
else:
return 'Low Price'
在下一步中,我们创建一个 UDF ( brand_udf
),它使用这个函数并捕获它的数据类型,以便将这个转换应用到 dataframe 的移动列上。
[In]: brand_udf=udf(price_range,StringType())
在最后一步,我们将udf(brand_udf)
应用到 dataframe 的 m obile
列,并创建一个具有新值的新列(price_range
)。
[In]: df.withColumn('price_range',brand_udf(df['mobile'])).show(10,False)
[Out]:
使用 Lambda 函数
不用定义传统的 Python 函数,我们可以利用 lambda 函数,用一行代码创建一个 UDF,如下所示。我们根据用户的年龄将年龄列分为两组(young
、senior
)。
[In]: age_udf = udf(lambda age: "young" if age <= 30 else "senior", StringType())
[In]: df.withColumn("age_group", age_udf(df.age)).show(10,False)
[Out]:
熊猫 UDF(矢量化 UDF)
如前所述,熊猫 UDF 比它们的同类更快更高效。有两种类型的熊猫 UDF:
-
数量
-
分组地图
使用熊猫 UDF 非常类似于使用基本的 UDF。我们必须首先从 PySpark 函数导入pandas_udf
,并将其应用于任何要转换的特定列。
[In]: from pyspark.sql.functions import pandas_udf
在本例中,我们定义了一个 Python 函数,用于计算假设预期寿命为 100 岁的用户的剩余寿命。这是一个非常简单的计算:我们使用 Python 函数从 100 中减去用户的年龄。
[In]:
def remaining_yrs(age):
yrs_left=(100-age)
return yrs_left
[In]: length_udf = pandas_udf(remaining_yrs, IntegerType())
一旦我们使用 Python 函数(remaining_yrs)创建了熊猫 UDF (length _udf
),我们就可以将其应用到age
列并创建一个新列 yrs_left。
[In]:df.withColumn("yrs_left", length_udf(df['age'])).show(10,False)
[Out]:
熊猫 UDF(多列)
我们可能会遇到这样的情况,我们需要多个列作为输入来创建一个新列。因此,下面的例子展示了在数据帧的多列上应用熊猫 UDF 的方法。在这里,我们将创建一个新列,它只是 ratings 和 experience 列的乘积。像往常一样,我们定义一个 Python 函数,并计算两列的乘积。
[In]:
def prod(rating,exp):
x=rating*exp
return x
[In]: prod_udf = pandas_udf(prod, DoubleType())
创建熊猫 UDF 后,我们可以将它应用于两个列(ratings
、experience
)以形成新列(product
)。
[In]:df.withColumn("product",prod_udf(df['ratings'],df['experience'])).show(10,False)
[Out]:
删除重复值
我们可以使用dropDuplicates
方法从数据帧中删除重复的记录。该数据帧中的记录总数为 33,但它还包含 7 个重复记录,这可以通过删除这些重复记录来轻松确认,因为我们只剩下 26 行。
[In]: df.count()
[Out]: 33
[In]: df=df.dropDuplicates()
[In]: df.count()
[Out]: 26
删除列
我们可以利用drop
函数从数据帧中删除任何列。如果我们想从 dataframe 中删除 m obile
列,我们可以将它作为一个参数传递给drop
函数。
[In]: df_new=df.drop('mobile')
[In]: df_new.show()
[Out]:
写入数据
一旦我们完成了处理步骤,我们就可以以所需的格式将干净的数据帧写入所需的位置(本地/云)。
战斗支援车
如果我们想把它保存回原来的 csv 格式作为单个文件,我们可以使用 spark 中的coalesce
函数。
[In]: pwd
[Out]: ' /home/jovyan/work '
[In]: write_uri= ' /home/jovyan/work/df_csv '
[In]: df.coalesce(1).write.format("csv").option("header","true").save(write_uri)
镶木地板
如果数据集很大,并且包含很多列,我们可以选择压缩它,并将其转换为 parquet 文件格式。它减少了数据的总体大小,并在处理数据时优化了性能,因为它处理所需列的子集,而不是整个数据。我们可以很容易地将数据帧转换并保存为拼花格式,只需将格式命名为如下所示的parquet
。
[In]: parquet_uri='/home/jovyan/work/df_parquet'
[In]: df.write.format('parquet').save(parquet_uri)
注意
完整的数据集和代码可以在本书的 GitHub repo 上参考,在 Spark 2.3 和更高版本上执行得最好。
结论
在本章中,我们熟悉了一些使用 PySpark 处理和转换数据的函数和技术。使用 PySpark 对数据进行预处理的方法还有很多,但是本章已经介绍了为机器学习清理和准备数据的基本步骤。
四、线性回归
正如我们在前一章中谈到的机器学习,这是一个非常广阔的领域,有多种算法属于不同的类别,但线性回归是最基本的机器学习算法之一。本章着重于用 PySpark 构建一个线性回归模型,并深入研究 LR 模型的工作原理。它将涵盖在使用 LR 以及不同评估指标之前需要考虑的各种假设。但是在试图理解线性回归之前,我们必须了解变量的类型。
变量
变量以不同的形式捕获数据信息。主要有两类广泛使用的变量,如图 4-1 所示。
图 4-1
变量的类型
我们甚至可以将这些变量进一步细分为子类别,但在本书中我们将坚持这两种类型。
数字变量是那些本质上是定量的值,比如数字(整数/浮点数)。例如,工资记录、考试分数、一个人的年龄或身高以及股票价格都属于数值变量的范畴。
另一方面,分类变量本质上是定性的,主要代表被测量数据的类别。例如,颜色、结果(是/否)、评级(好/差/平均)。
为了建立任何类型的机器学习模型,我们需要输入和输出变量。输入变量是用于建立和训练机器学习模型以预测输出或目标变量的那些值。我们举个简单的例子。假设我们想预测一个人的工资,给定这个人的年龄,使用机器学习。在这种情况下,工资是我们的输出/目标/因变量,因为它取决于年龄,这被称为输入或自变量。现在,输出变量本质上可以是分类的或数字的,并且根据其类型,选择机器学习模型。
现在回到线性回归,它主要用于我们试图预测数字输出变量的情况。线性回归用于预测符合输入数据的直线,指出可能的最佳方式,并有助于预测看不见的数据,但这里要注意的一点是,模型如何仅从“年龄”中学习并预测给定人员的工资金额?当然,这两个变量(工资和年龄)之间需要有某种关系。变量关系有两种主要类型:
-
线性的
-
非线性的
任何两个变量之间的线性关系的概念表明两者在某些方面是成比例的。任何两个变量之间的相关性给了我们一个指示,表明它们之间的线性关系有多强或多弱。相关系数的范围从-1 到+ 1。负相关意味着通过增加一个变量,另一个变量减少。例如,车辆的功率和里程可能是负相关的,因为当我们增加功率时,车辆的里程会下降。另一方面,工资和工作年限是正相关变量的一个例子。非线性关系本质上比较复杂,因此需要额外的细节来预测目标变量。比如自动驾驶汽车,地形、信号系统、行人等输入变量与汽车速度的关系是非线性的。
注意
下一节包括线性回归背后的理论,对许多读者来说可能是多余的。如果是这种情况,请随意跳过这一部分。
理论
既然我们已经了解了变量的基本知识和它们之间的关系,让我们以年龄和工资为例来深入理解线性回归。
线性回归的总体目标是通过数据预测一条直线,使得这些点中的每一个与该直线的垂直距离最小。因此,在这种情况下,我们将预测给定年龄的人的工资。假设我们有四个人的记录,记录了他们的年龄和各自的工资,如表 4-1 所示。
表 4-1
示例数据集
|-你好。不,不
|
年龄
|
薪金(' 0000 美元)
|
| --- | --- | --- |
| one | Twenty | five |
| Two | Thirty | Ten |
| three | Forty | Fifteen |
| four | Fifty | Twenty-two |
我们有一个可以利用的输入变量(年龄)来预测工资(我们将在本书的后面部分介绍),但是让我们后退一步。让我们假设开始时我们所拥有的只是这四个人的工资值。在图 4-2 中绘制了每个人的工资。
图 4-2
工资散点图
现在,如果我们要根据前面几个人的工资来预测第五个人(新人)的工资,最好的预测方法是取现有工资值的平均值。根据这些信息,这将是最好的预测。这就像建立一个机器学习模型,但没有任何输入数据(因为我们使用输出数据作为输入数据)。
让我们继续计算这些给定工资值的平均工资。
平均值。薪水= = 13
所以,下一个人的工资值的最佳预测是 13。图 4-3 展示了每个人的工资值以及平均值(仅使用一个变量时的最佳拟合线)。
图 4-3
最佳拟合线图
图 4-3 所示的平均值线可能是这些数据点在这种情况下的最佳拟合线,因为除了工资本身,我们没有使用任何其他变量。如果我们仔细观察,没有一个早期的工资值位于这条最佳拟合线上;如图 4-4 所示,这似乎与平均工资值有一定的差距。这些也被称为错误。如果我们继续计算这个距离的总和并将它们相加,它变成 0,这是有意义的,因为它是所有数据点的平均值。因此,我们不是简单地将它们相加,而是将每个误差平方,然后将它们相加。
图 4-4
残差图
误差平方和= 64 + 9 + 4 + 81 = 158。
因此,将残差平方相加,得出的总值为 158,即误差平方和(SSE)。
注意
到目前为止,我们还没有使用任何输入变量来计算 SSE。
让我们暂时搁置这个分数,并加入输入变量(人的年龄)来预测这个人的工资。让我们从图 4-5 所示的人的年龄和工资之间的关系开始。
图 4-5
工资与年龄的相关图
正如我们所观察到的,工作经验年限和工资值之间似乎存在明显的正相关关系,这对我们来说是一件好事,因为它表明,由于投入(年龄)和产出(工资)之间存在很强的线性关系,该模型将能够以很高的准确度预测目标值(工资)。如前所述,线性回归的总体目标是得出一条符合数据点的直线,使实际目标值和预测值之间的平方差最小。因为它是一条直线,我们知道在线性代数中直线的方程是
y= mx + c,同样如图 4-6 所示。
图 4-6
直线图
在哪里,
m =直线的斜率()
x =轴上的值
y =轴上的值
c =截距(x = 0 时 y 的值)
由于线性回归也是找出直线,线性回归方程变成
(因为我们仅使用 1 个输入变量,即年龄)
其中:
y=工资(预测)
B0 =截距(年龄为 0 时的工资值)
B1=工资的斜率或系数
x=年龄
现在,你可能会问,是否可以通过数据点绘制多条线(如图 4-7 )以及如何计算出哪条线是最佳拟合线。
图 4-7
穿过数据的可能直线
找出最佳拟合线的第一个标准是它应该通过数据点的质心,如图 4-8 所示。在我们的例子中,质心值为
平均年龄= = 35 岁
平均值(工资)= = 13
图 4-8
数据的质心
第二个标准是它应该能够最小化误差平方和。我们知道我们的回归线方程等于
现在,使用线性回归的目的是得出截距( B 0 )和系数( B 1 )的最佳值,使得残差/误差最小化到最大程度。
通过使用下面的公式,我们可以很容易地找出数据集的值B0&B1。
B1=
b【0】=和 意思是-【b】
表 4-2 展示了使用输入数据计算线性回归的斜率和截距。
表 4-2
斜率和截距的计算
|年龄
|
薪水
|
年龄差异(不同于平均值)
|
薪资差异(差异。来自平均值)
|
协方差(乘积)
|
年龄差异(平方)
|
| --- | --- | --- | --- | --- | --- |
| Twenty | five | -15 | -8 | One hundred and twenty | Two hundred and twenty-five |
| Thirty | Ten | -5 | -3 | Fifteen | Twenty-five |
| Forty | Fifteen | five | Two | Ten | Twenty-five |
| Fifty | Twenty-two | Fifteen | nine | One hundred and thirty-five | Two hundred and twenty-five |
平均年龄= 35 岁
平均工资=13
任何两个变量(年龄和工资)之间的协方差被定义为每个变量(年龄和工资)与其平均值之间的距离的乘积。简而言之,年龄和工资方差的乘积称为协方差。现在我们有了协方差积和年龄方差的平方值,我们可以继续计算线性回归线的斜率和截距值:
B1=
=
=0.56
B0= 13-(0.56 * 35)
= -6.6
我们最终的线性回归方程变成
*工资= -6.6 + (0.56 年龄)
我们现在可以用这个等式预测任何给定年龄的工资值。例如,该模型会预测第一个人的工资如下:
薪金(第一人)= -6.6 + (0.56*20)
= 4.6 ($ ‘0000)
解释
Slope ( B 1 = 0.56)这里的意思是,人的年龄每增加 1 岁,工资也会增加 5600 美元。
截距并不总是从其值中推导出意义。就像在这个例子中,负 6.6 的值表明如果这个人还没有出生(年龄=0),那么这个人的工资将是负 66,000 美元。
图 4-9 显示了我们数据集的最终回归线。
图 4-9
回归线
让我们使用回归方程预测数据中所有四条记录的工资,并比较与实际工资的差异,如表 4-3 所示。
表 4-3
预测值和实际值之间的差异
|年龄
|
薪水
|
预计工资
|
差异/误差
|
| --- | --- | --- | --- |
| Twenty | five | Four point six | -0.4 |
| Thirty | Ten | Ten point two | Zero point two |
| Forty | Fifteen | Fifteen point eight | Zero point eight |
| Fifty | Twenty-two | Twenty-one point four | -0.6 |
简而言之,线性回归得出截距( B 0 )和系数( B 1 , B 2 )的最优值,使得预测值和目标变量之间的差异(误差)最小。
但问题仍然是:这是一个很好的适合吗?
估价
评价回归线的拟合优度有多种方法,但其中一种方法是利用决定系数( r 平方 )值。请记住,当我们仅使用输出变量本身并且其值为 158 时,我们已经计算了误差平方和。现在让我们重新计算这个模型的 SSE,它是我们使用一个输入变量构建的。表 4-4 显示了使用线性回归后新 SSE 的计算。
表 4-4
使用线性回归后上证指数的下降
|年龄
|
薪水
|
预计工资
|
差异/误差
|
平方误差
|
旧上证所
|
| --- | --- | --- | --- | --- | --- |
| Twenty | five | Four point six | -0.4 | Zero point one six | Sixty-four |
| Thirty | Ten | Ten point two | Zero point two | Zero point zero four | nine |
| Forty | Fifteen | Fifteen point eight | Zero point eight | Zero point six four | four |
| Fifty | Twenty-two | Twenty-one point four | -0.6 | Zero point three six | Eighty-one |
正如我们所观察到的,平方差的总和从 158 显著减少到只有 1.2,这是因为使用了线性回归。目标变量(工资)的变化可以借助回归来解释(由于使用了输入变量——年龄)。因此,OLS 致力于减少误差平方和。误差平方和是两种类型的组合:
TSS(总误差平方和)= SSE(误差平方和)+ SSR(剩余误差平方和)
总平方和是实际值和平均值之间的平方差之和,并且总是固定的。在我们的示例中,这等于 158。
SSE 是目标变量的实际值与预测值的平方差,在使用线性回归后,该值减少到 1.2。
SSR 是回归解释的平方和,可以通过(TSS–SSE)计算。
SSR = 158–1.2 = 156.8
r 平方 (决定系数)=== 0.99
这一百分比表明,我们的线性回归模型能够以 99 %的准确度预测给定人员年龄的工资金额。另外 1%可以归因于模型无法解释的误差。我们的线性回归线非常适合模型,但它也可能是过度拟合的情况。当您的模型在训练数据上预测精度很高,但在未知/测试数据上性能下降时,就会发生过度拟合。解决过拟合问题的技术被称为正则化,并且有不同类型的正则化技术。就线性回归而言,可以使用脊、套索或弹性网正则化技术来处理过度拟合。
岭回归也称为 L2 正则化,其重点是将输入要素的系数值限制为接近于零,而拉索回归(L1)则使某些系数为零,以提高模型的概化能力。Elasticnet 是这两种技术的结合。
说到底,回归仍然是一种参数驱动的方法,并且假设关于输入数据点分布的基本模式很少。如果输入数据不符合这些假设,则线性回归模型表现不佳。因此,在使用线性回归模型之前,快速浏览这些假设以了解它们是很重要的。
假设:
-
输入变量和输出变量之间必须有线性关系。
-
独立变量(输入要素)不应相互关联(也称为多重共线性)。
-
残差/误差值之间必须没有相关性。
-
残差和输出变量之间必须有线性关系。
-
残差/误差值必须呈正态分布。
密码
本章的这一节着重于使用 PySpark 和 Jupyter Notebook 从头构建一个线性回归模型。
虽然我们看到了一个简单的例子,只有一个输入变量来理解线性回归,这是很少的情况。大多数情况下,数据集会包含多个变量,因此在这种情况下构建多变量回归模型更有意义。线性回归方程看起来像这样:
注意
完整的数据集和代码可以在本书的 GitHub repo 上参考,在 Spark 2.3 和更高版本上执行得最好。
让我们使用 Spark 的 MLlib 库构建一个线性回归模型,并使用输入特性预测目标变量。
数据信息
我们将在本例中使用的数据集是一个虚拟数据集,总共包含 1,232 行和 6 列。我们必须使用 5 个输入变量,通过线性回归模型来预测目标变量。
步骤 1:创建 SparkSession 对象
我们启动 Jupyter 笔记本并导入 SparkSession,然后创建一个新的 SparkSession 对象来使用 Spark:
[In]: from pyspark.sql import SparkSession
[In]: spark=SparkSession.builder.appName('lin_reg').getOrCreate()
步骤 2:读取数据集
然后,我们使用 Dataframe 在 Spark 中加载和读取数据集。我们必须确保我们已经从数据集可用的同一个目录文件夹中打开了 PySpark,否则我们必须提到数据文件夹的目录路径:
[In]: df=spark.read.csv('Linear_regression_dataset.csv',inferSchema=True,header=True)
步骤 3:探索性数据分析
在本节中,我们将通过查看数据集、验证数据集的形状、各种统计测量以及输入和输出变量之间的相关性来更深入地研究数据集。我们从检查数据集的形状开始。
[In]:print((df.count(), len(df.columns)))
[Out]: (1232, 6)
上面的输出确认了数据集的大小,我们可以验证输入值的数据类型,以检查我们是否需要更改/转换任何列的数据类型。在此示例中,所有列都包含整数值或双精度值。
[In]: df.printSchema()
[Out]:
总共有六列,其中五列是输入列(var_1
到var_5
)和目标列(输出)。我们现在可以使用describe
函数来检查数据集的统计度量。
[In]: df.describe().show(3,False)
[Out]:
这使我们能够对数据集列的分布、中心测量和分布有所了解。然后,我们使用 head 函数查看数据集,并传递我们想要查看的行数。
[In]: df.head(3)
[Out]:
我们可以使用 corr 函数检查输入变量和输出变量之间的相关性:
[In]: from pyspark.sql.functions import corr
[In]: df.select(corr('var_1','output')).show()
[Out] :
var_1
似乎与输出列的相关性最强。
步骤 4:特征工程
这是我们使用 Spark 的 VectorAssembler 创建一个组合所有输入特征的单一向量的部分。它仅创建一个要素来捕获该行的输入值。因此,它不是五个输入列,而是将所有输入列合并成一个特征向量列。
[In]: from pyspark.ml.linalg import Vector
[In]: from pyspark.ml.feature import VectorAssembler
用户可以选择用作输入特征的列数,并且只能通过 VectorAssembler 传递这些列。在我们的例子中,我们将传递所有五个输入列来创建一个单独的特征向量列。
[In]: df.columns
[Out]: ['var_1', 'var_2', 'var_3', 'var_4', 'var_5', 'output']
[In]: vec_assmebler=VectorAssembler(inputCols=['var_1', 'var_2', 'var_3', 'var_4', 'var_5'],outputCol='features')
[In]: features_df=vec_assmebler.transform(df)
[In]: features_df.printSchema()
[Out]:
正如我们可以看到的,我们有一个额外的列(“features”),其中包含所有输入的单个密集向量。
[In]: features_df.select('features').show(5,False)
[Out]:
我们获取数据帧的子集,并仅选择 features 列和 output 列来构建线性回归模型。
[In]: model_df=features_df.select('features','output')
[In]: model_df.show(5,False)
[Out]:
[In]: print((model_df.count(), len(model_df.columns)))
[Out]: (1232, 2)
步骤 5:拆分数据集
我们必须将数据集分成训练和测试数据集,以便训练和评估所建立的线性回归模型的性能。我们将其分成 70/30 的比例,并在 70%的数据集上训练我们的模型。我们可以打印火车的形状和测试数据来验证尺寸。
[In]: train_df,test_df=model_df.randomSplit([0.7,0.3])
[In]: print((train_df.count(), len(train_df.columns)))
[Out]: (882, 2)
[In]: print((test_df.count(), len(test_df.columns)))
[Out]: (350, 2)
步骤 6:建立和训练线性回归模型
在这一部分中,我们使用输入和输出列的功能来构建和定型线性回归模型。我们还可以获取模型的系数(B1,B2,B3,B4,B5)和截距(B0)值。我们还可以使用 r2 评估模型在训练数据上的性能。该模型在训练数据集上给出了非常好的准确度(86%)。
[In]: from pyspark.ml.regression import LinearRegression
[In]: lin_Reg=LinearRegression(labelCol='output')
[In]: lr_model=lin_Reg.fit(train_df)
[In]: print(lr_model.coefficients)
[Out]: [0.000345569740987,6.07805293067e-05,0.000269273376209,-0.713663600176,0.432967466411]
[In]: print(lr_model.intercept)
[Out]: 0.20596014754214345
[In]: training_predictions=lr_model.evaluate(train_df)
[In]: print(training_predictions.r2)
[Out]: 0.8656062610679494
步骤 7:评估测试数据的线性回归模型
整个练习的最后一部分是检查模型在未知或测试数据上的性能。我们使用 evaluate 函数对测试数据进行预测,并可以使用 r2 来检查模型对测试数据的准确性。表现好像和训练差不多。
[In]: test_predictions=lr_model.evaluate(test_df)
[In]: print(test_results.r2)
[Out]: 0.8716898064262081
[In]: print(test_results.meanSquaredError)
[Out]: 0.00014705472365990883
结论
在本章中,我们回顾了使用 PySpark 构建线性回归模型的过程,并解释了寻找最佳系数和截距值的过程。
五、逻辑回归
本章着重于用 PySpark 构建一个逻辑回归模型,并理解逻辑回归背后的思想。逻辑回归用于分类问题。我们已经在前面的章节中看到了分类的细节。虽然用于分类,但还是叫逻辑回归。这是因为在幕后,线性回归方程仍然可以找到输入变量和目标变量之间的关系。线性回归和逻辑回归之间的主要区别是,我们使用某种非线性函数将后者的输出转换为概率,以将其限制在 0 和 1 之间。例如,我们可以使用逻辑回归来预测用户是否会购买该产品。在这种情况下,模型将返回每个用户的购买概率。逻辑回归在许多商业应用中被广泛使用。
可能性
为了理解逻辑回归,我们必须先复习概率的概念。它被定义为在所有可能的结果中,期望的事件或感兴趣的结果发生的几率。举个例子,如果我们掷硬币,得到正面或反面的机会是相等的(50%),如图 5-1 所示。
图 5-1
事件的概率
如果我们掷一个公平的骰子,那么得到(1 到 6)之间任何一个数字的概率等于 16.7%。
如果我们从一个包含四个绿色球和一个蓝色球的袋子中挑选一个球,挑选一个绿色球的概率是 80%。
逻辑回归用于预测每个目标类的概率。在二进制分类(只有两个类)的情况下,它返回与每个记录的每个类相关联的概率。如前所述,它在幕后使用线性回归来捕捉输入和输出变量之间的关系,但我们另外使用一个元素(非线性函数)来将输出从连续形式转换为概率。让我们借助一个例子来理解这一点。让我们考虑一下,我们必须使用模型来预测某个特定用户是否会购买该产品,我们只使用了一个输入变量,即用户在网站上花费的时间。相同的数据在表 5-1 中给出。
表 5-1。
转换数据集
|-你好。不,不
|
花费的时间(分钟)
|
修改的
|
| --- | --- | --- |
| one | one | 不 |
| Two | Two | 不 |
| three | five | 不 |
| four | Fifteen | 是 |
| five | Seventeen | 是 |
| six | Eighteen | 是 |
让我们将这些数据形象化,以便看出转换用户和非转换用户之间的区别,如图 5-2 所示。
图 5-2
转换状态与花费的时间
使用线性回归
让我们尝试使用线性回归而不是逻辑回归来理解逻辑回归在分类场景中更有意义的原因。为了使用线性回归,我们必须将目标变量从分类形式转换成数字形式。因此,让我们为转换后的列重新分配值:
是= 1
否= 0
现在,我们的数据看起来就像表 5-2 中给出的那样。
表 5-2。
抽样资料
|-你好。不,不
|
花费的时间(分钟)
|
修改的
|
| --- | --- | --- |
| one | one | Zero |
| Two | Two | Zero |
| three | five | Zero |
| four | Fifteen | one |
| five | Seventeen | one |
| six | Eighteen | one |
将分类变量转换成数字变量的过程也很关键,我们将在本章的后半部分详细讨论这一点。现在,让我们绘制这些数据点,以便更好地可视化和理解它(图 5-3 )。
图 5-3
转换状态(1 和 0)与花费时间的关系
正如我们所观察到的,在我们的目标列中只有两个值(1 和 0),并且每个点都位于这两个值中的任意一个上。现在,假设我们对这些数据点进行线性回归,得出一条“最佳拟合线”,如图 5-4 所示。
图 5-4
用户回归线
这条线的回归方程是
就用一条直线来区分 1 和 0 值而言,到目前为止一切看起来都不错。看起来线性回归在区分转换用户和非转换用户方面做得很好,但是这种方法有一个小问题。
举个例子,一个新用户在网站上花了 20 秒,我们必须使用线性回归线来预测这个用户是否会转化。我们使用上面的回归方程,并尝试预测 20 秒所用时间的 y 值。
我们可以简单地通过计算来计算 y 的值
或者,我们也可以简单地从耗时轴到最佳拟合线上画一条垂直线来预测 y 的值。显然,y 的预测值 1.7 似乎远远大于 1,如图 5-5 所示。这种方法没有任何意义,因为我们只想预测 0 到 1 之间的值。
图 5-5
使用回归线的预测
因此,如果我们对分类案例使用线性回归,就会产生预测输出值范围从–无穷大到+无穷大的情况。因此,我们需要另一种方法来将这些值限制在 0 和 1 之间。0 和 1 之间的值的概念不再陌生,因为我们已经看到了概率。因此,逻辑回归本质上提出了与概率值相关联的正类和负类之间的决策边界。
使用 Logit
为了实现将输出值转换成概率的目标,我们使用了一种叫做 Logit 的东西。Logit 是一个非线性函数,它对线性方程进行非线性变换,将输出在 0 和 1 之间转换。在逻辑回归中,非线性函数是 Sigmoid 函数,如下所示:
它总是产生 0 到 1 之间的值,与 x 的值无关。
所以,回到我们之前的线性回归方程
我们将输出(y)传递给这个非线性函数(sigmoid ),使其值在 0 和 1 之间变化。
概率=
概率=
使用上述等式,预测值被限制在 0 和 1 之间,输出现在如图 5-6 所示。
图 5-6
逻辑曲线
使用非线性函数的优点是,无论输入值(花费的时间)如何,输出总是转换的概率。这条曲线也被称为逻辑曲线。逻辑回归还假设输入和目标变量之间存在线性关系,因此找出截距和系数的最佳值来捕捉这种关系。
解释(系数)
使用被称为梯度下降的技术找到输入变量的系数,该技术寻找以总误差最小化的方式优化损失函数。我们可以看看 logistic 回归方程,了解系数的解释。
比方说,在计算了我们示例中的数据点之后,我们得到花费时间的系数值为 0.75。
为了理解这个 0.75 意味着什么,我们必须取这个系数的指数值。
e?? 0.75= 2.12
这个 2.12 被认为是一个奇怪的比率,它表明在网站上花费的每单位时间的增加会增加 112%的客户转化几率。
虚拟变量
到目前为止,我们只处理了连续/数值变量,但数据集中出现分类变量是不可避免的。因此,让我们来理解使用分类值进行建模的方法。由于机器学习模型只消耗数字格式的数据,我们必须采用某种技术将分类数据转换成数字形式。我们已经在上面看到了一个例子,其中我们将目标类(Yes/No)转换为数值(1 或 0)。这就是所谓的标签编码,我们为特定列中的每个类别分配唯一的数值。还有一种方法非常有效,被称为实体模型化或热编码。让我们借助一个例子来理解这一点。让我们在现有的示例数据中再添加一列。假设我们有一个包含用户使用的搜索引擎的附加列。因此,我们的数据看起来像这样,如表 5-3 所示。
表 5-3。
分类数据集
|-你好。不,不
|
花费的时间(分钟)
|
搜索引擎
|
修改的
|
| --- | --- | --- | --- |
| one | five | 谷歌 | Zero |
| Two | Two | 堆 | Zero |
| three | Ten | 美国 Yahoo 公司(提供互联网的信息检索服务) | one |
| four | Fifteen | 堆 | one |
| five | one | 美国 Yahoo 公司(提供互联网的信息检索服务) | Zero |
| six | Twelve | 谷歌 | one |
因此,要使用搜索引擎专栏中提供的额外信息,我们必须使用实体模型化将其转换为数字格式。因此,我们将获得额外数量的虚拟变量(列),这将等于搜索引擎列中不同类别的数量。以下步骤解释了将分类特征转换为数字特征的整个过程。
表 5-5。
木乃伊化
|-你好。不,不
|
花费的时间(分钟)
|
如果 _Google
|
色冰
|
SE_Yahoo
|
修改的
|
| --- | --- | --- | --- | --- | --- |
| one | one | one | Zero | Zero | Zero |
| Two | Two | Zero | one | Zero | Zero |
| three | five | Zero | Zero | one | Zero |
| four | Fifteen | Zero | one | Zero | one |
| five | Seventeen | Zero | one | Zero | one |
| six | Eighteen | one | Zero | Zero | one |
-
找出类别列中不同类别的数量。到目前为止,我们只有三个不同的类别(谷歌、必应、雅虎)。
-
Create new columns for each of the distinct categories and add value 1 in the category column for which it is present or else 0 as shown in Table 5-4.
表 5-4。
一个热编码
|-你好。不,不
|
花费的时间(分钟)
|
搜索引擎
|
如果 _Google
|
色冰
|
SE_Yahoo
|
修改的
|
| --- | --- | --- | --- | --- | --- | --- |
| one | one | 谷歌 | one | Zero | Zero | Zero |
| Two | Two | 堆 | Zero | one | Zero | Zero |
| three | five | 美国 Yahoo 公司(提供互联网的信息检索服务) | Zero | Zero | one | Zero |
| four | Fifteen | 堆 | Zero | one | Zero | one |
| five | Seventeen | 美国 Yahoo 公司(提供互联网的信息检索服务) | Zero | one | Zero | one |
| six | Eighteen | 谷歌 | one | Zero | Zero | one | -
删除原始类别列。因此,数据集现在总共包含五列(不包括索引),因为我们有三个额外的虚拟变量,如表 5-5 所示。
整个想法是以不同的方式表示相同的信息,以便机器学习模型也可以从分类值中学习。
模型评估
为了衡量逻辑回归模型的性能,我们可以使用多个指标。最明显的是精度参数。准确性是模型做出的正确预测的百分比。然而,准确性并不总是首选的方法。为了理解逻辑模型的性能,我们应该使用混淆矩阵。它由预测值计数和实际值组成。二进制类的混淆矩阵如表 5-6 所示。
表 5-6。
混淆矩阵
|实际/预测
|
预测类别(是)
|
预测类别(否)
|
| --- | --- | --- |
| 实际类别(是) | 真正数(TP) | 假阴性(FN) |
| 实际类别(无) | 假阳性(FP) | 真阴性(TN) |
让我们理解混淆矩阵中的单个值。
真阳性
这些是实际上属于正类的值,并且模型也正确地预测了它们属于正类。
-
实际类:正(1)
-
ML 模型预测类:正(1)
真正的否定
这些值实际上属于负类,并且模型也正确地预测了它们属于负类。
-
实际类别:负(0)
-
ML 模型预测类:阴性(1)
假阳性
这些值实际上属于负类,但模型错误地预测它们属于正类。
-
实际类别:负(0)
-
ML 模型预测类:正(1)
假阴性
这些值实际上属于正类,但模型错误地预测它们属于负类。
-
实际类:正(1)
-
ML 模型预测类:阴性(1)
准确
准确度是真阳性和真阴性的总和除以记录总数:
但是如前所述,由于目标阶层的不平衡,它并不总是首选指标。大多数时候,目标类频率是偏斜的(与 TP 示例相比,TN 示例的数量更大)。例如,欺诈检测数据集包含 99 %的真实交易和仅 1%的欺诈交易。现在,如果我们的逻辑回归模型预测所有真实交易,没有欺诈案件,它仍然有 99%的准确率。全部的重点是找出关于积极类的表现;因此,我们可以使用几个其他评估指标。
回忆
召回率有助于从正面类的角度评估模型的性能。它表示模型能够正确预测的实际阳性案例占阳性案例总数的百分比。
它谈到了机器学习模型在预测积极类时的质量。那么,在所有积极的类别中,该模型能够正确预测多少呢?该指标被广泛用作分类模型的评估标准。
精确
精度是指模型预测的所有阳性案例中实际阳性案例的数量:
这些也可以作为评价标准。
F1 分数
F1 得分=
截止/阈值概率
因为我们知道逻辑回归模型的输出是概率得分,所以决定预测概率的截止值或阈值是非常重要的。默认情况下,概率阈值设置为 50%。这意味着,如果模型的概率输出低于 50%,模型将预测它属于负类(0),如果它等于并高于 50%,它将被分配正类(1)。
如果阈值限制非常低,那么该模型将预测许多肯定的类别,并且将具有高召回率。相反,如果阈值概率非常高,则模型可能会错过正例,召回率会很低,但精度会更高。在这种情况下,模型将预测很少的阳性病例。决定一个好的阈值通常是具有挑战性的。受试者操作者特征曲线,或 ROC 曲线,可以帮助决定哪个阈值是最好的。
受试者工作特征曲线
ROC 用于决定模型的阈值。如图 5-7 所示,是召回率(也称为灵敏度)和精确度(特异性)之间的关系图。
图 5-7
受试者工作特征曲线
人们希望选择一个在召回率和精确度之间提供平衡的阈值。因此,现在我们已经了解了与逻辑回归相关的各种组件,我们可以继续使用 PySpark 构建逻辑回归模型。
逻辑回归代码
本章的这一节重点介绍如何使用 PySpark 和 Jupyter Notebook 从头构建一个逻辑回归模型。
注意
这本书的 GitHub repo 上提供了完整的数据集和代码,在 Spark 2.3 和更高版本上执行得最好。
让我们使用 Spark 的 MLlib 库建立一个逻辑回归模型,并预测目标类标签。
数据信息
我们将在本例中使用的数据集是一个虚拟数据集,总共包含 20,000 行和 6 列。我们必须使用 5 个输入变量,通过逻辑回归模型来预测目标类别。该数据集包含关于零售体育商品网站的在线用户的信息。这些数据包括用户的国家、使用的平台、年龄、回头客或首次访客,以及在网站上浏览的网页数量。它还包含客户最终是否购买了产品的信息(转换状态)。
步骤 1:创建 Spark 会话对象
我们启动 Jupyter 笔记本并导入 SparkSession,然后创建一个新的 SparkSession 对象来使用 Spark。
[In]: from pyspark.sql import SparkSession
[In]: spark=SparkSession.builder.appName('log_reg').getOrCreate()
步骤 2:读取数据集
然后,我们使用 Dataframe 在 Spark 中加载和读取数据集。我们必须确保我们已经从数据集可用的同一个目录文件夹中打开了 PySpark,否则我们必须提到数据文件夹的目录路径。
[In]: df=spark.read.csv('Log_Reg_dataset.csv',inferSchema=True,header=True)
步骤 3:探索性数据分析
在本节中,我们将通过查看数据集并验证它的形状和变量的各种统计测量值来更深入地研究数据集。我们从检查数据集的形状开始:
[In]:print((df.count(), len(df.columns)))
[Out]: (20000, 6)
因此,上面的输出确认了数据集的大小,然后我们可以验证输入值的数据类型,以检查我们是否需要更改/转换任何列的数据类型。
[In]: df.printSchema()
[Out]: root
|-- Country: string (nullable = true)
|-- Age: integer (nullable = true)
|-- Repeat_Visitor: integer (nullable = true)
|-- Search_Engine: string (nullable = true)
|-- Web_pages_viewed: integer (nullable = true)
|-- Status: integer (nullable = true)
正如我们所看到的,有两个这样的列(Country,Search_Engine),它们本质上是分类的,因此需要转换成数字形式。让我们用 Spark 中的 show 函数来看看数据集。
[In]: df.show(5)
[Out]:
我们现在可以使用 describe 函数来检查数据集的统计度量。
[In]: df.describe().show()
[Out]:
我们可以观察到,访问者的平均年龄接近 28 岁,他们在网站访问期间查看了大约 9 个网页。
让我们研究各个列,以便更深入地了解数据的细节。与 counts 一起使用的 groupBy 函数返回数据中每个类别的出现频率。
[In]: df.groupBy('Country').count().show()
[Out]:
因此,来自印度尼西亚的游客数量最多,其次是印度:
[In]: df.groupBy('Search_Engine').count().show()
[Out]:
雅虎搜索引擎的用户数量最多。
[In]: df.groupBy('Status').count().show()
[Out]:
+------+-----+
|Status|count|
+------+-----+
| 1|10000|
| 0|10000|
+------+-----+
我们有同等数量的用户转化和非转化。
让我们使用groupBy
函数和均值来了解更多关于数据集的信息。
[In]: df.groupBy('Country').mean().show()
[Out]:
我们来自马来西亚的转化率最高,其次是印度。网页平均访问量在马来西亚最高,在巴西最低。
[In]: df.groupBy('Search_Engine').mean().show()
[Out]:
我们从使用谷歌搜索引擎的用户访问者那里获得了最高的转化率。
[In]: df.groupBy(Status).mean().show()
[Out]:
我们可以清楚地看到,转换状态和页面浏览数量以及重复访问之间有着密切的联系。
步骤 4:特征工程
在这一部分,我们将分类变量转换成数字形式,并使用 Spark 的VectorAssembler
创建一个组合了所有输入特征的向量。
[In]: from pyspark.ml.feature import StringIndexer
[In]: from pyspark.ml.feature import VectorAssembler
由于我们要处理两个分类列,我们必须将国家和搜索引擎列转换成数字形式。机器学习模型无法理解分类值。
第一步是使用StringIndexer
into numerical form
标记列。它为列的每个类别分配唯一的值。因此,在下面的例子中,搜索引擎(Yahoo,Google,Bing)的所有三个值都被赋值(0.0,1.0,2.0)。这在名为search_engine_num
的栏目中可见一斑。
[In]: search_engine_indexer =StringIndexer(inputCol="Search_Engine", outputCol="Search_Engine_Num").fit(df)
[In]:df = search_engine_indexer.transform(df)
[In]: df.show(3,False)
[Out]:
[In]: df.groupBy('Search_Engine').count().orderBy('count',ascending=False).show(5,False)
[Out]:
[In]: df.groupBy(‘Search_Engine_Num').count().orderBy('count',ascending=False).show(5,False)
[Out]:
下一步是将这些值中的每一个表示成一个热编码向量的形式。然而,这个向量在表示方面有一点不同,因为它捕获向量中的值和位置。
[In]: from pyspark.ml.feature import OneHotEncoder
[In]:search_engine_encoder=OneHotEncoder(inputCol="Search_Engine_Num", outputCol="Search_Engine_Vector")
[In]: df = search_engine_encoder.transform(df)
[In]: df.show(3,False)
[Out]:
[In]: df.groupBy('Search_Engine_Vector').count().orderBy('count',ascending=False).show(5,False)
[Out]:
我们将用于构建逻辑回归的最后一个特性是Search_Engine_Vector
。让我们理解这些列值代表什么。
(2,[0],[1.0]) represents a vector of length 2 , with 1 value :
Size of Vector – 2
Value contained in vector – 1.0
Position of 1.0 value in vector – 0th place
这种表示法可以节省计算空间,从而加快计算速度。向量的长度等于元素总数减 1,因为每个值只需借助两列就可以很容易地表示出来。例如,如果我们需要使用一种热编码来表示搜索引擎,通常,我们可以这样做,如下所示。
|搜索引擎
|
谷歌
|
美国 Yahoo 公司(提供互联网的信息检索服务)
|
堆
|
| --- | --- | --- | --- |
| 谷歌 | one | Zero | Zero |
| 美国 Yahoo 公司(提供互联网的信息检索服务) | Zero | one | Zero |
| 堆 | Zero | Zero | one |
以优化方式表示上述信息的另一种方式是使用两列而不是三列,如下所示。
|搜索引擎
|
谷歌
|
美国 Yahoo 公司(提供互联网的信息检索服务)
|
| --- | --- | --- |
| 谷歌 | one | Zero |
| 美国 Yahoo 公司(提供互联网的信息检索服务) | Zero | one |
| 堆 | Zero | Zero |
让我们对另一个分类列(Country
)重复相同的过程。
[In]:country_indexer = StringIndexer(inputCol="Country", outputCol="Country_Num").fit(df)
[In]: df = country_indexer.transform(df)
[In]: df.groupBy('Country').count().orderBy('count',ascending=False).show(5,False)
[Out]:
[In]: df.groupBy('Country_Num').count().orderBy('count',ascending=False).show(5,False)
[Out]:
[In]: country_encoder = OneHotEncoder(inputCol="Country_Num", outputCol="Country_Vector")
[In]: df = country_encoder.transform(df)
[In]: df.select(['Country','Country_Num','Country_Vector']).show(3,False)
[Out]:
[In]: df.groupBy('Country_Vector').count().orderBy('count',ascending=False).show(5,False)
[Out]:
既然我们已经将两个分类列转换为数字形式,我们需要将所有输入列组合成一个向量,作为模型的输入特征。
因此,我们选择需要用来创建单个特征向量的输入列,并将输出向量命名为 features。
[In]: df_assembler = VectorAssembler(inputCols=['Search_Engine_Vector','Country_Vector','Age', 'Repeat_Visitor','Web_pages_viewed'], outputCol="features")
[In}:df = df_assembler.transform(df)
[In]: df.printSchema()
[Out]:
root
|-- Country: string (nullable = true)
|-- Age: integer (nullable = true)
|-- Repeat_Visitor: integer (nullable = true)
|-- Search_Engine: string (nullable = true)
|-- Web_pages_viewed: integer (nullable = true)
|-- Status: integer (nullable = true)
|-- Search_Engine_Num: double (nullable = false)
|-- Search_Engine_Vector: vector (nullable = true)
|-- Country_Num: double (nullable = false)
|-- Country_Vector: vector (nullable = true)
|-- features: vector (nullable = true)
正如我们所看到的,现在我们有了一个名为 features 的额外列,它是所有输入要素的组合,表示为一个密集矢量。
[In]: df.select(['features','Status']).show(10,False)
[Out]:
+-----------------------------------+------+
|features |Status|
+-----------------------------------+------+
|[1.0,0.0,0.0,1.0,0.0,41.0,1.0,21.0]|1 |
|[1.0,0.0,0.0,0.0,1.0,28.0,1.0,5.0] |0 |
|(8,[1,4,5,7],[1.0,1.0,40.0,3.0]) |0 |
|(8,[2,5,6,7],[1.0,31.0,1.0,15.0]) |1 |
|(8,[1,5,7],[1.0,32.0,15.0]) |1 |
|(8,[1,4,5,7],[1.0,1.0,32.0,3.0]) |0 |
|(8,[1,4,5,7],[1.0,1.0,32.0,6.0]) |0 |
|(8,[1,2,5,7],[1.0,1.0,27.0,9.0]) |0 |
|(8,[0,2,5,7],[1.0,1.0,32.0,2.0]) |0 |
|(8,[2,5,6,7],[1.0,31.0,1.0,16.0]) |1 |
+-----------------------------------+------+
only showing top 10 rows
让我们只选择 features 列作为输入,Status 列作为输出来训练逻辑回归模型。
[In]: model_df=df.select(['features','Status'])
步骤 5:拆分数据集
为了训练和评估逻辑回归模型的性能,我们必须将数据集分为训练和测试数据集。我们以 75/25 的比例分割它,并在数据集的 75%上训练我们的模型。拆分数据的另一个用途是,我们可以使用 75%的数据来应用交叉验证,以便得出最佳的超参数。交叉验证可以是不同的类型,其中训练数据的一部分被保留用于训练,而剩余部分用于验证目的。K-fold 交叉验证主要用于训练具有最佳超参数的模型。
我们可以打印火车的形状和测试数据来验证尺寸。
[In]: training_df,test_df=model_df.randomSplit([0.75,0.25])
[In]: print(training_df.count())
[Out]: (14907)
[In]: training_df.groupBy('Status').count().show()
[Out]:
+------+-----+
|Status|count|
+------+-----+
| 1| 7417|
| 0| 7490|
+------+-----+
这确保我们在训练和测试集中有一个目标类(Status
)的平衡集。
[In]:print(test_df.count())
[Out]: (5093)
[In]: test_df.groupBy('Status').count().show()
[Out]:
+------+-----+
|Status|count|
+------+-----+
| 1| 2583|
| 0| 2510|
+------+-----+
步骤 6:建立和训练逻辑回归模型
在这一部分中,我们使用功能作为输入列,状态作为输出列来构建和训练逻辑回归模型。
[In]: from pyspark.ml.classification import LogisticRegression
[In]: log_reg=LogisticRegression(labelCol='Status').fit(training_df)
培训结果
我们可以使用 Spark 中的 evaluate 函数访问模型做出的预测,该函数以优化的方式执行所有步骤。这给出了另一个总共包含四列的数据框架,包括预测和概率。prediction
列表示模型已经为给定行预测的类标签,而probability
列包含两个概率(第 0 个索引处的负类的概率和第 1 个索引处的正类的概率)。
[In]: train_results=log_reg.evaluate(training_df).predictions
[In]: train_results.filter(train_results['Status']==1).filter(train_results['prediction']==1).select(['Status','prediction','probability']).show(10,False)
[Out]:
+------+----------+----------------------------------------+
|Status|prediction|probability |
+------+----------+----------------------------------------+
|1 |1.0 |[0.2978572628475072,0.7021427371524929] |
|1 |1.0 |[0.2978572628475072,0.7021427371524929] |
|1 |1.0 |[0.16704676975730415,0.8329532302426959]|
|1 |1.0 |[0.16704676975730415,0.8329532302426959]|
|1 |1.0 |[0.16704676975730415,0.8329532302426959]|
|1 |1.0 |[0.08659913656062515,0.9134008634393749]|
|1 |1.0 |[0.08659913656062515,0.9134008634393749]|
|1 |1.0 |[0.08659913656062515,0.9134008634393749]|
|1 |1.0 |[0.08659913656062515,0.9134008634393749]|
|1 |1.0 |[0.08659913656062515,0.9134008634393749]|
+------+----------+----------------------------------------+
因此,在上述结果中,第 0 个指标的概率是针对Status = 0
的,第 1 个指标的概率是针对Status =1
的。
步骤 7:评估测试数据的线性回归模型
整个练习的最后一部分是检查模型在未知或测试数据上的性能。我们再次利用 evaluate 函数对测试进行预测。
我们将预测数据帧分配给结果,结果数据帧现在包含五列。
[In]:results=log_reg.evaluate(test_df).predictions
[In]: results.printSchema()
[Out]:
root
|-- features: vector (nullable = true)
|-- Status: integer (nullable = true)
|-- rawPrediction: vector (nullable = true)
|-- probability: vector (nullable = true)
|-- prediction: double (nullable = false)
我们可以使用 select 关键字过滤我们想要查看的列。
[In]: results.select(['Status','prediction']).show(10,False)
[Out]:
+------+----------+
|Status|prediction|
+------+----------+
|0 |0.0 |
|0 |0.0 |
|0 |0.0 |
|0 |0.0 |
|1 |0.0 |
|0 |0.0 |
|1 |1.0 |
|0 |1.0 |
|1 |1.0 |
|1 |1.0 |
+------+----------+
only showing top 10 rows
由于这是一个分类问题,我们将使用混淆矩阵来衡量模型的性能。
混淆矩阵
我们将手动为真阳性、真阴性、假阳性和假阴性创建变量,以更好地理解它们,而不是使用直接的内置函数。
[In]:tp = results[(results.Status == 1) & (results.prediction == 1)].count()
[In]:tn = results[(results.Status == 0) & (results.prediction == 0)].count()
[In]:fp = results[(results.Status == 0) & (results.prediction == 1)].count()
[In]:fn = results[(results.Status == 1) & (results.prediction == 0)].count()
准确
正如本章已经讨论过的,精确度是评估任何分类器的最基本的度量标准;然而,由于对目标类平衡的依赖性,这不是模型性能的正确指标。
[In]: accuracy=float((true_postives+true_negatives) /(results.count()))
[In]:print(accuracy)
[Out]: 0.9374255065554231
我们建立的模型的准确率大约为 94%。
回忆
召回率显示了在所有的正类观察结果中,我们能够正确预测的正类案例的数量。
[In]: recall = float(true_postives)/(true_postives + false_negatives)
[In]:print(recall)
[Out]: 0.937524870672503
模型的召回率在 0.94 左右。
精确
精确率是指在所有预测的阳性观察值中,正确预测的真阳性的数量:
[In]: precision = float(true_postives) / (true_postives + false_positives)
[In]: print(precision)
[Out]: 0.9371519490851233
因此,召回率和精确率也在相同的范围内,这是因为我们的目标类别得到了很好的平衡。
结论
在本章中,我们回顾了理解逻辑回归的构建模块、将分类列转换为数字特征以及使用 PySpark 从头构建逻辑回归模型的过程。
六、随机森林
这一章主要讲述用 PySpark 构建随机森林(RF)进行分类。我们将了解它们的各个方面以及预测是如何发生的;但在了解更多关于随机森林的知识之前,我们必须了解 RF 的构建模块,即决策树(DT)。决策树也用于分类/回归。但是就准确性而言,由于各种原因,随机森林胜过 DT 分类器,我们将在本章后面讨论这些原因。让我们了解更多关于决策树的知识。
决策图表
决策树属于机器学习的监督类别,使用频率表进行预测。决策树的一个优点是它可以处理分类变量和数值变量。顾名思义,它以一种树状结构运行,并根据各种分裂形成这些规则,最终做出预测。决策树中使用的算法是 J. R. Quinlan 开发的 ID3。
我们可以将决策树分解成不同的组件,如图 6-1 所示。
图 6-1
决策图表
树从中分支的最顶端的分裂节点被称为根节点;在上面的例子中,年龄是根节点。圆圈中表示的值称为叶节点或预测。让我们用一个样本数据集来理解决策树实际上是如何工作的。
表 6-1 中显示的数据包含了一些不同年龄组和属性的人的样本数据。基于这些属性要做出的最终决定是保险费是否应该偏高。这是一个典型的分类案例,我们将使用决策树对其进行分类。我们有四个输入栏(年龄组、吸烟者、医疗状况、工资水平)。
表 6-1
示例数据集
|年龄层
|
吸烟者
|
医疗条件
|
薪资水平
|
保险费
|
| --- | --- | --- | --- | --- |
| 老的 | 是 | 是 | 高的 | 高的 |
| 十几岁的青少年 | 是 | 是 | 中等 | 高的 |
| 年纪轻的 | 是 | 是 | 中等 | 低的 |
| 老的 | 不 | 是 | 高的 | 高的 |
| 年纪轻的 | 是 | 是 | 高的 | 低的 |
| 十几岁的青少年 | 不 | 是 | 低的 | 高的 |
| 十几岁的青少年 | 不 | 不 | 低的 | 低的 |
| 老的 | 不 | 不 | 低的 | 高的 |
| 十几岁的青少年 | 不 | 是 | 中等 | 高的 |
| 年纪轻的 | 不 | 是 | 低的 | 高的 |
| 年纪轻的 | 是 | 不 | 高的 | 低的 |
| 十几岁的青少年 | 是 | 不 | 中等 | 低的 |
| 年纪轻的 | 不 | 不 | 中等 | 高的 |
| 老的 | 是 | 不 | 中等 | 高的 |
熵
决策树生成这些数据的子集,每个子集包含相同的类值(同质);为了计算同质性,我们使用熵。这也可以使用基尼指数和分类误差等其他指标来计算,但我们将使用熵来理解决策树是如何工作的。计算熵的公式是
-p log2p-q log2q
图 6-2 表明如果子集完全纯,熵等于零;这意味着它只属于一个类,如果子集被平均分成两个类,则它等于 1
图 6-2
熵
如果我们想计算我们的目标变量(保险费)的熵,我们首先要计算每一类的概率,然后用上面的公式计算熵。
|保险费
|
| --- |
| 高(9) | 低(5) |
高类别的概率等于 9/14 =0.64
低类别的概率等于 5/14 =0.36
熵=p(高)log2(p(高)p(低)log2(p(低))
=——0.64′??【日志】(0.64))——0.36′??【日志】(0.36))
= 0.94
为了构建决策树,我们需要计算两种熵:
-
目标熵(保险费)
-
具有属性的目标的熵(例如。保险费-吸烟者)
我们已经看到了目标的熵,所以让我们计算具有输入特征的目标的第二个熵。例如,让我们考虑吸烟者的特征。
|熵计算
目标-保险费
特征-吸烟者
|
保险费(目标)
|
| --- | --- |
|
高(9)
|
低(5)
|
| --- | --- |
| 吸烟者(特写) | 是(7) | three | four |
| 第七项 | six | one |
P是 == 0.5
Pno== 0.5
= 0.99
= 0.59
=0.79
类似地,我们计算所有其他属性的熵:
熵 ( 目标,年龄组 ) = 0.69
熵 ( 目标,医疗状况 ) = 0.89
熵 ( 目标,薪资水平 ) = 0.91
信息增益
信息增益(IG)用于在决策树中进行分裂。提供最大信息增益的属性用于分割子集。信息增益告诉我们,就预测而言,哪一个特征是最重要的。从熵的角度来说,IG 是目标在分割前和分割后的熵的变化。
= 0.94 − 0.79
= 0.15
= 0.94 – 0.69
=0.25
= 0.94 – 0.89
=0.05
= 0.94 – 0.91
=0.03
我们可以观察到,年龄组属性给出了最大的信息增益;因此,决策树的根节点应该是年龄组,第一次拆分发生在该属性上,如图 6-3 所示。
图 6-3
决策树分裂
寻找提供最大信息增益的下一个属性的过程递归地继续,并且在决策树中进行进一步的分裂。最后,决策树可能看起来如图 6-4 所示。
图 6-4
决策树拆分
决策树提供的优点是,它可以通过跟踪根节点到任何叶节点而容易地转换成一组规则,因此可以容易地用于分类。存在与决策树相关联的超参数集,这些超参数集提供了以不同方式构建树的更多选项。其中之一是最大深度,它允许我们决定决策树的深度;树越深,树的裂缝就越多,有可能过度拟合。
随机森林
现在我们知道了决策树是如何工作的,我们可以继续学习随机森林。顾名思义,随机森林是由很多树组成的:很多决策树。它们非常受欢迎,有时是监督机器学习的首选方法。随机森林也可以用于分类和回归。他们将来自许多单独决策树的投票组合起来,然后用多数票预测该类,或者在回归的情况下取平均值。这真的很有效,因为弱学习者最终会聚集在一起做出强预测。重要性在于这些决策树的形成方式。“随机”这个名称在 RF 中是有原因的,因为这些树是由一组随机的特征和一组随机的训练样本组成的。现在,每个决策树都使用一组略有不同的数据点进行训练,试图了解输入和输出之间的关系,最终与使用其他数据集进行训练的其他决策树的预测相结合,从而形成随机森林。如果我们举一个与上面相似的例子,创建一个有五棵决策树的随机森林,它可能看起来如图 6-5 所示。
图 6-5
个体决策树
现在,这些决策树中的每一个都使用了数据子集和特征子集来进行训练。这也称为“打包”技术,即引导聚合。每棵树都对预测进行投票排序,投票数最多的类是随机森林分类器的最终预测,如图 6-6 所示。
图 6-6
随机森林
随机森林提供的一些优势如下:
-
特征重要性:随机森林可以根据预测能力给出用于训练的每个特征的重要性。这提供了一个很好的机会来选择相关的特性,去掉较弱的特性。所有特征重要性的总和总是等于 1。
-
更高的准确性:由于它从单个决策树收集投票,随机森林的预测能力比单个决策树相对更高。
-
较少过拟合:单个分类器的结果被平均或最大化投票,因此减少了过拟合的机会。
随机森林的一个缺点是,与决策树相比,它很难可视化,并且在计算方面涉及更多,因为它构建了多个单独的分类器,
密码
本章的这一节重点介绍如何使用 PySpark 和 Jupyter Notebook 从头构建一个随机森林分类器。
注意
这本书的 GitHub repo 上提供了完整的数据集和代码,在 Spark 2.0 和更高版本上执行得最好。
让我们使用 Spark 的 MLlib 库构建一个随机森林模型,并使用输入特性预测目标变量。
数据信息
我们将在这个例子中使用的数据集是一个有几千行六列的开源数据集。我们必须使用五个输入变量来预测使用随机森林模型的目标变量。
步骤 1:创建 SparkSession 对象
我们启动 Jupyter 笔记本并导入 SparkSession,然后创建一个新的 SparkSession 对象来使用 Spark。
[In]: from pyspark.sql import SparkSession
[In]: spark=SparkSession.builder.appName('random_forest').getOrCreate()
步骤 2:读取数据集
然后,我们使用 Dataframe 在 Spark 中加载和读取数据集。我们必须确保我们已经从数据集可用的同一个目录文件夹中打开了 PySpark,否则我们必须提到数据文件夹的目录路径。
[In]: df=spark.read.csv('affairs.csv',inferSchema=True,header=True)
步骤 3:探索性数据分析
在本节中,我们将通过查看数据集并验证数据集的形状和变量的各种统计度量来更深入地研究数据集。我们从检查数据集的形状开始。
[In]: print((df.count(), len(df.columns)))
[Out]: (6366, 6)
因此,上面的输出确认了数据集的大小,然后我们可以验证输入值的数据类型,以检查我们是否需要更改/转换任何列的数据类型。
[In]: df.printSchema()
[Out]: root
|-- rate_marriage: integer (nullable = true)
|-- age: double (nullable = true)
|-- yrs_married: double (nullable = true)
|-- children: double (nullable = true)
|-- religious: integer (nullable = true)
|-- affairs: integer (nullable = true)
正如我们所看到的,没有需要转换成数字形式的分类列。让我们用 Spark 中的 show 函数来看看数据集:
[In]: df.show(5)
[Out]:
我们现在可以使用 describe 函数来检查数据集的统计度量。
[In]: df.describe().select('summary','rate_marriage','age','yrs_married','children','religious').show()
[Out]:
我们可以观察到,人的平均年龄接近 29 岁,结婚 9 年。
让我们研究各个列,以便更深入地了解数据的细节。与 counts 一起使用的 groupBy 函数返回数据中每个类别的出现频率。
[In]: df.groupBy('affairs').count().show()
[Out]:
所以,在总人口中,有超过 33%的人有婚外情。
[In]: df.groupBy('rate_marriage').count().show()
[Out]:
大多数人对他们的婚姻评价很高(4 或 5),其余的人评价较低。让我们再深入一点,了解婚姻评级是否与婚外情变量有关。
[In]: df.groupBy('rate_marriage','affairs').count().orderBy('rate_marriage','affairs','count',ascending=True).show()
[Out]:
显然,这些数据表明,对婚姻评价低的人有外遇的比例很高。这可能被证明是预测的一个有用特征。我们将以类似的方式探索其他变量。
[In]: df.groupBy('religious','affairs').count().orderBy('religious','affairs','count',ascending=True).show()
[Out]:
从宗教角度的评分以及对宗教特征评分较低和对婚外情评分较高的人数来看,我们也有类似的情况。
[In]: df.groupBy('children','affairs').count().orderBy('children','affairs','count',ascending=True).show()
[Out]:
上表没有清楚地表明子女数量和卷入婚外情机会之间关系的任何趋势。让我们使用 groupBy 函数和 mean 来了解数据集的更多信息。
[In]: df.groupBy('affairs').mean().show()
[Out]:
所以,从年龄的角度来看,有婚外情的人对他们的婚姻评价很低,甚至有点偏高。他们结婚的时间也更长,也更不信教。
步骤 4:特征工程
这是我们使用 Spark 的 VectorAssembler 创建一个组合所有输入特征的单一向量的部分。
[In]: from pyspark.ml.feature import VectorAssembler
我们需要将所有的输入列组合成一个向量,作为模型的输入特征。因此,我们选择需要用来创建单个特征向量的输入列,并将输出向量命名为 features。
[In]: df_assembler = VectorAssembler(inputCols=['rate_marriage', 'age', 'yrs_married', 'children', 'religious'], outputCol="features")
[In}:df = df_assembler.transform(df)
[In]: df.printSchema()
[Out]:
root
|-- rate_marriage: integer (nullable = true)
|-- age: double (nullable = true)
|-- yrs_married: double (nullable = true)
|-- children: double (nullable = true)
|-- religious: integer (nullable = true)
|-- affairs: integer (nullable = true)
|-- features: vector (nullable = true)
正如我们所看到的,现在我们有了一个名为 features 的额外列,它是所有输入要素的组合,表示为一个密集矢量。
[In]: df.select(['features','affairs']).show(10,False)
[Out]:
让我们只选择 features 列作为输入,事务列作为输出来训练随机森林模型。
[In]: model_df=df.select(['features','affairs'])
步骤 5:拆分数据集
为了训练和评估随机森林模型的性能,我们必须将数据集分为训练和测试数据集。我们将其分成 75/25 的比例,并在数据集的 75%上训练我们的模型。我们可以打印火车的形状和测试数据来验证大小。
[In]: train_df,test_df=model_df.randomSplit([0.75,0.25])
[In]: print(train_df.count())
[Out]: 4775
[In]: train_df.groupBy('affairs').count().show()
[Out]:
+-------+-----+
|affairs|count|
+-------+-----+
| 1| 1560|
| 0| 3215|
+-------+-----+
这确保我们将目标类(“事务”)的集合值平衡到训练集和测试集中。
[In]: test_df.groupBy('affairs').count().show()
[Out]:
+-------+-----+
|affairs|count|
+-------+-----+
| 1| 493|
| 0| 1098|
+-------+-----+
步骤 6:建立和训练随机森林模型
在这一部分中,我们使用输入和状态等特征作为输出列来构建和训练随机森林模型。
[In]: from pyspark.ml.classification import RandomForestClassifier
[In]: rf_classifier=RandomForestClassifier(labelCol='affairs',numTrees=50).fit(train_df)
有许多超参数可以设置来调整模型的性能,但我们在这里选择的是默认参数,除了我们想要构建的决策树的数量。
步骤 7:测试数据的评估
一旦我们在训练数据集上训练了我们的模型,我们就可以在测试集上评估它的性能。
[In]: rf_predictions=rf_classifier.transform(test_df)
[In]: rf_predictions.show()
[Out]:
预测表中的第一列是测试数据的输入要素。第二列是测试数据的实际标签或输出。第三列(rawPrediction)表示两种可能输出的置信度。第四列是每个类别标签的条件概率,最后一列是随机森林分类器的预测。我们可以对预测列应用一个 groupBy 函数,以找出对正类和负类的预测数量。
[In]: rf_predictions.groupBy('prediction').count().show()
[Out]:
+----------+-----+
|prediction|count|
+----------+-----+
| 0.0| 1257|
| 1.0| 334|
+----------+-----+
为了评估这些预测,我们将导入分类评估器。
[In]: from pyspark.ml.evaluation import MulticlassClassificationEvaluator
[In]: from pyspark.ml.evaluation import BinaryClassificationEvaluator
准确
[In]: rf_accuracy=MulticlassClassificationEvaluator(labelCol='affairs',metricName='accuracy').evaluate(rf_predictions)
[In]: print('The accuracy of RF on test data is {0:.0%}'.format(rf_accuracy))
[Out]: The accuracy of RF on test data is 73%
精确
[In]: rf_precision=MulticlassClassificationEvaluator(labelCol='affairs',metricName='weightedPrecision').evaluate(rf_predictions)
[In]: print('The precision rate on test data is {0:.0%}'.format(rf_precision))
[Out]: The precision rate on test data is 71%
罗马纪元
[In]: rf_auc=BinaryClassificationEvaluator(labelCol='affairs').evaluate(rf_predictions)
[In]: print( rf_auc)
[Out]: 0.738
如前所述,RF 给出了每个特征在预测能力方面的重要性,这对于找出对预测贡献最大的关键变量非常有用。
[In]: rf_classifier.featureImportances
[Out]: (5,[0,1,2,3,4],[0.563965247822,0.0367408623003,0.243756511958,0.0657893200779,0.0897480578415])
我们使用了五个特征,并且可以使用特征重要性函数来找出重要性。要了解哪个输入要素映射到哪个索引值,我们可以使用元数据信息。
[In]: df.schema["features"].metadata["ml_attr"]["attrs"]
[Out]:
{'idx': 0, 'name': 'rate_marriage'},
{'idx': 1, 'name': 'age'},
{'idx': 2, 'name': 'yrs_married'},
{'idx': 3, 'name': 'children'},
{'idx': 4, 'name': 'religious'}}
因此,从预测的角度来看,结婚率是最重要的特征,其次是结婚年龄。最不重要的变量似乎是年龄。
步骤 8:保存模型
有时,在训练完模型后,我们只需要调用模型进行预测,因此保留模型对象并重用它进行预测是非常有意义的。这包括两个部分。
-
保存 ML 模型
-
加载 ML 模型
[In]: from pyspark.ml.classification import RandomForestClassificationModel
[In]: rf_classifier.save("/home/jovyan/work/RF_model")
This way we saved the model as object locally.The next step is to load the model again for predictions
[In]: rf=RandomForestClassificationModel.load("/home/jovyan/work/RF_model")
[In]: new_preditions=rf.transform(new_df)
新的预测表将包含具有模型预测的列
结论
在本章中,我们回顾了理解随机森林的构建块并在 PySpark 中创建 ML 模型以进行分类以及评估指标(如准确度、精密度和 auc)的过程。我们还讲述了如何在本地保存 ML 模型对象并在预测中重用它。
七、推荐系统
在实体店中可以观察到的一个常见趋势是,我们在购物时有销售人员指导和推荐我们相关的产品。另一方面,在在线零售平台上,有无数不同的产品可供选择,我们必须自己导航才能找到合适的产品。现在的情况是,用户有太多的选项和选择,但是他们不喜欢花太多的时间浏览整个目录。因此,推荐系统(RS)的作用对于推荐相关项目和推动客户转化变得至关重要。
传统实体店使用货架图来排列商品,这样可以增加高销量商品的可见性并增加收入,而在线零售店需要根据每个客户的偏好来保持动态,而不是对每个人都保持相同。
推荐系统主要用于以个性化的方式向正确的用户自动推荐正确的内容或产品,以增强整体体验。推荐系统在使用大量数据和学习理解特定用户的偏好方面非常强大。推荐可以帮助用户轻松浏览数以百万计的产品或大量内容(文章/视频/电影),并向他们展示他们可能喜欢或购买的正确商品/信息。因此,简单地说,RS 代表用户帮助发现信息。现在,这取决于用户来决定 rs 是否在推荐方面做得很好,他们可以选择选择产品/内容或放弃并继续前进。用户的每一个决定(积极的或消极的)都有助于根据最新数据重新训练 rs,以便能够给出更好的建议。在这一章中,我们将回顾 RS 是如何工作的,以及提出这些建议时所使用的不同类型的技术。我们还将使用 PySpark 构建一个推荐系统。
推荐
在向用户推荐各种事物的意义上,推荐系统可以用于多种目的。例如,其中一些可能属于以下类别:
-
零售产品
-
乔布斯
-
关系/朋友
-
电影/音乐/视频/书籍/文章
-
广告(ad 的复数)
“推荐什么”部分完全取决于使用 RS 的环境,可以通过提供用户最有可能购买的商品或通过在适当的时间展示相关内容来增加参与度,从而帮助企业增加收入。RS 关注的关键方面是,被推荐的产品或内容应该是用户可能喜欢但自己不会发现的东西。除此之外,RS 还需要各种各样的推荐元素来保持足够的趣味性。一些当今企业大量使用 RS 的例子,如亚马逊产品、脸书的朋友建议、LinkedIn 的“你可能认识的人”、网飞的电影、YouTube 的视频、Spotify 的音乐和 Coursera 的课程。
从商业角度来看,这些建议的影响被证明是巨大的,因此花费了更多的时间来使这些规则更加有效和相关。RS 在零售环境中提供的一些直接优势包括:
-
收入增加
-
用户的积极评价和评级
-
参与度提高
对于其他垂直行业,如广告推荐和其他内容推荐,RS 非常有助于帮助他们找到适合用户的东西,从而增加采用率和订阅量。如果没有 RS,以个性化方式向数百万用户推荐在线内容或向每个用户提供通用内容可能会令人难以置信地偏离目标,并对用户产生负面影响。
现在我们知道了 RS 的用法和特性,我们可以看看不同类型的 RS。可以构建的 RS 主要有五种类型:
-
基于流行度的 RS
-
基于内容的遥感
-
基于协同过滤的粗糙集
-
混合遥感
-
基于粗糙集的关联规则挖掘
除了最后一项,即基于 RS 的关联规则挖掘,我们将简要介绍每一项,因为它超出了本书的范围。
基于流行度的 RS
这是可以用来向用户推荐产品或内容的最基本和最简单的 RS。它根据大多数用户的购买/观看/喜欢/下载来推荐项目/内容。虽然它实现起来简单易行,但它不会产生相关的结果,因为对每个用户的推荐都是一样的,但它有时会优于一些更复杂的 RS。RS 的实现方式是简单地根据各种参数对项目进行排名,并推荐列表中排名最高的项目。如前所述,项目或内容可以按以下方式排序:
-
下载次数
-
购买次数
-
浏览次数
-
最高评级
-
共享次数
-
喜欢的次数
这种 RS 直接向客户推荐最畅销或最受关注/购买的商品,从而增加客户转化的机会。这种 RS 的局限性在于它不是超个性化的。
基于内容的遥感
这种类型的 RS 向用户推荐用户过去喜欢过的类似项目。因此,整个想法是计算任意两个项目之间的相似性得分,并基于用户的兴趣简档推荐给用户。我们首先为每个项目创建项目配置文件。现在,可以通过多种方式创建这些项目配置文件,但最常用的方法是包含关于项目的详细信息或属性的信息。例如,电影的项目简档可以具有各种属性的值,例如恐怖、艺术、喜剧、动作、戏剧和商业,如下所示。
|电影 ID
|
恐怖
|
艺术
|
喜剧
|
行动
|
戏剧
|
商业
|
| --- | --- | --- | --- | --- | --- | --- |
| Two thousand three hundred and ten | Zero point zero one | Zero point three | Zero point eight | Zero | Zero point five | Zero point nine |
上面是一个项目配置文件的例子,每个项目都有一个相似的向量来表示它的属性。现在,让我们假设用户已经观看了 10 部这样的电影,并且非常喜欢它们。因此,对于该特定用户,我们最终得到了表 7-1 中所示的项目矩阵。
表 7-1。
电影数据
|电影 ID
|
恐怖
|
艺术
|
喜剧
|
行动
|
戏剧
|
商业
|
| --- | --- | --- | --- | --- | --- | --- |
| Two thousand three hundred and ten | Zero point zero one | Zero point three | Zero point eight | Zero | Zero point five | Zero point nine |
| Two thousand six hundred and thirty-one | Zero | Zero point four five | Zero point eight | Zero | Zero point five | Zero point six five |
| Two thousand four hundred and forty-four | Zero point two | Zero | Zero point eight | Zero | Zero point five | Zero point seven |
| Two thousand nine hundred and seventy-four | Zero point six | Zero point three | Zero | Zero point six | Zero point five | Zero point three |
| Two thousand one hundred and fifty-one | Zero point nine | Zero point two | Zero | Zero point seven | Zero point five | Zero point nine |
| Two thousand eight hundred and seventy-six | Zero | Zero point three | Zero point eight | Zero | Zero point five | Zero point nine |
| Two thousand three hundred and forty-five | Zero | Zero point three | Zero point eight | Zero | Zero point five | Zero point nine |
| Two thousand three hundred and nine | Zero point seven | Zero | Zero | Zero point eight | Zero point four | Zero point five |
| Two thousand three hundred and sixty-six | Zero point one | Zero point one five | Zero point eight | Zero | Zero point five | Zero point six |
| Two thousand three hundred and eighty-eight | Zero | Zero point three | Zero point eight five | Zero | Zero point eight | Zero point nine |
用户概要
基于内容的 RC 中的另一个组件是用户简档,它是使用用户喜欢或评价的项目简档创建的。假设用户喜欢表 7-1 中的电影,用户简档可能看起来像单个向量,它只是项目向量的平均值。用户配置文件可能如下所示。
|用户标识
|
恐怖
|
艺术
|
喜剧
|
行动
|
戏剧
|
商业
|
| --- | --- | --- | --- | --- | --- | --- |
| 1A92 | Zero point two five one | Zero point two three | Zero point five six five | Zero point two one | Zero point five two | Zero point seven two five |
这种创建用户简档的方法是最基本的方法之一,还有其他复杂的方法来创建更丰富的用户简档,如标准化值、加权值等。下一步是根据之前的偏好推荐用户可能喜欢的项目(电影)。因此,用户简档和项目简档之间的相似性得分被计算并相应地排序。相似度得分越多,用户喜欢该电影的几率越高。有几种方法可以计算相似性得分。
欧几里得距离
用户简档和项目简档都是高维向量,因此为了计算两者之间的相似性,我们需要计算两个向量之间的距离。使用下面的公式可以很容易地计算出 n 维向量的欧几里德距离:
距离值越高,两个向量越不相似。因此,计算用户简档和所有其他项目之间的距离,并按降序排列。以这种方式向用户推荐前几个项目。
余弦相似性
计算用户和项目简档之间的相似性得分的另一种方式是余弦相似性。它测量的不是距离,而是两个向量(用户简档向量和项目简档向量)之间的角度。两个向量之间的角度越小,它们彼此越相似。余弦相似度可以使用下面的公式计算出来:
是(x,y)=cos(θ)= xy / |x||y|
让我们来看看基于内容的 RS 的优缺点。
优点:
-
基于内容的 RC 独立于其他用户的数据工作,因此可以应用于个人的历史数据。
-
RC 背后的基本原理很容易理解,因为推荐是基于用户简档和项目简档之间的相似性得分。
-
也可以仅仅基于用户的历史兴趣和偏好向用户推荐新的和未知的项目。
缺点:
-
项目配置文件可能有偏差,可能无法反映准确的属性值,并可能导致不正确的建议。
-
推荐完全取决于用户的历史,并且只能推荐与历史观看/喜欢的项目相似的项目,而不考虑访问者的新兴趣或喜欢。
基于协同过滤的粗糙集
基于 CF 的 RS 不需要项目属性或描述来进行推荐;相反,它作用于用户项目交互。这些互动可以通过各种方式来衡量,如评级、购买的商品、花费的时间、在另一个平台上的分享等。在深入探讨 CF 之前,让我们后退一步,思考一下我们是如何在日常生活中做出某些决定的,例如以下这些决定:
-
看哪部电影
-
读哪本书
-
去哪家餐馆
-
去哪个地方旅游
我们问我们的朋友,对!我们向在某些方面与我们相似、品味和爱好与我们相同的人寻求推荐。我们的利益在某些领域是一致的,所以我们相信他们的建议。这些人可能是我们的家庭成员、朋友、同事、亲戚或社区成员。在现实生活中,很容易知道这个圈子里的人都是些什么人,但当涉及到在线推荐时,协同过滤中的关键任务是找到与你最相似的用户。每个用户可以由包含用户项目交互的反馈值的向量来表示。让我们先了解用户项目矩阵,以理解 CF 方法。
用户项目矩阵
用户项目矩阵顾名思义。在行中,我们有所有的唯一用户;沿着列,我们有所有独特的项目。这些值填充有反馈或交互分数,以突出用户对该产品的喜欢或不喜欢。一个简单的用户条目矩阵可能类似于表 7-2 所示。
表 7-2。
用户项目矩阵
|用户标识
|
项目 1
|
项目 2
|
项目 3
|
项目 4
|
项目 5
|
项目 n
|
| --- | --- | --- | --- | --- | --- | --- |
| 14SD | one | four | | | five | |
| 26BB | | three | three | | | one |
| 24DG | one | four | one | | five | Two |
| 59YU | | Two | | | five | |
| 21HT | three | Two | one | Two | five | |
| 公元前 68 年 | | one | | | | five |
| 26DF | one | four | | three | three | |
| 25TR | one | four | | | five | |
| 33XF | five | five | five | one | five | five |
| 73QS | one | | three | | | one |
正如您所观察到的,用户项目矩阵通常非常稀疏,因为有数百万个项目,并且每个用户不会与每个项目进行交互;所以这个矩阵包含了很多空值。矩阵中的值通常是基于用户与该特定项目的交互而推导出的反馈值。在 UI 矩阵中可以考虑两种类型的反馈。
明确的反馈
这种类型的反馈通常是当用户在交互之后给项目评级并且已经体验了项目特征时。评级可以有多种类型。
-
用 1-5 级评分
-
向他人推荐的简单评分项目(是或否或从不)
-
喜欢该项目(是或否)
显式反馈数据包含非常有限的数据点,因为即使在购买或使用商品后,也只有很小比例的用户会花时间给出评级。一个完美的例子可以是一部电影,因为很少有用户在观看后给出评级。因此,仅基于显式反馈数据构建 RS 会将我们置于一个棘手的境地,尽管数据本身噪声较小,但有时不足以构建 RS。
隐性反馈
这种反馈不是直接的,主要是从用户在在线平台上的活动中推断出来的,并且基于与项目的交互。例如,如果用户已经购买了该商品,将其添加到购物车中,查看了该商品,并且花了大量时间查看关于该商品的信息,这表明用户对该商品有更高的兴趣。隐式反馈值很容易收集,并且当每个用户通过在线平台导航时,他们可以获得大量的数据点。隐式反馈面临的挑战是,它包含大量嘈杂的数据,因此不会在推荐中增加太多价值。
现在我们已经了解了 UI 矩阵和进入该矩阵的值的类型,我们可以看到不同类型的协同过滤(CF)。主要有两种 CF:
-
基于最近邻的 CF
-
基于潜在因素的 CF
基于最近邻的 CF
这种 CF 的工作方式是通过找到与活跃用户(对于我们试图推荐的用户)喜欢或不喜欢相同项目的最相似用户来找出用户的 k 个最近邻居。最近邻的协同过滤包括两个步骤。第一步是找到 k 个最近的邻居,第二步是预测活跃用户喜欢特定项目的评级或可能性。可以使用我们在本章中讨论过的一些早期技术来找出 k-最近邻。余弦相似性或欧几里德距离等指标可以帮助我们根据两组用户喜欢或不喜欢的共同项目,从用户总数中找到与活跃用户最相似的用户。也可以使用的另一个度量是 Jaccard 相似性。让我们看一个例子来理解这个指标——回到之前的用户项目矩阵,只取五个用户的数据,如表 7-3 所示。
表 7-3。
用户项目矩阵
|用户标识
|
项目 1
|
项目 2
|
项目 3
|
项目 4
|
项目 5
|
项目 n
|
| --- | --- | --- | --- | --- | --- | --- |
| 14SD | one | four | | | five | |
| 26BB | | three | three | | | one |
| 24DG | one | four | one | | five | Two |
| 59YU | | Two | | | five | |
| 26DF | one | four | | three | three | |
假设我们总共有五个用户,我们想找到离活动用户最近的两个邻居(14SD)。Jaccard 的相似性可以通过使用
是(x,y)= | rx’ry |/| rx’ry |
因此,这是任意两个用户共同评价的项目数除以两个用户评价的项目总数:
sim(用户 1,用户 2) = 1 / 5 = 0.2,因为他们只对项目 2 进行了共同评价)。
其余四个用户与活跃用户的相似性得分将类似于表 7-4 所示。
表 7-4。
用户相似性得分
|用户标识
|
相似性得分
|
| --- | --- |
| 14SD | one |
| 26BB | Zero point two |
| 24DG | Zero point six |
| 59YU | Zero point six seven seven |
| 26DF | Zero point seven five |
因此,根据 Jaccard 相似性,前两个最近的邻居是第四和第五用户。不过,这种方法有一个主要问题,因为 Jaccard 相似性在计算相似性得分时不考虑反馈值,而只考虑已评级的常见项目。因此,可能存在这样一种可能性,即用户可能对许多项目进行了共同评级,但一个人可能对它们进行了高评级,而另一个人可能对它们进行了低评级。Jaccard 相似性得分仍然可能以两个用户的高分结束,这是违反直觉的。在上面的例子中,很明显,活动用户与第三用户(24DG)最相似,因为他们对三个常见项目具有完全相同的评级,而第三用户甚至没有出现在前两个最近的邻居中。因此,我们可以选择其他度量来计算 k-最近邻。
缺少值
用户项目矩阵将包含许多缺失值,原因很简单,因为有许多项目,并且不是每个用户都与每个项目交互。有几种方法可以处理 UI 矩阵中缺失的值。
-
用 0 替换丢失的值。
-
用用户的平均评分替换缺失值。
对共同项目的评级越相似,邻居离活跃用户越近。同样,有两类基于最近邻的 CF
-
基于用户的 CF
-
基于项目的 CF
这两个 RS 之间的唯一区别是,在基于用户的 CF 中,我们找到 k 个最近的用户,而在基于项目的 CF 中,我们找到 k 个最近的项目推荐给用户。我们将看到推荐在基于用户的 RS 中是如何工作的。
顾名思义,在基于用户的 CF 中,整个思路就是找到与活跃用户最相似的用户,将相似用户已经购买/评价很高的商品推荐给活跃用户,而这些商品是他还没有看过/买过/试过的。这种 RS 的假设是,如果两个或更多的用户对一堆项目有相同的意见,那么他们很可能对其他项目也有相同的意见。让我们看一个例子来理解基于用户的协同过滤:有三个用户,我们想向其中的活跃用户推荐一个新项目。其余两个用户是与活动用户在项目的好恶方面的前两个最近邻居,如图 7-1 所示。
图 7-1
活跃用户和最近邻居
这三个用户都对某个特定的相机品牌给予了很高的评价,根据图 7-2 所示的相似性得分,前两个用户是与活跃用户最相似的用户。
图 7-2
所有用户都喜欢一个项目
现在,前两个用户对另一个项目(Xbox 360)的评价也非常高,第三个用户尚未与之互动,也没有看到如图 7-3 所示。使用该信息,我们试图预测活动用户将给予新项目(XBOX 360)的评级,这也是该特定项目(Xbox 360)的最近邻居的评级的加权平均值。
图 7-3
最近的邻居也喜欢另一个项目
然后,基于用户的 CF 向活跃用户推荐另一个项目(XBOX 360 ),因为他最有可能对该项目评价较高,因为最近的邻居也对该项目评价较高,如图 7-4 所示。
图 7-4
主动用户推荐
基于潜在因素的 CF
这种协作过滤也使用用户项目矩阵,但是不是寻找最近的邻居和预测评级,而是试图将 UI 矩阵分解成两个潜在的因素矩阵。潜在因素是从原始值导出的值。它们与观察到的变量有着内在的联系。这些新矩阵的秩要低得多,并且包含潜在的因素。这也称为矩阵分解。我们举个例子来理解矩阵因式分解的过程。我们可以将秩为 r 的 m×n 大小的矩阵“A”分解成两个更小的秩矩阵 X,Y,使得 X 和 Y 的点积产生原始的 A 矩阵。如果我们有表 7-5 所示的矩阵 A,
表 7-5。
潜在因素计算
|one
|
Two
|
three
|
five
|
| --- | --- | --- | --- |
| Two | four | eight | Twelve |
| three | six | seven | Thirteen |
然后我们可以把所有的列值写成第一列和第三列(A1 和 A3)的线性组合。
A1 = 1 * A1 + 0 * A3
A2 = 2 * A1 + 0 * A3
A3 = 0 * A1 + 1 * A3
A4 = 2 * A1 + 1 * A3
现在,我们可以创建两个小秩矩阵,使得这两个矩阵的乘积返回原始矩阵 a。
X =
|one
|
three
|
| --- | --- |
| Two | eight |
| three | seven |
Y =
|one
|
Two
|
Zero
|
Two
|
| --- | --- | --- | --- |
| Zero | Zero | one | one |
x 包含 A1 和 A3 的列值,Y 包含线性组合的系数。
X 和 Y 之间的点积返回到矩阵‘A’(原始矩阵)
考虑到表 7-2 中所示的相同用户项目矩阵,我们将其因式分解或分解成两个更小的秩矩阵。
-
用户潜在因素矩阵
-
项目潜在因素矩阵
用户潜在因素矩阵包含映射到这些潜在因素的所有用户,类似地,项目潜在因素矩阵包含映射到每个潜在因素的列中的所有项目。寻找这些潜在因素的过程是使用机器学习优化技术来完成的,例如交替最小二乘法。用户项目矩阵被分解成潜在因素矩阵,使得用户对任何项目的评级是用户潜在因素值和项目潜在因素值之间的乘积。主要目标是最小化整个用户项目矩阵评级和预测项目评级的误差平方和。例如,第二用户(26BB)对项目 2 的预测评级将是
评级(用户 2,项目 2)= 1
在每一个预测的收视率上都会有一定量的误差,因此成本函数变成了预测收视率和实际收视率之间的误差平方和。训练推荐模型包括以这样一种方式学习这些潜在因素,即它最小化总体评级的 SSE。我们可以用 ALS 方法找到最低的 SSE。ALS 的工作方式是首先固定用户潜在因素值,并尝试改变项目潜在因素值,以使总体 SSE 降低。在下一步中,项目潜在因素值保持固定,并且用户潜在因素值被更新以进一步降低 SSE。这在用户矩阵和项目矩阵之间保持交替,直到 SSE 不再减少。
优点:
-
不需要项目的内容信息,并且可以基于有价值的用户项目交互来进行推荐。
-
基于其他用户的个性化体验。
限制:
-
冷启动问题:如果用户没有物品交互的历史数据。则 RC 不能预测新用户的 k 个最近邻居,并且不能做出推荐。
-
缺失值:由于项目数量庞大,很少有用户与所有项目进行交互,因此有些项目从未被用户评级,也无法推荐。
-
无法推荐新的或未分级的项目:如果项目是新的且尚未被用户看到,则在其他用户与之交互之前,无法向现有用户推荐该项目。
-
准确性差:它不能很好地执行,因为许多组件都在不断变化,如用户的兴趣,有限的商品保质期,以及很少的商品评级。
混合推荐系统
顾名思义,混合推荐系统包括来自多个推荐系统的输入,这使得它在向用户提供有意义的推荐方面更加强大和相关。正如我们所看到的,使用个人简历有一些限制,但结合起来,他们克服了这些限制,因此能够推荐用户认为更有用和个性化的项目或信息。混合 RS 可以通过特定的方式构建,以满足业务需求。其中一种方法是构建单个 RS,并将多个 RS 输出的建议组合起来,然后推荐给用户,如图 7-5 所示。
图 7-5
综合建议
另一种方法是利用基于内容的推荐器的优势,并将它们用作基于协同过滤的推荐的输入,以向用户提供更好的推荐。这种方法也可以反过来,协同过滤可以用作基于内容的推荐的输入,如图 7-6 所示。
图 7-6
混合建议
混合推荐还包括使用其他类型的推荐,例如基于人口统计的和基于知识的,以增强其推荐的性能。混合 RS 已经成为各种业务不可或缺的一部分,帮助他们的用户消费正确的内容,因此获得了很多价值。
密码
本章的这一节重点介绍使用 PySpark 和 Jupyter Notebook 中的 ALS 方法从头构建 RS。
注意
完整的数据集和代码可以在本书的 GitHub repo 上参考,在 Spark 2.0 和更高版本上执行得最好。
让我们使用 Spark 的 MLlib 库构建一个推荐器模型,并预测任何给定用户对某个项目的评分。
数据信息
我们将在本章使用的数据集是一个著名的开源电影镜头数据集的子集,包含总共 10 万条记录,有三列(User_Id、title、rating)。我们将使用 75%的数据来训练我们的推荐模型,并在剩余的 25%用户评级上测试它。
步骤 1:创建 SparkSession 对象
我们启动 Jupyter 笔记本并导入 SparkSession,然后创建一个新的 SparkSession 对象来使用 Spark:
[In]: from pyspark.sql import SparkSession
[In]: spark=SparkSession.builder.appName('lin_reg').getOrCreate()
步骤 2:读取数据集
然后,我们使用 dataframe 在 Spark 中加载和读取数据集。我们必须确保我们已经从数据集可用的同一个目录文件夹中打开了 PySpark,否则我们必须提到数据文件夹的目录路径。
[In]:
df=spark.read.csv('movie_ratings_df.csv',inferSchema=True,header=True)
步骤 3:探索性数据分析
在本节中,我们将通过查看数据集、验证数据集的形状,以及获得已评级的电影数量和每个用户已评级的电影数量来研究数据集。
[In]: print((df.count(), len(df.columns)))
[Out]: (100000,3)
因此,上面的输出确认了数据集的大小,然后我们可以验证输入值的数据类型,以检查我们是否需要更改/转换任何列的数据类型。
[In]: df.printSchema()
[Out]: root
|-- userId: integer (nullable = true)
|-- title: string (nullable = true)
|-- rating: integer (nullable = true)
总共有三列,其中两列是数字,标题是分类。使用 PySpark 构建 RS 的关键是我们需要数字形式的 user_id 和 item_id。因此,我们稍后会将电影标题转换成数值。我们现在使用 rand 函数来查看数据帧的几行,以随机顺序打乱记录。
[In]: df.orderBy(rand()).show(10,False)
[Out]:
[In]: df.groupBy('userId').count().orderBy('count',ascending=False).show(10,False)
[Out]:
[In]: df.groupBy('userId').count().orderBy('count',ascending=True).show(10,False)
[Out]:
记录数最高的用户评价了 737 部电影,每个用户至少评价了 20 部电影。
[In]: df.groupBy('title').count().orderBy('count',ascending=False).show(10,False)
[Out]:
收视率最高的电影是星球大战 (1977),被评为 583 次,每部电影至少有 1 名用户评为。
步骤 4:特征工程
我们现在使用 StringIndexer 将电影标题列从分类值转换为数值。我们从 PySpark 库中导入 stringIndexer 和 Indextostring。
[In]: from pyspark.sql.functions import *
[In]: from pyspark.ml.feature import StringIndexer,IndexToString
接下来,我们通过提到输入列和输出列来创建 stringindexer 对象。然后,我们将对象放在数据帧上,并将其应用于电影标题列,以创建带有数值的新数据帧。
[In]: stringIndexer = StringIndexer(inputCol="title", outputCol="title_new")
[In]: model = stringIndexer.fit(df)
[In]: indexed = model.transform(df)
让我们通过查看新数据帧的几行来验证标题列的数值。
[In]: indexed.show(10)
[Out]:
正如我们所看到的,我们现在有了一个额外的列(title_new ),用数值表示电影标题。如果 user_id 也是分类类型,我们必须重复相同的过程。为了验证电影计数,我们在一个新的数据帧上重新运行 groupBy。
[In]: indexed.groupBy('title_new').count().orderBy('count',ascending=False).show(10,False)
[Out]:
步骤 5:拆分数据集
既然我们已经为构建推荐器模型准备了数据,我们可以将数据集分成训练集和测试集。我们将其分成 75 比 25 的比例来训练模型并测试其准确性。
[In]: train,test=indexed.randomSplit([0.75,0.25])
[In]: train.count()
[Out]: 75104
[In]: test.count()
[Out]: 24876
步骤 6:建立和训练推荐模型
我们从 PySpark ml 库中导入 ALS 函数,并在训练数据集上构建模型。可以调整多个超参数来提高模型的性能。其中两个重要的是 non negative =“True”不会在推荐中产生负面评级,cold start strategy =“drop”会阻止任何 NaN 评级预测。
[In]: from pyspark.ml.recommendation import ALS
[In]: rec=ALS(maxIter=10,regParam=0.01,userCol='userId',itemCol='title_new',ratingCol='rating',nonnegative=True,coldStartStrategy="drop")
[In]: rec_model=rec.fit(train)
步骤 7:对测试数据的预测和评估
整个练习的最后一部分是检查模型在未知或测试数据上的性能。我们使用 transform 函数对测试数据进行预测,并使用 RegressionEvaluate 来检查模型对测试数据的 RMSE 值。
[In]: predicted_ratings=rec_model.transform(test)
[In]: predicted_ratings.printSchema()
root
|-- userId: integer (nullable = true)
|-- title: string (nullable = true)
|-- rating: integer (nullable = true)
|-- title_new: double (nullable = false)
|-- prediction: float (nullable = false)
[In]: predicted_ratings.orderBy(rand()).show(10)
[Out]:
[xIn]: from pyspark.ml.evaluation import RegressionEvaluator
[In]: evaluator=RegressionEvaluator(metricName='rmse',predictionCol='prediction',labelCol='rating')
[In]: rmse=evaluator.evaluate(predictions)
[In] : print(rmse)
[Out]: 1.0293574739493354
RMSE 不是很高;我们在实际评分和预测评分上犯了一个点的错误。这可以通过调整模型参数和使用混合方法来进一步改善。
第八步:推荐活跃用户可能喜欢的热门电影
在检查了模型的性能并调整了超参数之后,我们可以继续向用户推荐他们没有看过但可能喜欢的顶级电影。第一步是在数据帧中创建独特电影的列表。
[In]: unique_movies=indexed.select('title_new').distinct()
[In]: unique_movies.count()
[Out]: 1664
所以,我们总共有 1664 部不同的电影。
[In]: a = unique_movies.alias('a')
我们可以在数据集中选择任何需要推荐其他电影的用户。在我们的例子中,我们使用 userId = 85。
[In]: user_id=85
我们将过滤该活跃用户已经评级或看过的电影。
[In]: watched_movies=indexed.filter(indexed['userId'] == user_id).select('title_new').distinct()
[In]: watched_movies.count()
[Out]: 287
[In]: b=watched_movies.alias('b')
因此,在该活跃用户已经评级的 1,664 部电影中,总共有 287 部独特的电影。因此,我们想从剩余的 1377 个项目中推荐电影。我们现在合并这两个表,通过从连接的表中过滤空值来查找我们可以推荐的电影。
[In]: total_movies = a.join(b, a.title_new == b.title_new,how='left')
[In]: total_movies.show(10,False)
[Out]:
[In]: remaining_movies=total_movies.where(col("b.title_new").isNull()).select(a.title_new).distinct()
[In]: remaining_movies.count()
[Out]: 1377
[In]: remaining_movies=remaining_movies.withColumn("userId",lit(int(user_id)))
[In]: remaining_movies.show(10,False)
[Out]:
最后,我们现在可以使用我们之前构建的推荐者模型,对活跃用户的剩余电影数据集进行预测。我们只过滤几个预测评级最高的推荐。
[In]: recommendations=rec_model.transform(remaining_movies).orderBy('prediction',ascending=False)
[In]: recommendations.show(5,False)
[Out]:
因此,电影标题 1433 和 1322 对于该活动用户具有最高的预测评级(85)。我们可以通过将电影标题重新添加到推荐中来使其更加直观。我们使用 Indextostring 函数创建一个额外的列来返回电影标题。
[In]:
movie_title = IndexToString(inputCol="title_new", outputCol="title",labels=model.labels)
[In]: final_recommendations=movie_title.transform(recommendations)
[In]: final_recommendations.show(10,False)
[Out]:
所以,对 userId (85)的推荐是 Boys,Les (1997)和 F aust (1994)。这可以很好地封装在一个函数中,该函数依次执行上述步骤,并为活跃用户生成推荐。GitHub repo 上有完整的代码,内置了这个函数。
结论
在本章中,我们讨论了各种类型的推荐模型,以及每种模型的优点和局限性。然后,我们在 PySpark 中创建了一个基于协同过滤的推荐系统,使用 ALS 方法向用户推荐电影。
八、聚类
到目前为止,在前面的章节中,我们已经看到了有监督的机器学习,其中目标变量或标签是我们已知的,我们试图根据输入特征来预测输出。无监督学习在某种意义上是不同的,因为没有标记的数据,我们不试图预测任何输出;相反,我们试图找到有趣的模式,并在数据中找出组。相似的值被分组在一起。
当我们加入一所新的学校或大学,我们会遇到许多新面孔,每个人看起来都不一样。我们几乎不认识研究所里的任何人,最初也没有成立小组。慢慢地,我们开始花时间和其他人在一起,团体开始发展。我们与许多不同的人打交道,并弄清楚他们与我们有多相似和不相似。几个月后,我们几乎在自己的朋友群中安顿下来。群体中的朋友/成员具有相似的属性/爱好/品味,因此会呆在一起。聚类有点类似于这种基于定义组的属性集来形成组的方法。
从集群开始
我们可以对任何类型的数据进行聚类,形成相似的观察结果组,并使用它来做出更好的决策。在早期,客户细分通常是通过基于规则的方法来完成的,这需要大量的人工工作,并且只能使用有限数量的变量。例如,如果企业想要进行客户细分,他们会考虑多达 10 个变量,如年龄、性别、工资、地点等。,并创建仍能提供合理性能的基于规则的细分市场;但在今天的情况下,这将变得非常无效。一个原因是数据可用性丰富,另一个原因是动态的客户行为。有成千上万的其他变量可以被认为是产生这些机器学习驱动的片段,这些片段更加丰富和有意义。
当我们开始聚类时,每个观察都是不同的,不属于任何组,而是基于每个观察的属性有多相似。我们以这样的方式对它们进行分组,即每组包含最相似的记录,并且任意两组之间存在尽可能多的差异。那么,我们如何衡量两个观察值是相似还是不同呢?
有多种方法可以计算任意两个观测值之间的距离。首先,我们认为任何观测值都是一种向量形式,包含如下所示的观测值(A)。
|年龄
|
工资(万美元)
|
重量(千克)
|
高度(英尺。)
|
| --- | --- | --- | --- |
| Thirty-two | eight | Sixty-five | six |
现在,假设我们想要计算这个观察/记录与任何其他观察(B)的距离,其他观察(B)也包含类似的属性,如下所示。
|年龄
|
工资(万美元)
|
重量(千克)
|
高度(英尺。)
|
| --- | --- | --- | --- |
| Forty | Fifteen | Ninety | five |
我们可以用欧几里得方法来测量距离,这很简单。
它也被称为笛卡尔距离。我们试图计算任意两点间直线的距离;如果这些点之间的距离很小,它们更可能是相似的,而如果距离很大,它们彼此不相似,如图 8-1 所示。
图 8-1
基于欧氏距离的相似性
任意两点之间的欧几里德距离可以使用下面的公式计算:
Dist ( A , B ) = 27.18
因此,观察值 A 和 B 之间的欧几里德距离是 27.18。计算观测值之间距离的其他技术如下:
-
曼哈顿距离
-
马哈拉诺比斯距离
-
闵可夫斯基距离
-
切比雪夫距离
-
余弦距离
聚类的目标是具有最小的簇内距离和最大的簇间差异。基于我们用来进行聚类的距离方法,我们可能会得到不同的组,因此,确保选择与业务问题相一致的正确距离度量是至关重要的。在研究不同的集群技术之前,让我们快速回顾一下集群的一些应用。
应用程序
如今,聚类被用于从客户细分到异常检测的各种用例中。企业广泛使用机器学习驱动的聚类来分析客户和细分,以围绕这些结果创建市场战略。聚类通过在一个聚类中找到相似的对象和彼此远离的不相似的对象来驱动大量搜索引擎的结果。它基于搜索查询推荐最接近的相似结果
基于数据类型和业务需求,可以通过多种方式进行集群。最常用的是 K-means 和层次聚类。
k 均值
k’代表我们想要在给定数据集中形成的聚类或组的数量。这种类型的聚类包括预先决定聚类的数量。在研究 K-means 聚类如何工作之前,让我们先熟悉几个术语。
-
图心
-
变化
质心是指在一个簇或组的中心的中心数据点。它也是聚类中最具代表性的点,因为它是与聚类中其他点距离最远的数据点。图 8-2 显示了三个随机集群的质心(用十字表示)。
图 8-2
星团的质心
每个聚类或组包含不同数量的最接近聚类质心的数据点。一旦单个数据点改变聚类,聚类的质心值也会改变。改变组内的中心位置,产生新的质心,如图 8-3 所示。
图 8-3
新群的新质心
聚类的整体思想是最小化类内距离,即数据点与类的质心的内部距离,并最大化类间距离,即两个不同类的质心之间的距离。
方差是该聚类内质心和数据点之间的聚类内距离的总和,如图 8-4 所示。方差随着聚类数的增加而不断减小。聚类越多,每个聚类中的数据点数量就越少,因此可变性就越小。
图 8-4
在调整距离中
K-means 聚类总共由四个步骤组成,以形成数据集中的内部组。我们将考虑一个样本数据集来理解 K 均值聚类算法是如何工作的。数据集包含一些用户,他们的年龄和体重值如表 8-1 所示。现在我们将使用 K-means 聚类来得出有意义的聚类并理解算法。
表 8-1
K 均值的样本数据集
|用户标识
|
年龄
|
重量
|
| --- | --- | --- |
| one | Eighteen | Eighty |
| Two | Forty | Sixty |
| three | Thirty-five | One hundred |
| four | Twenty | Forty-five |
| five | Forty-five | One hundred and twenty |
| six | Thirty-two | Sixty-five |
| seven | Seventeen | Fifty |
| eight | Fifty-five | Fifty-five |
| nine | Sixty | Ninety |
| Ten | Ninety | Fifty |
如果我们在二维空间中绘制这些用户,我们可以看到最初没有点属于任何组,我们的目的是在这组用户中找到聚类(我们可以尝试两个或三个),使得每个组包含相似的用户。每个用户由年龄和体重表示,如图 8-5 所示。
图 8-5
聚类前的用户
第一步:决定 K
它从决定簇的数量(K)开始。大多数情况下,我们在开始时不能确定正确的组数,但是我们可以使用一种基于可变性的称为肘方法的方法来找到最佳的组数。对于这个例子,为了简单起见,让我们从 K=2 开始。因此,我们在这个样本数据中寻找两个集群。
步骤 2:随机初始化质心
下一步是随机将任意两个点视为新聚类的质心。这些可以随机选择,所以我们选择用户号 5 和用户号 10 作为新群集中的两个质心,如表 8-2 所示。
表 8-2
K 均值的样本数据集
|用户标识
|
年龄
|
重量
|
| --- | --- | --- |
| one | Eighteen | Eighty |
| Two | Forty | Sixty |
| three | Thirty-five | One hundred |
| four | Twenty | Forty-five |
| 5(质心 1) | Forty-five | One hundred and twenty |
| six | Thirty-two | Sixty-five |
| seven | Seventeen | Fifty |
| eight | Fifty-five | Fifty-five |
| nine | Sixty | Ninety |
| 10(质心 2) | Ninety | Fifty |
质心可以用重量和年龄值来表示,如图 8-6 所示。
图 8-6
两个簇的随机质心
步骤 3:为每个值分配分类号
在这一步,我们计算每个点到质心的距离。在这个例子中,我们计算每个用户到两个质心点的欧几里德平方距离。根据距离值,我们继续决定用户属于哪个特定的集群(1 或 2)。无论用户靠近哪个质心(距离更小),都将成为该聚类的一部分。为每个用户计算欧几里得平方距离,如表 8-3 所示。用户 5 和用户 10 到各自质心的距离为零,因为它们是与质心相同的点。
表 8-3
基于距质心距离的聚类分配
|用户标识
|
年龄
|
重量
|
质心 1 的 ED*
|
质心 2 的 ED*
|
串
|
| --- | --- | --- | --- | --- | --- |
| one | Eighteen | Eighty | Forty-eight | seventy-eight | one |
| Two | Forty | Sixty | Sixty | Fifty-one | Two |
| three | Thirty-five | One hundred | Twenty-two | Seventy-four | one |
| four | Twenty | Forty-five | Seventy-nine | Seventy | Two |
| five | Forty-five | One hundred and twenty | Zero | Eighty-three | one |
| six | Thirty-two | Sixty-five | Fifty-seven | Sixty | one |
| seven | Seventeen | Fifty | Seventy-five | Seventy-three | Two |
| eight | Fifty-five | Fifty-five | Sixty-six | Thirty-five | Two |
| nine | Sixty | Ninety | Thirty-four | Fifty | one |
| Ten | Ninety | Fifty | Eighty-three | Zero | Two |
| (*欧几里德距离) |
因此,根据距质心的距离,我们将每个用户分配到簇 1 或簇 2。集群 1 包含五个用户,集群 2 也包含五个用户。初始集群如图 8-7 所示。
图 8-7
初始聚类和质心
如前所述,在聚类中包含或排除新的数据点之后,聚类的质心必然会改变。由于早期的质心(C1,C2)不再位于簇的中心,我们在下一步计算新的质心。
步骤 4:计算新的质心和重新分配集群
K-means 聚类的最后一步是计算聚类的新质心,并根据与新质心的距离将聚类重新分配给每个值。让我们计算群集 1 和群集 2 的新质心。为了计算聚类 1 的质心,我们只需取那些属于聚类 1 的值的年龄和体重的平均值,如表 8-4 所示。
表 8-4
聚类 1 的新质心计算
|用户标识
|
年龄
|
重量
|
| --- | --- | --- |
| one | Eighteen | Eighty |
| three | Thirty-five | One hundred |
| five | Forty-five | One hundred and twenty |
| six | Thirty-two | Sixty-five |
| nine | Sixty | Ninety |
| 平均值 | 38 | 91 |
群集 2 的质心计算也以类似的方式完成,如表 8-5 所示。
表 8-5
群集 2 的新质心计算
|用户标识
|
年龄
|
重量
|
| --- | --- | --- |
| Two | Forty | Sixty |
| four | Twenty | Forty-five |
| seven | Seventeen | Fifty |
| eight | Fifty-five | Fifty-five |
| Ten | Ninety | Fifty |
| 平均值 | 44.4 | 52 |
现在我们有了每个聚类的新质心值,用十字表示,如图 8-8 所示。箭头表示质心在群集内的移动。
图 8-8
两个星团的新质心
对于每个簇的质心,我们重复步骤 3,计算每个用户与新质心的欧几里德平方距离,并找出最近的质心。然后,我们根据离质心的距离将用户重新分配到集群 1 或集群 2。在这种情况下,只有一个值(用户 6)将其集群从 1 更改为 2,如表 8-6 所示。
表 8-6
集群的重新分配
|用户标识
|
年龄
|
重量
|
质心 1 的 ED*
|
质心 2 的 ED*
|
串
|
| --- | --- | --- | --- | --- | --- |
| one | Eighteen | Eighty | Twenty-three | Thirty-eight | one |
| Two | Forty | Sixty | Thirty-one | nine | Two |
| three | Thirty-five | One hundred | nine | forty-nine | one |
| four | Twenty | Forty-five | forty-nine | Twenty-five | Two |
| five | Forty-five | One hundred and twenty | Thirty | sixty-eight | one |
| six | Thirty-two | Sixty-five | Twenty-seven | Eighteen | Two |
| seven | Seventeen | Fifty | Forty-six | Twenty-seven | Two |
| eight | Fifty-five | Fifty-five | Forty | Eleven | Two |
| nine | Sixty | Ninety | Twenty-two | Forty-one | one |
| Ten | Ninety | Fifty | Sixty-six | Forty-six | Two |
现在,根据到每个集群质心的距离,集群 1 只剩下四个用户,集群 2 包含六个用户,如图 8-9 所示。
图 8-9
集群的重新分配
我们不断重复上述步骤,直到集群分配不再有变化。新星团的质心如表 8-7 所示。
表 8-7
质心的计算
|用户标识
|
年龄
|
重量
|
| --- | --- | --- |
| one | Eighteen | Eighty |
| three | Thirty-five | One hundred |
| five | Forty-five | One hundred and twenty |
| nine | Sixty | Ninety |
| 平均值 | Thirty-nine point five | Ninety-seven point five |
用户标识
|
年龄
|
重量
|
| --- | --- | --- |
| Two | Forty | Sixty |
| four | Twenty | Forty-five |
| six | Thirty-two | Sixty-five |
| seven | Seventeen | Fifty |
| eight | Fifty-five | Fifty-five |
| Ten | Ninety | Fifty |
| 平均值 | Forty-two point three three | Fifty-four point one seven |
当我们执行这些步骤时,质心的移动变得越来越小,并且这些值几乎成为特定聚类的一部分,如图 8-10 所示。
图 8-10
集群的重新分配
正如我们所观察到的,即使在质心发生变化之后,点也不再发生变化,这就完成了 K-means 聚类。结果可能会有所不同,因为它是基于第一组随机质心。为了重现结果,我们也可以自己设置起点。具有值的最终聚类如图 8-11 所示。
图 8-11
最终聚类
聚类 1 包含在身高属性上处于平均水平但在体重变量上似乎非常高的用户,而聚类 2 似乎将那些高于平均水平但非常在意自己体重的用户分组在一起,如图 8-12 所示。
图 8-12
最终聚类的属性
决定聚类数(K)
大多数时候,选择最佳数量的集群是相当棘手的,因为我们需要对数据集和业务问题的背景有深刻的理解。此外,当谈到无监督学习时,没有正确或错误的答案。与另一种方法相比,一种方法可能会产生不同数量的簇。我们必须试着找出哪种方法效果最好,以及创建的集群是否与决策足够相关。每个聚类可以用几个重要的属性来表示,这些属性表示或给出关于该特定聚类的信息。但是,有一种方法可以选择数据集的最佳聚类数。这种方法被称为肘法。
肘方法有助于我们测量大量聚类数据的总方差。聚类数量越多,方差就越小。如果我们的聚类数与数据集中的记录数相等,那么可变性将为零,因为每个点与其自身的距离为零。可变性或 SSE(误差平方和)以及“K”值如图 8-13 所示。
图 8-13
肘法
正如我们所观察到的,在 K 值为 3 和 4 之间有一种肘形结构。总方差(组内差异)会突然减少,之后方差会缓慢下降。事实上,它在 K=9 值之后变平。因此,如果我们使用肘方法,K =3 的值是最有意义的,因为它可以用较少的聚类数量捕获最多的可变性。
分层聚类
这是另一种类型的无监督机器学习技术,它不同于 K-means,因为我们不需要事先知道聚类的数量。有两种类型的分层聚类。
-
聚集聚类(自下而上的方法)
-
分裂聚类(自上而下的方法)
我们将讨论凝聚聚类,因为它是主要类型。首先假设每个数据点都是一个独立的聚类,然后逐渐将最近的值组合到相同的聚类中,直到所有的值都成为一个聚类的一部分。这是一种自底向上的方法,它计算每个聚类之间的距离,并将两个最接近的聚类合并为一个。让我们借助可视化来理解凝聚聚类。假设我们最初有七个数据点(A1-A7),需要使用聚集聚类将它们分组到包含相似值的簇中,如图 8-14 所示。
图 8-14
每个值作为一个单独的集群
在初始阶段(步骤 1),每个点被视为一个单独的聚类。在下一步中,计算每个点之间的距离,并将最近的点组合成单个聚类。在本例中,A1 和 A2、A5 和 A6 彼此距离最近,因此形成如图 8-15 所示的单个集群。
图 8-15
最近的聚类合并在一起
在使用层次聚类时,可以通过多种方式来确定最佳聚类数。一种方法是使用 elbow 方法本身,另一种方法是使用一种叫做树状图的东西。它用于可视化聚类之间的可变性(欧几里德距离)。在树状图中,垂直线的高度代表点或簇与底部列出的数据点之间的距离。每个点绘制在 X 轴上,距离表示在 Y 轴上(长度)。它是数据点的分层表示。在本例中,第 2 步的树状图如图 8-16 所示。
图 8-16
系统树图
在步骤 3 中,重复计算聚类之间的距离的练习,并将最近的聚类组合成单个聚类。这次 A3 与(A1,A2)合并,A4 与(A5,A6)合并,如图 8-17 所示。
图 8-17
最近的聚类合并在一起
第三步后的树状图如图 8-18 所示。
图 8-18
步骤 3 后的树状图
在步骤 4 中,计算唯一剩余的点 A7 之间的距离,并发现其更靠近群集(A4,A5,A6)。它与图 8-19 所示的同一个集群合并。
图 8-19
簇状构造
在最后一个阶段(步骤 5),所有点被组合成一个单独的簇(A1,A2,A3,A4,A5,A6,A7),如图 8-20 所示。
图 8-20
凝聚聚类
有时很难通过树状图确定正确的聚类数,因为它可能变得非常复杂,并且很难根据用于聚类的数据集进行解释。与 K-means 相比,层次聚类在大型数据集上效果不佳。聚类对数据点的规模也非常敏感,因此总是建议在聚类之前进行数据缩放。还有其他类型的聚类可用于将相似的数据点分组在一起,如下所示:
-
高斯混合模型聚类
-
模糊 C 均值聚类
但以上方法不在本书讨论范围之内。我们现在开始使用 PySpark 中的 K-means 数据集来构建集群。
密码
本章的这一节将介绍使用 PySpark 和 Jupyter Notebook 进行 K-Means 聚类。
注意
完整的数据集和代码可以在本书的 GitHub repo 上参考,在 Spark 2.0 和更高版本上执行得最好。
在本练习中,我们考虑最标准化的开源数据集 IRIS 数据集,用于捕获聚类数并比较监督和非监督性能。
数据信息
我们将在本章使用的数据集是著名的开源 IRIS 数据集,包含总共 150 条记录,共 5 列(萼片长度、萼片宽度、花瓣长度、花瓣宽度、物种)。每种类型有 50 个记录。我们将尝试在不使用物种标签信息的情况下,将这些物种分组。
步骤 1:创建 SparkSession 对象
我们启动 Jupyter Notebook 并导入 SparkSession,然后创建一个新的 SparkSession 对象来使用 Spark:
[In]: from pyspark.sql import SparkSession
[In]: spark=SparkSession.builder.appName('K_means').getOrCreate()
步骤 2:读取数据集
然后,我们使用 dataframe 在 Spark 中加载和读取数据集。我们必须确保我们已经从数据集可用的同一个目录文件夹中打开了 PySpark,否则我们必须提到数据文件夹的目录路径。
[In]:
df=spark.read.csv('iris_dataset.csv',inferSchema=True,header=True)
步骤 3:探索性数据分析
在本节中,我们通过查看数据集并验证其形状来探索数据集。
[In]:print((df.count(), len(df.columns)))
[Out]: (150,3)
因此,上面的输出确认了数据集的大小,然后我们可以验证输入值的数据类型,以检查我们是否需要更改/转换任何列的数据类型。
[In]: df.printSchema()
[Out]: root
|-- sepal_length: double (nullable = true)
|-- sepal_width: double (nullable = true)
|-- petal_length: double (nullable = true)
|-- petal_width: double (nullable = true)
|-- species: string (nullable = true)
总共有五列,其中四列是数字列,标签列是分类列。
[In]: from pyspark.sql.functions import rand
[In]: df.orderBy(rand()).show(10,False)
[Out]:
+------------+-----------+------------+-----------+----------+
|sepal_length|sepal_width|petal_length|petal_width|species |
+------------+-----------+------------+-----------+----------+
|5.5 |2.6 |4.4 |1.2 |versicolor|
|4.5 |2.3 |1.3 |0.3 |setosa |
|5.1 |3.7 |1.5 |0.4 |setosa |
|7.7 |3.0 |6.1 |2.3 |virginica |
|5.5 |2.5 |4.0 |1.3 |versicolor|
|6.3 |2.3 |4.4 |1.3 |versicolor|
|6.2 |2.9 |4.3 |1.3 |versicolor|
|6.3 |2.5 |4.9 |1.5 |versicolor|
|4.7 |3.2 |1.3 |0.2 |setosa |
|6.1 |2.8 |4.0 |1.3 |versicolor|
+------------+-----------+------------+-----------+----------+
[In]: df.groupBy('species').count().orderBy('count').show(10,False)
[Out]:
+----------+-----+
|species |count|
+----------+-----+
|virginica |50 |
|setosa |50 |
|versicolor|50 |
+----------+-----+
因此,它确认了数据集中每个物种都有相同数量的记录
步骤 4:特征工程
这是我们使用 Spark 的 VectorAssembler 创建一个组合所有输入特征的单一向量的部分。它仅创建一个要素来捕获该特定行的输入值。因此,不是四个输入列(我们不考虑标签列,因为它是一种无监督的机器学习技术),它本质上是以列表的形式将它转换为具有四个输入值的单个列。
[In]: from pyspark.ml.linalg import Vector
[In]: from pyspark.ml.feature import VectorAssembler
[In]: input_cols=['sepal_length', 'sepal_width', 'petal_length', 'petal_width']
[In]: vec_assembler = VectorAssembler(inputCols = input_cols, outputCol="features")
[In]: final_data = vec_assembler.transform(df)
步骤 5:构建 K-Means 聚类模型
最终数据包含可用于运行 K 均值聚类的输入向量。由于我们需要在使用 K-means 之前预先声明' K '的值,所以我们可以使用 elbow 方法来计算出' K '的正确值。为了使用肘方法,我们对不同的“K”值运行 K 均值聚类。首先,我们从 PySpark 库中导入 K-means,并创建一个空列表来捕获 K 的每个值的可变性或 SSE(在聚类距离内)。
[In]:from pyspark.ml.clustering import KMeans
[In]:errors=[]
[In]:
for k in range(2,10):
kmeans = KMeans(featuresCol='features',k=k)
model = kmeans.fit(final_data)
intra_distance = model.computeCost(final_data)
errors.append(intra_distance)
注意
“K”的最小值应该为 2,以便能够构建聚类。
现在,我们可以使用 numpy 和 matplotlib 绘制星团内距离与星团数量的关系。
[In]: import pandas as pd
[In]: import numpy as np
[In]: import matplotlib.pyplot as plt
[In]: cluster_number = range(2,10)
[In]: plt.xlabel('Number of Clusters (K)')
[In]: plt.ylabel('SSE')
[In]: plt.scatter(cluster_number,errors)
[In]: plt.show()
[Out]:
在这种情况下,k=3 似乎是最佳的聚类数,因为我们可以看到 3 到 4 个值之间的肘形结构。我们使用 k=3 构建最终的集群。
[In]: kmeans = KMeans(featuresCol='features',k=3)
[In]: model = kmeans.fit(final_data)
[In]: model.transform(final_data).groupBy('prediction').count().show()
[Out]:
+----------+-----+
|prediction|count|
+----------+-----+
| 1| 50|
| 2| 38|
| 0| 62|
+----------+-----+
基于 IRIS 数据集,K-Means 聚类为我们提供了三个不同的聚类。我们肯定是做了一些错误的分配,因为只有一个类别有 50 条记录,其余的类别都混在一起了。我们可以使用 transform 函数将分类编号分配给原始数据集,并使用 groupBy 函数来验证分组。
[In]: predictions=model.transform(final_data)
[In]: predictions.groupBy('species','prediction').count().show()
[Out]:
+----------+----------+-----+
| species|prediction|count|
+----------+----------+-----+
| virginica| 2| 14|
| setosa| 0| 50|
| virginica| 1| 36|
|versicolor| 1| 3|
|versicolor| 2| 47|
+----------+----------+-----+
正如可以观察到的,setosa 物种与 versicolor 完美地分组在一起,几乎被捕获在同一个簇中,但 verginica 似乎属于两个不同的组。K-means 每次都会产生不同的结果,因为它每次都随机选择起始点(质心)。因此,在 K-means 聚类中得到的结果可能与这些结果完全不同,除非我们使用种子来重现这些结果。种子确保分割和初始质心值在整个分析过程中保持一致。
步骤 6:集群的可视化
在最后一步,我们可以借助 Python 的 matplotlib 库来可视化新的集群。为此,我们首先将 Spark 数据帧转换成 Pandas 数据帧。
[In]: pandas_df = predictions.toPandas()
[In]: pandas_df.head()
我们导入所需的库来绘制第三个可视化并观察集群。
[In]: from mpl_toolkits.mplot3d import Axes3D
[In]: cluster_vis = plt.figure(figsize=(12,10)).gca(projection='3d')
[In]: cluster_vis.scatter(pandas_df.sepal_length, pandas_df.sepal_width, pandas_df.petal_length, c=pandas_df.prediction,depthshade=False)
[In]: plt.show()
结论
在这一章中,我们讨论了不同类型的无监督机器学习技术,并使用 PySpark 中的 K-means 算法构建了聚类。K-Means 使用随机质心初始化对数据点进行分组,而分层聚类侧重于将整个数据点合并到单个聚类中。我们还介绍了各种确定最佳聚类数的技术,如肘方法和树状图,它们在对数据点进行分组时使用方差优化。
九、自然语言处理
介绍
本章揭示了一些使用 PySpark 处理文本数据的基本技术。今天的文本形式的数据正在以闪电般的速度产生,多个社交媒体平台为用户提供了分享他们的意见、建议、评论等的选项。专注于让机器学习和理解文本数据以执行一些有用任务的领域被称为自然语言处理(NLP)。文本数据可以是结构化的,也可以是非结构化的,我们必须应用多个步骤来做好分析准备。NLP 已经为多种应用做出了巨大贡献。目前,NLP 的许多应用被企业大量使用,例如聊天机器人、语音识别、语言翻译、推荐系统、垃圾邮件检测和情感分析。本章演示了处理文本数据并对其应用机器学习算法的一系列步骤。它还展示了序列嵌入,可以作为传统的分类输入特征的替代。
NLP 中涉及的步骤
进行 NLP 分析没有正确的方法,因为人们可以探索多种方法并采用不同的方法来处理文本数据。然而,从机器学习的角度来看,要使文本数据为分析做好准备,需要五个主要步骤。NLP 中涉及的五个主要步骤是:
-
阅读文集
-
标记化
-
清除/停用字词删除
-
堵塞物
-
转换成数字形式
在进入加载和清理文本数据的步骤之前,让我们先熟悉一个叫做语料库的术语,因为它会在本章的其余部分不断出现。
文集
语料库被称为文本文档的全部集合。例如,假设我们有数千封电子邮件需要处理和分析。这组电子邮件被称为语料库,因为它包含所有的文本文档。文本处理的下一步是标记化。
标记化
将文本文档中的给定句子或单词集合分成单独的/个别的单词的方法被称为标记化。它删除了不必要的字符,如标点符号。例如,如果我们有这样一个句子:
输入:他真的很喜欢伦敦这座城市。他在那里还要呆两天。
令牌:
他真的很喜欢伦敦,他在那里呆了两天多
对于上面的输入句子,我们最终得到了 13 个标记。
让我们看看如何使用 PySpark 进行标记化。第一步是创建包含文本数据的 dataframe。
[In]: df=spark.createDataFrame([(1,'I really liked this movie'),
(2,'I would recommend this movie to my friends'),
(3,'movie was alright but acting was horrible'),
(4,'I am never watching that movie ever again')],
['user_id','review'])
[In]: df.show(4,False)
[Out]:
+-------+------------------------------------------+
|user_id|review |
+-------+------------------------------------------+
|1 |I really liked this movie |
|2 |I would recommend this movie to my friends|
|3 |movie was alright but acting was horrible |
|4 |I am never watching that movie ever again |
+-------+------------------------------------------+
在这个数据框架中,我们有四个句子用于标记化。下一步是从 Spark 库中导入 Tokenizer。然后,我们必须传递输入列,并在标记化之后命名输出列。我们使用 transform 函数来对 review 列应用标记化。
[In]: from pyspark.ml.feature import Tokenizer
[In]: tokenization=Tokenizer(inputCol='review',outputCol='tokens')
[In]: tokenized_df=tokenization.transform(df)
[In]: tokenized_df.show(4,False)
[Out]:
我们得到一个名为 tokens 的新列,其中包含每个句子的标记。
停用词移除
正如您所观察到的,tokens 列包含非常常见的单词,如“this”、“the”、“to”、“was”、“that”等。这些词被称为停用词,它们似乎对分析没有什么价值。如果在分析中使用它们,会增加计算开销,但不会增加太多价值或洞察力。因此,从标记中去掉这些停用词总是一个好主意。在 PySpark 中,我们使用停用词移除器来移除停用词。
[In]: from pyspark.ml.feature import StopWordsRemover
[In]: stopword_removal=StopWordsRemover(inputCol='tokens',outputCol='refined_tokens')
We then pass the tokens as the input column and name the output column as refined tokens.
[In]: refined_df=stopword_removal.transform(tokenized_df)
[In]: refined_df.select(['user_id','tokens','refined_tokens']).show(4,False)
[Out]:
正如您所观察到的,像' I ',' this ',' was ',' am ',' but ',' that '这样的停用词都从 tokens 列中删除了。
一袋单词
这是一种方法,通过这种方法,我们可以将文本数据表示为数字形式,以供机器学习或任何其他分析使用。文本数据通常是非结构化的,并且长度各不相同。BOW(单词包)允许我们通过考虑单词在文本文档中的出现来将文本形式转换成数字向量形式。举个例子,
医生 1:生活中最好的事情就是旅行
医生 2:旅行是最好的良药
医生 3:一个人应该经常旅行
词汇:
出现在所有文档中的唯一单词的列表称为词汇表。在上面的例子中,我们有 13 个独特的单词,它们是词汇表的一部分。每个文档可以由这个固定大小的向量 13 来表示。
| 这 | 最好的 | 东西 | 在 | 生活 | 存在 | 到 | 旅行 | 医学 | 一个 | 应该 | 更多 | 时常 |另一个元素是使用布尔值表示特定文档中的单词。
(1 或 0)。
文件 1:
|这
|
最好的
|
东西
|
在
|
生活
|
存在
|
到
|
旅行
|
医学
|
一个
|
应该
|
更多
|
时常
|
| --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | --- |
| one | one | one | one | one | one | one | one | Zero | Zero | Zero | Zero | Zero |
文件 2:
|这
|
最好的
|
东西
|
在
|
生活
|
存在
|
到
|
旅行
|
医学
|
一个
|
应该
|
更多
|
时常
|
| --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | --- |
| one | one | Zero | Zero | Zero | one | Zero | one | one | Zero | Zero | Zero | Zero |
文档 3:
|这
|
最好的
|
东西
|
在
|
生活
|
存在
|
到
|
旅行
|
医学
|
一个
|
应该
|
更多
|
时常
|
| --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | --- |
| Zero | Zero | Zero | Zero | Zero | Zero | Zero | one | Zero | one | one | one | one |
BOW 不考虑文档中单词的顺序和单词的语义,因此是将文本数据表示为数字形式的最基本的方法。还有其他方法可以将文本数据转换成数字形式,这将在下一节中提到。我们将使用 PySpark 来研究这些方法。
计数矢量器
在 BOW 中,我们看到简单地用 1 或 0 来表示单词的出现,而没有考虑单词的频率。相反,计数矢量器对特定文档中出现的单词进行计数。我们将使用之前在使用标记化时创建的相同文本文档。我们首先导入计数矢量器。
[In]: from pyspark.ml.feature import CountVectorizer
[In]: count_vec=CountVectorizer(inputCol='refined_tokens',outputCol='features')
[In]: cv_df=count_vec.fit(refined_df).transform(refined_df)
[In]: cv_df.select(['user_id','refined_tokens','features']).show(4,False)
[Out]:
正如我们所观察到的,每个句子都被表示为一个密集的向量。它显示了向量长度是 11,并且第一个句子在第 0、第 4 和第 9 个索引处包含 3 个值。
为了验证计数矢量器的词汇,我们可以简单地使用词汇函数。
[In]: count_vec.fit(refined_df).vocabulary
[Out]:
['movie',
'horrible',
'really',
'alright',
'liked',
'friends',
'recommend',
'never',
'ever',
'acting',
'watching']
因此,上述句子的词汇量为 11,如果你仔细观察这些特征,它们类似于我们在 PySpark 中用于机器学习的输入特征向量。使用计数矢量器方法的缺点是它不考虑单词在其他文档中的共现。简单来说,出现频率越高的单词对特征向量的影响越大。因此,将文本数据转换为数字形式的另一种方法称为词频-逆文档频率(TF-IDF)。
TF-以色列国防军
这种方法试图根据其他文档来标准化单词出现的频率。整个想法是,如果一个单词在同一文档中出现的次数多,就给这个单词更大的权重,但是如果这个单词在其他文档中出现的次数也多,就惩罚这个单词。这表明一个单词在整个语料库中是常见的,并且不如它在当前文档中的频率所表明的那样重要。
词频:根据当前文档中的词频进行评分。
逆向文档频率:基于包含当前单词的文档的频率进行评分。
现在,我们在 PySpark 中使用相同的细化 DF 数据框架创建基于 TF-IDF 的要素。
[In]: from pyspark.ml.feature import HashingTF,IDF
[In]: hashing_vec=HashingTF(inputCol='refined_tokens',outputCol='tf_features')
[In]: hashing_df=hashing_vec.transform(refined_df)
[In]: hashing_df.select(['user_id','refined_tokens','tf_features']).show(4,False)
[Out]:
[In]: tf_idf_vec=IDF(inputCol='tf_features',outputCol='tf_idf_features')
[In]: tf_idf_df=tf_idf_vec.fit(hashing_df).transform(hashing_df)
[In]: tf_idf_df.select(['user_id','tf_idf_features']).show(4,False)
[Out]:
使用机器学习的文本分类
现在,我们已经基本了解了处理文本处理和要素矢量化所涉及的步骤,我们可以构建一个文本分类模型,并使用它来预测文本数据。我们将使用的数据集是开源的电影镜头评论数据,我们将预测任何给定评论的情感类别(正面或负面)。让我们先从读取文本数据开始,并创建一个 Spark 数据帧。
[In]: text_df=spark.read.csv('Movie_reviews.csv',inferSchema=True,header=True,sep=',')
[In]: text_df.printSchema()
[Out]:
root
|-- Review: string (nullable = true)
|-- Sentiment: string (nullable = true)
您可以观察 StringType 中的情感列,我们将需要它将它转换为 Integer 或 float 类型。
[In]: text_df.count()
[Out]: 7087
我们有近 7000 条记录,其中一些可能没有正确标记。因此,我们只过滤那些标记正确的记录。
[In]: text_df=text_df.filter(((text_df.Sentiment =='1') | (text_df.Sentiment =='0')))
[In]: text_df.count()
[Out]: 6990
一些记录被过滤掉了,我们现在剩下 6,990 条记录进行分析。下一步是验证每门课的大量复习。
[In]: text_df.groupBy('Sentiment').count().show()
[Out]:
+---------+-----+
|Sentiment|count|
+---------+-----+
| 0| 3081|
| 1| 3909|
+---------+-----+
我们在这里处理一个平衡的数据集,因为两个类有几乎相似数量的评论。让我们看看数据集中的一些记录。
[In]: from pyspark.sql.functions import rand
[In]: text_df.orderBy(rand()).show(10,False)
[Out]:
在下一步中,我们创建一个新的整数类型的标签列,并删除原来的字符串类型的情感列。
[In]: text_df=text_df.withColumn("Label", text_df.Sentiment.cast('float')).drop('Sentiment')
[In]: text_df.orderBy(rand()).show(10,False)
[Out]:
我们还包括一个额外的列,用于记录评论的长度。
[In]: from pyspark.sql.functions import length
[In]: text_df=text_df.withColumn('length',length(text_df['Review']))
[In]: text_df.orderBy(rand()).show(10,False)
[Out]:
[In]: text_df.groupBy('Label').agg({'Length':'mean'}).show()
[Out]:
+-----+-----------------+
|Label| avg(Length)|
+-----+-----------------+
| 1.0|47.61882834484523|
| 0.0|50.95845504706264|
+-----+-----------------+
正面和负面评论的平均长度没有太大差异。下一步是开始标记化过程并删除停用词。
[In]: tokenization=Tokenizer(inputCol='Review',outputCol='tokens')
[In]: tokenized_df=tokenization.transform(text_df)
[In]: stopword_removal=StopWordsRemover(inputCol='tokens',outputCol='refined_tokens')
[In]: refined_text_df=stopword_removal.transform(tokenized_df)
因为我们现在只处理标记而不是整个审查,所以在每个审查中捕获一些标记比使用审查的长度更有意义。我们创建了另一个列(令牌计数),给出了每行中的令牌数。
[In]: from pyspark.sql.functions import udf
[In]: from pyspark.sql.types import IntegerType
[In]: from pyspark.sql.functions import *
[In]: len_udf = udf(lambda s: len(s), IntegerType())
[In]: refined_text_df = refined_text_df.withColumn("token_count", len_udf(col('refined_tokens')))
[In]: refined_text_df.orderBy(rand()).show(10)
[Out]:
现在我们已经有了去除停用词后的精炼标记,我们可以使用上述任何一种方法将文本转换成数字特征。在这种情况下,我们使用计数矢量器对机器学习模型进行特征矢量化。
[In]:count_vec=CountVectorizer(inputCol='refined_tokens',outputCol='features')
[In]: cv_text_df=count_vec.fit(refined_text_df).transform(refined_text_df)
[In]: cv_text_df.select(['refined_tokens','token_count','features','Label']).show(10)
[Out]:
+--------------------+-----------+--------------------+-----+
| refined_tokens|token_count| features|Label|
+--------------------+-----------+--------------------+-----+
|[da, vinci, code,...| 5|(2302,[0,1,4,43,2...| 1.0|
|[first, clive, cu...| 9|(2302,[11,51,229,...| 1.0|
|[liked, da, vinci...| 5|(2302,[0,1,4,53,3...| 1.0|
|[liked, da, vinci...| 5|(2302,[0,1,4,53,3...| 1.0|
|[liked, da, vinci...| 8|(2302,[0,1,4,53,6...| 1.0|
|[even, exaggerati...| 6|(2302,[46,229,271...| 1.0|
|[loved, da, vinci...| 8|(2302,[0,1,22,30,...| 1.0|
|[thought, da, vin...| 7|(2302,[0,1,4,228,...| 1.0|
|[da, vinci, code,...| 6|(2302,[0,1,4,33,2...| 1.0|
|[thought, da, vin...| 7|(2302,[0,1,4,223,...| 1.0|
+--------------------+-----------+--------------------+-----+
[In]: model_text_df=cv_text_df.select(['features','token_count','Label'])
一旦我们有了每行的特征向量,我们就可以利用 VectorAssembler 为机器学习模型创建输入特征。
[In]: from pyspark.ml.feature import VectorAssembler
[In]: df_assembler = VectorAssembler(inputCols=['features','token_count'],outputCol='features_vec')
[In]: model_text_df = df_assembler.transform(model_text_df)
[In]: model_text_df.printSchema()
[Out]:
root
|-- features: vector (nullable = true)
|-- token_count: integer (nullable = true)
|-- Label: float (nullable = true)
|-- features_vec: vector (nullable = true)
我们可以对这些数据使用任何分类模型,但是我们继续训练逻辑回归模型。
[In]: from pyspark.ml.classification import LogisticRegression
[In]: training_df,test_df=model_text_df.randomSplit([0.75,0.25])
为了验证在训练和测试数据集中是否存在足够的记录,我们可以对标签列应用 groupBy 函数。
[In]: training_df.groupBy('Label').count().show()
[Out]:
+-----+-----+
|Label|count|
+-----+-----+
| 1.0| 2979|
| 0.0| 2335|
+-----+-----+
[In]: test_df.groupBy('Label').count().show()
[Out]:
+-----+-----+
|Label|count|
+-----+-----+
| 1.0| 930|
| 0.0| 746|
+-----+-----+
[In]: log_reg=LogisticRegression(featuresCol='features_vec',labelCol='Label').fit(training_df)
在训练模型之后,我们在测试数据集上评估模型的性能。
[In]: results=log_reg.evaluate(test_df).predictions
[In]: results.show()
[Out]:
[In]: from pyspark.ml.evaluation import BinaryClassificationEvaluator
[In]: true_postives = results[(results.Label == 1) & (results.prediction == 1)].count()
[In]: true_negatives = results[(results.Label == 0) & (results.prediction == 0)].count()
[In]: false_positives = results[(results.Label == 0) & (results.prediction == 1)].count()
[In]: false_negatives = results[(results.Label == 1) & (results.prediction == 0)].count()
该模型的性能似乎相当好,它能够很容易地区分正面和负面的评论。
[In]: recall = float(true_postives)/(true_postives + false_negatives)
[In]:print(recall)
[Out]: 0.986021505376344
[In]: precision = float(true_postives) / (true_postives + false_positives)
[In]: print(precision)
[Out]: 0.9572025052192067
[In]: accuracy=float((true_postives+true_negatives) /(results.count()))
[In]: print(accuracy)
[Out]: 0.9677804295942721
序列嵌入
每天有数百万人访问商业网站,每个人都采取不同的步骤来寻找正确的信息/产品。然而,他们中的大多数人出于某种原因失望或沮丧地离开,很少有人能在网站中找到正确的页面。在这种情况下,很难确定潜在客户是否真的得到了他想要的信息。此外,这些观众的个人旅程不能相互比较,因为每个人都做了一套不同的活动。那么,我们怎样才能更多地了解这些旅程,并对这些游客进行相互比较呢?序列嵌入是一种强大的方式,它不仅为我们提供了灵活性来比较任何两个不同的观众在相似性方面的整个旅程,还可以预测他们转换的概率。序列嵌入本质上帮助我们摆脱使用传统特征来进行预测,并且不仅考虑用户活动的顺序,还考虑花费在每个独特页面上的平均时间,以转化为更健壮的特征;它还用于跨多个用例的监督机器学习(下一个可能的行动预测,转换与非转换,产品分类)。在序列嵌入等高级功能上使用传统的机器学习模型,我们可以在预测准确性方面取得巨大的成果,但真正的好处在于可视化所有这些用户旅程,并观察这些路径与理想路径有多么不同。
本章的这一部分将展开在 PySpark 中为每个用户的旅程创建序列嵌入的过程。
嵌入
到目前为止,我们已经看到使用计数矢量化、TF-IDF 和哈希矢量化等技术将文本数据表示成数字形式。然而,上述技术都没有考虑单词的语义或单词出现的上下文。嵌入在捕捉单词的上下文并以这样一种方式表示它们方面是独特的,即具有相似含义的单词用相似种类的嵌入来表示。有两种方法来计算嵌入。
-
跳过克
-
连续单词包(CBOW)
这两种方法给出的嵌入值只不过是神经网络中隐藏层的权重。根据需要,这些嵌入向量的大小可以是 100 或更大。word2vec 给出每个单词的嵌入值,而 doc2vec 给出整个句子的嵌入值。序列嵌入类似于 doc2vec,并且是出现在句子中的单词的单独嵌入的加权平均的结果。
让我们用一个样本数据集来说明我们如何从用户的在线零售之旅中创建序列嵌入。
[In]: spark=SparkSession.builder.appName('seq_embedding').getOrCreate()
[In]:
df = spark.read.csv('embedding_dataset.csv',header=True,inferSchema=True)
[In]: df.count()
[Out]: 1096955
数据集中的记录总数接近一百万,并且有 10 万个唯一用户。还跟踪每个用户在每个网页上花费的时间以及用户是否购买该产品的最终状态。
[In]: df.printSchema()
[Out]:
root
|-- user_id: string (nullable = true)
|-- page: string (nullable = true)
|-- timestamp: timestamp (nullable = true)
|-- visit_number: integer (nullable = true)
|-- time_spent: double (nullable = true)
|-- converted: integer (nullable = true)
[In]: df.select('user_id').distinct().count()
[Out]: 104087
[In]: df.groupBy('page').count().orderBy('count',ascending=False).show(10,False)
[Out]:
+-------------+------+
|page |count |
+-------------+------+
|product info |767131|
|homepage |142456|
|added to cart|67087 |
|others |39919 |
|offers |32003 |
|buy |24916 |
|reviews |23443 |
+-------------+------+
[In]: df.select(['user_id','page','visit_number','time_spent','converted']).show(10,False)
[Out]:
序列嵌入的整体思想是将用户在他或她的在线旅程中采取的一系列步骤转换成页面序列,该页面序列可用于计算嵌入分数。第一步是在用户的旅程中移除任何连续的重复页面。我们创建了一个额外的列来捕获用户的上一页。Window 是 spark 中的一个功能,它有助于将特定的逻辑应用于数据集中的单个或一组行。
[In]:w = Window.partitionBy("user_id").orderBy('timestamp')
[In]: df = df.withColumn("previous_page", lag("page", 1, 'started').over(w))
[In]: df.select('user_id','timestamp','previous_page','page').show(10,False)
[Out]:
[In]:
def indicator(page, prev_page):
if page == prev_page:
return 0
else:
return 1
[In]:page_udf = udf(indicator,IntegerType())
[In]: df = df.withColumn("indicator",page_udf(col('page'),col('previous_page'))) \
.withColumn('indicator_cummulative',sum(col('indicator')).over(w))
现在,我们创建一个函数来检查当前页面是否与前一个页面相似,并在一个新的列指示器中指示相同的内容。指示器累积栏用于跟踪用户旅程中不同页面的数量。
[In]: df.select('previous_page','page','indicator','indicator_cummulative').show(20,False)
[Out]:
我们不断创建新的窗口对象来进一步划分数据,以便为每个用户构建序列。
[In]: w2=Window.partitionBy(["user_id",'indicator_cummulative']).orderBy('timestamp')
[In]: df= df.withColumn('time_spent_cummulative',sum(col('time_spent')).over(w2))
[In]: df.select('timestamp','previous_page','page','indicator','indicator_cummulative','time_spent','time_spent_cummulative').show(20,False)
在下一阶段,我们计算在相似页面上花费的总时间,以便只保留一个记录来表示连续的页面。
[In]: w3 = Window.partitionBy(["user_id",'indicator_cummulative']).orderBy(col('timestamp').desc())
[In]: df = df.withColumn('final_page',first('page').over(w3))\
.withColumn('final_time_spent',first('time_spent_cummulative').over(w3))
[In]: df.select(['time_spent_cummulative','indicator_cummulative','page','final_page','final_time_spent']).show(10,False)
[Out]:
[In]: aggregations=[]
[In]: aggregations.append(max(col('final_page')).alias('page_emb'))
[In]: aggregations.append(max(col('final_time_spent')).alias('time_spent_emb'))
[In]: aggregations.append(max(col('converted')).alias('converted_emb'))
[In]: df_embedding = df.select(['user_id','indicator_cummulative','final_page','final_time_spent','converted']).groupBy(['user_id','indicator_cummulative']).agg(*aggregations)
[In]: w4 = Window.partitionBy(["user_id"]).orderBy('indicator_cummulative')
[In]: w5 = Window.partitionBy(["user_id"]).orderBy(col('indicator_cummulative').desc())
最后,我们使用一个收集列表将用户旅程中的所有页面合并到一个列表中,并计算花费的时间。因此,我们以页面列表和花费时间列表的形式结束用户旅程。
[In]:df_embedding = df_embedding.withColumn('journey_page', collect_list(col('page_emb')).over(w4))\
.withColumn('journey_time_temp', collect_list(col('time_spent_emb')).over(w4)) \
.withColumn('journey_page_final',first('journey_page').over(w5))\
.withColumn('journey_time_final',first('journey_time_temp').over(w5)) \
.select(['user_id','journey_page_final','journey_time_final','converted_emb'])
我们只继续独特的用户旅程。每个用户由一个单独的旅程和花费的时间向量表示。
[In]: df_embedding = df_embedding.dropDuplicates()
[In]: df_embedding.count()
[Out]: 104087
[In]: df_embedding.select('user_id').distinct().count()
[Out]: 104087
[In]: df_embedding.select('user_id','journey_page_final','journey_time_final').show(10)
[Out]:
现在我们有了用户旅程和花费时间的列表,我们将这个数据帧转换成 Pandas 数据帧,并使用这些旅程序列构建一个 word2vec 模型。为了使用 word2vec,我们必须先安装一个 gensim 库。为了简单起见,我们使用 100 的嵌入大小。
[In]: pd_df_emb0 = df_embedding.toPandas()
[In]: pd_df_embedding = pd_df_embedding.reset_index(drop=True)
[In]: !pip install gensim
[In]: from gensim.models import Word2Vec
[In]: EMBEDDING_SIZE = 100
[In]: model = Word2Vec(pd_df_embedding['journey_page_final'], size=EMBEDDING_SIZE)
[In]: print(model)
[Out]: Word2Vec(vocab=7, size=100, alpha=0.025)
正如我们所观察到的,词汇量为 7,因为我们只处理了 7 个页面类别。这些页面类别中的每一个现在都可以借助大小为 100 的嵌入向量来表示。
[In]: page_categories = list(model.wv.vocab)
[In]: print(page_categories)
[Out]:
['product info', 'homepage', 'added to cart', 'others', 'reviews', 'offers', 'buy']
[In]: print(model['reviews'])
[Out]:
[In]: model['offers'].shape
[Out]: (100,)
要创建嵌入矩阵,我们可以使用一个模型,并传递模型词汇表;这将产生大小为(7,100)的矩阵。)
[In]: X = model[model.wv.vocab]
[In]: X.shape
[Out]: (7,100)
为了更好地理解这些页面类别之间的关系,我们可以使用降维技术(PCA)并在二维空间上绘制这七个页面嵌入。
[In]: pca = PCA(n_components=2)
[In]: result = pca.fit_transform(X)
[In]: plt.figure(figsize=(10,10))
[In]: plt.scatter(result[:, 0], result[:, 1])
[In]: for i,page_category in enumerate(page_categories):
plt.annotate(page_category,horizontalalignment='right', verticalalignment="top",xy=(result[i, 0], result[i, 1]))
[In]: plt.show()
我们可以清楚地看到,“购买”和“添加到购物车”的嵌入在相似性方面彼此接近,而主页和产品信息也彼此接近。当涉及到通过嵌入表示时,提供和评论是完全分开的。这些单独的嵌入可以被组合并使用机器学习用于用户旅程比较和分类。
注意
这本书的 GitHub repo 上提供了完整的数据集和代码,在 Spark 2.3 和更高版本上执行得最好。
结论
在这一章中,我们熟悉了文本处理和为机器学习创建特征向量的步骤。我们还经历了从在线用户旅程数据创建序列嵌入的过程,以比较各种用户旅程。
标签:教程,机器,PySpark,df,用户,---,Zero,我们,Out From: https://www.cnblogs.com/apachecn/p/18443300