首页 > 编程语言 >iEnhancer-ENCC_test_layer1.py源码阅读

iEnhancer-ENCC_test_layer1.py源码阅读

时间:2023-05-20 10:56:24浏览次数:40  
标签:inputs layer1 ENCC self labels arr 源码 mer data

一、基本准备

1、导入包:与train一致

  In [ ]:
import numpy as np

import torch 
from torch import nn
from torch.autograd import Variable
from torch.utils.data import Dataset, DataLoader
import torch.nn.functional as F

import pickle
import time
import math
import os
import csv
import glob

from sklearn import metrics
from sklearn.model_selection import KFold, StratifiedKFold
   

2、设置超参数

  In [ ]:
FILE_MODEL_TMP = "model_tmp.pkl"

MY_RANDOM_STATE = 5
torch.manual_seed(MY_RANDOM_STATE)

# NUMBER_EPOCHS = 20
# LEARNING_RATE = 1e-4

SAMPLE_LENGTH = 200
AVGPOOL1D_KERNEL_SIZE = 4
CONV1D_KERNEL_SIZE = 3
CONV1D_FEATURE_SIZE_BLOCK1 = 32
CONV1D_FEATURE_SIZE_BLOCK2 = 64
CONV1D_FEATURE_SIZE_BLOCK3 = 128

FULLY_CONNECTED_LAYER_SIZE = 256

# 这里不一样
MODEL_DIR = '../Train/model_layer1_seed' + str(MY_RANDOM_STATE)
if not os.path.exists(MODEL_DIR):
    os.makedirs(MODEL_DIR)
   

二、定义所需使用函数:与train一致

  In [ ]:
def one_hot(index, dimension):
    data = np.zeros((dimension))
    data[index] = 1
    return data
  In [ ]:
def load_text_file(file_text):
    with open(file_text) as f:
        lines = f.readlines()
        my_data = [line.strip().upper() for line in lines[1::2]]
        return my_data
  In [ ]:
my_dict = {'A': 0,
        'C': 1, 
        'G': 2,
        'T':3,
        'a':0,
        'c':1,
        'g':2,
        't':3}
   

定义了一个名为EnhancerDataset的Python类,它是PyTorch中Dataset类的子类。EnhancerDataset类通过定义,主要用于处理DNA序列数据和相关的标签数据,用于训练深度学习模型。具体实现方式如下:

  • 构造函数__init__接受两个参数X和Y,分别表示DNA序列数据和相关的标签数据

  • __getitem__:

    • 从X中取出一个样本(DNA序列),同时从Y中取出相应的标签
    • 将DNA序列转换为一个one-hot编码的矩阵,其中每一列对应于DNA序列中的一个字符,每一行对应于字符的one-hot编码的矩阵,其中每一列对应于DNA序列中的一个字符,每一行对应于字符的one-hot编码
    • 从DNA序列中提取出1-mer、2-mer和3-mer,分别对应于单个字符、相邻两个字符和相邻三个字符的频率
    • 最后,它将one-hot编码矩阵和提取出的1-mer、2-mer和3-mer矩阵进行拼接,得到一个维度为(7, 200)的特征矩阵。这个特征矩阵被转换为PyTorch张量,并和标签一起返回
  • 三个成员函数extract_1_mer、extract_2_mer和extract_3_mer,用于从DNA序列中提取1-mer、2-mer和3-mer

    • extract_1_mer函数统计DNA序列中每种字符(A、C、G、T)的出现次数,然后将出现次数除以序列长度得到该字符的频率
    • extract_2_mer函数统计DNA序列中每种相邻两个字符(AA、AC、AG、AT等)的出现次数,然后将出现次数除以序列长度减1得到相邻两个字符的频率
    • extract_3_mer函数统计DNA序列中每种相邻三个字符(AAA、AAC、AAG、AAT等)的出现次数,然后将出现次数除以序列长度得到相邻三个字符的频率
  • __len__:用于返回数据集的大小,即DNA序列的个数

  In [ ]:
