首页 > 其他分享 >[BIG2015] 2. 基于操作码序列和TextCNN分类

[BIG2015] 2. 基于操作码序列和TextCNN分类

时间:2023-12-29 20:57:05浏览次数:25  
标签:loss val list TextCNN 操作码 train BIG2015 import size

目录

导入包:

import pandas as pd
import os
import numpy as np
import matplotlib.pyplot as plt
from sklearn.feature_extraction.text import CountVectorizer          #For Bag of words
from sklearn.feature_extraction.text import TfidfVectorizer          #For TF-IDF
from sklearn.model_selection import train_test_split
from catboost import CatBoostClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score
from sklearn.preprocessing import LabelEncoder
from preprocessing.build_vocab import build_vocab
from sklearn.metrics import accuracy_score, precision_recall_fscore_support

import gensim
from gensim.models import Word2Vec     #For Word2Vec  
from gensim.corpora import Dictionary 

import os
import torch
import torch.nn as nn
import torch.optim as optim
import matplotlib.pyplot as plt
from torch.utils.data import TensorDataset, DataLoader
from tqdm import tqdm

构建词表

读取数据:

dataset_folder = "../../benchmarks/BIG2015/"
data_path = "../../benchmarks/BIG2015/opcode/opcode.csv"
data_folder = "../../benchmarks/BIG2015/opcode/"

df = pd.read_csv(data_path)
# label
label = LabelEncoder().fit_transform(df['label'])
# data
data = df['opcode']
# 填充缺失值
df.fillna("")

image.png
构建词表:

  • 使用 gensim 的 Dictionary类处理语料,输入类似 [['add', 'push'], ...],需要先将单个文档(一段话)分割为列表。
  • 因为在整个训练数据集上构建词表,没有加 unknown标记,加了 pad用于填充单个文档到固定长度。
# # 构建词表
vocab_save_path = "../../benchmarks/BIG2015/word2id.json"
opcode_seq_list_split = [seq.split() for seq in opcode_seq_list]

dct = Dictionary(opcode_seq_list_split)
print(len(opcode_seq_list_split))# 10868
print(dct) # Dictionary<735 unique tokens: ['add', 'and', 'call', 'cmp', 'db']...>
print(len(list(dct.token2id.keys()))) # 735

special_tokens = {"pad": 0}
dct.patch_with_special_tokens(special_tokens)
print(len(list(dct.token2id.keys()))) # 736

语料库中文档的长度分布:

len_list = [len(seq) for seq in opcode_seq_list_split]
len_list

plt.hist(len_list)
# plt.xticks(range(0, 10000, 100)) 


print(f"小于 1000 的元素占比为: {(sum(1 for value in len_list if value < 1000) / len(len_list)) :.2f}")
print(f"小于 10000 的元素占比为: {sum(1 for value in len_list if value < 10000 )/len(len_list):.2f}")

image.png

以 json 格式 保存词汇表:

import json
with open(vocab_save_path, "w") as file:
    json.dump(dct.token2id, file, indent=4)

构建整数索引语料

使用doc2idx 将语料转换为整数索引语料,之后对每句话填充或者截断到固定长度。

int_opcode_list = [] # 保存整数索引语料
desired_size = 1000 # 序列长度
for opcode in opcode_seq_list_split:
    int_opcode = dct.doc2idx(opcode)
    if len(int_opcode) < 1000: # 文档长度小于1000, 用pad对应的整数索引填充到1000
        int_opcode += [0] * (desired_size - len(int_opcode)) # 填充
        int_opcode_list.append(int_opcode)
    elif len(int_opcode) >= 1000: # 文档长度大于1000, 截断到1000
        int_opcode_list.append(int_opcode[0:1000]) # 截断
print(len(int_opcode_list))

保存标签和转换后的数据:

np.save(os.path.join(data_folder, "opcode_int_top1000.npy"), np.array(int_opcode_list))
np.save(os.path.join("../../benchmarks/BIG2015/", "label.npy"), np.array(label))

构建 dataset 和 dataloader

读取保存的数据并划分训练集、验证集和测试集,数据划分比例为 4:4:2。

# 读取数据
data = np.load(os.path.join(data_folder, "opcode_int_top1000.npy"))
label = np.load(os.path.join(dataset_folder, "label.npy"))
print(data.shape, label.shape) # (12695, 1000) (10868,)

# 划分训练集/验证集和测试集
x_train, x_temp, y_train, y_temp = train_test_split(data, label, test_size=0.2, random_state=42) # 4:4:2
x_test, x_val, y_test, y_val = train_test_split(x_temp, y_temp, test_size=0.5, random_state=42)
print(x_train.shape, x_val.shape, x_test.shape)

