聚类算法 0基础小白也能懂(附代码)
啥是聚类算法
聚类(Clustering)是最常见的无监督学习算法,它指的是按照某个特定标准(如距离)把一个数据集分割成不同的类或簇,使得同一个簇内的数据对象的相似性尽可能大,同时不在同一个簇中的数据对象的差异性也尽可能地大。也即聚类后同一类的数据尽可能聚集到一起,不同类数据尽量分离。
聚类算法在很多场景下都有应用,例如新闻自动分组,用户分群,图像分割等等。很多时候,无监督的聚类算法,得到的聚类结果还可以作为特征在后续监督学习中应用,提升整体效果。
聚类问题
我们先对比区分一下聚类和分类:
聚类是一种无监督学习( 数据不需要标签,所有的样本都没有类别信息。模型基于输入数据本身的特征进行分组),而分类是一种有监督的学习(训练数据需要有明确的标签,也就是目标变量。每个样本都对应一个类别标签,用于指导模型学习。)。
聚类只需要人工指定相似度的标准和类别数就可以,而分类需要从训练集学习分类的方法。
比如下面:
- 分类数量为3;
- 分类标准是物理距离;
- 分好的类分别用红、绿、蓝表示。
际上,除了物理距离,现实生活中任何你能想到、计算机可以通过运算和逻辑进行判断的规则,都可以作为分类标准。
下面我们用图像压缩作为例子来解释一下。最左边是一张彩色照片,大小约1Mb,通过压缩可以把它变成几十到几百Kb,这就是压缩软件的实现过程。那么压缩软件的实现原理是什么呢?其中一种就是聚类算法。
从原始图片到压缩存储的过程如下图所示:
聚类算法同样可以用于图像分割。图像中每一个像素点是一个 \(3\) 维向量,对应 \([R,G,B]\) 像素值。给定聚类中类别个数 \(K\),算法用 \(K\) 个不同的颜色来表示原来的图像,每个像素点用 \(K\) 个颜色中一个表示。具体如下:
对于文档、新闻、商品而言,很多时候我们会使用嵌套的归类方法,这是一种层次化聚类:
主流聚类算法
我们先对聚类算法做个了解,主流的聚类算法可以分成两类:划分聚类(Partitioning Clustering)和层次聚类(Hierarchical Clustering)。他们的主要区别如图中所示:
划分聚类算法会给出一系列扁平结构的簇(分开的几个类),它们之间没有任何显式的结构来表明彼此的关联性。
常见算法有 K-Means / K-Medoids、Gaussian Mixture Model (高斯混合模型)、Spectral Clustering(谱聚类)、Centroid-based Clustering等。
层次聚类会输出一个具有层次结构的簇集合,因此能够比划分聚类输出的无结构簇集合提供更丰富的信息。层次聚类可以认为是是嵌套的划分聚类。
常见算法有 Single-linkage、Complete-linkage、Connectivity-based Clustering等。
这两类算法在聚类过程中用到的具体算法不一样,后文我们会重点展开讲一下 K-Means 算法、Single-linkage 算法和 Complete-linkage 算法。
K-Means聚类算法
我们提到了聚类算法要把 \(n\) 个数据点按照分布分成 \(K\) 类(很多算法的 \(k\) 是人为提前设定的)。我们希望通过聚类算法得到 \(k\) 个中心点,以及每个数据点属于哪个中心点的划分。
- 中心点可以通过迭代算法来找到,满足条件:所有的数据点到聚类中心的距离之和是最小的。
- 中心点确定后,每个数据点属于离它最近的中心点。
在进入「如何寻找中心点」这个核心问题之前,我们先解决几个小问题:
Q1:数据点到中心点的距离如何计算?
我们一般选择几何距离,就是L2距离的平方。
Q2:中心点是否唯一,或者说,是不是存在全局最优解?
对于多个中心点的情况,全局最优是一个相当难的问题。理论上存在一个全局最优解,但是不一定能找到。既然全局最优解不好找,那我们退而求其次,看能不能找到局部最优解。
Q3:聚类结果如何表示?
采用空间分割的方式:将空间分割成多个多边形,每个多边形对应一个 cluster中心。
K-Means算法步骤
K-Means 采用 EM算法 迭代确定中心点。流程分两步:
① 更新中心点:初始化的时候以随机取点作为起始点;迭代过程中,取同一类的所有数据点的重心(或质心)作为新中心点。
② 分配数据点:把所有的数据点分配到离它最近的中心点。
重复上面的两个步骤,一直到中心点不再改变为止。过程如图所示:
-
左侧Assignments:一开始随机选取三个点,作为三个类的中心,基于其它点和这三个中心点的距离分配簇;每一类重新计算和分配中心。
-
右侧Refitted Means:根据新的中心点,重新分配所有的数据点(原来属于绿色中心点的1个点,此次迭代后变成属于红色中心点了)。
下图的示例展示了 K-Means 动态迭代收敛的过程:
改进K-Means至K-Medoids算法
K-Means缺点:
缺点1:中心点是所有同一类数据点的质心,所以聚类中心点可能不属于数据集的样本点。
缺点2:计算距离时我们用的是L2距离的平方。对离群点很敏感,噪声(Noisy Data)和离群点(Outlier)会把中心点拉偏,甚至改变分割线的位置。
针对 K-Means 算法的缺点改进得到了 K-Medoids 算法:
(1)限制聚类中心点必须来自数据点。
求中心点的计算方法,由原来的直接计算重心,变成计算完重心后,在重心附近找一个数据点作为新的中心点。
K-Medoids 重拟合步骤比直接求平均的 K-Means 要复杂一些。
(2)为避免平方计算对离群点的敏感,把平方变成绝对值。
总结来说,K-Medoids 算法的迭代过程与 K-Means 是一致的,不同点如下所示:
- 起始点不是随机点,而是任意选择数据集中的点。
- 距离使用L1距离,而不是L2距离。
- 新的中心点,也不是同类所有点的重心,而是同一类别所有数据点中,离其它点最近的点。
- 复杂度方面,相比于 K-Means 的 公式 ,K-Medoids 更新中心点的复杂度 公式 要更高一些。
k-kmedoid实现代码
import numpy as np
from sklearn.metrics import pairwise_distances
import random
class KMedoids:
def __init__(self, n_clusters=3, max_iter=300, random_state=None):
"""
初始化 K-Medoids 模型。
:param n_clusters: 聚类簇的数量
:param max_iter: 最大迭代次数
:param random_state: 随机种子,用于控制随机性
"""
self.n_clusters = n_clusters
self.max_iter = max_iter
self.random_state = random_state
self.medoids_ = None # 存储簇的中心(即 Medoids)
self.labels_ = None # 存储每个样本的簇标签
def fit(self, X):
"""
训练 K-Medoids 模型
:param X: 输入数据,形状为 (n_samples, n_features)
:return: self
"""
# 设置随机种子,确保结果的可重复性
if self.random_state:
np.random.seed(self.random_state)
# 初始化随机选取 n_clusters 个样本点作为 Medoids
n_samples = X.shape[0]
medoids_idx = np.random.choice(n_samples, self.n_clusters, replace=False)
self.medoids_ = X[medoids_idx]
for iteration in range(self.max_iter):
# 计算每个样本与每个 Medoid 之间的距离
distances = pairwise_distances(X, self.medoids_)
# 对每个样本分配到最近的 Medoid(即分配簇标签)
labels = np.argmin(distances, axis=1)
# 计算当前的总成本(即每个样本到其分配的 Medoid 的距离之和)
current_cost = np.sum([np.linalg.norm(X[i] - self.medoids_[labels[i]]) for i in range(n_samples)])
# 对每个簇更新 Medoid
new_medoids_idx = []
for k in range(self.n_clusters):
# 获取当前簇中的所有样本
cluster_points = X[labels == k]
if len(cluster_points) == 0: # 防止空簇
continue
# 计算簇中每个点到其他点的距离之和
intra_cluster_distances = pairwise_distances(cluster_points)
total_distances = np.sum(intra_cluster_distances, axis=1)
# 选择距离和最小的点作为新的 Medoid
new_medoid_idx = np.argmin(total_distances)
new_medoids_idx.append(np.where(X == cluster_points[new_medoid_idx])[0][0])
new_medoids_idx = np.array(new_medoids_idx)
new_medoids = X[new_medoids_idx]
# 计算新的总成本
new_distances = pairwise_distances(X, new_medoids)
new_labels = np.argmin(new_distances, axis=1)
new_cost = np.sum([np.linalg.norm(X[i] - new_medoids[new_labels[i]]) for i in range(n_samples)])
# 如果没有发生变化,则退出循环
if np.array_equal(new_medoids, self.medoids_) or new_cost >= current_cost:
break
# 否则更新 Medoids 和标签
self.medoids_ = new_medoids
self.labels_ = new_labels
return self
def predict(self, X):
"""
使用训练好的模型进行预测
:param X: 输入数据,形状为 (n_samples, n_features)
:return: 返回每个样本的簇标签
"""
distances = pairwise_distances(X, self.medoids_)
return np.argmin(distances, axis=1)
# 测试 K-Medoids 算法
if __name__ == "__main__":
from sklearn.datasets import make_blobs
import matplotlib.pyplot as plt
# 生成测试数据,形成三个簇
X, y = make_blobs(n_samples=300, centers=3, cluster_std=0.60, random_state=0)
# 实例化 KMedoids 模型
kmedoids = KMedoids(n_clusters=3, random_state=42)
kmedoids.fit(X)
# 获取聚类结果
labels = kmedoids.labels_
medoids = kmedoids.medoids_
# 绘制聚类结果
plt.scatter(X[:, 0], X[:, 1], c=labels, cmap='rainbow', s=30)
plt.scatter(medoids[:, 0], medoids[:, 1], c='black', s=100, marker='x', label='Medoids')
plt.title("K-Medoids Clustering")
plt.legend()
plt.show()
结果如下
层次聚类算法
相比于 K-Means 这类划分聚类,我们有另外一类层次化聚类算法。
划分聚类得到的是划分清晰的几个类,而层次聚类最后得到的是一个树状层次化结构。
从层次化聚类转换为划分聚类很简单,在层次化聚类的某一层进行切割,就得到1个划分聚类。如下图所示:
Single-Linkage 算法
这个算法是构造一棵二叉树,用叶节点代表数据,而二叉树的每一个内部节点代表一个聚类,有内到外来构建。如图所示:
这是一个从下而上的聚类。这棵树是先有叶子,顺着叶子逐渐长树枝,树枝越长越大一直到树根。
如果叶子很多,这个生长过程需要合并的类就会很多。图中有 11 个数据点,一共发生了 10 次合并。
Single-Linkage实现代码
import numpy as np
from scipy.spatial.distance import pdist, squareform
class SingleLinkage:
def __init__(self, n_clusters=2):
"""
初始化 Single-Linkage 模型。
:param n_clusters: 最终聚类簇的数量。
"""
self.n_clusters = n_clusters
self.labels_ = None # 保存每个样本的聚类标签
self.linkage_matrix_ = None # 保存层次聚类的链接矩阵
def fit(self, X):
"""
对输入数据进行层次聚类
:param X: 输入数据,形状为 (n_samples, n_features)
:return: self
"""
n_samples = X.shape[0]
# 计算所有数据点之间的距离矩阵
distances = squareform(pdist(X, metric='euclidean')) # 欧氏距离
# 初始化簇,每个点为一个簇
clusters = {i: [i] for i in range(n_samples)}
# 初始化链接矩阵,用于记录聚类过程
linkage_matrix = []
# 进行合并操作,直到剩余簇的数量等于目标簇数量
while len(clusters) > self.n_clusters:
min_dist = np.inf
to_merge = (None, None)
# 寻找最近的两个簇
cluster_keys = list(clusters.keys())
for i in range(len(cluster_keys)):
for j in range(i + 1, len(cluster_keys)):
c1 = cluster_keys[i]
c2 = cluster_keys[j]
# 计算簇之间的最小距离(Single-Linkage)
for p1 in clusters[c1]:
for p2 in clusters[c2]:
dist = distances[p1, p2]
if dist < min_dist:
min_dist = dist
to_merge = (c1, c2)
# 合并最近的两个簇
c1, c2 = to_merge
clusters[c1].extend(clusters[c2]) # 将c2中的点并入c1
del clusters[c2] # 删除c2簇
# 记录合并过程(使用样本ID、最小距离和合并后簇的大小)
linkage_matrix.append([c1, c2, min_dist, len(clusters[c1])])
self.linkage_matrix_ = np.array(linkage_matrix)
# 最后,给每个样本分配聚类标签
self.labels_ = np.zeros(n_samples)
for cluster_id, points in enumerate(clusters.values()):
for point in points:
self.labels_[point] = cluster_id
return self
# 测试 Single-Linkage 聚类算法
if __name__ == "__main__":
import matplotlib.pyplot as plt
from sklearn.datasets import make_blobs
# 生成测试数据
X, y = make_blobs(n_samples=100, centers=3, cluster_std=0.60, random_state=0)
# 实例化 Single-Linkage 模型
model = SingleLinkage(n_clusters=3)
model.fit(X)
# 获取聚类标签
labels = model.labels_
# 绘制聚类结果
plt.scatter(X[:, 0], X[:, 1], c=labels, cmap='rainbow', s=30)
plt.title("Single-Linkage Clustering")
plt.show()
结果如下
Complete-Linkage算法
与 Single-Linkage 算法相似,Complete-Linkage 的迭代思路是一样的,不同的是在合并类时,Single-Linkage 是用两个类中距离最小的两个点作为类之间的距离,而 Complete-Linkage 恰恰相反,用距离最远的两个数据点之间的距离作为这两个类之间的距离。
应该是由外到内构建。
DB-SCAN算法
在前面的内容中我们介绍了划分聚类和层次聚类的算法,接下来我们学习另外一个聚类算法:DB-SCAN 算法。
DB-SCAN 是一个基于密度的聚类。如下图中这样不规则形态的点,如果用 K-Means,效果不会很好。而通过 DB-SCAN 就可以很好地把在同一密度区域的点聚在一类中。
DB-SCAN算法的关键概念
核心对象(Core Object),也就是密度达到一定程度的点。
- 若\(x_j\)的\(\epsilon\)邻域至少包含 MinPts 个样本,即\(|N_{\epsilon}(x_j)\geq{MinPts}|\),则\(x_j\)是一个核心对象。
密度直达(directly density-reachable),密度可达(density-reachable):核心对象之间可以是密度直达或者密度可达。
- 若\(x_i\)位于\(x_j\)的\(\epsilon\)邻域中,且\(x_j\)是核心对象,则称 \(x_j\)由\(x_i\)密度直达。
- 密度可达就是好几个密度直达的点互相传递,只要能传递到就是密度可达
密度相连(density-connected):所有密度可达的核心点就构成密度相连。
密度相连是通过一个核心点双向的连接,密度可达是单向的。
DB-SCAN实现代码
import numpy as np
from sklearn.neighbors import NearestNeighbors
# 定义DBSCAN算法
class DBSCAN:
def __init__(self, eps=0.5, min_samples=5):
"""
初始化DBSCAN聚类算法。
:param eps: 邻域半径,即每个点在多远的距离内才算邻居。
:param min_samples: 一个点要成为核心点所需的最小邻居数。
"""
self.eps = eps # 邻域半径
self.min_samples = min_samples # 密度阈值,即一个点要成为核心点所需的最小邻居数
self.labels_ = None # 簇标签,初始状态下为None,将在fit过程中分配
self.core_samples_indices_ = None # 核心点索引,初始为None
self.n_clusters_ = 0 # 聚类的数量,初始为0
def fit(self, X):
"""
对数据进行聚类。
:param X: 输入数据,是一个二维数组,形状为 (n_samples, n_features)。
:return: 返回自身,包含聚类结果。
"""
n_samples = len(X) # 获取样本数量
self.labels_ = -1 * np.ones(n_samples) # 初始化标签,初始值为-1,表示噪声点
# 使用 sklearn 的 NearestNeighbors 查找每个点的邻居
neighbors_model = NearestNeighbors(radius=self.eps) # 构建邻居模型,指定半径eps
neighbors_model.fit(X) # 训练邻居模型
# 查找每个点在 eps 半径内的邻居点
neighbors = neighbors_model.radius_neighbors(X, return_distance=False)
visited = np.zeros(n_samples, dtype=bool) # 记录每个点是否被访问过,初始为未访问
cluster_id = 0 # 簇的初始ID为0
# 遍历每个数据点
for i in range(n_samples):
if visited[i]:
# 如果该点已经被访问过,则跳过
continue
visited[i] = True # 标记该点已被访问
neighbors_i = neighbors[i] # 获取第 i 个点的邻居
if len(neighbors_i) < self.min_samples:
# 如果邻居数量小于 min_samples,则标记为噪声点
self.labels_[i] = -1
else:
# 如果是核心点,扩展为一个新的簇
self._expand_cluster(X, neighbors, i, cluster_id, visited)
cluster_id += 1 # 新的簇创建后,簇ID加1
self.n_clusters_ = cluster_id # 最终簇的数量
# 将非噪声点的索引作为核心点
self.core_samples_indices_ = np.where(self.labels_ != -1)[0]
return self
def _expand_cluster(self, X, neighbors, i, cluster_id, visited):
"""
将第 i 个点扩展为新的簇。
:param X: 输入数据。
:param neighbors: 每个点的邻居列表。
:param i: 当前点的索引。
:param cluster_id: 当前簇的ID。
:param visited: 记录每个点是否被访问。
"""
# 将当前点 i 的标签设置为当前簇ID
self.labels_[i] = cluster_id
# 使用该点的邻居作为种子点列表
seeds = list(neighbors[i])
# 不断扩展簇,直到没有新的点可以加入
while seeds:
current_point = seeds.pop(0) # 从种子列表中取出一个点
if not visited[current_point]:
# 如果该点还没有被访问,则标记为已访问
visited[current_point] = True
current_neighbors = neighbors[current_point] # 获取该点的邻居
if len(current_neighbors) >= self.min_samples:
# 如果邻居数量大于或等于min_samples,则继续扩展,加入到种子列表
seeds.extend(current_neighbors)
# 如果该点还未归属到某个簇,则将其分配给当前簇
if self.labels_[current_point] == -1:
self.labels_[current_point] = cluster_id
# 测试DBSCAN算法
if __name__ == "__main__":
# 从 sklearn 导入 make_blobs 用于生成测试数据
from sklearn.datasets import make_blobs
import matplotlib.pyplot as plt
# 生成 300 个点,分布在 4 个中心周围,使用随机种子0保持结果一致
X, _ = make_blobs(n_samples=300, centers=4, cluster_std=0.6, random_state=0)
# 实例化并训练 DBSCAN 模型,设置 eps = 0.3,min_samples = 5
dbscan = DBSCAN(eps=0.3, min_samples=5)
dbscan.fit(X)
# 绘制聚类结果,颜色根据标签显示不同的簇
plt.scatter(X[:, 0], X[:, 1], c=dbscan.labels_, cmap='rainbow', s=20)
plt.title("DBSCAN Clustering")
plt.show()
结果如下
标签:self,小白,cluster,算法,samples,聚类,clusters From: https://www.cnblogs.com/Mephostopheles/p/18405402