class EnhancerDataset(Dataset):
    # X: list Enhancer sequence (每个序列200个字符)
    # Y: list label [0, 1]; 0: 负样本, 1: 正样本
    def __init__(self, X, Y):
        self.X = X
        self.Y = Y

    def __getitem__(self, index):
        label = self.Y[index]
        sample = self.X[index]
        
        values = np.zeros((4, SAMPLE_LENGTH))
        for i in range(SAMPLE_LENGTH):
            char_idx = my_dict[sample[i]]
            values[char_idx, i] = 1 
        
        values_one_mer = self.extract_1_mer(sample)
        #values = np.concatenate((values, values_one_mer), axis=0)
        values_two_mer = self.extract_2_mer(sample)
        #values = np.concatenate((values, values_two_mer), axis=0)
        values_three_mer = self.extract_3_mer(sample)
        #values = np.concatenate((values, values_three_mer), axis=0)
        values = np.concatenate((values, values_one_mer, values_two_mer, 
                        values_three_mer), axis=0)
        
        input = torch.from_numpy(values)
        return input, label
    
    def extract_1_mer(self, sample):
        my_count = {'A': 0.0, 'C': 0.0, 'G': 0.0, 'T': 0.0}        
        values = np.zeros((1, SAMPLE_LENGTH))
        for i in range(SAMPLE_LENGTH):
            my_count[sample[i]] += 1
        
        #for one_mer in my_count:
        #    print("one mer: ", one_mer, " : ", my_count[one_mer])
        
        for i in range(SAMPLE_LENGTH):
            values[0, i] = my_count[sample[i]] / SAMPLE_LENGTH;
        
        #print("values: ", values)    
        return values
    
    def extract_2_mer(self, sample):
        my_count = {'AA': 0.0, 'AC': 0.0, 'AG': 0.0, 'AT': 0.0,
                    'CA': 0.0, 'CC': 0.0, 'CG': 0.0, 'CT': 0.0,
                    'GA': 0.0, 'GC': 0.0, 'GG': 0.0, 'GT': 0.0,
                    'TA': 0.0, 'TC': 0.0, 'TG': 0.0, 'TT': 0.0} 
        values = np.zeros((2, SAMPLE_LENGTH))
        for i in range(SAMPLE_LENGTH - 1):
            two_mer = sample[i:i+2]
            #print("two_mer: ", two_mer)
            my_count[two_mer] += 1
        
        #for two_mer in my_count:
        #    print("two mer: ", two_mer, " : ", my_count[two_mer])
        
        values = np.zeros((2, SAMPLE_LENGTH))
        for i in range(1,SAMPLE_LENGTH-1):
            two_mer_left = sample[i-1:i+1]
            two_mer_right = sample[i:i+2]
            
            values[0, i] = my_count[two_mer_left] / (SAMPLE_LENGTH - 1);
            values[1, i] = my_count[two_mer_right] / (SAMPLE_LENGTH - 1);
        
        #print("values: ", values) 
        return values
    
    def extract_3_mer(self, sample):
        my_count = {}
                                        
        for firchCh in ['A', 'C', 'G', 'T']:
            for secondCh in ['A', 'C', 'G', 'T']:
                for thirdCh in ['A', 'C', 'G', 'T']:
                    three_mer = firchCh + secondCh + thirdCh
                    my_count[three_mer] = 0.0
        for i in range(SAMPLE_LENGTH - 2):
            three_mer = sample[i:i+3]
            #print("two_mer: ", two_mer)
            my_count[three_mer] += 1
                    
        values = np.zeros((1, SAMPLE_LENGTH))
        for i in range(1,SAMPLE_LENGTH-2):
            three_mer = sample[i-1:i+2]
            values[0, i] = my_count[three_mer] / SAMPLE_LENGTH;
                    
        return values
        
    def __len__(self):
        #return 100
        return len(self.X)
   

三、搭建CNN模型:没有使用优化器

   

该模型为一个卷积神经网络,通过多层卷积和池化操作,最终将特征映射转换为一个实数,并经过 Sigmoid 函数输出二元分类结果。其中,CONV1D_FEATURE_SIZE_BLOCK1、CONV1D_FEATURE_SIZE_BLOCK2、CONV1D_KERNEL_SIZE、AVGPOOL1D_KERNEL_SIZE、FULLY_CONNECTED_LAYER_SIZE、LEARNING_RATE 等超参数已定义和赋值

  In [ ]:
# import torch.nn as nn
# import torch.nn.functional as F
# import torch

