一、多卡并行训练框架
lightning-hydra-template
这里主要使用github上开源框架lightning-hydra-template,但该框架存在一些小的问题,目前得到了解决。
1. 将github上lightning-hydra-template框架加入自己的仓库,然后从仓库中下载到服务器。
2. 修改src/utils/utils.py中的extras函数,在后面加一个修复config的操作,如下:
def extras(cfg: DictConfig) -> None:
"""Applies optional utilities before the task is started.
Utilities:
- Ignoring python warnings
- Setting tags from command line
- Rich config printing
"""
# return if no `extras` config
if not cfg.get("extras"):
log.warning("Extras config not found! <cfg.extras=null>")
return
# disable python warnings
if cfg.extras.get("ignore_warnings"):
log.info("Disabling python warnings! <cfg.extras.ignore_warnings=True>")
warnings.filterwarnings("ignore")
# prompt user to input tags from command line if none are provided in the config
if cfg.extras.get("enforce_tags"):
log.info("Enforcing tags! <cfg.extras.enforce_tags=True>")
rich_utils.enforce_tags(cfg, save_to_file=True)
# pretty print config tree using Rich library
if cfg.extras.get("print_config"):
log.info("Printing config tree with Rich! <cfg.extras.print_config=True>")
rich_utils.print_config_tree(cfg, resolve=True, save_to_file=True)
def fix_DictConfig(cfg: DictConfig):
"""fix all vars in the cfg config
this is a in-place operation"""
keys = list(cfg.keys())
for k in keys:
if type(cfg[k]) is DictConfig:
fix_DictConfig(cfg[k])
else:
setattr(cfg, k, getattr(cfg, k))
fix_DictConfig(cfg)
1. 数据集
1. 在configs/data下新建一个数据集yaml配置文件
例如 cifar10.yaml
_target_: src.data.cifar10_datamodule.CIFAR10DataModule # 之后需要再src/data下新建一个cifar10_datamodule.py文件,并且里面定义一个LightningDataModule类CIFAR10DataModule
data_dir: ${paths.data_dir} # 可在configs/paths/default.yaml中设置,data_dir为数据集所在目录
batch_size: 128
train_val_test_split: [45_000, 5_000, 10_000] # 划分数据集比例,在CIFAR10DataModule会用到这个参数
num_workers: 0
pin_memory: False # 数据集小的时候设置为False,数据集大的时候可以设置为True,减小数据从CPU复制到GPU的开销
2. 在src/data下新建一个datamodule.py数据模块定义文件
cifar10_datamodule.py
from typing import Any, Dict, Optional, Tuple
import torch
from lightning import LightningDataModule
from torch.utils.data import ConcatDataset, DataLoader, Dataset, random_split
from torchvision.datasets import CIFAR10
from torchvision.transforms import transforms
class CIFAR10DataModule(LightningDataModule):
def __init__(
self,
data_dir: str = "data/", # 默认值,会被cfg.paths.data_dir替换
train_val_test_split: Tuple[int, int, int] = (45_000, 5_000, 10_000),
batch_size: int = 64,
num_workers: int = 0,
pin_memory: bool = False,
):
super().__init__()
# this line allows to access init params with 'self.hparams' attribute
# also ensures init params will be stored in ckpt
self.save_hyperparameters(logger=False)
# data transformations
self.transforms = transforms.Compose(
[transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))]
)
self.data_train: Optional[Dataset] = None
self.data_val: Optional[Dataset] = None
self.data_test: Optional[Dataset] = None
@property
def num_classes(self):
return 10
def prepare_data(self):
CIFAR10(root=self.hparams.data_dir, train=True, download=True)
CIFAR10(root=self.hparams.data_dir, train=False, download=True)
def setup(self, stage: Optional[str] = None):
"""Load data. Set variables: `self.data_train`, `self.data_val`, `self.data_test`.
This method is called by lightning with both `trainer.fit()` and `trainer.test()`, so be
careful not to execute things like random split twice!
"""
# load and split datasets only if not loaded already
if not self.data_train and not self.data_val and not self.data_test:
trainset = CIFAR10(self.hparams.data_dir, train=True, transform=self.transforms)
testset = CIFAR10(self.hparams.data_dir, train=False, transform=self.transforms)
dataset = ConcatDataset(datasets=[trainset, testset]) # 合并数据集,然后划分训练-验证集-测试集(一般测试集无标签)
self.data_train, self.data_val, self.data_test = random_split(
dataset=dataset,
lengths=self.hparams.train_val_test_split,
generator=torch.Generator().manual_seed(513),
)
def train_dataloader(self):
return DataLoader(
dataset=self.data_train,
batch_size=self.hparams.batch_size,
num_workers=self.hparams.num_workers,
pin_memory=self.hparams.pin_memory,
shuffle=True,
)
def val_dataloader(self):
return DataLoader(
dataset=self.data_val,
batch_size=self.hparams.batch_size,
num_workers=self.hparams.num_workers,
pin_memory=self.hparams.pin_memory,
shuffle=False, # 验证集不需要打乱
)
def test_dataloader(self):
return DataLoader(
dataset=self.data_test,
batch_size=self.hparams.batch_size,
num_workers=self.hparams.num_workers,
pin_memory=self.hparams.pin_memory,
shuffle=False, # 测试集不需要打乱
)
def teardown(self, stage: Optional[str] = None):
"""Clean up after fit or test."""
pass
def state_dict(self):
"""Extra things to save to checkpoint."""
return {} # 无参数需要保存
def load_state_dict(self, state_dict: Dict[str, Any]):
"""Things to do when loading checkpoint."""
pass # 加载参数时候,该数据集并没有参数需要加载
if __name__ == "__main__":
_ = CIFAR10DataModule()
2. 模型
模型改写成标准lightning格式
原始ViT模型定义
from turtle import forward
import torch
from torch import nn, einsum
import torch.nn.functional as F
from einops import rearrange, repeat
from einops.layers.torch import Rearrange
def pair(t):
return t if isinstance(t, tuple) else (t, t)
class PreNorm(nn.Module):
def __init__(self, dim, fn) -> None:
super().__init__()
self.norm = nn.LayerNorm(dim)
self.fn = fn
def forward(self, x, **kwargs):
return self.fn(self.norm(x), **kwargs)
class FeedForward(nn.Module):
def __init__(self, dim, hidden_dim, dropout=0.):
super().__init__()
self.net = nn.Sequential(
nn.Linear(dim, hidden_dim),
nn.GELU(),
nn.Dropout(dropout),
nn.Linear(hidden_dim, dim),
nn.Dropout(dropout)
)
def forward(self, x):
return self.net(x)
class Attention(nn.Module):
def __init__(self, dim, heads=8, dim_head=64, dropout=0.) -> None:
super().__init__()
inner_dim = dim_head * heads
project_out = not(heads == 1 and dim_head == dim)
self.heads = heads
self.scale = dim_head ** -0.5
self.attend = nn.Softmax(dim=-1)
self.to_qkv = nn.Linear(dim, inner_dim * 3, bias=False)
self.to_out = nn.Sequential(
nn.Linear(inner_dim, dim),
nn.Dropout(dropout),
) if project_out else nn.Identity()
def forward(self, x):
b, n, _, h = *x.shape, self.heads
qkv = self.to_qkv(x).chunk(3, dim=-1) # (b, n(65), dim*3) ---> [3 * (b, n, dim)]
q, k, v = map(lambda t: rearrange(t, 'b n (h d) -> b h n d', h=h), qkv) # q, k, v (b, h, n, dim_head(64))
dots = einsum('b h i d, b h j d -> b h i j', q, k) * self.scale
attn = self.attend(dots)
out = einsum('b h i j, b h j d -> b h i d', attn, v)
out = rearrange(out, 'b h n d -> b n (h d)')
return self.to_out(out)
class TransformerEncoder(nn.Module):
def __init__(self, dim, depth, heads, dim_head, mlp_dim, dropout=0.):
super().__init__()
self.layers = nn.ModuleList([])
for _ in range(depth):
self.layers.append(nn.ModuleList([
PreNorm(dim, Attention(dim, heads=heads, dim_head=dim_head, dropout=dropout)),
PreNorm(dim, FeedForward(dim, mlp_dim, dropout=dropout))
]))
def forward(self, x):
for attn, ff in self.layers:
x = attn(x) + x
x = ff(x) + x
return x
class ViT(nn.Module):
def __init__(self, *, image_size, patch_size, num_classes, dim, depth, heads, mlp_dim, pool='cls', channels=3, dim_head=64, dropout=0., emb_dropout=0.):
super().__init__()
image_height, image_width = pair(image_size)
patch_height, patch_width = pair(patch_size)
assert image_height % patch_height ==0 and image_width % patch_width == 0
num_patches = (image_height // patch_height) * (image_width // patch_width)
patch_dim = channels * patch_height * patch_width
assert pool in {'cls', 'mean'}
self.to_patch_embedding = nn.Sequential(
Rearrange('b c (h p1) (w p2) -> b (h w) (p1 p2 c)', p1=patch_height, p2=patch_width),
nn.Linear(patch_dim, dim)
)
self.pos_embedding = nn.Parameter(torch.randn(1, num_patches+1, dim))
self.cls_token = nn.Parameter(torch.randn(1, 1, dim)) # nn.Parameter()定义可学习参数
self.dropout = nn.Dropout(emb_dropout)
self.transformer = TransformerEncoder(dim, depth, heads, dim_head, mlp_dim, dropout)
self.pool = pool
self.to_latent = nn.Identity()
self.mlp_head = nn.Sequential(
nn.LayerNorm(dim),
nn.Linear(dim, num_classes)
)
def forward(self, img):
x = self.to_patch_embedding(img) # b c (h p1) (w p2) -> b (h w) (p1 p2 c) -> b (h w) dim
b, n, _ = x.shape # b表示batchSize, n表示每个块的空间分辨率, _表示一个块内有多少个值
cls_tokens = repeat(self.cls_token, '() n d -> b n d', b=b) # self.cls_token: (1, 1, dim) -> cls_tokens: (batchSize, 1, dim)
x = torch.cat((cls_tokens, x), dim=1) # 将cls_token拼接到patch token中去 (b, 65, dim)
x += self.pos_embedding[:, :(n+1)] # 加位置嵌入(直接加) (b, 65, dim)
x = self.dropout(x)
x = self.transformer(x) # (b, 65, dim)
x = x.mean(dim=1) if self.pool == 'mean' else x[:, 0] # (b, dim)
x = self.to_latent(x) # Identity (b, dim)
# print(x.shape)
return self.mlp_head(x) # (b, num_classes)
if __name__ == "__main__":
model_vit = ViT(
image_size = 256,
patch_size = 32,
num_classes = 1000,
dim = 1024,
depth = 6,
heads = 16,
mlp_dim = 2048,
dropout = 0.1,
emb_dropout = 0.1
)
img = torch.randn(16, 3, 256, 256)
preds = model_vit(img)
print(preds.shape) # (16, 1000)
分成组件+模型格式
模型由各个组件搭建而成,然后附加优化器以及损失函数。
因此上面代码中组件部分有:TransformerEncoder,FeedForward、PreNorm 、Attention( 为了方便,这里把组成所有模型的模块都放在 vit_components.py中,对于多个模型共用的组件放在common.py中) 这里只有一个模型就全部放在vit_componts.py中了。
src/models/components/vit_components.py
from turtle import forward
import torch
from torch import nn, einsum
import torch.nn.functional as F
from einops import rearrange, repeat # einops: Einstein Notation for Operations 爱因斯坦符号表示
from einops.layers.torch import Rearrange
def pair(t):
return t if isinstance(t, tuple) else (t, t) # 返回 double t
class PreNorm(nn.Module):
"""
PreNorm:在处理前先进行LayerNorm操作
优点:
- 可以增加模型的泛化能力和训练效果
- 对每一个样本的特征进行标准化处理,使得每个特征维度的均值为0,方差为1,\
从而降低不同特征之间的相关性,提高模型的泛化性能。
- 解决了深层神经网络中的梯度消失或爆炸问题,提高训练效果和泛化性能
- 对每一个样本进行标准化,避免了BN中mini-batch的大小对标准化结果的影响
"""
def __init__(self, dim, fn) -> None: # 需要给出特征维度
super().__init__()
self.norm = nn.LayerNorm(dim)
self.fn = fn
def forward(self, x, **kwargs):
return self.fn(self.norm(x), **kwargs)
class FeedForward(nn.Module):
"""
FeedForward: 前馈层
一般:dim -> hidden_dim -> output_dim,两个全连接层,中间加入非线性激活函数
全连接层后一般需要加一个非线性激活函数,这里使用的是GELU可以更好的解决梯度消失的问题
"""
def __init__(self, dim, hidden_dim, dropout=0.):
super().__init__()
self.net = nn.Sequential(
nn.Linear(dim, hidden_dim),
nn.GELU(),
nn.Dropout(dropout),
nn.Linear(hidden_dim, dim),
nn.Dropout(dropout)
)
def forward(self, x):
return self.net(x)
class Attention(nn.Module):
"""
Attention: 自注意力层
"""
def __init__(self, dim, heads=8, dim_head=64, dropout=0.) -> None:
super().__init__()
inner_dim = dim_head * heads
project_out = not(heads == 1 and dim_head == dim)
self.heads = heads
self.scale = dim_head ** -0.5
self.attend = nn.Softmax(dim=-1)
self.to_qkv = nn.Linear(dim, inner_dim * 3, bias=False)
self.to_out = nn.Sequential(
nn.Linear(inner_dim, dim),
nn.Dropout(dropout),
) if project_out else nn.Identity()
def forward(self, x):
b, n, _, h = *x.shape, self.heads
qkv = self.to_qkv(x).chunk(3, dim=-1) # (b, n(65), dim*3) ---> [3 * (b, n, dim)]
q, k, v = map(lambda t: rearrange(t, 'b n (h d) -> b h n d', h=h), qkv) # q, k, v (b, h, n, dim_head(64))
dots = einsum('b h i d, b h j d -> b h i j', q, k) * self.scale
attn = self.attend(dots)
out = einsum('b h i j, b h j d -> b h i d', attn, v)
out = rearrange(out, 'b h n d -> b n (h d)')
return self.to_out(out)
class TransformerEncoder(nn.Module):
def __init__(self, dim, depth, heads, dim_head, mlp_dim, dropout=0.):
super().__init__()
self.layers = nn.ModuleList([])
for _ in range(depth):
self.layers.append(nn.ModuleList([
PreNorm(dim, Attention(dim, heads=heads, dim_head=dim_head, dropout=dropout)),
PreNorm(dim, FeedForward(dim, mlp_dim, dropout=dropout))
]))
def forward(self, x):
for attn, ff in self.layers:
x = attn(x) + x
x = ff(x) + x
return x
src/models/vit_net.py
import torch
from torch import nn
from einops import repeat
from einops.layers.torch import Rearrange
from vit_components import *
class ViT(nn.Module):
def __init__(self, *, image_size, patch_size, num_classes, dim, depth, heads, mlp_dim, pool='cls', channels=3, dim_head=64, dropout=0., emb_dropout=0.):
super().__init__()
image_height, image_width = pair(image_size)
patch_height, patch_width = pair(patch_size)
assert image_height % patch_height ==0 and image_width % patch_width == 0
num_patches = (image_height // patch_height) * (image_width // patch_width)
patch_dim = channels * patch_height * patch_width
assert pool in {'cls', 'mean'}
self.to_patch_embedding = nn.Sequential(
Rearrange('b c (h p1) (w p2) -> b (h w) (p1 p2 c)', p1=patch_height, p2=patch_width),
nn.Linear(patch_dim, dim)
)
self.pos_embedding = nn.Parameter(torch.randn(1, num_patches+1, dim))
self.cls_token = nn.Parameter(torch.randn(1, 1, dim)) # nn.Parameter()定义可学习参数
self.dropout = nn.Dropout(emb_dropout)
self.transformer = TransformerEncoder(dim, depth, heads, dim_head, mlp_dim, dropout)
self.pool = pool
self.to_latent = nn.Identity()
self.mlp_head = nn.Sequential(
nn.LayerNorm(dim),
nn.Linear(dim, num_classes)
)
def forward(self, img):
x = self.to_patch_embedding(img) # b c (h p1) (w p2) -> b (h w) (p1 p2 c) -> b (h w) dim
b, n, _ = x.shape # b表示batchSize, n表示每个块的空间分辨率, _表示一个块内有多少个值
cls_tokens = repeat(self.cls_token, '() n d -> b n d', b=b) # self.cls_token: (1, 1, dim) -> cls_tokens: (batchSize, 1, dim)
x = torch.cat((cls_tokens, x), dim=1) # 将cls_token拼接到patch token中去 (b, 65, dim)
x += self.pos_embedding[:, :(n+1)] # 加位置嵌入(直接加) (b, 65, dim)
x = self.dropout(x)
x = self.transformer(x) # (b, 65, dim)
x = x.mean(dim=1) if self.pool == 'mean' else x[:, 0] # (b, dim)
x = self.to_latent(x) # Identity (b, dim)
# print(x.shape)
return self.mlp_head(x) # (b, num_classes)
if __name__ == "__main__":
model_vit = ViT(
image_size = 256,
patch_size = 32,
num_classes = 1000,
dim = 1024,
depth = 6,
heads = 16,
mlp_dim = 2048,
dropout = 0.1,
emb_dropout = 0.1
)
img = torch.randn(16, 3, 256, 256)
preds = model_vit(img)
print(preds.shape) # (16, 1000)
3. 修改训练相关配置文件(训练配置yml、模型yml)
configs/train_cifar10.yml
# @package _global_
# specify here default configuration
# order of defaults determines the order in which configs override each other
defaults:
- _self_
- data: cifar10.yaml # 数据集如果不变,则这一项不变
- model: vit.yaml # 模型改变,则需要修改这一项
- callbacks: default.yaml
- logger: null # set logger here or use command line (e.g. `python train.py logger=tensorboard`)
- trainer: default.yaml
- paths: default.yaml
- extras: default.yaml
- hydra: default.yaml
# experiment configs allow for version control of specific hyperparameters
# e.g. best hyperparameters for given model and datamodule
- experiment: null
# config for hyperparameter optimization
- hparams_search: null
# optional local config for machine/user specific settings
# it's optional since it doesn't need to exist and is excluded from version control
- optional local: default.yaml
# debugging config (enable through command line, e.g. `python train.py debug=default)
- debug: null
# task name, determines output directory path
task_name: "train"
# tags to help you identify your experiments
# you can overwrite this in experiment configs
# overwrite from command line with `python train.py tags="[first_tag, second_tag]"`
tags: ["dev"]
# set False to skip model training
train: True
# evaluate on test set, using best model weights achieved during training
# lightning chooses best weights based on the metric specified in checkpoint callback
test: True
# compile model for faster training with pytorch 2.0
compile: False
# simply provide checkpoint path to resume training
ckpt_path: null
# seed for random number generators in pytorch, numpy and python.random
seed: 20131
model/vit.yaml
_target_: src.models.vit_module.VITLitModule
optimizer:
_target_: torch.optim.Adam # 指定优化器名称
_partial_: true
lr: 0.001
weight_decay: 0.0
scheduler:
_target_: torch.optim.lr_scheduler.ReduceLROnPlateau
_partial_: true
mode: min
factor: 0.1
patience: 10
net: # 指定_target_传入的net目标
_target_: src.models.components.vit_net.ViT
image_size: 32 # 28x28大小的输入图片
channels: 3
patch_size: 8
dim: 64 # 特征维度
depth: 6
heads: 8 # 多头注意力机制
dim_head: 16
mlp_dim: 128
dropout: 0.
emb_dropout: 0.
num_classes: 10
4. 训练
指定训练用到的配置文件
修改train.py
修改hydra注解,将config_name=“mnist.yaml”改成config_name=”train_cifar10.yaml”
from typing import List, Optional, Tuple
import hydra
import lightning as L
import pyrootutils
import torch
from lightning import Callback, LightningDataModule, LightningModule, Trainer
from lightning.pytorch.loggers import Logger
from omegaconf import DictConfig
pyrootutils.setup_root(__file__, indicator=".project-root", pythonpath=True)
# ------------------------------------------------------------------------------------ #
# the setup_root above is equivalent to:
# - adding project root dir to PYTHONPATH
# (so you don't need to force user to install project as a package)
# (necessary before importing any local modules e.g. `from src import utils`)
# - setting up PROJECT_ROOT environment variable
# (which is used as a base for paths in "configs/paths/default.yaml")
# (this way all filepaths are the same no matter where you run the code)
# - loading environment variables from ".env" in root dir
#
# you can remove it if you:
# 1. either install project as a package or move entry files to project root dir
# 2. set `root_dir` to "." in "configs/paths/default.yaml"
#
# more info: https://github.com/ashleve/pyrootutils
# ------------------------------------------------------------------------------------ #
from src import utils
log = utils.get_pylogger(__name__)
@utils.task_wrapper
def train(cfg: DictConfig) -> Tuple[dict, dict]:
"""Trains the model. Can additionally evaluate on a testset, using best weights obtained during
training.
This method is wrapped in optional @task_wrapper decorator, that controls the behavior during
failure. Useful for multiruns, saving info about the crash, etc.
Args:
cfg (DictConfig): Configuration composed by Hydra.
Returns:
Tuple[dict, dict]: Dict with metrics and dict with all instantiated objects.
"""
# set seed for random number generators in pytorch, numpy and python.random
if cfg.get("seed"):
L.seed_everything(cfg.seed, workers=True)
log.info(f"Instantiating datamodule <{cfg.data._target_}>")
datamodule: LightningDataModule = hydra.utils.instantiate(cfg.data)
log.info(f"Instantiating model <{cfg.model._target_}>")
model: LightningModule = hydra.utils.instantiate(cfg.model)
log.info("Instantiating callbacks...")
callbacks: List[Callback] = utils.instantiate_callbacks(cfg.get("callbacks"))
log.info("Instantiating loggers...")
logger: List[Logger] = utils.instantiate_loggers(cfg.get("logger"))
log.info(f"Instantiating trainer <{cfg.trainer._target_}>")
trainer: Trainer = hydra.utils.instantiate(cfg.trainer, callbacks=callbacks, logger=logger)
object_dict = {
"cfg": cfg,
"datamodule": datamodule,
"model": model,
"callbacks": callbacks,
"logger": logger,
"trainer": trainer,
}
if logger:
log.info("Logging hyperparameters!")
utils.log_hyperparameters(object_dict)
if cfg.get("compile"):
log.info("Compiling model!")
model = torch.compile(model)
if cfg.get("train"):
log.info("Starting training!")
trainer.fit(model=model, datamodule=datamodule, ckpt_path=cfg.get("ckpt_path"))
train_metrics = trainer.callback_metrics
if cfg.get("test"):
log.info("Starting testing!")
ckpt_path = trainer.checkpoint_callback.best_model_path
if ckpt_path == "":
log.warning("Best ckpt not found! Using current weights for testing...")
ckpt_path = None
trainer.test(model=model, datamodule=datamodule, ckpt_path=ckpt_path)
log.info(f"Best ckpt path: {ckpt_path}")
test_metrics = trainer.callback_metrics
# merge train and test metrics
metric_dict = {**train_metrics, **test_metrics}
return metric_dict, object_dict
@hydra.main(version_base="1.3", config_path="../configs", config_name="train_cifar10.yaml")
def main(cfg: DictConfig) -> Optional[float]:
# apply extra utilities
# (e.g. ask for tags if none are provided in cfg, print cfg tree, etc.)
utils.extras(cfg)
# train the model
metric_dict, _ = train(cfg)
# safely retrieve metric value for hydra-based hyperparameter optimization
metric_value = utils.get_metric_value(
metric_dict=metric_dict, metric_name=cfg.get("optimized_metric")
)
# return optimized metric
return metric_value
if __name__ == "__main__":
main()
修改trainer/default.yaml
将max_epochs设为50 或者 在外指定trainer.max_epochs=50
_target_: lightning.pytorch.trainer.Trainer
default_root_dir: ${paths.output_dir}
min_epochs: 3 # prevents early stopping
max_epochs: 50
accelerator: cpu
devices: 1
# mixed precision for extra speed-up
# precision: 16
# perform a validation loop every N training epochs
check_val_every_n_epoch: 1
# set True to to ensure deterministic results
# makes training slower but gives more reproducibility than just setting seeds
deterministic: False
python src/main.py trainer=ddp trainer.max_epochs=50 logger=wandb # 使用wandb需要修改logger下的wandb配置文件
如果想用特定编号的GPU,可以在python src/main.py前面加一个命令 CUDA_VISIBLE_DEVICES=0,1,2,3
二、测评框架(多卡测评)
lightning-hydra-template框架支持多卡测评
标签:__,dim,框架,nn,测评,cfg,self,多卡,dropout
From: https://www.cnblogs.com/raiuny/p/17237433.html