1.什么是AutoML?
AutoML(自动化机器学习)是一种技术方法,旨在将机器学习的流程自动化,包括特征工程、模型选择、超参数优化等步骤。它通过简化机器学习过程,使得非专业人员或数据科学团队能更快、更便捷地构建并优化机器学习模型,特别适合于模型快速部署和大规模应用。
2.AutoML的核心步骤
- 数据预处理:自动处理缺失值、编码分类变量、标准化数据等。
- 特征工程:自动创建和选择特征,以提高模型性能。
- 模型选择:通过自动化流程选择最适合数据的模型(如决策树、随机森林、XGBoost等)。
- 超参数优化:通过网格搜索、贝叶斯优化或进化算法等,自动调节超参数以提高模型效果。
- 模型评估与选择:根据不同模型的表现选择最佳模型,并基于验证集评估性能。
- 部署与监控:在模型上线后,自动化工具可以持续监控和更新模型。
AutoML执行过程:
3.AutoML的常见工具
- Auto-sklearn:基于scikit-learn的开源AutoML工具。
- TPOT:基于遗传算法的Python库,能自动搜索最佳模型与特征工程。
- H2O AutoML:支持多种算法、数据预处理,适用于大数据。
- Google Cloud AutoML:适合Google云平台用户,提供计算资源和自动化建模。
- Microsoft Azure AutoML:专为Azure用户设计的AutoML工具。
- FLAML:一个轻量级快速库,适合小型数据集和资源受限的环境。
4.AutoML工具分析
1. Auto-sklearn
Auto-sklearn 是一个基于 scikit-learn 的 AutoML 工具,主要用于分类和回归任务
优点:
- 开源免费:基于 Python 和 scikit-learn 构建,开源,适合自主研究和应用。
- 集成丰富:包含丰富的scikit-learn模型库和特征工程预处理器。
- 自动化模型组合:使用集成学习方法,通过模型加权组合获得更优结果。
- 内置超参数优化:自动执行模型选择和超参数优化,使用贝叶斯优化加速搜索。
缺点:
- 资源消耗较高:复杂模型和大数据集下训练时间较长。
- 模型支持有限:仅支持scikit-learn模型,不支持深度学习模型。
使用场景:
- 小规模数据集的分类或回归任务:适合科研应用、小规模数据建模或快速原型验证。
2. TPOT (Tree-based Pipeline Optimization Tool)
TPOT 是基于遗传算法的AutoML工具,能够自动优化和组合机器学习模型。
优点:
- 遗传算法搜索:使用遗传算法进行模型选择和特征工程,有助于发掘复杂的模型组合。
- 可读性强:最终模型以Python代码形式导出,便于调试和理解。
- 特征工程能力强:能自动化复杂的特征工程处理。
缺点:
- 训练时间长:遗传算法计算量较大,尤其是在大规模数据集上。
- 较少模型支持:不支持深度学习模型,主要用于经典机器学习任务。
使用场景:
- 特征工程复杂的任务:适合需要大量特征转换的任务,例如文本或图像的基础特征提取。
- 结构化数据的分类和回归任务:适用于结构化数据场景,例如金融、医疗、营销等领域。
3. H2O AutoML
H2O AutoML 是由 H2O.ai 开发的,支持大规模数据集的自动化机器学习平台,能处理分类、回归等任务。
优点:
- 丰富的模型支持:包含随机森林、GBM、XGBoost等多个集成学习模型。
- 大数据支持:适合大规模数据,能很好地适应分布式和云计算环境。
- 模型堆叠:支持自动化模型堆叠,通过多个模型的组合提高性能。
缺点:
- 较高的资源需求:在大规模数据集上运行时需要较多的计算资源。
- 用户界面不如其他工具友好:相对来说需要较强的专业知识。
使用场景:
- 大规模数据的回归和分类任务:适合对大数据进行建模的场景,如电商推荐、金融预测、用户行为分析等。
- 需要模型堆叠的场景:适合需要组合多个模型来提升预测性能的场景
4. Google Cloud AutoML
Google Cloud AutoML 是谷歌云平台的AutoML服务,支持图像、文本、表格等多种数据类型,提供了强大的计算资源。
优点:
- 云计算资源支持:适合需要高计算能力的用户,无需本地计算资源。
- 多种数据类型支持:支持结构化数据、文本、图像等多种数据格式。
- 深度学习支持:可以自动优化深度学习模型结构,适用于复杂任务。
缺点:
- 付费服务:Google Cloud AutoML 是付费的,适合预算充足的企业用户。
- 数据隐私问题:数据需上传到云端,可能涉及数据隐私问题。
使用场景:
- 企业级自动化建模:适合预算充足的企业,特别是需要处理图像和自然语言处理任务。
- 无本地计算资源:适合无大型计算资源的中小型团队。
5. Microsoft Azure AutoML
Azure AutoML 是微软提供的云端AutoML服务,支持多种机器学习任务,包括分类、回归、时间序列预测等。
优点:
- 集成微软生态:与Azure其他服务无缝集成,如Azure ML Studio和Azure Data Lake。
- 时间序列预测支持:内置时间序列预测,适合时序数据的建模。
- 自动化特征工程:内置丰富的特征工程模块,能快速提升模型效果。
缺点:
- 依赖Azure云平台:主要适用于Azure云用户,不适合离线使用。
- 费用较高:和其他云服务类似,按计算资源收费。
使用场景:
- 时间序列数据建模:适合金融、物流等行业的时间序列预测任务。
- 企业级自动化建模:适合已经在Azure生态中的企业用户。
6. FLAML (Fast Lightweight AutoML)
FLAML 是由微软开发的轻量级AutoML库,专注于快速构建和优化模型,特别适合小型数据集和资源受限的环境。
优点:
- 轻量化:计算资源需求低,适合资源受限的本地环境。
- 支持多种任务:支持分类、回归、时间序列和多目标优化等任务。
- 可调节的模型训练速度:可以通过限制搜索空间加速建模过程。
缺点:
- 模型选择较少:不支持深度学习模型,主要用于经典机器学习任务。
- 性能略逊于大规模AutoML工具:在特定任务上可能不如H2O等性能表现。
使用场景:
- 资源受限环境:适合小型团队、科研项目或资源有限的环境。
- 快速原型设计:适用于快速迭代和测试的场景,尤其是在处理小型数据集时。
7.实现一个原生Auto ML
import os
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, recall_score, precision_score, f1_score, confusion_matrix
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from sklearn.preprocessing import StandardScaler
from tensorflow.keras.layers import Dense
from tensorflow.keras.models import Sequential
from tensorflow.keras.utils import to_categorical
from torch.utils.data import DataLoader, TensorDataset
from xgboost import XGBClassifier
def load_data_from_folder(folder_path):
"""读取指定文件夹中的所有CSV文件并合并为一个DataFrame"""
all_data = pd.DataFrame()
for file_name in os.listdir(folder_path):
if file_name.endswith('normalized.csv'):
file_path = os.path.join(folder_path, file_name)
data = pd.read_csv(file_path)
all_data = pd.concat([all_data, data], ignore_index=True)
return all_data
def split_data(data):
"""将数据分为特征和标签,并进行训练集和测试集的划分"""
X = data.iloc[:, 1:] # 所有特征列
y = data.iloc[:, 0] # 标签列
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
return X_train, X_test, y_train, y_test
def split_data_for_kan(data):
"""将数据分为特征和标签,并进行训练集和测试集的划分"""
X = data.iloc[:, 1:].values # 转换为 numpy 数组
y = data.iloc[:, 0].values # 转换为 numpy 数组
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
return X_train, X_test, y_train, y_test
def create_data_loaders_for_kan(X_train, y_train, X_test, y_test, batch_size=32):
"""创建数据加载器"""
X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
y_train_tensor = torch.tensor(y_train, dtype=torch.long)
X_test_tensor = torch.tensor(X_test, dtype=torch.float32)
y_test_tensor = torch.tensor(y_test, dtype=torch.long)
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
test_dataset = TensorDataset(X_test_tensor, y_test_tensor)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
return train_loader, test_loader
def encode_labels(y_train, y_test):
"""对标签列进行编码,转换为数值类型"""
label_encoder = LabelEncoder()
y_train_encoded = label_encoder.fit_transform(y_train)
y_test_encoded = label_encoder.transform(y_test)
return y_train_encoded, y_test_encoded, label_encoder
def preprocess_data(X_train, X_test):
"""对特征进行标准化处理"""
scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)
return X_train, X_test
def train_random_forest(X_train, y_train):
"""训练随机森林分类模型并返回训练好的模型"""
model = RandomForestClassifier(random_state=42)
model.fit(X_train, y_train)
return model
def train_xgboost(X_train, y_train):
"""训练XGBoost分类模型并返回训练好的模型"""
model = XGBClassifier(eval_metric='mlogloss', random_state=42)
model.fit(X_train, y_train)
return model
def train_ann_model(input_dim, output_dim):
"""构建并返回一个简单的ANN模型"""
model = Sequential([
Dense(64, activation='relu', input_dim=input_dim),
Dense(32, activation='relu'),
Dense(output_dim, activation='softmax')
])
model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])
return model
# 定义KAN模型
class KAN(nn.Module):
def __init__(self, input_dim, output_dim):
super(KAN, self).__init__()
self.hidden1 = nn.Linear(input_dim, 256)
self.hidden2 = nn.Linear(256, 128)
self.hidden3 = nn.Linear(128, 64)
self.output = nn.Linear(64, output_dim)
self.activation = nn.ReLU()
def forward(self, x):
x = self.activation(self.hidden1(x))
x = self.activation(self.hidden2(x))
x = self.activation(self.hidden3(x))
x = self.output(x)
return x
def train_kan_model(model, train_loader, criterion, optimizer, num_epochs=100):
"""训练模型"""
for epoch in range(num_epochs):
model.train()
for X_batch, y_batch in train_loader:
optimizer.zero_grad()
outputs = model(X_batch)
loss = criterion(outputs, y_batch)
loss.backward()
optimizer.step()
def evaluate_kan_model(model, X_train_tensor, X_test_tensor):
"""评估模型,返回训练集和测试集预测结果"""
model.eval()
with torch.no_grad():
train_outputs = model(X_train_tensor)
test_outputs = model(X_test_tensor)
_, train_preds = torch.max(train_outputs, 1)
_, test_preds = torch.max(test_outputs, 1)
return train_preds, test_preds
def calculate_metrics(y_true, y_pred):
"""计算并返回模型的评估指标"""
accuracy = accuracy_score(y_true, y_pred)
recall = recall_score(y_true, y_pred, average='macro', zero_division=0)
precision = precision_score(y_true, y_pred, average='macro', zero_division=0)
f1 = f1_score(y_true, y_pred, average='macro', zero_division=0)
# 处理多分类情况
tn, fp, fn, tp = 0, 0, 0, 0
if len(confusion_matrix(y_true, y_pred)) == 2:
tn, fp, fn, tp = confusion_matrix(y_true, y_pred).ravel()
# 计算漏报率和误报率
cm = confusion_matrix(y_true, y_pred)
miss_rate, false_alarm_rate = None, None
if cm.shape == (2, 2): # 针对二分类的情况
tn, fp, fn, tp = cm.ravel()
miss_rate = fn / (fn + tp) if (fn + tp) > 0 else 0
false_alarm_rate = fp / (fp + tn) if (fp + tn) > 0 else 0
else: # 多分类情况
miss_rate = sum(cm[i][j] for i in range(len(cm)) for j in range(len(cm)) if i != j) / cm.sum()
false_alarm_rate = sum(cm[j][i] for i in range(len(cm)) for j in range(len(cm)) if i != j) / cm.sum()
return {
"Accuracy": accuracy,
"Miss Rate": miss_rate,
"False Alarm Rate": false_alarm_rate,
"F1 Score": f1,
"Recall": recall
}
def save_metrics_to_csv(train_metrics, test_metrics, save_path):
"""将训练集和测试集的评估指标保存为CSV文件"""
results_df = pd.DataFrame({
"Metric": ["Accuracy", "Miss Rate", "False Alarm Rate", "F1 Score", "Recall"],
"Training Set": [train_metrics["Accuracy"], train_metrics["Miss Rate"],
train_metrics["False Alarm Rate"], train_metrics["F1 Score"],
train_metrics["Recall"]],
"Test Set": [test_metrics["Accuracy"], test_metrics["Miss Rate"],
test_metrics["False Alarm Rate"], test_metrics["F1 Score"],
test_metrics["Recall"]]
})
results_df.to_csv(save_path, index=False)
print(f"评估结果已保存到: {save_path}")
def compare_models(folder_path, result_file_path):
"""对比多种模型性能,选择最优模型"""
# 加载数据
all_data = load_data_from_folder(folder_path)
X_train, X_test, y_train, y_test = split_data(all_data)
# 对标签进行编码
y_train_encoded, y_test_encoded, label_encoder = encode_labels(y_train, y_test)
# 存储各个模型的评估结果
model_performance = {}
# 1. 随机森林模型
rf_model = train_random_forest(X_train, y_train_encoded)
rf_train_pred = rf_model.predict(X_train)
rf_test_pred = rf_model.predict(X_test)
rf_train_metrics = calculate_metrics(y_train_encoded, rf_train_pred)
rf_test_metrics = calculate_metrics(y_test_encoded, rf_test_pred)
model_performance["RandomForest"] = (rf_train_metrics, rf_test_metrics)
# 2. XGBoost模型
xgb_model = train_xgboost(X_train, y_train_encoded)
xgb_train_pred = xgb_model.predict(X_train)
xgb_test_pred = xgb_model.predict(X_test)
xgb_train_metrics = calculate_metrics(y_train_encoded, xgb_train_pred)
xgb_test_metrics = calculate_metrics(y_test_encoded, xgb_test_pred)
model_performance["XGBoost"] = (xgb_train_metrics, xgb_test_metrics)
# 3. ANN模型
X_train_preprocessed, X_test_preprocessed = preprocess_data(X_train, X_test)
y_train_onehot = to_categorical(y_train_encoded)
y_test_onehot = to_categorical(y_test_encoded)
ann_model = train_ann_model(X_train_preprocessed.shape[1], y_train_onehot.shape[1])
ann_model.fit(X_train_preprocessed, y_train_onehot, epochs=20, batch_size=32, verbose=1)
ann_train_pred = np.argmax(ann_model.predict(X_train_preprocessed), axis=1)
ann_test_pred = np.argmax(ann_model.predict(X_test_preprocessed), axis=1)
ann_train_metrics = calculate_metrics(y_train_encoded, ann_train_pred)
ann_test_metrics = calculate_metrics(y_test_encoded, ann_test_pred)
model_performance["ANN"] = (ann_train_metrics, ann_test_metrics)
# 4. KAN模型
X_train_kan, X_test_kan, y_train_kan, y_test_kan = split_data_for_kan(all_data)
y_train_kan_encoded, y_test_kan_encoded, label_encoder = encode_labels(y_train_kan, y_test_kan)
train_loader, test_loader = create_data_loaders_for_kan(X_train_kan, y_train_kan_encoded, X_test_kan,
y_test_kan_encoded)
input_dim = X_train_kan.shape[1]
output_dim = len(np.unique(y_train_kan_encoded))
kan_model = KAN(input_dim, output_dim)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(kan_model.parameters(), lr=0.001)
train_kan_model(kan_model, train_loader, criterion, optimizer)
X_train_tensor = torch.tensor(X_train_kan, dtype=torch.float32)
X_test_tensor = torch.tensor(X_test_kan, dtype=torch.float32)
kan_train_pred, kan_test_pred = evaluate_kan_model(kan_model, X_train_tensor, X_test_tensor)
kan_train_metrics = calculate_metrics(y_train_kan_encoded, kan_train_pred.numpy())
kan_test_metrics = calculate_metrics(y_test_kan_encoded, kan_test_pred.numpy())
model_performance["KAN"] = (kan_train_metrics, kan_test_metrics)
# 比较各个模型的测试集 `Accuracy`,选出性能最优的模型
best_model_name = None
best_accuracy = 0
for model_name, (train_metrics, test_metrics) in model_performance.items():
if test_metrics["Accuracy"] > best_accuracy:
best_model_name = model_name
best_accuracy = test_metrics["Accuracy"]
# 输出最佳模型
best_train_metrics, best_test_metrics = model_performance[best_model_name]
print(f"最佳模型: {best_model_name}")
print("训练集性能:", best_train_metrics)
print("测试集性能:", best_test_metrics)
# 保存最佳模型的评估指标
save_metrics_to_csv(best_train_metrics, best_test_metrics, result_file_path)
return best_model_name, best_train_metrics, best_test_metrics
标签:--,Auto,模型,kan,metrics,train,ML,test,model
From: https://blog.csdn.net/m0_69435612/article/details/143590906