# 定义模型类
class EnhancerCnnModel(nn.Module):
    def __init__(self):
        super(EnhancerCnnModel, self).__init__()
        
        # 第一种卷积+第一种BN层
        self.c1_1 = nn.Conv1d(8, CONV1D_FEATURE_SIZE_BLOCK1, CONV1D_KERNEL_SIZE, padding=1)
        self.c1_1bn = nn.BatchNorm1d(CONV1D_FEATURE_SIZE_BLOCK1)
        
        # 第一种卷积+第一种BN层
        self.c1_2 = nn.Conv1d(CONV1D_FEATURE_SIZE_BLOCK1, CONV1D_FEATURE_SIZE_BLOCK1, 
            CONV1D_KERNEL_SIZE, padding=1)
        self.c1_2bn = nn.BatchNorm1d(CONV1D_FEATURE_SIZE_BLOCK1)
        
        # 第一种卷积+第一种BN层
        self.c1_3 = nn.Conv1d(CONV1D_FEATURE_SIZE_BLOCK1, CONV1D_FEATURE_SIZE_BLOCK1, 
            CONV1D_KERNEL_SIZE, padding=1)    
        self.c1_3bn = nn.BatchNorm1d(CONV1D_FEATURE_SIZE_BLOCK1)
        
        # 池化层
        self.p1 = nn.MaxPool1d(AVGPOOL1D_KERNEL_SIZE)
        
        # 第二种卷积+第二种BN层
        self.c2_1 = nn.Conv1d(CONV1D_FEATURE_SIZE_BLOCK1, 
            CONV1D_FEATURE_SIZE_BLOCK2, 
            CONV1D_KERNEL_SIZE, padding=1)
        self.c2_1bn = nn.BatchNorm1d(CONV1D_FEATURE_SIZE_BLOCK2)
        
        # 第二种卷积+第二种BN层
        self.c2_2 = nn.Conv1d(CONV1D_FEATURE_SIZE_BLOCK2, CONV1D_FEATURE_SIZE_BLOCK2, 
            CONV1D_KERNEL_SIZE, padding=1)
        self.c2_2bn = nn.BatchNorm1d(CONV1D_FEATURE_SIZE_BLOCK2)
        
        # 第二种卷积+第二种BN层
        self.c2_3 = nn.Conv1d(CONV1D_FEATURE_SIZE_BLOCK2, CONV1D_FEATURE_SIZE_BLOCK2, 
            CONV1D_KERNEL_SIZE, padding=1)
        self.c2_3bn = nn.BatchNorm1d(CONV1D_FEATURE_SIZE_BLOCK2)
        
        # 池化层
        self.p2 = nn.MaxPool1d(AVGPOOL1D_KERNEL_SIZE)
        
        # 全连接层
        self.fc = nn.Linear(768, FULLY_CONNECTED_LAYER_SIZE)        
        # 输出层
        self.out = nn.Linear(FULLY_CONNECTED_LAYER_SIZE, 1)
        
        # 定义损失函数
        self.criterion = nn.BCELoss()
        # 定义优化器
        # self.optimizer = torch.optim.Adam(self.parameters(), lr=LEARNING_RATE)
    
    # 前向传播函数
    def forward(self, inputs):
        batch_size = inputs.size(0)
        # 将输入的形状从(batch_size x seq_len) 转换为 (batch_size x input_size x seq_len),以适应卷积层的输入
        #inputs = inputs.transpose(1,2)
        #print("inputs size: ", inputs.size())        
        output = F.relu(self.c1_1bn(self.c1_1(inputs)))
        output = F.relu(self.c1_2bn(self.c1_2(output)))
        output = F.relu(self.c1_3bn(self.c1_3(output)))
        output = self.p1(output)
        #print("After p1: ", output.shape) 
        
        output = F.relu(self.c2_1bn(self.c2_1(output)))
        output = F.relu(self.c2_2bn(self.c2_2(output)))
        output = F.relu(self.c2_3bn(self.c2_3(output)))
        output = self.p2(output)
        #print("After p2: ", output.shape)
        
        output = output.view(batch_size, -1)
        #print("Reshape : ", output.shape)
        
        output = F.relu(self.fc(output))
        #print("After FC layer: ", output.shape)  
        
        output = torch.sigmoid(self.out(output))
        #print("Final output (After sigmoid): ", output.shape)
        #print("Final output: ", output)
        
        return output 
   

四、构建训练函数:test没有这一块

下面函数用于训练一个 epoch,即遍历一遍训练集中的所有数据并更新模型参数。其中,该函数接受三个参数:model 表示要训练的模型;train_loader 表示训练集数据的 DataLoader,用于加载数据;learning_rate 表示当前训练的学习率。

在函数开始时,将模型设置为训练模式,并更新学习率。然后遍历训练集中的每一批数据,获取输入和标签,并将其转换为 float 类型。如果 GPU 可用,将输入和标签转换为 Variable 对象并移动到 GPU 上。然后清除参数的梯度,进行前向传播,计算损失函数,反向传播,更新参数,并累加损失。最后计算训练集的平均损失并返回。

  In [ ]:
# # 训练一个 epoch 的函数
# def train_one_epoch(model, train_loader, learning_rate):
#     # 设置模型为训练模式
#     model.train()
#     # 更新学习率
#     for param_group in model.optimizer.param_groups:
#         param_group['lr'] = learning_rate
    
#     epoch_loss_train = 0.0
#     nb_train = 0
#     # 遍历训练集中的每一批数据
#     for i, data in enumerate(train_loader, 0):
#         # 获取输入和标签
#         inputs, labels = data   
#         #print("\ninputs: ", inputs.shape)
#         #print("labels: ", labels.shape)
        
#         inputs_length = inputs.size()[0]
#         nb_train += inputs_length
        
#         inputs = inputs.float()
#         labels = labels.float()
        