构建 dataloader:

# dataloader
def build_dataloader(x_train=None, y_train=None, x_val=None, y_val=None, x_test=None, y_test=None):
    """获取dataloader
    """
    batch_size = 32
    y_train, y_test, y_val = y_train.reshape(-1), y_test.reshape(-1), y_val.reshape(-1)
    train_set = TensorDataset(torch.from_numpy(x_train), torch.from_numpy(y_train))
    train_loader = DataLoader(
        dataset=train_set,
        batch_size=batch_size,
        shuffle=True,
        num_workers=1,
    )
    val_set = TensorDataset(torch.from_numpy(x_val), torch.from_numpy(y_val))
    val_loader = DataLoader(
        dataset=val_set,
        shuffle=True,
        batch_size=batch_size,
        num_workers=1,
    )
    test_set = TensorDataset(torch.from_numpy(x_test), torch.from_numpy(y_test))
    test_loader = DataLoader(
        dataset=test_set,
        shuffle=True,
        batch_size=batch_size,
        num_workers=1,
    )
    return train_loader, val_loader, test_loader

构建训练函数和推理函数

在训练函数和推理函数中:

  • 得到所有预测结果后和真实标签计算 accuracy, precision, recall, f1
device = f'cuda:{0}' if torch.cuda.is_available()  else 'cpu' 
def train(epoch, model, train_loader, optimizer, criterion):
    """训练函数

    Args:
        epoch (_type_): _description_
        model (_type_): _description_
        train_loader (_type_): _description_
        optimizer (_type_): _description_
        criterion (_type_): _description_

    Returns:
        _type_: _description_
    """
    model.train()
    
    total_samples = 0
    total_accuracy = 0
    train_loss = 0
    predictions_all = []
    labels_all = []
    for batch, labels in tqdm(train_loader, ncols=100, desc=f"epoch: {epoch},   training"):
        batch, labels = batch.to(device), labels.to(device)
        optimizer.zero_grad()
        
        outputs = model(batch)
        loss = criterion(outputs, labels)
        # print(outputs, labels)
        loss.backward()
        optimizer.step()
        
        train_loss += loss * batch.size(0)
        _, predictions = outputs.max(1)
        # accuracy = predictions.eq(labels).sum().item()
        # total_accuracy += accuracy
        # 收集所有预测和标签以便后续计算
        predictions_all.extend(predictions.cpu().numpy())
        labels_all.extend(labels.cpu().numpy())
        
        total_samples += labels.size(0)
       
    # accuracy = total_accuracy  / total_samples
    accuracy = accuracy_score(predictions_all, labels_all)
    train_loss = train_loss / total_samples
    precision, recall, f1, _ = precision_recall_fscore_support(labels_all, predictions_all, average='macro', zero_division=0)
    return train_loss, accuracy, precision, recall, f1  
def val(epoch, model, val_loader, criterion):
    model.eval()
    
    total_samples = 0
    total_accuracy = 0
    val_loss = 0
    predictions_all = []
    labels_all = []
    with torch.no_grad():
        for batch, labels in tqdm(val_loader, desc=f"epoch: {epoch}, validating", ncols=100):
            batch, labels = batch.to(device), labels.to(device)
            
            outputs = model(batch)
            loss = criterion(outputs, labels)
            
            val_loss += loss * batch.size(0)
            _, predictions = outputs.max(1)
            # accuracy = predictions.eq(labels).sum().item()
            # total_accuracy += accuracy
            # 收集所有预测和标签以便后续计算
            predictions_all.extend(predictions.cpu().numpy())
            labels_all.extend(labels.cpu().numpy())
            
            total_samples += labels.size(0)
            
        # accuracy = total_accuracy  / total_samples
        accuracy = accuracy_score(predictions_all, labels_all)
        val_loss = val_loss / total_samples
        precision, recall, f1, _ = precision_recall_fscore_support(labels_all, predictions_all, average='macro', zero_division=0)
        
        return val_loss,accuracy, precision, recall, f1

构建 TextCNN 模型:

import torch
import torch.nn as nn
import torch.nn.functional as F
class TextCNN(nn.Module):
    def __init__(self, vocab_size, embedding_dim, num_filters, filter_sizes, output_dim, dropout):
        super(TextCNN, self).__init__()
        
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        
        self.convs = nn.ModuleList([
            nn.Conv1d(in_channels=embedding_dim, 
                      out_channels=num_filters, 
                      kernel_size=fs)
            for fs in filter_sizes
        ])
        
        self.fc = nn.Linear(len(filter_sizes) * num_filters, output_dim)
        
        self.dropout = nn.Dropout(dropout)

    def forward(self, text):
        # text.shape = [batch_size, seq_len]

        embedded = self.embedding(text)
        # embedded.shape = [batch_size, seq_len, embedding_dim]

        embedded = embedded.permute(0, 2, 1)
        # embedded.shape = [batch_size, embedding_dim, seq_len]

        conved = [F.relu(conv(embedded)) for conv in self.convs]
        # conved[i].shape = [batch_size, num_filters, *]

        pooled = [F.max_pool1d(conv, conv.shape[2]).squeeze(2) for conv in conved]
        # pooled[i].shape = [batch_size, num_filters]

        cat = self.dropout(torch.cat(pooled, dim=1))
        # cat.shape = [batch_size, len(filter_sizes) * num_filters]

        output = self.fc(cat)
        # output.shape = [batch_size, output_dim]

        return output

画图函数:

def plot_data(save_path, list1, list2, label="Loss"):
    # 创建一个数组,表示每个epoch的位置
    x = range(1, len(list1) + 1)

    # 创建一个折线图,使用epoch作为x轴,损失作为y轴
    plt.plot(x, list1, label=f'Train {label}') # 绘制训练损失曲线,添加标签
    plt.plot(x, list2, label=f'Validate {label}') # 绘制验证损失曲线,添加标签
    plt.legend() # 显示图例
    plt.title(f'{label} Curve') # 设置标题
    plt.xlabel('Epoch') # 设置x轴标签
    plt.ylabel(f'{label}') # 设置y轴标签
    # 保存图形到本地文件
    plt.savefig(os.path.join(save_path, f'{label}_curve.png'), format='png') # 指定文件名和格式
    # 显示图形
    plt.show()
    plt.close()

训练、推理和结果分析

训练和验证代码:

  • 使用 GPU 的情况下,训练时间较短,在训练完成后可以加上推理代码。
epochs = 10
output_dir = "../outputs/big2015/"
vocab_size = 736
embedding_dim = 128
num_filters = 300  # 设定卷积核数量为 300
filter_sizes = [3, 4, 5, 6]  # 三个不同大小的卷积核: 2, 3, 4
output_dim = 9  # 输出类别数为 5,可以根据你的任务设定
dropout = 0.2  # 设定 Dropout 概率为 0.5,可以根据需求调整

# data
train_loader, val_loader, test_loader = build_dataloader(x_train, y_train, x_val, y_val, x_test, y_test)

# model
model = TextCNN(vocab_size, embedding_dim, num_filters, filter_sizes, output_dim, dropout)
model.to(device)
print(model)

# loss
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# train
train_loss_list, train_acc_list, val_loss_list, val_acc_list = [],[],[],[]
best_epoch = 0
best_val_loss = float('inf')
best_val_acc= float('inf')
patience = 5
for epoch in range(1, epochs):
    train_loss, train_acc, train_precision, train_recall, train_f1 = train(epoch, model, train_loader, optimizer, criterion)  
    print('Epoch: {}, Train Loss: {:.4f}, Train Acc: {:.2f}%, Train Precision: {:.2f}%, Train Recall: {:.2f}%, Train F1: {:.2f}%'.format((epoch), train_loss, 100*train_acc, 100*train_precision, 100*train_recall, 100*train_f1))
    
    val_loss, val_acc, val_precision, val_recall, val_f1  = val(epoch, model, val_loader, criterion)
    print('Epoch: {}, Val Loss: {:.4f}, Val Acc: {:.2f}%, Val Precision: {:.2f}%, Val Recall: {:.2f}%, Val F1: {:.2f}%, (Best Val Acc: {:.2f}%)'.format(epoch, val_loss, 100*val_acc, 100*val_precision, 100*val_recall, 100*val_f1, 100*best_val_acc))
    # 检查是否进行早停
    if best_val_loss > val_loss :
        best_epoch = epoch
        # ! 根据最小loss记录对应的acc作为best_val_acc
        best_val_acc = val_acc
        best_val_loss = val_loss
        wait = 0  # 重置等待次数
        
        # 保存模型
        save_dir = os.path.join(output_dir, "test")
        os.makedirs(save_dir, exist_ok=True)
        print(f"--> save model success: {save_dir}")
        torch.save(model.state_dict(), os.path.join(save_dir, "textcnn.pth"))
    else:
        wait += 1  # 没有改善,等待次数加1
        if wait >= patience:
            print(f'Early stopping at epoch {epoch}...')
            # exit(0)  # 达到等待次数上限,停止训练

    print(f"--> Best Epoch: {best_epoch}, Best Val Acc: {best_val_acc}, Best Val Loss: {best_val_loss}")
    train_loss_list.append(train_loss.cpu().detach().numpy())
    train_acc_list.append(train_acc)
    val_loss_list.append(val_loss.cpu().detach().numpy())
    val_acc_list.append(val_acc)

    print("-------------------------------------------------")
