毕设进展
目前集中在写一个公共的量化模板
lenet_mnist
未量化跟量化都训练成功。
cifar10_vgg16
未量化的模型甚至都训练不出来,代码我放目录里,我尝试print了几层,似乎它的初始化效果是不管什么输入进去,经过一层一层的传递,输出会越来越趋近于相同,最后的输出基本一样。
imagenet_vgg16
未量化模型通过官网的模型修改层名得到,量化模型目前训练一层,准确率降低到0.001左右,模板层面应该存在问题。
附录
量化模板(以lenet为例,没加特殊要求)
只要原来的模型以layer1,layer2,..顺序命名每一次,理论上继承的模型代码一点都不用变。
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import Function
import torchvision
import torchvision.transforms as tf
import os
from torch.utils.data import DataLoader
from torchsummary import summary
import matplotlib.pyplot as plt
train_data = torchvision.datasets.MNIST(root='./dataset',train=True,transform=tf.ToTensor(),download=True)
test_data = torchvision.datasets.MNIST(root='./dataset',train=False,transform=tf.ToTensor(),download=True)
print('训练数据个数:%d,测试数据个数%d'%(len(train_data),len(test_data)))
batch_size = 64
train_iter = DataLoader(train_data,batch_size=batch_size,shuffle=True,num_workers=0)
test_iter = DataLoader(test_data,batch_size=batch_size,shuffle=False,num_workers=0)
def dequantize(tq, qt):
return tq / qt
max_value={}
max_value['input']={}
max_value['weight']={}
max_value['bias']={}
max_value['output']={}
def quantize(tf, signed, part, layer=0):
tf_max = torch.max(torch.abs(tf))
if(layer in max_value[part]):
tf_max = max_value[part][layer] if (max_value[part][layer]>tf_max) else tf_max
max_value[part][layer]=tf_max
if signed:
qt = 255/2/tf_max
else:
qt = 255/tf_max
tq = tf * qt
if signed:
tq.clamp_(-128,127).round_()
else:
tq.clamp_(0, 255).round_()
return tq, qt
def print_size_of_model(model):
torch.save(model.state_dict(), "temp.p")
print('Size (MB):', os.path.getsize("temp.p")/1e6)
os.remove('temp.p')
class LeNet(nn.Module):
def __init__(self):
super(LeNet, self).__init__()
self.layer1=nn.Conv2d(in_channels=1, out_channels=6, kernel_size=5, stride=1, padding=0)
self.layer2=nn.ReLU()
self.layer3=nn.MaxPool2d(kernel_size=2, stride=2, padding=0)
self.layer4=nn.Conv2d(in_channels=6, out_channels=16, kernel_size=5, stride=1, padding=0)
self.layer5=nn.ReLU()
self.layer6=nn.MaxPool2d(kernel_size=2, stride=2, padding=0)
self.layer7=nn.Flatten()
self.layer8=nn.Linear(in_features=256, out_features=120)
self.layer9=nn.ReLU()
self.layer10=nn.Linear(in_features=120, out_features=84)
self.layer11=nn.ReLU()
self.layer12=nn.Linear(in_features=84, out_features=10)
self.layer_number=12
def forward(self,x):
for i in range(1,self.layer_number+1):
module = getattr(self,"layer"+str(i))
x = module(x)
out = x
return out
class Fakequant(Function):
def forward(self, tf, signed,part,layer):
tq, qt= quantize(tf, signed, part, layer)
dtf = dequantize(tq, qt)
return dtf
def backward(self, grad_output):
return grad_output,None,None,None
class qat_LeNet(LeNet):
def __init__(self):
super(qat_LeNet, self).__init__()
for layer in range(1,self.layer_number+1):
module = getattr(self,"layer"+str(layer))
if(module.__class__.__name__ in ['Conv2d','Linear']):
setattr(self,'layer'+str(layer)+'_qw',0)
setattr(self,'layer'+str(layer)+'_qb',0)
setattr(self,'layer'+str(layer)+'_qy',0)
setattr(self,'layer'+str(layer)+'_qx',0)
def forward(self,x):
for layer in range(1,self.layer_number+1):
module = getattr(self,"layer"+str(layer))
if(module.__class__.__name__=='Conv2d'):
_,x_factor=quantize(x, False, 'input', layer)
x = Fakequant.apply(x, False, 'input', layer)
setattr(self,'layer'+str(layer)+'_qx',x_factor)
x = F.conv2d(x, Fakequant.apply(module.weight,True,'weight',layer), \
Fakequant.apply(module.bias,True,'bias',layer), stride=module.stride, \
padding=module.padding, dilation=module.dilation)
x = F.relu(x)
_,y_factor=quantize(x, False, 'output', layer)
x = Fakequant.apply(x, False,'output',layer)
setattr(self,'layer'+str(layer)+'_qy',y_factor)
elif(module.__class__.__name__=='Linear'):
_,x_factor=quantize(x, False, 'input', layer)
x = Fakequant.apply(x, False, 'input', layer)
setattr(self,'layer'+str(layer)+'_qx',x_factor)
x = F.linear(x, Fakequant.apply(module.weight, True, 'weight', layer), \
Fakequant.apply(module.bias, True , 'bias', layer))
x = F.relu(x)
_,y_factor=quantize(x, False, 'output', layer)
x = Fakequant.apply(x, False,'output',layer)
setattr(self,'layer'+str(layer)+'_qy',y_factor)
else:
x = module(x)
return x
def save_qat(self):
for layer in range(1,self.layer_number+1):
module = getattr(self,"layer"+str(layer))
if(module.__class__.__name__ in ['Conv2d','Linear']):
module.weight.data, qw = quantize(module.weight.data, True, 'weight', layer)
module.bias.data, qb = quantize(module.bias.data, True, 'bias', layer)
setattr(self,'layer'+str(layer)+'_qw',qw)
setattr(self,'layer'+str(layer)+'_qb',qb)
def qat_forward(self,x):
for layer in range(1,self.layer_number+1):
module = getattr(self,"layer"+str(layer))
if(module.__class__.__name__ =='Conv2d'):
x = F.conv2d(x, module.weight.data/getattr(self,'layer'+str(layer)+'_qw'), \
module.bias.data/getattr(self,'layer'+str(layer)+'_qb'), stride=module.stride, \
padding=module.padding, dilation=module.dilation)
x = x*getattr(self,'layer'+str(layer)+'_qy')/getattr(self,'layer'+str(layer)+'_qx')
x = F.relu(x)
x.clamp_(0, 255).round_()
elif(module.__class__.__name__ =='Linear'):
x = F.linear(x, module.weight.data/getattr(self,'layer'+str(layer)+'_qw'), \
module.bias.data/getattr(self,'layer'+str(layer)+'_qb'))
x = x*getattr(self,'layer'+str(layer)+'_qy')/getattr(self,'layer'+str(layer)+'_qx')
x = F.relu(x)
x.clamp_(0, 255).round_()
else:
x = module(x)
out = x
return out
def train(net, device, trainloader, qtrain_epoch, lr, save_dir):
net = net.to(device)
net.train()
optimizer = torch.optim.SGD(net.parameters(), lr=lr, momentum=0.9)
criterion = nn.CrossEntropyLoss()
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size = 20, gamma = 0.1)
for epoch in range(qtrain_epoch):
running_loss = 0.0
total = 0.0
for i, data in enumerate(trainloader, 0):
inputs, labels = data
inputs, labels = inputs.to(device), labels.to(device)
outputs = net.forward(inputs)
optimizer.zero_grad()
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
running_loss += loss.item()
total += labels.size(0)
# scheduler.step()
print("epoch: {}, loss: {}".format(epoch+1, running_loss/total))
if not os.path.isdir(save_dir):
os.mkdir(save_dir)
torch.save(net.state_dict(), save_dir+'/checkpoint_quant_epoch_{}.pkl'.format(epoch+1))
return net
def eval1(net, device, testloader, save_file):
net = net.to(device)
net.eval()
running_corrects = 0.0
total = 0.0
with torch.no_grad():
for i, data in enumerate(testloader):
inputs, labels = data
inputs, labels = inputs.to(device), labels.to(device)
outputs = net.forward(inputs)
_, preds = torch.max(outputs, 1)
running_corrects += torch.sum(preds==labels.data)
total += labels.size(0)
print('测试集上准确率%.6f' % (running_corrects/total))
torch.save(net.state_dict(), save_file)
return net
def eval2(net, device, testloader, save_file):
net = net.to(device)
net.eval()
running_corrects = 0.0
total = 0.0
with torch.no_grad():
for i, data in enumerate(testloader):
inputs, labels = data
inputs, labels = inputs.to(device), labels.to(device)
inputs,_ = quantize(inputs, False, 'input',1)
outputs = net.qat_forward(inputs)
_, preds = torch.max(outputs, 1)
running_corrects += torch.sum(preds==labels.data)
total += labels.size(0)
print('测试集上准确率%.6f' % (running_corrects/total))
torch.save(net, save_file)
return net
def main():
#net = qat_LeNet()
net = torch.load('test.pkl')
device = torch.device("cuda:0")
#net = train(net, device, train_iter, 5,0.01, "qat_epoch_pkl")
#net.save_qat()
#print(net.layer1.weight.data)
eval2(net, device, test_iter, './test.pkl')
if __name__ == '__main__':
main()
cifar10_vgg16模型未量化代码
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import Function
import torchvision
import torchvision.transforms as transforms
import os
import _pickle as cPickle
transform_train = transforms.Compose(
[
transforms.RandomCrop(32, padding=4),
transforms.RandomHorizontalFlip(),
transforms.ToTensor(), # converting images to tensor
transforms.Normalize(mean = (0.5, 0.5, 0.5), std = (0.5, 0.5, 0.5))
]) # 灏嗗浘鐗囨湁[0,1]杞垚[-1,1]
transform_test = transforms.Compose(
[
transforms.ToTensor(), # converting images to tensor
transforms.Normalize(mean = (0.5, 0.5, 0.5), std = (0.5, 0.5, 0.5))
]) # 灏嗗浘鐗囨湁[0,1]杞垚[-1,1]
trainset = torchvision.datasets.CIFAR10(root='./data',
train=True,
download=False,
transform=transform_train)
trainloader = torch.utils.data.DataLoader(trainset,
batch_size=256,
shuffle=True,
num_workers=8)
testset = torchvision.datasets.CIFAR10(root='./data',
train=False,
download=False,
transform=transform_test)
testloader = torch.utils.data.DataLoader(testset,
batch_size=256,
shuffle=False,
num_workers=8)
print('训练数据个数:%d,测试数据个数%d'%(len(trainset),len(testset)))
class Cifar10Net(nn.Module):
def __init__(self):
super(Cifar10Net,self).__init__()
self.layer1 = nn.Conv2d(3,64,3,1,1)
self.layer2 = nn.ReLU(inplace=True)
self.layer3 = nn.Conv2d(64,64,3,1,1)
self.layer4 = nn.ReLU(inplace=True)
self.layer5= nn.Conv2d(64,64,3,1,1)
self.layer6= nn.ReLU(inplace=True)
self.layer7= nn.Conv2d(64,64,3,1,1)
self.layer8= nn.ReLU(inplace=True)
self.layer9= nn.MaxPool2d((2,2))
self.layer10= nn.Conv2d(64,64,3,1,1)
self.layer11= nn.ReLU(inplace=True)
self.layer12= nn.Conv2d(64,64,3,1,1)
self.layer13= nn.ReLU(inplace=True)
self.layer14= nn.Conv2d(64,64,3,1,1)
self.layer15= nn.ReLU(inplace=True)
self.layer16= nn.Conv2d(64,128,3,1,1)
self.layer17= nn.ReLU(inplace=True)
self.layer18= nn.MaxPool2d((2,2))
self.layer19= nn.Conv2d(128,128,3,1,1)
self.layer20= nn.ReLU(inplace=True)
self.layer21= nn.Conv2d(128,128,3,1,1)
self.layer22= nn.ReLU(inplace=True)
self.layer23= nn.Conv2d(128,128,3,1,1)
self.layer24= nn.ReLU(inplace=True)
self.layer25= nn.MaxPool2d((2,2))
self.layer26= nn.Conv2d(128,128,3,1,1)
self.layer27= nn.ReLU(inplace=True)
self.layer28= nn.Conv2d(128,128,3,1,1)
self.layer29= nn.ReLU(inplace=True)
self.layer30= nn.Conv2d(128,256,3,1,1)
self.layer31= nn.ReLU(inplace=True)
self.layer32= nn.Conv2d(256,256,3,1,1)
self.layer33= nn.ReLU(inplace=True)
self.layer34= nn.MaxPool2d((2,2))
self.layer35= nn.Flatten()
self.layer36= nn.Linear(1024,256)
self.layer37= nn.ReLU(inplace=True)
self.layer38= nn.Linear(256,256)
self.layer39= nn.ReLU(inplace=True)
self.layer40= nn.Linear(256,10)
self.layer41= nn.ReLU(inplace=True)
self.layer_number = 41
def forward(self,x):
for layer in range(1,self.layer_number+1):
module = getattr(self,"layer"+str(layer))
x = module(x)
if(layer == 9):
self.test1=x
if(layer == 18):
self.test2=x
if(layer == 25):
self.test3=x
if(layer == 35):
self.test4=x
out = x
return out
def train(net, device, trainloader, qtrain_epoch, lr, save_dir):
net = net.to(device)
net.train()
optimizer = torch.optim.SGD(net.parameters(), lr=lr, momentum=0.9)
criterion = nn.CrossEntropyLoss()
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size = 20, gamma = 0.1)
for epoch in range(qtrain_epoch):
running_loss = 0.0
total = 0.0
for i, data in enumerate(trainloader, 0):
inputs, labels = data
inputs, labels = inputs.to(device), labels.to(device)
outputs = net.forward(inputs)
print("=========================inputs")
print(inputs)
print("=========================test1")
print(net.test1)
print("=========================test2")
print(net.test2)
print("=========================test3")
print(net.test3)
print("=========================test4")
print(net.test4)
print("=========================outputs")
print(outputs)
optimizer.zero_grad()
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
running_loss += loss.item()
total += labels.size(0)
# scheduler.step()
#print(net.layer1.weight.data)
print("epoch: {}, loss: {}".format(epoch+1, running_loss/total))
if not os.path.isdir(save_dir):
os.mkdir(save_dir)
torch.save(net.state_dict(), save_dir+'/checkpoint_quant_epoch_{}.pkl'.format(epoch+1))
return net
def eval1(net, device, testloader, save_file):
net = net.to(device)
net.eval()
#net.save_qat()
running_corrects = 0.0
total = 0.0
with torch.no_grad():
for i, data in enumerate(testloader):
inputs, labels = data
inputs, labels = inputs.to(device), labels.to(device)
outputs = net.forward(inputs)
_, preds = torch.max(outputs, 1)
#print("=========================")
#print(inputs)
#print("=========================inputs")
#print(net.test)
#print("=========================test")
#print(outputs)
#print("=========================outputs")
#print(preds)
running_corrects += torch.sum(preds==labels.data)
total += labels.size(0)
print('eval_acc: {}'.format(running_corrects/total))
torch.save(net.state_dict(), save_file)
def main():
net = Cifar10Net()
#net.load_state_dict(torch.load('./tmp.pkl'))
device = torch.device("cuda:0")
net = train(net, device, trainloader, 1,0.0001, "qat_epoch_pkl")
eval1(net, device, testloader, './checkpoint_quant_eval.pkl')
if __name__ == '__main__':
main()
标签:layer,毕设,nn,self,torch,module,net,2.7,周报
From: https://www.cnblogs.com/xzh-personal-issue/p/17099181.html