#         #######################
#         #  使用 GPU 进行计算  #
#         #######################
#         if torch.cuda.is_available():
#             inputs = Variable(inputs.cuda())
#             labels = Variable(labels.cuda())            
#         else:
#             inputs = Variable(inputs)
#             labels = Variable(labels)
        
#         # 清除参数的梯度
#         model.optimizer.zero_grad()
#         # 进行前向传播
#         outputs = model(inputs)
        
#         # 计算损失函数
#         loss = model.criterion(outputs, labels)
#         #print("loss: ", loss)
#         # 反向传播
#         loss.backward()
#         # 更新参数
#         model.optimizer.step()
        
#         # 累加损失
#         epoch_loss_train = epoch_loss_train + loss.item() * inputs_length
    
#     print("nb_train: ", nb_train)
#     # 计算平均损失
#     epoch_loss_train_avg = epoch_loss_train / nb_train
#     return epoch_loss_train_avg
   

五、构建评估函数:完全一致, calculate_confusion_matrix也一致

评估给定神经网络模型在给定数据集上的性能表现

  • 定义函数 evaluate(file_model, loader),它有两个输入参数:file_model 表示预训练的模型文件路径,loader 表示数据集加载器

  • 如果 CUDA 可用,则将模型 model 移动到 GPU 上,以便在 GPU 上进行计算

  • 加载预训练的模型权重,将模型 model 的参数设置为预训练的参数

  • 设置模型 model 为评估模式,即关闭 Dropout 和 Batch Normalization 等随机性操作

  • 定义变量 epoch_loss 和 nb_samples,分别用于累加每个 batch 的损失和样本数

  • 定义空列表 arr_labels、arr_labels_hyp 和 arr_prob,分别用于存储真实标签、预测标签和模型的预测概率

  • 遍历数据集 loader,对于每个 batch:

    • 获取输入数据 inputs 和标签 labels
    • 计算 inputs 的长度 inputs_length,并将其累加到 nb_samples 中
    • 将标签 labels 的维度从 (batch_size, 1) 转换为 (batch_size,),并将其转换为 numpy 数组后添加到 arr_labels 列表中
    • 将输入数据 inputs 和标签 labels 转换为 float 类型,以便于后续计算
    • 如果 CUDA 可用,则将输入数据 inputs 和标签 labels 移动到 GPU 上,以便于在 GPU 上进行计算
    • 将输入数据 inputs 作为输入,通过模型 model 进行前向传递,得到输出 outputs
    • 计算模型的损失 loss,并将其累加到变量 epoch_loss 中
    • 将模型的预测概率从 tensor 转换为 numpy 数组,并将其添加到 arr_prob 列表中
  • 打印样本总数 nb_samples 和平均损失 epoch_loss_avg

  • 计算 AUC 值,并打印输出

  • 将模型的预测概率转换为二分类标签,并将其添加到 arr_labels_hyp 列表中

  • 将真实标签 arr_labels 和预测标签 arr_labels_hyp 转换为整型,并分别存储到变量 arr_labels 和 arr_labels_hyp 中

  • 调用函数 calculate_confusion_matrix(arr_labels, arr_labels_hyp),计算准确率、混淆矩阵、灵敏度、特异度和 MCC 值,并将它们存储到变量 acc、confusion_matrix、sensitivity、specificity 和 mcc 中

函数在后面进行了定义

  • 将评估指标存储到字典 result 中,并返回该字典
  • 打印输出准确率 acc
  In [ ]:
