异常检测(Anomaly detection)是机器学习中一种常见应用,其目标是识别数据集中的异常或不寻常模式。尽管通常被归类为非监督学习问题,异常检测却具有与监督学习相似的特征。在异常检测中,我们通常处理的是未标记的数据,即没有明确的标签指示哪些样本是异常的。相反,算法需要根据数据本身的特征来确定异常。这使得异常检测成为一项挑战,因为异常通常是稀有事件,不易获取大量标记的异常数据以进行训练。尽管异常检测被归类为非监督学习,但我们可以将其视为一种“半监督”学习,因为我们通常有一些正常样本,但没有足够的异常样本。这种情况下,我们可以利用正常样本来构建模型,然后将其应用于整个数据集以检测异常。
一、问题来源
假想你是一个飞机引擎制造商,当你生产的飞机引擎从生产线上流出时,你需要进行QA(质量控制测试),而作为这个测试的一部分,你测量了飞机引擎的一些特征变量,比如引擎运转时产生的热量,或者引擎的振动等。如果你生产了\(m\)个引擎的话,那么你就有了一个数据集,从\(X^{(1)}\)到\(X^{(m)}\),将这些数据绘制成图表,如下图1中红色X所示。这里每个叉,都是无标签的数据。那么,异常检测问题可以定义为:我们假设后来有一天,你有一个新的飞机引擎从生产线上流出,而你的新飞机引擎有特征变量\(x_{test}\),我们希望知道这个新的飞机引擎是否有某种异常,或者说,我们希望判断这个引擎是否需要进一步测试。因为,如果它看起来像一个正常的引擎,那么我们可以直接将它运送到客户那里,而不需要进一步的测试。从直觉上先来理解下异常检测问题,现在假设,这个新的飞机引擎,如下图2,它出现在比较靠近中心的位置(绿X),那么它看起来是 OK 的,但如果它出现的位置距离数据集的中心比较偏远,那么它有可能是 anomaly,我们可能需要对它再进行测试一下。
图1 | 图2 |
---|---|
让我们定义这个问题:
假设我们有数据集 \(x^{(1)},x^{(2)},..,x^{(m)}\),通常我们假定 \(m\) 个样本都是正常的,然后我们希望有一个算法告诉我们,一个新的样本数据 \(x_{test}\) 是否异常,我们要采取的做法是,给定无标签的训练集,我们将对数据建模,即 \(p(x)\),也就是说,我们将对 \(x\)的概率分布建模,其中 这些\(x\) 是特征变量,如引擎运转时产生的热量,或者引擎的振动等。因此,当我们建立了 \(x\) 的概率模型之后,我们就有对于一个新的飞机引擎 \(x_{test}\),它的概率\(p(x_{test})\)如果低于一个阈值 \(\varepsilon\),那么就将其标记为异常(anomaly)。这意味着,对于整个数据集,这个点出现的概率非常的低。反之,如果概率大于等于给定的阈值 \(\varepsilon\),我们就认为它是正常的(normal)。这种方法称为密度估计,可表示为:$$\quad p(x_{test}) \begin{cases} < \varepsilon & Anomaly \\ \geq \varepsilon & Normal \end{cases}$$
如下图3,给定图中这样的训练集,如果你建立了一个模型,你将很可能发现飞机引擎,即模型\(p(x)\) 会认为在中心区域的这些点概率相当大,而稍微远离中心区域的点概率会小点,更远地方的点,概率会更小,而在蓝色圈外面的点,将成为异常点。
图3 | 图4 |
---|---|
异常检测算法的实际应用有很多,最常见的应用是欺诈检测。假设有很多用户,每个用户都在从事不同的活动,也许是在个人网站上,也许是在一个实体工厂之类的地方,可以通过对不同的用户活动计算特征变量,如\(x_1\)是用户登陆的频率,\(x_2\)也许是用户访问某个页面的次数或者交易次数,\(x_3\)是用户在论坛上发贴的次数,\(x_4\)是用户的打字速度等等,然后建立一个模型\(p(x)\),这时可以用它来发现网站上的行为异常的用户。只需要看哪些用户的\(p(x)\)概率小于\(\varepsilon\),接下来拿来这些用户的档案做进一步筛选,或者要求这些用户验证他们的身份等等,从而让你的网站防御异常行为或者欺诈盗号等行为。
异常检测的另一个例子是在工业生产领域,事实上我们之前所说的飞机引擎的问题,通过模型\(p(x)\)找到异常的飞机引擎,然后要求进一步细查这些引擎的质量。 此外对于数据中心的计算机监控也是异常检测的应之一,如果你管理一个计算机集群或者一个数据中心,其中有许多计算机,我们可以为每台计算机计算特征变量如:计算机的内存消耗、硬盘访问量、CPU负载或者一些更加复杂的特征,例如一台计算机的CPU负载与网络流量的比值,那么给定正常情况下数据中心中计算机的特征变量,可以建立\(p(x)\)模型,如果有一个计算机它的概率\(p(x)\)非常小,这时可以认为这个计算机运行不正常,从而进一步要求系统管理员查看其工作状况,目前这种技术实际正在被各大数据中心使用用来监测大量计算机可能发生的异常。
二、基于统计学的异常检测
异常点(outlier)是一个数据对象,它明显不同于其他的数据对象,就好像它是被不同的机制产生的一样。
2.1 正态分布一元离群点的检测方法
一元正态分布函数:$$p(x) = {1 \over {\sqrt {2\pi } \sigma }}{e^{ - {{{{(x - u)}^2}} \over {2{\sigma ^2}}}}}$$
其中,\(\mu\)为数据的均值,\(\sigma\)为数据的标准差,\(\sigma\)越小,对应的图像越尖。
假设有\(n\) 个点 \((x_{1},...,x_{n})\),那么可以估计出这\(n\)个点的均值\(\mu\) 和方差\(\sigma\)。均值和方差分别被定义为:
在正态分布的假设下,区域 \(\mu\pm 3\sigma\)包含了99.7% 的数据,如果某个值距离分布的均值\(\mu\)超过了\(3\sigma\),那么这个值就可以被简单的标记为一个异常点(outlier)。
2.2 多元正态分布离群点的检测方法
涉及两个或者两个以上变量的数据称为多元数据,很多一元离群点的检测方法都可以扩展到高维空间中,从而处理多元数据。
- 基于一元正态分布的离群点检测方法
假设\(n\)维的数据集合形如 \(x_{i}=(x_{i,1},...,x_{i,n}), i\in \{1,...,m\}\),那么可以计算每个维度的均值和方差\(\mu_{j},\sigma_{j}, j\in\{1,...,n\}\)。 具体来说,对于\(j\in \{1,...,n\}\),可以计算
\[\mu_{j}=\sum_{i=1}^{m}x_{i,j}/m \quad \sigma_{j}^{2}=\sum_{i=1}^{m}(x_{i,j}-\mu_{j})^{2}/m \]在正态分布的假设下,如果有一个新的数据 \(x\),可以计算概率\(p(x)\) 如下:
\[p(x)=\prod_{j=1}^{n} p(x_{j};\mu_{j},\sigma_{j}^{2})=\prod_{j=1}^{n}\frac{1}{\sqrt{2\pi}\sigma_{j}}\exp(-\frac{(x_{j}-\mu_{j})^{2}}{2\sigma_{j}^{2}}) \]根据概率值的大小就可以判断 \(x\) 是否属于异常值。
- 多维正态分布——高斯分布的离群点检测方法
假设 \(n\) 维的数据集合 \(x=(x_{1},...,x_{n})\),可以计算\(n\)维的均值向量
和 \(n\times n\) 的协方差矩阵:
\[\Sigma=[Cov(x_{i},x_{j})], i,j \in \{1,...,n\} \]如果有一个新的数据 \(x_{n+1}=(x_{n+1,1},...,x_{n+1,n})\),可以计算我们首先计算所有特征的平均值,然后再估计协方差矩阵:
\[\Sigma = \frac{1}{n} \sum_{i=1}^{n} (x_{n+1,i} - \mu_i)(x_{n+1,i} - \mu_i)^T = \frac{1}{n}(x - \mu)^T (x - \mu) \]注意\(\mu\)是一个向量,其每一个单元都是原特征矩阵中一行数据的均值。最后我们计算多元高斯分布的$ p(x)$:
\[p(x) = \frac{1}{(2\pi)^{n/2} |\Sigma|^{1/2}} \exp\left(-\frac{1}{2}(x - \mu)^T \Sigma^{-1} (x - \mu)\right) \]其中:\(|\Sigma|\) 是 \(\Sigma\) 的行列式;$ \Sigma^{-1}$ 是 $\Sigma $的逆矩阵。
异常判定:当 $p(x) < \epsilon $ 时,判定为异常,其中$ \epsilon $ 是预先设定的阈值。如果这个分数低于某个阈值 $ \epsilon $,那么该样本就被认为是异常的。
import numpy as np
import matplotlib.pyplot as plt
# 生成虚构的二维数据
np.random.seed(0)
mean = [0, 0]
cov = [[1, 0.5], [0.5, 1]]
x = np.random.multivariate_normal(mean, cov, 300)
# 新数据点
new_point = np.array([7, 6])
# 可视化生成的数据及新数据点
plt.figure(figsize=(8, 5))
plt.scatter(x[:, 0], x[:, 1], edgecolors='b', label='Data')
plt.scatter(new_point[0], new_point[1], color='r', marker='x', label='New Point (7, 6)')
plt.xlabel('Feature 1')
plt.ylabel('Feature 2')
plt.title('Generated Data with New Point')
plt.legend()
plt.show()
def estimate_gaussian(x):
"""估计高斯分布的均值和方差"""
mu = x.mean(axis=0) # 求每列的均值
sigma2 = x.var(axis=0) # 求每列的方差,这里自由度为m
return mu, sigma2
def multivariate_gaussian(x, mu, sigma2):
"""多元高斯分布密度函数"""
p = np.zeros((x.shape[0], 1))
n = len(mu)
if np.ndim(sigma2) == 1:
sigma2 = np.diag(sigma2) # 对角阵
for i in range(x.shape[0]):
p[i] = (2 * np.pi) ** (-n / 2) * np.linalg.det(sigma2) ** (-1 / 2) * np.exp(
-0.5 * (x[i, :] - mu).T @ np.linalg.inv(sigma2) @ (x[i, :] - mu))
return p
# 估计高斯分布的参数
mu, sigma2 = estimate_gaussian(x)
# 计算概率密度
p = multivariate_gaussian(x, mu, sigma2)
# 计算新数据点的概率密度
p_new_point = multivariate_gaussian(new_point.reshape(1, -1), mu, sigma2)
# 选择最优阈值
def select_threshold(p):
"""选择最优阈值"""
bestF1 = 0
bestEpsilon = 0
for epsilon in np.linspace(min(p), max(p), 1001):
y_predict = np.zeros(p.shape)
y_predict[p < epsilon] = 1 # 把小于阈值的设为1(异常)
tp = np.sum(y_predict) # 真正例
precision = tp / np.sum(y_predict == 1)
recall = tp / len(p)
F1 = (2 * precision * recall) / (precision + recall)
if F1 > bestF1:
bestF1 = F1
bestEpsilon = epsilon
return bestEpsilon, bestF1
epsilon, F1 = select_threshold(p)
print("最优阈值:", epsilon)
print("最优F1分数:", F1)
# 解释最优F1分数
print("\nF1 分数是精确率(Precision)和召回率(Recall)的调和平均。")
print("精确率衡量了模型在预测为异常时的准确性,即异常点中真正的异常点的比例。")
print("召回率衡量了模型能够发现真正的异常点的能力,即所有真正异常点中被发现的比例。")
print("F1 分数越高,说明模型在精确率和召回率之间取得了更好的平衡。")
print("在这个例子中,最优 F1 分数为", F1, ",表示模型在判定异常点时达到了较好的平衡。")
# 判断新数据点是否为异常点
if p_new_point < epsilon:
print("新数据点 (7, 6) 是异常点。")
else:
print("新数据点 (7, 6) 不是异常点。")
2.3 \(\chi^{2}\) 统计量检测多元离群点
在正态分布的假设下,\(\chi^{2}\) 统计量可以用来检测多元离群点。对于某个对象\(\bold{a}\),\(\chi^{2}\) 统计量是
\[\chi^{2}=\sum_{i=1}^{n}(a_{i}-E_{i})^{2}/E_{i} \]其中,\(a_{i}\)是\(\bold{a}\)在第 \(i\) 维上的取值,\(E_{i}\)是所有对象在第 \(i\) 维的均值,\(n\)是维度。如果对象\(\bold{a}\)的\(\chi^{2}\) 统计量很大,那么该对象就可以认为是离群点。
三、其他检查方法
3.1 基于聚类的方法
基于聚类的异常检测方法通常依赖下列假设,1)正常数据实例属于数据中的一个簇,而异常数据实例不属于任何簇;2)正常数据实例靠近它们最近的簇质心,而异常数据离它们最近的簇质心很远;3)正常数据实例属于大而密集的簇,而异常数据实例要么属于小簇,要么属于稀疏簇;通过将数据归分到不同的簇中,异常数据则是那些属于小簇或者不属于任何一簇或者远离簇中心的数据。
将距离簇中心较远的数据作为异常点:这类方法有 SOM、K-means、最大期望( expectation maximization,EM)及基于语义异常因子( semantic anomaly factor)算法等;将聚类所得小簇数据作为异常点:代表方法有K-means聚类;将不属于任何一簇作为异常点:代表方法有 DBSCAN、ROCK、SNN 聚类。
3.2 基于深度的方法
该方法将数据映射到 \(k\) 维空间的分层结构中,并假设异常值分布在外围,而正常数据点靠近分层结构的中心(深度越高)。半空间深度法( ISODEPTH 法) ,通过计算每个点的深度,并根据深度值判断异常数据点。最小椭球估计 ( minimum volume ellipsoid estimator,MVE)法。根据大多数数据点( 通常为 > 50% ) 的概率分布模型拟合出一个实线椭圆形所示的最小椭圆形球体的边界,不在此边界范围内的数据点将被判断为异常点。
图5 | 图6 |
---|---|
3.3 基于神经网络的方法
代表方法有自动编码器( autoencoder,AE) ,长短期记忆神经网络(LSTM)等。LSTM可用于时间序列数据的异常检测:利用历史序列数据训练模型,检测与预测值差异较大的异常点。Autoencoder异常检测 Autoencoder本质上使用了一个神经网络来产生一个高维输入的低维表示。Autoencoder与主成分分析PCA类似,但是Autoencoder在使用非线性激活函数时克服了PCA线性的限制。算法的基本上假设是异常点服从不同的分布。根据正常数据训练出来的Autoencoder,能够将正常样本重建还原,但是却无法将异于正常分布的数据点较好地还原,导致其基于重构误差较大。当重构误差大于某个阈值时,将其标记为异常值。
四、信用卡反欺诈
项目为kaggle上经典的信用卡欺诈检测,该数据集质量高,正负样本比例非常悬殊。我们在此项目主要用了无监督的Autoencoder新颖点检测,根据重构误差识别异常欺诈样本。
import warnings
warnings.filterwarnings("ignore")
import pandas as pd
import numpy as np
import pickle
import matplotlib.pyplot as plt
import tensorflow as tf
import seaborn as sns
plt.style.use('seaborn')
from sklearn.model_selection import train_test_split
from keras.models import Model, load_model
from keras.layers import Input, Dense
from keras.callbacks import ModelCheckpoint
from keras import regularizers
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import roc_curve, auc, precision_recall_curve
import sys # 导入sys模块
# 读取数据 :信用卡欺诈数据集地址https://www.kaggle.com/mlg-ulb/creditcardfraud
d = pd.read_csv('creditcard.csv')
# 处理缺失值
d.fillna(d.mean(), inplace=True)
# 查看样本比例
num_nonfraud = np.sum(d['Class'] == 0)
num_fraud = np.sum(d['Class'] == 1)
plt.bar(['Fraud', 'non-fraud'], [num_fraud, num_nonfraud], color='dodgerblue')
plt.show()
# 删除时间列,对Amount进行标准化
data = d.drop(['Time'], axis=1)
data['Amount'] = StandardScaler().fit_transform(data[['Amount']])
# 为无监督新颖点检测方法,只提取负样本,并且按照8:2切成训练集和测试集
mask = (data['Class'] == 0)
X_train, X_test = train_test_split(data[mask], test_size=0.2, random_state=0)
X_train = X_train.drop(['Class'], axis=1).values
X_test = X_test.drop(['Class'], axis=1).values
# 提取所有正样本,作为测试集的一部分
X_fraud = data[~mask].drop(['Class'], axis=1).values
# 构建Autoencoder网络模型
# 隐藏层节点数分别为16,8,8,16
# epoch为5,batch size为32
input_dim = X_train.shape[1]
encoding_dim = 16
num_epoch = 5
batch_size = 32
input_layer = Input(shape=(input_dim, ))
encoder = Dense(encoding_dim, activation="tanh",
activity_regularizer=regularizers.l1(10e-5))(input_layer)
encoder = Dense(int(encoding_dim / 2), activation="relu")(encoder)
decoder = Dense(int(encoding_dim / 2), activation='tanh')(encoder)
decoder = Dense(input_dim, activation='relu')(decoder)
autoencoder = Model(inputs=input_layer, outputs=decoder)
autoencoder.compile(optimizer='adam',
loss='mean_squared_error',
metrics=['mae'])
# 模型保存为model.keras,并开始训练模型
checkpointer = ModelCheckpoint(filepath="model.keras",
verbose=0,
save_best_only=True)
history = autoencoder.fit(X_train, X_train,
epochs=num_epoch,
batch_size=batch_size,
shuffle=True,
validation_data=(X_test, X_test),
verbose=1,
callbacks=[checkpointer]).history
# 读取模型
autoencoder = load_model('model.keras')
# 利用autoencoder重建测试集
pred_test = autoencoder.predict(X_test)
# 重建欺诈样本
pred_fraud = autoencoder.predict(X_fraud)
# 计算重构MSE和MAE误差
mse_test = np.mean(np.power(X_test - pred_test, 2), axis=1)
mse_fraud = np.mean(np.power(X_fraud - pred_fraud, 2), axis=1)
mae_test = np.mean(np.abs(X_test - pred_test), axis=1)
mae_fraud = np.mean(np.abs(X_fraud - pred_fraud), axis=1)
mse_df = pd.DataFrame()
mse_df['Class'] = [0] * len(mse_test) + [1] * len(mse_fraud)
mse_df['MSE'] = np.hstack([mse_test, mse_fraud])
mse_df['MAE'] = np.hstack([mae_test, mae_fraud])
mse_df = mse_df.sample(frac=1).reset_index(drop=True)
# 输出标签为Fraud的数据索引
fraud_indices = mse_df[mse_df['Class'] == 1].index
print("Label为Fraud的数据索引:", fraud_indices)
# 分别画出测试集中正样本和负样本的还原误差MAE和MSE
markers = ['o', '^']
colors = ['dodgerblue', 'coral']
labels = ['Non-fraud', 'Fraud']
plt.figure(figsize=(14, 5))
plt.subplot(121)
for flag in [1, 0]:
temp = mse_df[mse_df['Class'] == flag]
plt.scatter(temp.index,
temp['MAE'],
alpha=0.7,
marker=markers[flag],
c=colors[flag],
label=labels[flag])
plt.title('Reconstruction MAE')
plt.ylabel('Reconstruction MAE'); plt.xlabel('Index')
plt.subplot(122)
for flag in [1, 0]:
temp = mse_df[mse_df['Class'] == flag]
plt.scatter(temp.index,
temp['MSE'],
alpha=0.7,
marker=markers[flag],
c=colors[flag],
label=labels[flag])
plt.legend(loc=[1, 0], fontsize=12); plt.title('Reconstruction MSE')
plt.ylabel('Reconstruction MSE'); plt.xlabel('Index')
plt.show()
# 画出Precision-Recall曲线
plt.figure(figsize=(14, 6))
for i, metric in enumerate(['MAE', 'MSE']):
plt.subplot(1, 2, i+1)
precision, recall, _ = precision_recall_curve(mse_df['Class'], mse_df[metric])
pr_auc = auc(recall, precision)
plt.title('Precision-Recall curve based on %s\nAUC = %0.2f'%(metric, pr_auc))
plt.plot(recall[:-2], precision[:-2], c='coral', lw=4)
plt.xlabel('Recall'); plt.ylabel('Precision')
plt.show()
图5 | 图6 |
---|---|
总结
异常检测作为一项关键技术,在当今多样化的应用场景中扮演着越来越重要的角色。异常检测已经渗透到多个行业中。在网络安全领域,它能够有效识别网络中的恶意活动、攻击行为和潜在的系统漏洞。金融行业同样受益于异常检测技术,它可以识别交易中的欺诈行为、洗钱活动以及其他违规操作。生物医学领域中,异常检测技术的应用有助于在医疗检查中发现异常结果,如肿瘤和疾病等,从而提高诊断的准确性。物流行业也利用异常检测来识别和处理包裹丢失、延误或损坏等问题,提升服务质量和效率。
随着技术的发展,未来的异常检测将呈现出几个显著的发展趋势。首先,深度学习技术,包括自编码器和生成对抗网络等,将在异常检测领域发挥更大的作用,提供更为精准的识别能力。其次,随着多模态数据的兴起,异常检测将面临图像、文本、音频等多种类型数据的综合分析挑战,这要求算法具备更高的适应性和灵活性。此外,提高异常检测的解释性,增强算法的可解释性和可信度,也将成为未来研究的重点。然而,异常检测在发展的同时,也面临着一些挑战。数据不均衡问题尤为突出,异常数据相对于正常数据的稀缺性,可能导致算法性能的下降。高维数据的存在同样对算法的效能构成了挑战,需要研究者采用特殊的处理方法来优化性能。实时性能的需求也对算法的计算复杂度和延迟提出了更高的要求,尤其是在需要快速响应的场景中。