plot_data(os.path.join(output_dir, "test"), train_acc_list, val_acc_list, label="Acc")
plot_data(os.path.join(output_dir, "test"), train_loss_list, val_loss_list, label='Loss')

image.png

image.png

  • 第 7 个 epoch 开始有过拟合倾向,但总体过拟合不严重。
  • 限制长度为 1000,使用 TextCNN,验证精度可以达到 97+。
  • 提取操作码时,没有去除 dd dw这类数据定义指令,部分操作码序列长度很长,后续可以考虑去掉这类指令,并合并处理语义相近的指令,可以进一步缩小序列长度,实验效果是否能有提升需要再验证。
  • 一个非常有挑战性的问题,指令的参数是否能帮助检测/分类任务。

标签:loss,val,list,TextCNN,操作码,train,BIG2015,import,size
From: https://www.cnblogs.com/handsome6/p/17935644.html

相关文章

  • 【组成原理-指令】扩展操作码的树形解法
    仿照哈夫曼树(或前缀编码,Prefix-free)的解法,目前先不解释具体怎么画了,直接放例题,大家自己慢慢品味吧。【例1】某指令系统指令长16位,操作码字段为4位,地址码字段为4位,采用扩展操作码技术,形成三地址指令15条、二地址指令15条、一地址指令15条、零地址指令16条。【解......
  • TextRCNN、TextCNN、RNN…你都掌握了吗?一文总结文本分类必备经典模型(一)
     本专栏将逐一盘点自然语言处理、计算机视觉等领域下的常见任务,并对在这些任务上取得过SOTA的经典模型逐一详解。前往SOTA!模型资源站(sota.jiqizhixin.com)即可获取本文中包含的模型实现代码、预训练模型及API等资源。本文将分3期进行连载,共介绍 20 个在文本分类任务上......
  • TextCNN、DCNN、AttentionXML…你都掌握了吗?一文总结文本分类必备经典模型(二)
    https://mp.weixin.qq.com/s/f5SkoWD4BY_HDWfPi5R5ng 本专栏将逐一盘点自然语言处理、计算机视觉等领域下的常见任务,并对在这些任务上取得过SOTA的经典模型逐一详解。前往SOTA!模型资源站(sota.jiqizhixin.com)即可获取本文中包含的模型实现代码、预训练模型及API等资源。本......
  • TextCNN和TextRNN:原理与实践
    1.TextCNN原理CNN的核心点在于可以捕获信息的局部相关性,具体到文本分类任务中可以利用CNN来提取句子中类似N-Gram的关键信息。(1)一维卷积:使用不同尺寸的kernel_size来模拟语言模型中的N-Gram,提取句子中的信息。即TextCNN中的卷积用的是一维卷积,通过不同kernel_size的滤波器获取......
  • 扩展操作码
    扩展操作码指令格式:定长指令字结构+可变长操作码。设计操作码指令格式时,必须关注:不允许短码是长码的前缀,就是短的操作码不能与长的操作码的前面部分的代码相同。数据结构:哈夫曼树是一个左子节点永远小于右子节点的二叉树,哈夫曼编码是一种数据压缩手段,该原理就是:假设向左......
  • 操作码序列
    操作码序列通常对PE格式文件(.exe文件等),用IDAPro反汇编得到对应的asm(包含汇编代码)文件。从asm文件中可以提取操作码、函数调用等信息作为特征训练机器学习和深度......
  • 【特征】操作码序列
    【特征】操作码序列通常对PE格式文件(.exe文件等),用IDAPro反汇编得到对应的asm(包含汇编代码)文件。从asm文件中可以提取操作码、函数调用等信息作为特征训练机器学......
  • 【小结】操作码序列
    【小结】操作码序列通常对PE格式文件(.exe文件等),用IDAPro反汇编得到对应的asm(包含汇编代码)文件。从asm文件中可以提取操作码、函数调用等信息作为特征训练机器学......
  • 10.4 汇编语言的语法是“操作码+操作数”
     在汇编语言中,1行表示对CPU的一个指令。汇编语言指令的语法结构是操作码+操作数(或只有操作码没有操作数的指令)。 能够使用任何形式的操作码,是由CPU的种类决定的。 ......
  • 10.4汇编语言的语法是“操作码+操作数”
       在汇编语言中,1行表示对CPU的一个指令。汇编语言指令的语法结构是操作码+操作数(也存在只有操作码没有操作数的指令)。   能够使用任何形式的操作码,是由CPU的......