def evaluate(file_model, loader):
    # 定义一个 EnhancerCnnModel 的实例 model
    model = EnhancerCnnModel()
    
    # 如果 CUDA 可用,则将 model 移动到 GPU 上
    if torch.cuda.is_available():
        model.cuda()
    
    # 加载预训练的模型权重
    model.load_state_dict(torch.load(file_model))
    
    # 设置模型为评估模式
    model.eval()    
    
    epoch_loss = 0.0
    nb_samples = 0
    
    arr_labels = []
    arr_labels_hyp = []
    arr_prob = []
    
    # 遍历数据集
    for i, data in enumerate(loader, 0):
        # 获取输入和标签
        inputs, labels = data
        
        # inputs 的长度
        inputs_length = inputs.size()[0]
        nb_samples += inputs_length
        
        # 将标签添加到 arr_labels 列表中
        arr_labels += labels.squeeze(1).data.cpu().numpy().tolist()

        # 将 inputs 和 labels 转换为 float 类型
        inputs = inputs.float()
        labels = labels.float()
        
        # 如果 CUDA 可用,则将 inputs 和 labels 移动到 GPU 上
        if torch.cuda.is_available():
            inputs = Variable(inputs.cuda())
            labels = Variable(labels.cuda())
        else:
            inputs = Variable(inputs)
            labels = Variable(labels)
        
        # 前向传递
        outputs = model(inputs)
        
        # 计算损失使用模型 model 的损失函数来计算该模型在给定输出 outputs 和
        # 标签 labels 下的损失
        loss = model.criterion(outputs, labels)
        
        # 累加 epoch_loss
        epoch_loss = epoch_loss + loss.item() * inputs_length
        
        # 将模型的预测结果添加到 arr_prob 列表中
        arr_prob += outputs.squeeze(1).data.cpu().numpy().tolist()
    
    # 输出样本总数和平均损失
    print("nb_samples: ", nb_samples)
    epoch_loss_avg = epoch_loss / nb_samples
    print("epoch loss avg: ", epoch_loss_avg)
        
    # 输出预测概率和真实标签的列表长度
    print("arr_prob: ", len(arr_prob))
    print("arr_labels: ", len(arr_labels))
    
    # 计算 AUC 值
    auc = metrics.roc_auc_score(arr_labels, arr_prob)
    print("auc: ", auc)
    
    # 将预测概率转换为二分类标签,阈值为 0.5
    arr_labels_hyp = [int(prob > 0.5) for prob in arr_prob]
    arr_labels = [int(label) for label in arr_labels]
    
    # 计算各种分类指标
    acc, confusion_matrix, sensitivity, specificity, mcc = calculate_confusion_matrix(arr_labels, arr_labels_hyp)
    
    # 将结果存储到字典中
    result = {'epoch_loss_avg': epoch_loss_avg,
              'acc' : acc,
              'confusion_matrix' : confusion_matrix,
              'sensitivity' : sensitivity,
              'specificity' : specificity,
              'mcc' : mcc,
              'auc' : auc,
              'arr_prob': arr_prob,
              'arr_labels': arr_labels,
              'arr_labels_hyp':arr_labels_hyp
              }
    
    # 输出 accuracy
    print("acc: ", acc)
    
    # 返回结果字典
    return result
   

定义函数 calculate_confusion_matrix(arr_labels, arr_labels_hyp)

函数的作用是计算混淆矩阵和相关的分类指标,用于评估模型在给定数据集上的性能表现。 函数的输入参数为 arr_labels 和 arr_labels_hyp,分别表示真实标签的列表和模型的预测标签的列表

混淆矩阵(confusion matrix)是一种常见的二分类模型性能评估工具,用于衡量模型的分类能力和错误分类情况。混淆矩阵的行表示真实标签,列表示模型预测的标签,每个元素表示对应的样本数。在二分类问题中,混淆矩阵通常被表示为下面的形式:

  真实情况  
  正例 反例
预测情况    
正例 TP FP
反例 FN TN

通过混淆矩阵,我们可以计算出许多有用的评价指标,例如准确率、精确率、召回率、F1 值、特异度、灵敏度、MCC 值等

  In [ ]:
def calculate_confusion_matrix(arr_labels, arr_labels_hyp):
    # 初始化变量
    corrects = 0  # 正确分类的样本数
    confusion_matrix = np.zeros((2, 2))  # 混淆矩阵

    # 遍历所有样本
    for i in range(len(arr_labels)):
        # 更新混淆矩阵
        confusion_matrix[arr_labels_hyp[i]][arr_labels[i]] += 1

        # 统计正确分类的样本数
        if arr_labels[i] == arr_labels_hyp[i]:
            corrects = corrects + 1

    # 计算分类指标
    acc = corrects * 1.0 / len(arr_labels)  # 准确率
    specificity = confusion_matrix[0][0] / (confusion_matrix[0][0] + confusion_matrix[1][0])  # 特异度
    sensitivity = confusion_matrix[1][1] / (confusion_matrix[0][1] + confusion_matrix[1][1])  # 灵敏度
    tp = confusion_matrix[1][1]  # 真正例数
    tn = confusion_matrix[0][0]  # 真反例数
    fp = confusion_matrix[1][0]  # 假正例数
    fn = confusion_matrix[0][1]  # 假反例数
    mcc = (tp * tn - fp * fn ) / math.sqrt((tp + fp)*(tp + fn)*(tn + fp)*(tn + fn))  # MCC值

    # 返回分类指标
    return acc, confusion_matrix, sensitivity, specificity, mcc
   

六、检查数据集中的样本:这里开始不同,这一步同train中的第七步

检查数据集中的每个样本是否符合要求,即是否由长度为 200 的序列组成

具体过程如下:

  • 从字典 dataset 中获取测试集 testset
  • 对测试集每个样本进行检查,如果样本长度不为 200,则将错误样本数量加 1
  • 输出检查结果,如果测试集中有错误样本,则输出错误样本数量;否则输出“OK”

这个函数的主要作用是确保数据集中的每个样本都符合要求,以避免在测试过程中出现错误。如果数据集中的样本长度不为 200,则可能会导致模型无法处理这些样本或者产生错误的预测结果。因此,在进行测试之前,需要先对数据集进行检查,以确保数据集中的每个样本都符合要求。

  In [ ]:
