一、基本准备¶
1、导入包:与train一致¶
In [ ]:import numpy as np
import torch
from torch import nn
from torch.autograd import Variable
from torch.utils.data import Dataset, DataLoader
import torch.nn.functional as F
import pickle
import time
import math
import os
import csv
import glob
from sklearn import metrics
from sklearn.model_selection import KFold, StratifiedKFold
2、设置超参数¶
In [ ]:FILE_MODEL_TMP = "model_tmp.pkl"
MY_RANDOM_STATE = 5
torch.manual_seed(MY_RANDOM_STATE)
# NUMBER_EPOCHS = 20
# LEARNING_RATE = 1e-4
SAMPLE_LENGTH = 200
AVGPOOL1D_KERNEL_SIZE = 4
CONV1D_KERNEL_SIZE = 3
CONV1D_FEATURE_SIZE_BLOCK1 = 32
CONV1D_FEATURE_SIZE_BLOCK2 = 64
CONV1D_FEATURE_SIZE_BLOCK3 = 128
FULLY_CONNECTED_LAYER_SIZE = 256
# 这里不一样
MODEL_DIR = '../Train/model_layer1_seed' + str(MY_RANDOM_STATE)
if not os.path.exists(MODEL_DIR):
os.makedirs(MODEL_DIR)
二、定义所需使用函数:与train一致¶
In [ ]:def one_hot(index, dimension):
data = np.zeros((dimension))
data[index] = 1
return data
In [ ]:
def load_text_file(file_text):
with open(file_text) as f:
lines = f.readlines()
my_data = [line.strip().upper() for line in lines[1::2]]
return my_data
In [ ]:
my_dict = {'A': 0,
'C': 1,
'G': 2,
'T':3,
'a':0,
'c':1,
'g':2,
't':3}
定义了一个名为EnhancerDataset的Python类,它是PyTorch中Dataset类的子类。EnhancerDataset类通过定义,主要用于处理DNA序列数据和相关的标签数据,用于训练深度学习模型。具体实现方式如下:
-
构造函数__init__接受两个参数X和Y,分别表示DNA序列数据和相关的标签数据
-
__getitem__:
- 从X中取出一个样本(DNA序列),同时从Y中取出相应的标签
- 将DNA序列转换为一个one-hot编码的矩阵,其中每一列对应于DNA序列中的一个字符,每一行对应于字符的one-hot编码的矩阵,其中每一列对应于DNA序列中的一个字符,每一行对应于字符的one-hot编码
- 从DNA序列中提取出1-mer、2-mer和3-mer,分别对应于单个字符、相邻两个字符和相邻三个字符的频率
- 最后,它将one-hot编码矩阵和提取出的1-mer、2-mer和3-mer矩阵进行拼接,得到一个维度为(7, 200)的特征矩阵。这个特征矩阵被转换为PyTorch张量,并和标签一起返回
-
三个成员函数extract_1_mer、extract_2_mer和extract_3_mer,用于从DNA序列中提取1-mer、2-mer和3-mer
- extract_1_mer函数统计DNA序列中每种字符(A、C、G、T)的出现次数,然后将出现次数除以序列长度得到该字符的频率
- extract_2_mer函数统计DNA序列中每种相邻两个字符(AA、AC、AG、AT等)的出现次数,然后将出现次数除以序列长度减1得到相邻两个字符的频率
- extract_3_mer函数统计DNA序列中每种相邻三个字符(AAA、AAC、AAG、AAT等)的出现次数,然后将出现次数除以序列长度得到相邻三个字符的频率
-
__len__:用于返回数据集的大小,即DNA序列的个数
class EnhancerDataset(Dataset):
# X: list Enhancer sequence (每个序列200个字符)
# Y: list label [0, 1]; 0: 负样本, 1: 正样本
def __init__(self, X, Y):
self.X = X
self.Y = Y
def __getitem__(self, index):
label = self.Y[index]
sample = self.X[index]
values = np.zeros((4, SAMPLE_LENGTH))
for i in range(SAMPLE_LENGTH):
char_idx = my_dict[sample[i]]
values[char_idx, i] = 1
values_one_mer = self.extract_1_mer(sample)
#values = np.concatenate((values, values_one_mer), axis=0)
values_two_mer = self.extract_2_mer(sample)
#values = np.concatenate((values, values_two_mer), axis=0)
values_three_mer = self.extract_3_mer(sample)
#values = np.concatenate((values, values_three_mer), axis=0)
values = np.concatenate((values, values_one_mer, values_two_mer,
values_three_mer), axis=0)
input = torch.from_numpy(values)
return input, label
def extract_1_mer(self, sample):
my_count = {'A': 0.0, 'C': 0.0, 'G': 0.0, 'T': 0.0}
values = np.zeros((1, SAMPLE_LENGTH))
for i in range(SAMPLE_LENGTH):
my_count[sample[i]] += 1
#for one_mer in my_count:
# print("one mer: ", one_mer, " : ", my_count[one_mer])
for i in range(SAMPLE_LENGTH):
values[0, i] = my_count[sample[i]] / SAMPLE_LENGTH;
#print("values: ", values)
return values
def extract_2_mer(self, sample):
my_count = {'AA': 0.0, 'AC': 0.0, 'AG': 0.0, 'AT': 0.0,
'CA': 0.0, 'CC': 0.0, 'CG': 0.0, 'CT': 0.0,
'GA': 0.0, 'GC': 0.0, 'GG': 0.0, 'GT': 0.0,
'TA': 0.0, 'TC': 0.0, 'TG': 0.0, 'TT': 0.0}
values = np.zeros((2, SAMPLE_LENGTH))
for i in range(SAMPLE_LENGTH - 1):
two_mer = sample[i:i+2]
#print("two_mer: ", two_mer)
my_count[two_mer] += 1
#for two_mer in my_count:
# print("two mer: ", two_mer, " : ", my_count[two_mer])
values = np.zeros((2, SAMPLE_LENGTH))
for i in range(1,SAMPLE_LENGTH-1):
two_mer_left = sample[i-1:i+1]
two_mer_right = sample[i:i+2]
values[0, i] = my_count[two_mer_left] / (SAMPLE_LENGTH - 1);
values[1, i] = my_count[two_mer_right] / (SAMPLE_LENGTH - 1);
#print("values: ", values)
return values
def extract_3_mer(self, sample):
my_count = {}
for firchCh in ['A', 'C', 'G', 'T']:
for secondCh in ['A', 'C', 'G', 'T']:
for thirdCh in ['A', 'C', 'G', 'T']:
three_mer = firchCh + secondCh + thirdCh
my_count[three_mer] = 0.0
for i in range(SAMPLE_LENGTH - 2):
three_mer = sample[i:i+3]
#print("two_mer: ", two_mer)
my_count[three_mer] += 1
values = np.zeros((1, SAMPLE_LENGTH))
for i in range(1,SAMPLE_LENGTH-2):
three_mer = sample[i-1:i+2]
values[0, i] = my_count[three_mer] / SAMPLE_LENGTH;
return values
def __len__(self):
#return 100
return len(self.X)
三、搭建CNN模型:没有使用优化器¶
该模型为一个卷积神经网络,通过多层卷积和池化操作,最终将特征映射转换为一个实数,并经过 Sigmoid 函数输出二元分类结果。其中,CONV1D_FEATURE_SIZE_BLOCK1、CONV1D_FEATURE_SIZE_BLOCK2、CONV1D_KERNEL_SIZE、AVGPOOL1D_KERNEL_SIZE、FULLY_CONNECTED_LAYER_SIZE、LEARNING_RATE 等超参数已定义和赋值
In [ ]:# import torch.nn as nn
# import torch.nn.functional as F
# import torch
# 定义模型类
class EnhancerCnnModel(nn.Module):
def __init__(self):
super(EnhancerCnnModel, self).__init__()
# 第一种卷积+第一种BN层
self.c1_1 = nn.Conv1d(8, CONV1D_FEATURE_SIZE_BLOCK1, CONV1D_KERNEL_SIZE, padding=1)
self.c1_1bn = nn.BatchNorm1d(CONV1D_FEATURE_SIZE_BLOCK1)
# 第一种卷积+第一种BN层
self.c1_2 = nn.Conv1d(CONV1D_FEATURE_SIZE_BLOCK1, CONV1D_FEATURE_SIZE_BLOCK1,
CONV1D_KERNEL_SIZE, padding=1)
self.c1_2bn = nn.BatchNorm1d(CONV1D_FEATURE_SIZE_BLOCK1)
# 第一种卷积+第一种BN层
self.c1_3 = nn.Conv1d(CONV1D_FEATURE_SIZE_BLOCK1, CONV1D_FEATURE_SIZE_BLOCK1,
CONV1D_KERNEL_SIZE, padding=1)
self.c1_3bn = nn.BatchNorm1d(CONV1D_FEATURE_SIZE_BLOCK1)
# 池化层
self.p1 = nn.MaxPool1d(AVGPOOL1D_KERNEL_SIZE)
# 第二种卷积+第二种BN层
self.c2_1 = nn.Conv1d(CONV1D_FEATURE_SIZE_BLOCK1,
CONV1D_FEATURE_SIZE_BLOCK2,
CONV1D_KERNEL_SIZE, padding=1)
self.c2_1bn = nn.BatchNorm1d(CONV1D_FEATURE_SIZE_BLOCK2)
# 第二种卷积+第二种BN层
self.c2_2 = nn.Conv1d(CONV1D_FEATURE_SIZE_BLOCK2, CONV1D_FEATURE_SIZE_BLOCK2,
CONV1D_KERNEL_SIZE, padding=1)
self.c2_2bn = nn.BatchNorm1d(CONV1D_FEATURE_SIZE_BLOCK2)
# 第二种卷积+第二种BN层
self.c2_3 = nn.Conv1d(CONV1D_FEATURE_SIZE_BLOCK2, CONV1D_FEATURE_SIZE_BLOCK2,
CONV1D_KERNEL_SIZE, padding=1)
self.c2_3bn = nn.BatchNorm1d(CONV1D_FEATURE_SIZE_BLOCK2)
# 池化层
self.p2 = nn.MaxPool1d(AVGPOOL1D_KERNEL_SIZE)
# 全连接层
self.fc = nn.Linear(768, FULLY_CONNECTED_LAYER_SIZE)
# 输出层
self.out = nn.Linear(FULLY_CONNECTED_LAYER_SIZE, 1)
# 定义损失函数
self.criterion = nn.BCELoss()
# 定义优化器
# self.optimizer = torch.optim.Adam(self.parameters(), lr=LEARNING_RATE)
# 前向传播函数
def forward(self, inputs):
batch_size = inputs.size(0)
# 将输入的形状从(batch_size x seq_len) 转换为 (batch_size x input_size x seq_len),以适应卷积层的输入
#inputs = inputs.transpose(1,2)
#print("inputs size: ", inputs.size())
output = F.relu(self.c1_1bn(self.c1_1(inputs)))
output = F.relu(self.c1_2bn(self.c1_2(output)))
output = F.relu(self.c1_3bn(self.c1_3(output)))
output = self.p1(output)
#print("After p1: ", output.shape)
output = F.relu(self.c2_1bn(self.c2_1(output)))
output = F.relu(self.c2_2bn(self.c2_2(output)))
output = F.relu(self.c2_3bn(self.c2_3(output)))
output = self.p2(output)
#print("After p2: ", output.shape)
output = output.view(batch_size, -1)
#print("Reshape : ", output.shape)
output = F.relu(self.fc(output))
#print("After FC layer: ", output.shape)
output = torch.sigmoid(self.out(output))
#print("Final output (After sigmoid): ", output.shape)
#print("Final output: ", output)
return output
四、构建训练函数:test没有这一块¶
下面函数用于训练一个 epoch,即遍历一遍训练集中的所有数据并更新模型参数。其中,该函数接受三个参数:model 表示要训练的模型;train_loader 表示训练集数据的 DataLoader,用于加载数据;learning_rate 表示当前训练的学习率。
在函数开始时,将模型设置为训练模式,并更新学习率。然后遍历训练集中的每一批数据,获取输入和标签,并将其转换为 float 类型。如果 GPU 可用,将输入和标签转换为 Variable 对象并移动到 GPU 上。然后清除参数的梯度,进行前向传播,计算损失函数,反向传播,更新参数,并累加损失。最后计算训练集的平均损失并返回。
In [ ]:# # 训练一个 epoch 的函数
# def train_one_epoch(model, train_loader, learning_rate):
# # 设置模型为训练模式
# model.train()
# # 更新学习率
# for param_group in model.optimizer.param_groups:
# param_group['lr'] = learning_rate
# epoch_loss_train = 0.0
# nb_train = 0
# # 遍历训练集中的每一批数据
# for i, data in enumerate(train_loader, 0):
# # 获取输入和标签
# inputs, labels = data
# #print("\ninputs: ", inputs.shape)
# #print("labels: ", labels.shape)
# inputs_length = inputs.size()[0]
# nb_train += inputs_length
# inputs = inputs.float()
# labels = labels.float()
# #######################
# # 使用 GPU 进行计算 #
# #######################
# if torch.cuda.is_available():
# inputs = Variable(inputs.cuda())
# labels = Variable(labels.cuda())
# else:
# inputs = Variable(inputs)
# labels = Variable(labels)
# # 清除参数的梯度
# model.optimizer.zero_grad()
# # 进行前向传播
# outputs = model(inputs)
# # 计算损失函数
# loss = model.criterion(outputs, labels)
# #print("loss: ", loss)
# # 反向传播
# loss.backward()
# # 更新参数
# model.optimizer.step()
# # 累加损失
# epoch_loss_train = epoch_loss_train + loss.item() * inputs_length
# print("nb_train: ", nb_train)
# # 计算平均损失
# epoch_loss_train_avg = epoch_loss_train / nb_train
# return epoch_loss_train_avg
五、构建评估函数:完全一致, calculate_confusion_matrix也一致¶
评估给定神经网络模型在给定数据集上的性能表现
-
定义函数 evaluate(file_model, loader),它有两个输入参数:file_model 表示预训练的模型文件路径,loader 表示数据集加载器
-
如果 CUDA 可用,则将模型 model 移动到 GPU 上,以便在 GPU 上进行计算
-
加载预训练的模型权重,将模型 model 的参数设置为预训练的参数
-
设置模型 model 为评估模式,即关闭 Dropout 和 Batch Normalization 等随机性操作
-
定义变量 epoch_loss 和 nb_samples,分别用于累加每个 batch 的损失和样本数
-
定义空列表 arr_labels、arr_labels_hyp 和 arr_prob,分别用于存储真实标签、预测标签和模型的预测概率
-
遍历数据集 loader,对于每个 batch:
- 获取输入数据 inputs 和标签 labels
- 计算 inputs 的长度 inputs_length,并将其累加到 nb_samples 中
- 将标签 labels 的维度从 (batch_size, 1) 转换为 (batch_size,),并将其转换为 numpy 数组后添加到 arr_labels 列表中
- 将输入数据 inputs 和标签 labels 转换为 float 类型,以便于后续计算
- 如果 CUDA 可用,则将输入数据 inputs 和标签 labels 移动到 GPU 上,以便于在 GPU 上进行计算
- 将输入数据 inputs 作为输入,通过模型 model 进行前向传递,得到输出 outputs
- 计算模型的损失 loss,并将其累加到变量 epoch_loss 中
- 将模型的预测概率从 tensor 转换为 numpy 数组,并将其添加到 arr_prob 列表中
-
打印样本总数 nb_samples 和平均损失 epoch_loss_avg
-
计算 AUC 值,并打印输出
-
将模型的预测概率转换为二分类标签,并将其添加到 arr_labels_hyp 列表中
-
将真实标签 arr_labels 和预测标签 arr_labels_hyp 转换为整型,并分别存储到变量 arr_labels 和 arr_labels_hyp 中
-
调用函数 calculate_confusion_matrix(arr_labels, arr_labels_hyp),计算准确率、混淆矩阵、灵敏度、特异度和 MCC 值,并将它们存储到变量 acc、confusion_matrix、sensitivity、specificity 和 mcc 中
函数在后面进行了定义
- 将评估指标存储到字典 result 中,并返回该字典
- 打印输出准确率 acc
def evaluate(file_model, loader):
# 定义一个 EnhancerCnnModel 的实例 model
model = EnhancerCnnModel()
# 如果 CUDA 可用,则将 model 移动到 GPU 上
if torch.cuda.is_available():
model.cuda()
# 加载预训练的模型权重
model.load_state_dict(torch.load(file_model))
# 设置模型为评估模式
model.eval()
epoch_loss = 0.0
nb_samples = 0
arr_labels = []
arr_labels_hyp = []
arr_prob = []
# 遍历数据集
for i, data in enumerate(loader, 0):
# 获取输入和标签
inputs, labels = data
# inputs 的长度
inputs_length = inputs.size()[0]
nb_samples += inputs_length
# 将标签添加到 arr_labels 列表中
arr_labels += labels.squeeze(1).data.cpu().numpy().tolist()
# 将 inputs 和 labels 转换为 float 类型
inputs = inputs.float()
labels = labels.float()
# 如果 CUDA 可用,则将 inputs 和 labels 移动到 GPU 上
if torch.cuda.is_available():
inputs = Variable(inputs.cuda())
labels = Variable(labels.cuda())
else:
inputs = Variable(inputs)
labels = Variable(labels)
# 前向传递
outputs = model(inputs)
# 计算损失使用模型 model 的损失函数来计算该模型在给定输出 outputs 和
# 标签 labels 下的损失
loss = model.criterion(outputs, labels)
# 累加 epoch_loss
epoch_loss = epoch_loss + loss.item() * inputs_length
# 将模型的预测结果添加到 arr_prob 列表中
arr_prob += outputs.squeeze(1).data.cpu().numpy().tolist()
# 输出样本总数和平均损失
print("nb_samples: ", nb_samples)
epoch_loss_avg = epoch_loss / nb_samples
print("epoch loss avg: ", epoch_loss_avg)
# 输出预测概率和真实标签的列表长度
print("arr_prob: ", len(arr_prob))
print("arr_labels: ", len(arr_labels))
# 计算 AUC 值
auc = metrics.roc_auc_score(arr_labels, arr_prob)
print("auc: ", auc)
# 将预测概率转换为二分类标签,阈值为 0.5
arr_labels_hyp = [int(prob > 0.5) for prob in arr_prob]
arr_labels = [int(label) for label in arr_labels]
# 计算各种分类指标
acc, confusion_matrix, sensitivity, specificity, mcc = calculate_confusion_matrix(arr_labels, arr_labels_hyp)
# 将结果存储到字典中
result = {'epoch_loss_avg': epoch_loss_avg,
'acc' : acc,
'confusion_matrix' : confusion_matrix,
'sensitivity' : sensitivity,
'specificity' : specificity,
'mcc' : mcc,
'auc' : auc,
'arr_prob': arr_prob,
'arr_labels': arr_labels,
'arr_labels_hyp':arr_labels_hyp
}
# 输出 accuracy
print("acc: ", acc)
# 返回结果字典
return result
定义函数 calculate_confusion_matrix(arr_labels, arr_labels_hyp)¶
函数的作用是计算混淆矩阵和相关的分类指标,用于评估模型在给定数据集上的性能表现。 函数的输入参数为 arr_labels 和 arr_labels_hyp,分别表示真实标签的列表和模型的预测标签的列表
混淆矩阵(confusion matrix)是一种常见的二分类模型性能评估工具,用于衡量模型的分类能力和错误分类情况。混淆矩阵的行表示真实标签,列表示模型预测的标签,每个元素表示对应的样本数。在二分类问题中,混淆矩阵通常被表示为下面的形式:
真实情况 | ||
---|---|---|
正例 | 反例 | |
预测情况 | ||
正例 | TP | FP |
反例 | FN | TN |
通过混淆矩阵,我们可以计算出许多有用的评价指标,例如准确率、精确率、召回率、F1 值、特异度、灵敏度、MCC 值等
In [ ]:def calculate_confusion_matrix(arr_labels, arr_labels_hyp):
# 初始化变量
corrects = 0 # 正确分类的样本数
confusion_matrix = np.zeros((2, 2)) # 混淆矩阵
# 遍历所有样本
for i in range(len(arr_labels)):
# 更新混淆矩阵
confusion_matrix[arr_labels_hyp[i]][arr_labels[i]] += 1
# 统计正确分类的样本数
if arr_labels[i] == arr_labels_hyp[i]:
corrects = corrects + 1
# 计算分类指标
acc = corrects * 1.0 / len(arr_labels) # 准确率
specificity = confusion_matrix[0][0] / (confusion_matrix[0][0] + confusion_matrix[1][0]) # 特异度
sensitivity = confusion_matrix[1][1] / (confusion_matrix[0][1] + confusion_matrix[1][1]) # 灵敏度
tp = confusion_matrix[1][1] # 真正例数
tn = confusion_matrix[0][0] # 真反例数
fp = confusion_matrix[1][0] # 假正例数
fn = confusion_matrix[0][1] # 假反例数
mcc = (tp * tn - fp * fn ) / math.sqrt((tp + fp)*(tp + fn)*(tn + fp)*(tn + fn)) # MCC值
# 返回分类指标
return acc, confusion_matrix, sensitivity, specificity, mcc
六、检查数据集中的样本:这里开始不同,这一步同train中的第七步¶
检查数据集中的每个样本是否符合要求,即是否由长度为 200 的序列组成
具体过程如下:
- 从字典 dataset 中获取测试集 testset
- 对测试集每个样本进行检查,如果样本长度不为 200,则将错误样本数量加 1
- 输出检查结果,如果测试集中有错误样本,则输出错误样本数量;否则输出“OK”
这个函数的主要作用是确保数据集中的每个样本都符合要求,以避免在测试过程中出现错误。如果数据集中的样本长度不为 200,则可能会导致模型无法处理这些样本或者产生错误的预测结果。因此,在进行测试之前,需要先对数据集进行检查,以确保数据集中的每个样本都符合要求。
In [ ]:# 将测试集的数据和标签分别存储在 dataset 字典的 "data_test" 和 "label_test" 键下
# dataset = {"data_test" : testset["data"], "label_test" : testset["label"]}
# 定义一个函数 check_dataset,用于检查数据集是否符合要求
def check_dataset(dataset):
print("\n==> Checking dataset") # 打印提示信息
# 检查 data_test 数据是否符合要求
data_test = dataset["data_test"] # 获取 data_test 数据
nb_error_samples = 0 # 初始化错误样本数量为 0
for sample in data_test: # 遍历每个样本
if len(sample) != 200: # 如果样本长度不为 200
nb_error_samples += 1 # 错误样本数量加 1
if nb_error_samples > 0: # 如果存在错误样本
print("data_teset error: ", nb_error_samples) # 打印错误信息和错误样本数量
else: # 如果不存在错误样本
print("data_test : OK!") # 打印测试通过的信息
七、读取测试文件并将其转换为模型可用的数据格式¶
文件名为 test_strong_enhancer.txt 或 test_weak_enhancer.txt
# 文件格式如下,每个样本包括一个样本名称和四行 DNA 序列,需要将这四行序列拼接在一起
# >Chr11_6627824_6628024
# ATGCTGCCAGAAGGAAAAGGGGTGGAATTAATGAAACTGGAAGGTTGTGGTGCTGGTTTGAGGAG
# TAAAGTATGGGGGCCAAAGTTGGCTATATGCTGGATATGAAGAGGGGGTTAATTCCTTGCAGGTC
# TTCTTGAGATAGAAGTCCAGGCCCTGAGGTGGCAGGCAGCCTGATAGTGAACAGAACCCTTGTGC
# CCATA
In [ ]:
def read_test_file(filename):
text_file = open(filename) # 打开文件
lines = text_file.readlines() # 读取文件中所有行的内容到列表 lines 中
my_data =[] # 初始化空列表 my_data,用于存储处理后的数据
for i in range(0, len(lines), 2): # 每次跳两行,遍历每个样本
# 读取第一行 DNA 序列名称并去除多余空格和换行符
seq_name = lines[i].strip()
# 拼接第二行 DNA 序列,并使用 strip() 方法去除多余空格和换行符,再转换为大写字母形式
text = lines[i+1].strip().upper()
my_data.append(text) # 将处理后的数据添加到 my_data 列表中
return my_data # 返回处理后的数据列表 my_data
八、准备测试数据¶
调用 read_test_file 函数读取两个 DNA 序列文件 enhancer_200_process_rename_test.fasta 和 noenhancer_200_process_rename_test.fasta 中的数据,并将读取的数据拼接在一起,形成测试集。
最后,该函数返回一个字典 testset,其中包含测试集数据和标签。
In [ ]:def prepare_test_data():
print("\n ==> Loading test set") # 打印提示信息
# 加载增强子数据集
data_strong = read_test_file('./Test/enhancer_200_process_rename_test.fasta') # 调用函数 read_test_file 读取文件
print("data_strong: ", len(data_strong)) # 打印加载的数据集大小
#print("data_strong: ", data_strong)
# data_weak = read_test_file('test_weak_enhancer.txt')
# print("data_weak: ", len(data_weak))
data_enhancer = data_strong # + data_weak
print("data_enhancer: ", len(data_enhancer))
# 加载非增强子数据集
data_non_enhancer = read_test_file('./Test/noenhancer_200_process_rename_test.fasta') # 调用函数 read_test_file 读取文件
print("data_non_enhancer: ", len(data_non_enhancer)) # 打印加载的数据集大小
# 构建测试集
label_enhancer = np.ones((len(data_strong),1)) # 增强子数据集对应标签为 1
label_non_enhancer = np.zeros((len(data_non_enhancer), 1)) # 非增强子数据集对应标签为 0
data = np.concatenate((data_strong, data_non_enhancer)) # 将增强子数据集和非增强子数据集拼接在一起,构成测试集
label = np.concatenate((label_enhancer, label_non_enhancer))
testset = {"data" : data, "label" : label} # 将测试集数据和标签存储在字典 testset 中
return testset # 返回测试集数据和标签字典 testset
九、定义函数 testing,用于测试模型性能¶
这段代码实现了对模型性能的测试和评估,并将测试结果写入 CSV 文件中。
首先,该函数将测试数据集转化为 EnhancerDataset 类型,并使用 DataLoader 加载数据集,每次迭代使用 32 个样本,关闭数据集随机重排,使用 4 个工作进程。
然后,该函数创建一个 CSV 文件,用于保存测试结果。接着,该函数获取所有模型文件的文件名,并按字典序排序。对于每个模型,该函数调用 evaluate 函数对该模型进行测试,并返回测试结果。该函数将该模型预测的概率矩阵添加到 y_prob_mtx 列表中,并将该模型的测试结果写入 CSV 文件中。
接下来,该函数将 y_prob_mtx 转换为 numpy 的 ndarray 类型。对 y_prob_mtx 进行集成,计算每个样本预测为正例的概率平均值,并将概率值大于 0.5 的样本预测为正例,其余样本预测为负例。然后,该函数获取测试集的真实标签,并计算集成模型的 ROC AUC、Accuracy、Sensitivity 和 Specificity 分数。其中,ROC AUC 是一种衡量模型性能的指标,用于评估模型预测正负样本之间的优劣程度。Accuracy 表示模型预测的正确率,Sensitivity 和 Specificity 分别表示模型对正类和负类的识别能力。
最后,该函数将集成模型的 ROC AUC、Accuracy、Sensitivity 和 Specificity 分数打印出来,并将它们写入 CSV 文件中。
In [ ]:# 将测试数据集转化为 EnhancerDataset 类型,并使用 DataLoader 加载数据集,
# 每次迭代使用 32 个样本,关闭数据集随机重排,使用 4 个工作进程
def testing():
test_dataset = EnhancerDataset(testset["data"], testset["label"])
test_loader = DataLoader(dataset=test_dataset, batch_size=32,
shuffle=False, num_workers=4)
# 创建一个 CSV 文件,用于保存测试结果
with open(testresult_fn, mode='w') as outfile:
outfile = csv.writer(outfile, delimiter=',')
outfile.writerow(['model_fn', 'Accuracy score', 'AUC score', 'Sensitivity', 'Specificity'])
# 获取所有模型文件的文件名,并按字典序排序
list_model_fn = sorted(glob.glob(MODEL_DIR+"/enhancer_*.pkl"))
#print(list_model_fn)
y_prob_mtx = [] # 初始化 y_prob_mtx,用于存储每个模型所预测的概率矩阵
for model_fn in list_model_fn: # 遍历每个模型
print(model_fn)
result = evaluate(model_fn, test_loader) # 调前 evaluate 函数对该模型进行测试,并返回测试结果
#print(result['arr_prob'])
y_prob_mtx.append(result['arr_prob']) # 将该模型预测的概率矩阵添加到 y_prob_mtx 列表中
# 将该模型的测试结果写入 CSV 文件中
with open(testresult_fn, mode='a') as outfile:
outfile = csv.writer(outfile, delimiter=',')
outfile.writerow([model_fn, result['acc'], result['auc'],
result['sensitivity'], result['specificity']])
# 将 y_prob_mtx 转换为 numpy 的 ndarray 类型
y_prob_mtx = np.array(y_prob_mtx)
print("y_prob_mtx: ", y_prob_mtx.shape)
#print("y_prob_mtx: ", y_prob_mtx)
# 对 y_prob_mtx 进行集成,计算每个样本预测为正例的概率平均值,并将概率值大于 0.5 的样本预测为正例,其余样本预测为负例
y_prob_ensemble = [np.mean(y_prob_mtx[:,col]) for col in range(np.size(y_prob_mtx, 1))]
y_pred_ensemble = [np.float(each > 0.5) for each in y_prob_ensemble]
y_true = testset["label"] # 获取测试集的真实标签
# 计算集成模型的 ROC AUC、Accuracy、Sensitivity 和 Specificity 分数
auc_score_ensemble = metrics.roc_auc_score(y_true, y_prob_ensemble)
accuracy_score_ensemble = metrics.accuracy_score(y_true, y_pred_ensemble)
cm = metrics.confusion_matrix(y_true, y_pred_ensemble) # 计算混淆矩阵
specificity_ensemble = cm[0,0]/(cm[0,0] + cm[0,1]) # 计算 Specificity
sensitivity_ensemble = cm[1,1]/(cm[1,1] + cm[1,0]) # 计算 Sensitivity
# 将集成模型的 ROC AUC、Accuracy、Sensitivity 和 Specificity 分数打印出来
print("Accuracy score (Testing Set) = ", accuracy_score_ensemble)
print("ROC AUC score (Testing Set) = ", auc_score_ensemble)
print("Sensitivity (Testing Set) = ", sensitivity_ensemble)
print("Specificity (Testing Set) = ", specificity_ensemble)
# 将集成模型的 ROC AUC、Accuracy、Sensitivity 和 Specificity 分数写入 CSV 文件中
with open(testresult_fn, mode='a') as outfile:
outfile = csv.writer(outfile, delimiter=',')
outfile.writerow(["ensemble", accuracy_score_ensemble, auc_score_ensemble, sensitivity_ensemble, specificity_ensemble])
十、运行整个程序¶
In [ ]:# if __name__== "__main__":
# testresult_fn = MODEL_DIR + "/test_result.csv"
# testset = prepare_test_data()
# testing()
标签:inputs,layer1,ENCC,self,labels,arr,源码,mer,data
From: https://www.cnblogs.com/cauwj/p/17416899.html