import os os.environ["CUDA_VISIBLE_DEVICES"] = "2" import tensorflow as tf from sklearn.model_selection import train_test_split from transformers import BertTokenizer, TFBertModel from transformers import RobertaTokenizer, TFRobertaModel import pandas as pd from random import shuffle from sklearn.metrics import confusion_matrix, f1_score import numpy as np import random # 设置 Python 的随机种子 seed_value = 42 np.random.seed(seed_value) random.seed(seed_value) # 设置 TensorFlow 的全局随机种子 tf.random.set_seed(seed_value) os.environ['TF_DETERMINISTIC_OPS'] = '1' # 加载预训练的BERT模型和tokenizer bert_model_name = './bert' tokenizer = BertTokenizer.from_pretrained(bert_model_name) bert_model = TFBertModel.from_pretrained(bert_model_name) # 计算详细指标 def action_recall_accuracy(y_pred, y_true): cm = confusion_matrix(y_true, y_pred) # 计算每个类别的准确率和召回率 num_classes = cm.shape[0] accuracy = [] recall = [] for i in range(num_classes): # 计算准确率:预测正确的样本数 / 实际属于该类别的样本数 acc = cm[i, i] / sum(cm[i, :]) accuracy.append(acc) # 计算召回率:预测正确的样本数 / 预测为该类别的样本数 rec = cm[i, i] / sum(cm[:, i]) recall.append(rec) # 打印结果 for i in range(num_classes): print(f"类别 {i} 的准确率: {accuracy[i]:.3f}") print(f"类别 {i} 的召回率: {recall[i]:.3f}") scores = [] for i in range(num_classes): # 计算F1分数 f1 = f1_score(y_true, y_pred, average=None)[i] scores.append(f1) # 打印F1分数 print(f"类别 {i} 的F1分数: {scores[i]:.3f}") # 打印各类别F1-score的平均值 average_f1 = sum(scores) / len(scores) print(f"各类别F1-score的平均值: {average_f1:.3f}") # 定义输入处理函数 def encode_texts(query, title, tokenizer, max_length=128): encoded_dict = tokenizer.encode_plus( query, title, add_special_tokens=True, # 添加 [CLS], [SEP] 等标记 max_length=max_length, padding='max_length', truncation=True, return_attention_mask=True, return_tensors='tf' # 返回 TensorFlow 张量 ) return encoded_dict['input_ids'], encoded_dict['attention_mask'] # 构建模型 def build_model(bert_model, num_features): input_ids = tf.keras.layers.Input(shape=(128,), dtype=tf.int32, name='input_ids') attention_mask = tf.keras.layers.Input(shape=(128,), dtype=tf.int32, name='attention_mask') bert_output = bert_model(input_ids, attention_mask=attention_mask) cls_output = bert_output.last_hidden_state[:, 0, :] # 取出 [CLS] 向量 dense2 = tf.keras.layers.Dense(16, activation='relu')(cls_output) # 数值类特征输入层 numeric_input = tf.keras.layers.Input(shape=(num_features,), dtype=tf.float32, name='numeric_features') # 拼接 BERT 输出与数值类特征 concatenated = tf.keras.layers.Concatenate()([numeric_input, dense2]) # DNN 层 dense3 = tf.keras.layers.Dense(128, activation='relu')(concatenated) dense4 = tf.keras.layers.Dense(64, activation='relu')(dense3) dense5 = tf.keras.layers.Dense(32, activation='relu')(dense4) output = tf.keras.layers.Dense(1, activation='sigmoid')(dense5) # 二分类问题用 sigmoid 激活 model = tf.keras.Model(inputs=[input_ids, attention_mask, numeric_input], outputs=output) model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=2e-5), loss='binary_crossentropy', metrics=['accuracy', tf.keras.metrics.AUC(name='auc')]) return model # 读取数据集 def load_dataset(file_path, tokenizer, max_length=128): queries = [] titles = [] labels = [] numeric_features = [] data = pd.read_csv(file_path) all_data = [] for _, row in data.iterrows(): query = row['query'] title = row['title'] label = int(row["label"]) features = row.iloc[2:-1].values.astype(float) # 提取数值类特征 all_data.append([query, title, label, features]) shuffle(all_data) for item in all_data: query, title, label, features = item queries.append(query) titles.append(title) labels.append(label) numeric_features.append(features) input_ids_list = [] attention_mask_list = [] for query, title in zip(queries, titles): input_ids, attention_mask = encode_texts(query, title, tokenizer, max_length) input_ids_list.append(input_ids) attention_mask_list.append(attention_mask) input_ids = tf.concat(input_ids_list, axis=0) attention_masks = tf.concat(attention_mask_list, axis=0) labels = tf.convert_to_tensor(labels) numeric_features = np.array(numeric_features) return {'input_ids': input_ids, 'attention_mask': attention_masks, 'numeric_features': numeric_features}, labels # 加载训练和测试数据 train_data, train_labels = load_dataset("train_new.csv", tokenizer) test_data, test_labels = load_dataset('test_seo_124.csv', tokenizer) # 将TensorFlow张量转换为numpy数组 train_input_ids_np = train_data['input_ids'].numpy() train_attention_masks_np = train_data['attention_mask'].numpy() train_numeric_features_np = train_data['numeric_features'] train_labels_np = train_labels.numpy() # 将训练数据进一步划分为训练集和验证集 train_input_ids, val_input_ids, train_attention_masks, val_attention_masks, train_numeric_features, val_numeric_features, train_labels, val_labels = train_test_split( train_input_ids_np, train_attention_masks_np, train_numeric_features_np, train_labels_np, test_size=0.01, random_state=42, shuffle=False) # 将numpy数组转换回TensorFlow张量 train_inputs = { 'input_ids': tf.convert_to_tensor(train_input_ids), 'attention_mask': tf.convert_to_tensor(train_attention_masks), 'numeric_features': tf.convert_to_tensor(train_numeric_features) } val_inputs = { 'input_ids': tf.convert_to_tensor(val_input_ids), 'attention_mask': tf.convert_to_tensor(val_attention_masks), 'numeric_features': tf.convert_to_tensor(val_numeric_features) } train_labels = tf.convert_to_tensor(train_labels) val_labels = tf.convert_to_tensor(val_labels) # 模型实例化 model = build_model(bert_model, num_features=train_numeric_features_np.shape[1]) model.summary() # 计算类权重以强调准确性 neg_weight = 1.0 pos_weight = 0.5 # 使正类样本的权重较低,减少召回率 class_weight = {0: neg_weight, 1: pos_weight} # 训练模型 epochs = 1 batch_size = 32 true_labels = pd.read_csv('test_seo_124.csv')['label'].astype('int32') for epoch in range(epochs): print(f"Epoch {epoch + 1}/{epochs}") history = model.fit( x={ 'input_ids': train_inputs['input_ids'], 'attention_mask': train_inputs['attention_mask'], 'numeric_features': train_inputs['numeric_features'] }, y=train_labels, validation_data=( { 'input_ids': val_inputs['input_ids'], 'attention_mask': val_inputs['attention_mask'], 'numeric_features': val_inputs['numeric_features'] }, val_labels ), epochs=1, # 每次只训练一个 epoch batch_size=batch_size, shuffle=False # class_weight=class_weight # 调整类别权重 ) # 基于测试数据集进行评估 loss, accuracy, auc = model.evaluate(test_data, test_labels) print(f"Test loss: {loss}, Test accuracy: {accuracy}, Test AUC: {auc}") # 调整决策阈值 threshold = 0.5 # 调高阈值以减少 False Positives 提升准确度 # 计算精确率和召回率 predictions = model.predict(test_data) pred_labels = [int(i > threshold) for i in predictions[:, 0]] true_labels = list(np.array(true_labels)) action_recall_accuracy(pred_labels, true_labels)
标签:bert,attention,features,代码,dnn,ids,train,tf,input From: https://www.cnblogs.com/qiaoqifa/p/18260561