# 将测试集的数据和标签分别存储在 dataset 字典的 "data_test" 和 "label_test" 键下
# dataset = {"data_test" : testset["data"], "label_test" : testset["label"]}

# 定义一个函数 check_dataset,用于检查数据集是否符合要求
def check_dataset(dataset):
    print("\n==> Checking dataset")  # 打印提示信息
    
    # 检查 data_test 数据是否符合要求
    data_test = dataset["data_test"]  # 获取 data_test 数据
    nb_error_samples = 0  # 初始化错误样本数量为 0
    for sample in data_test:  # 遍历每个样本
        if len(sample) != 200:  # 如果样本长度不为 200
            nb_error_samples += 1  # 错误样本数量加 1
    if nb_error_samples > 0:  # 如果存在错误样本
        print("data_teset error: ", nb_error_samples)  # 打印错误信息和错误样本数量
    else:  # 如果不存在错误样本
        print("data_test : OK!")  # 打印测试通过的信息
   

七、读取测试文件并将其转换为模型可用的数据格式

文件名为 test_strong_enhancer.txt 或 test_weak_enhancer.txt

# 文件格式如下,每个样本包括一个样本名称和四行 DNA 序列,需要将这四行序列拼接在一起
# >Chr11_6627824_6628024
# ATGCTGCCAGAAGGAAAAGGGGTGGAATTAATGAAACTGGAAGGTTGTGGTGCTGGTTTGAGGAG
# TAAAGTATGGGGGCCAAAGTTGGCTATATGCTGGATATGAAGAGGGGGTTAATTCCTTGCAGGTC
# TTCTTGAGATAGAAGTCCAGGCCCTGAGGTGGCAGGCAGCCTGATAGTGAACAGAACCCTTGTGC
# CCATA
  In [ ]:
def read_test_file(filename):
    text_file = open(filename)  # 打开文件
    lines = text_file.readlines()  # 读取文件中所有行的内容到列表 lines 中
    my_data =[]  # 初始化空列表 my_data,用于存储处理后的数据
    
    for i in range(0, len(lines), 2):  # 每次跳两行,遍历每个样本
        # 读取第一行 DNA 序列名称并去除多余空格和换行符
        seq_name = lines[i].strip()
        # 拼接第二行 DNA 序列,并使用 strip() 方法去除多余空格和换行符,再转换为大写字母形式
        text = lines[i+1].strip().upper()
        my_data.append(text)  # 将处理后的数据添加到 my_data 列表中
    
    return my_data  # 返回处理后的数据列表 my_data
   

八、准备测试数据

调用 read_test_file 函数读取两个 DNA 序列文件 enhancer_200_process_rename_test.fasta 和 noenhancer_200_process_rename_test.fasta 中的数据,并将读取的数据拼接在一起,形成测试集。

最后,该函数返回一个字典 testset,其中包含测试集数据和标签。

  In [ ]:
def prepare_test_data():
    print("\n ==> Loading test set")  # 打印提示信息
    
    # 加载增强子数据集
    data_strong = read_test_file('./Test/enhancer_200_process_rename_test.fasta')  # 调用函数 read_test_file 读取文件
    print("data_strong: ", len(data_strong))  # 打印加载的数据集大小
    #print("data_strong: ", data_strong)

    # data_weak = read_test_file('test_weak_enhancer.txt')
    # print("data_weak: ", len(data_weak))
    
    data_enhancer = data_strong # + data_weak
    print("data_enhancer: ", len(data_enhancer))
    
    # 加载非增强子数据集
    data_non_enhancer = read_test_file('./Test/noenhancer_200_process_rename_test.fasta')  # 调用函数 read_test_file 读取文件
    print("data_non_enhancer: ", len(data_non_enhancer))  # 打印加载的数据集大小
    
    # 构建测试集
    label_enhancer = np.ones((len(data_strong),1))  # 增强子数据集对应标签为 1
    label_non_enhancer = np.zeros((len(data_non_enhancer), 1))  # 非增强子数据集对应标签为 0
    
    data = np.concatenate((data_strong, data_non_enhancer))     # 将增强子数据集和非增强子数据集拼接在一起,构成测试集
    label = np.concatenate((label_enhancer, label_non_enhancer))
    
    testset = {"data" : data, "label" : label}  # 将测试集数据和标签存储在字典 testset 中
    
    return testset  # 返回测试集数据和标签字典 testset
   

九、定义函数 testing,用于测试模型性能

这段代码实现了对模型性能的测试和评估,并将测试结果写入 CSV 文件中。

首先,该函数将测试数据集转化为 EnhancerDataset 类型,并使用 DataLoader 加载数据集,每次迭代使用 32 个样本,关闭数据集随机重排,使用 4 个工作进程。

