1 pandas 读 csv
import torch
from torch import nn
import numpy as np
import pandas as pd
from copy import deepcopy
device = "cuda" if torch.cuda.is_available() else "cpu"
# 读 csv
data_all = pd.read_csv('./CFD_data/record_data0.csv')
# 提取某一列
colume = np.array(data_all[['colume_name']], dtype=np.float32).reshape(-1, 1)
# 提取某一个值
value = data[data['食物种类']=='主食']['卡路里'].item()
# 数据操作
c = np.concatenate([a[1:], b[:-1]], axis=1)
c = torch.cat([a, b], axis=1)
# 存 csv
c.to_csv('./CFD_data/flow_rate.csv', index=False)
2 NN 的搭建、训练与评估
搭建:使用 nn.Sequential
。
# model
NN_model = nn.Sequential(
nn.Linear(6, 256),
nn.ReLU(),
nn.Linear(256, 256),
nn.ReLU(),
nn.Linear(256, 256),
nn.ReLU(),
nn.Linear(256, 1),
)
# 优化器
optimizer = torch.optim.Adam(NN_model.parameters(), lr=0.001)
训练:
def NN_train(train_x, train_y, model, loss_fn, optimizer, epoches, batch_size, save_path):
"""
训练网络
输入:
train_x, train_y: 训练集
model: 网络模型
loss_fn: 损失函数
optimizer: 优化器
epoches: epoches 个数
batch_size: mini batch 大小
save_path: 模型保存路径
"""
# 切换到train模式
model.train()
losses = []
for epoch in range(epoches):
batch_loss = []
for start in range(0, len(train_x), batch_size): # mini batch
end = start + batch_size if start + batch_size < len(train_x) else len(train_x)
xx = torch.tensor(train_x[start:end], dtype=torch.float, requires_grad=True)
yy = torch.tensor(train_y[start:end], dtype=torch.float, requires_grad=True)
xx, yy = xx.to(device), yy.to(device) # 加载到 device
pred = model(xx) # 输入数据到模型里得到输出
loss = loss_fn(pred, yy) # 计算输出和标签的 loss
optimizer.zero_grad() # 清零
loss.backward() # 反向推导
optimizer.step() # 步进优化器
batch_loss.append(loss.data.numpy())
if epoch % max(1, epoches//8) == 0:
print(f"Training Error in epoch {epoch}: {np.mean(batch_loss):>8f}")
torch.save(model.state_dict(), save_path) # 保存模型
测试:
def NN_test(test_x, test_y, model, save_path, loss_fn):
"""
测试网络
输入:
test_x, test_y: 测试集
model: 网络模型
loss_fn: 损失函数
save_path: 模型保存路径
"""
model.load_state_dict(torch.load(save_path)) # 加载模型
model.eval() # 切换到测试模型
MSE_loss_fn = nn.MSELoss() # MSE loss function
test_loss, MSE = 0, 0 # 记录 loss 和 MSE
# 梯度截断
with torch.no_grad():
test_x, test_y = torch.tensor(test_x).to(device), torch.tensor(test_y).to(device) # 加载到 device
pred = model(test_x) # 输入数据到模型里得到输出
test_loss = loss_fn(pred, test_y).item() # 计算输出和标签的 loss
MSE = MSE_loss_fn(pred, test_y).item() # MSE
print(f"Test Error: \n Avg loss: {test_loss:>8f}, MSE: {MSE:>8f}\n")
print(f"Test Result: \n Prediction: {pred[:5]}, \n Y: {test_y[:5]}, \n diff: {test_y[:5]-pred[:5]}\n")
测试 ensemble model(平均值):
def NN_test_ensemble(test_x, test_y, loaded_model_list, loss_fn):
for model in loaded_model_list:
model.eval() # 切换到测试模型
MSE_loss_fn = nn.MSELoss() # MSE loss function
test_loss, MSE = 0, 0 # 记录 loss 和 MSE
# 梯度截断
with torch.no_grad():
test_x, test_y = torch.tensor(test_x).to(device), torch.tensor(test_y).to(device) # 加载到 device
pred = torch.zeros(test_y.shape)
for model in loaded_model_list:
pred += model(test_x) # 输入数据到模型里得到输出
pred /= len(loaded_model_list)
test_loss = loss_fn(pred, test_y).item() # 计算输出和标签的 loss
MSE = MSE_loss_fn(pred, test_y).item() # MSE
print(f"Test Error: \n Avg loss: {test_loss:>8f}, MSE: {MSE:>8f}\n")
print(f"Test Result: \n Prediction: {pred[:5]}, \n Y: {test_y[:5]}, \n diff: {test_y[:5]-pred[:5]}\n")
标签:loss,DL,存档,torch,PyTorch,pred,test,model,MSE
From: https://www.cnblogs.com/moonout/p/17172065.html