然后,该函数创建一个 CSV 文件,用于保存测试结果。接着,该函数获取所有模型文件的文件名,并按字典序排序。对于每个模型,该函数调用 evaluate 函数对该模型进行测试,并返回测试结果。该函数将该模型预测的概率矩阵添加到 y_prob_mtx 列表中,并将该模型的测试结果写入 CSV 文件中。

接下来,该函数将 y_prob_mtx 转换为 numpy 的 ndarray 类型。对 y_prob_mtx 进行集成,计算每个样本预测为正例的概率平均值,并将概率值大于 0.5 的样本预测为正例,其余样本预测为负例。然后,该函数获取测试集的真实标签,并计算集成模型的 ROC AUC、Accuracy、Sensitivity 和 Specificity 分数。其中,ROC AUC 是一种衡量模型性能的指标,用于评估模型预测正负样本之间的优劣程度。Accuracy 表示模型预测的正确率,Sensitivity 和 Specificity 分别表示模型对正类和负类的识别能力。

最后,该函数将集成模型的 ROC AUC、Accuracy、Sensitivity 和 Specificity 分数打印出来,并将它们写入 CSV 文件中。

  In [ ]:
# 将测试数据集转化为 EnhancerDataset 类型,并使用 DataLoader 加载数据集,
# 每次迭代使用 32 个样本,关闭数据集随机重排,使用 4 个工作进程
def testing():
    test_dataset = EnhancerDataset(testset["data"], testset["label"])
    test_loader = DataLoader(dataset=test_dataset, batch_size=32,                              
                              shuffle=False, num_workers=4)
    
    # 创建一个 CSV 文件,用于保存测试结果
    with open(testresult_fn, mode='w') as outfile:
        outfile = csv.writer(outfile, delimiter=',')
        outfile.writerow(['model_fn', 'Accuracy score', 'AUC score', 'Sensitivity', 'Specificity'])
        
    # 获取所有模型文件的文件名,并按字典序排序
    list_model_fn = sorted(glob.glob(MODEL_DIR+"/enhancer_*.pkl"))
    #print(list_model_fn)
    y_prob_mtx = []  # 初始化 y_prob_mtx,用于存储每个模型所预测的概率矩阵
    
    for model_fn in list_model_fn:  # 遍历每个模型
        print(model_fn)
        result = evaluate(model_fn, test_loader)  # 调前 evaluate 函数对该模型进行测试,并返回测试结果
        #print(result['arr_prob'])
        y_prob_mtx.append(result['arr_prob'])  # 将该模型预测的概率矩阵添加到 y_prob_mtx 列表中
        
        # 将该模型的测试结果写入 CSV 文件中
        with open(testresult_fn, mode='a') as outfile:
            outfile = csv.writer(outfile, delimiter=',')
            outfile.writerow([model_fn, result['acc'], result['auc'], 
                result['sensitivity'], result['specificity']])
    
    # 将 y_prob_mtx 转换为 numpy 的 ndarray 类型
    y_prob_mtx = np.array(y_prob_mtx)
    print("y_prob_mtx: ", y_prob_mtx.shape)
    #print("y_prob_mtx: ", y_prob_mtx)
    
    # 对 y_prob_mtx 进行集成,计算每个样本预测为正例的概率平均值,并将概率值大于 0.5 的样本预测为正例,其余样本预测为负例
    y_prob_ensemble = [np.mean(y_prob_mtx[:,col]) for col in range(np.size(y_prob_mtx, 1))] 
    y_pred_ensemble = [np.float(each > 0.5) for each in y_prob_ensemble]
    
    y_true = testset["label"]  # 获取测试集的真实标签
    
    # 计算集成模型的 ROC AUC、Accuracy、Sensitivity 和 Specificity 分数
    auc_score_ensemble = metrics.roc_auc_score(y_true, y_prob_ensemble)
    accuracy_score_ensemble = metrics.accuracy_score(y_true, y_pred_ensemble)
    
    cm = metrics.confusion_matrix(y_true, y_pred_ensemble)  # 计算混淆矩阵
    specificity_ensemble = cm[0,0]/(cm[0,0] + cm[0,1])  # 计算 Specificity
    sensitivity_ensemble = cm[1,1]/(cm[1,1] + cm[1,0])  # 计算 Sensitivity

    # 将集成模型的 ROC AUC、Accuracy、Sensitivity 和 Specificity 分数打印出来
    print("Accuracy score (Testing Set) = ", accuracy_score_ensemble)
    print("ROC AUC score  (Testing Set) = ", auc_score_ensemble)
    print("Sensitivity    (Testing Set) = ", sensitivity_ensemble)
    print("Specificity    (Testing Set) = ", specificity_ensemble)
    
    # 将集成模型的 ROC AUC、Accuracy、Sensitivity 和 Specificity 分数写入 CSV 文件中
    with open(testresult_fn, mode='a') as outfile:
        outfile = csv.writer(outfile, delimiter=',')
        outfile.writerow(["ensemble", accuracy_score_ensemble, auc_score_ensemble, sensitivity_ensemble, specificity_ensemble])
   

十、运行整个程序

  In [ ]:
# if __name__== "__main__":    
#     testresult_fn = MODEL_DIR + "/test_result.csv"
    
#     testset = prepare_test_data()    
    
#     testing()

标签:inputs,layer1,ENCC,self,labels,arr,源码,mer,data
From: https://www.cnblogs.com/cauwj/p/17416899.html

相关文章

  • java基于springboot+vue的漫画网站管理系统,附源码+数据库+lw文档+PPT,适合毕业设计、课
    1、项目介绍考虑到实际生活中在漫画网站方面的需要以及对该系统认真的分析,将系统权限按管理员和用户这两类涉及用户划分。(a)管理员;管理员使用本系统涉到的功能主要有:首页、个人中心、用户管理、漫画分类管理、漫画投稿管理、分类管理、排行榜管理、交流论坛、系统管理等功能......
  • 模块与包,反序列化源码解析,drf请求响应,视图组件两个视图基类
    0模块与包的使用#模块与包 -模块:一个py文件,被别的py文件导入使用,这个py文件称之为模块,运行的这个py文件称之为脚本文件-包:一个文件夹下有__init__.py#模块与包的导入问题 '''0导入模块有相对导入和绝对导入,绝对的路径是从环境变量开始的1......
  • .NET 通过源码深究依赖注入原理
    依赖注入(DI)是.NET中一个非常重要的软件设计模式,它可以帮助我们更好地管理和组织组件,提高代码的可读性,扩展性和可测试性。在日常工作中,我们一定遇见过这些问题或者疑惑。Singleton服务为什么不能依赖Scoped服务?多个构造函数的选择机制?源码是如何识别循环依赖的?虽然我们可......
  • drf之反序列化校验源码分析 、 断言 、drf之请求和响应
    目录一、反序列化校验源码分析入口:总结:二、断言三、drf之请求3.1Request类对象的分析.data.query_params其他的属性用起来跟之前一样3.2请求,能够接受的编码格式限制只能接受某种或某几种编码格式限制方式一:在视图类上写---》只是局部视图类有效限制方式二:在配置文件中写---》全......
  • DCC32命令行方式编译delphi工程源码
    本文链接地址:http://blog.csdn.net/sushengmiyan/article/details/10284879作者:苏生米沿 一、首先找到这个可执行文件,熟悉delphi的人应该很容易就找到,打开你安装delphi的目录,如我的路径C:\ProgramFiles\Delphi_2007\bin\DCC32.EXE二、拷贝一份出来,我将其放在了我的测试目录下......
  • 2023最新OneTool多平台助手程序源码
    2023最新OneTool多平台助手程序源码开心可用版本:https://download.csdn.net/download/mo3408/87799108OneTool 是一款功能强大的多平台助手,目前最新版本为199911(1.9.1)。除此之外,该应用程序还拥有其他好玩的功能,等着您们来搭建测试。可以帮助用户快速完成各种任务。例如网......
  • APP中RN页面热更新流程-ReactNative源码分析
    平时使用WebStorm或VSCode对RN工程中的文件修改后,在键盘上按一下快捷cmd+s进行文件保存,此时当前调试的RN页面就会自动进行刷新,这是RN开发相比于原生开发一个很大的优点:热更新。那么,从按一下快捷cmd+s到RN页面展示出最新的JS页面,这个过程是怎样发生的呢?下面根据时间顺序来梳理一下......
  • Sentinel基本使用与源码分析
    系列文章目录和关于我一丶什么是SentinelSentinel官网Sentinel是面向分布式、多语言异构化服务架构的流量治理组件,主要以流量为切入点,从流量路由、流量控制、流量整形、熔断降级、系统自适应过载保护、热点流量防护等多个维度来帮助开发者保障微服务的稳定性。流量整形:限制流......
  • 东邻到家小程序|东邻到家小程序源码|东邻到家小程序开发功能
    上门服务这几年已经越来越火爆,不论是家政、按摩、美甲等等都在不断的发展上门服务,这几年东邻到家小程序系统在不断的摸索阶段,对于系统各方面的需求也在不断提升,东郊到家小程序通过线上匹配用户和技师的需求,让人们可以更加容易的享受上门服务,下面小编就给大家介绍下东邻到家小程序系......
  • RocketMQ源码(三):服务端NameSrv启动流程
    有关Namesrv的概念及功能,详见RocketMQ(三):架构设计中技术架构组成namesrv,这里不再赘述。RocketMQ中Namesrv启动入口:org.apache.rocketmq.namesrv.NamesrvStartup。Namesrv启动,NamesrvStartup#main0()核心伪代码:1publicstaticNamesrvControllermain0(String[......