1.什么是全局注意力机制?
全局注意力机制是一种在深度学习中广泛应用的技术,尤其在激光雷达目标检测中具有重要作用。它通过计算输入数据中各元素之间的相关性,动态地确定在进行目标检测时需要关注哪些部分。全局注意力机制的关键在于通过注意力得分对不同输入部分进行加权,从而能够捕捉到长距离的空间依赖关系,并提升模型的可解释性,帮助我们理解模型在进行目标识别时的关注区域。这种机制增强了空间和通道之间的交互能力,相较于局部注意力机制,全局注意力能够考虑整个输入数据中的所有信息,从而提升了对目标的感知能力和检测精度,其结构如下图所示:
2.关于Pillar Feature Net的组合池化改进以及加入Simam注意力机制的代码方式见下文:
(四)OpenPCDet当中Pointpillars改进之组合池化篇https://blog.csdn.net/A1828776499/article/details/143415007
(五)OpenPCDet当中Pointpillars之添加Simam注意力机制-深度学习https://mp.csdn.net/mp_blog/creation/editor/1434402153.本文将在Pillar Feature Net和Backbone(2DCNN)之间添加全局注意力机制Global Attention Mechanism。
注释之后的全局注意力机制代码(Global Attention Mechanism)如下:
# 导入必要的库
import torch
import torch.nn as nn
# 定义GAM_Attention类,继承自nn.Module
class GAM_Attention(nn.Module):
# 初始化方法
def __init__(self, in_channels, out_channels, rate=4):
# 调用父类的构造函数
super(GAM_Attention, self).__init__()
# 定义通道注意力机制
self.channel_attention = nn.Sequential(
# 第一个线性层,将输入通道数压缩到in_channels/rate
nn.Linear(in_channels, int(in_channels / rate)),
# ReLU激活函数,增加非线性
nn.ReLU(inplace=True),
# 第二个线性层,将通道数映射回in_channels
nn.Linear(int(in_channels / rate), in_channels)
)
# 定义空间注意力机制
self.spatial_attention = nn.Sequential(
# 第一个卷积层,将输入通道数压缩到in_channels/rate,使用7x7卷积
nn.Conv2d(in_channels, int(in_channels / rate), kernel_size=7, padding=3),
# 批归一化,规范化卷积层的输出
nn.BatchNorm2d(int(in_channels / rate)),
# ReLU激活函数
nn.ReLU(inplace=True),
# 第二个卷积层,将通道数变换到out_channels
nn.Conv2d(int(in_channels / rate), out_channels, kernel_size=7, padding=3),
# 批归一化
nn.BatchNorm2d(out_channels)
)
# 定义前向传播方法
def forward(self, x):
# 获取输入张量的批量大小、通道数、高度和宽度
b, c, h, w = x.shape
# 变换输入张量的维度为 (b, h, w, c),方便进行通道注意力计算
x_permute = x.permute(0, 2, 3, 1).view(b, -1, c)
# 通过通道注意力网络计算通道注意力权重
x_att_permute = self.channel_attention(x_permute).view(b, h, w, c)
# 变换回 (b, c, h, w) 维度
x_channel_att = x_att_permute.permute(0, 3, 1, 2)
# 逐元素相乘应用通道注意力权重
x = x * x_channel_att
# 计算空间注意力,并应用sigmoid激活
x_spatial_att = self.spatial_attention(x).sigmoid()
# 逐元素相乘应用空间注意力
out = x * x_spatial_att
# 返回最终的输出
return out
4. Backbone(2DCNN)在项目代码中的位置为
/OpenPCDet/pcdet/models/backbones_2d/base_bev_backbone.py
-
BaseBEVBackbone
类:基础的鸟瞰视角特征提取网络,构建卷积和上采样层。- 输入:模型配置
model_cfg
和输入通道数input_channels
。 - 卷积层配置:读取配置中的卷积层数、卷积步长和卷积核数量,依次构建每层卷积。
- 上采样层配置:若存在上采样配置,则构建上采样层(通过反卷积实现)用于特征图的放大。
forward
方法:接收输入的特征图,逐层计算卷积和上采样,并将特征图拼接作为输出。
- 输入:模型配置
-
BaseBEVBackboneV1
类:基础特征提取网络的另一个变体。- 结构与
BaseBEVBackbone
类似,但特别处理x_conv4
和x_conv5
特征层,通常在多尺度特征提取中使用。 forward
方法:从x_conv4
和x_conv5
提取多尺度特征,并拼接得到最终特征图。
- 结构与
-
BasicBlock
类:基本卷积块,实现残差连接的基本卷积层。- 包含两个卷积层、批归一化层和 ReLU 激活层。
- 使用残差连接,保留输入特征与输出特征的残差信息。
-
BaseBEVResBackbone
类:带残差连接的鸟瞰视角特征提取网络。- 采用
BasicBlock
进行卷积操作,每个块中包含残差连接。 - 支持上采样和多尺度特征提取,与
BaseBEVBackbone
类似
- 采用
5. 修改之后的完整代码 base_bev_backbone.py如下所示:
import numpy as np
import torch
import torch.nn as nn
# 定义全局注意力机制 (GAM)
class GAM_Attention(nn.Module):
def __init__(self, in_channels, out_channels, rate=4):
super(GAM_Attention, self).__init__()
self.channel_attention = nn.Sequential(
nn.Linear(in_channels, int(in_channels / rate)),""" """ """ """
nn.ReLU(inplace=True),
nn.Linear(int(in_channels / rate), in_channels)
) """
self.spatial_attention = nn.Sequential(
nn.Conv2d(in_channels, int(in_channels / rate), kernel_size=7, padding=3),
nn.BatchNorm2d(int(in_channels / rate)),
nn.ReLU(inplace=True), """
nn.Conv2d(int(in_channels / rate), out_channels, kernel_size=7, padding=3),
nn.BatchNorm2d(out_channels)
)
def forward(self, x):
b, c, h, w = x.shape
x_permute = x.permute(0, 2, 3, 1).view(b, -1, c)
x_att_permute = self.channel_attention(x_permute).view(b, h, w, c)
x_channel_att = x_att_permute.permute(0, 3, 1, 2)
x = x * x_channel_att # 进行通道注意力加权
x_spatial_att = self.spatial_attention(x).sigmoid() # 计算空间注意力
out = x * x_spatial_att # 进行空间注意力加权
return out
# 定义基础 BEV 骨干网络
class BaseBEVBackbone(nn.Module):
def __init__(self, model_cfg, input_channels):
super().__init__()
self.model_cfg = model_cfg
# 读取模型配置
if self.model_cfg.get('LAYER_NUMS', None) is not None:
assert len(self.model_cfg.LAYER_NUMS) == len(self.model_cfg.LAYER_STRIDES) == len(self.model_cfg.NUM_FILTERS)
layer_nums = self.model_cfg.LAYER_NUMS
layer_strides = self.model_cfg.LAYER_STRIDES
num_filters = self.model_cfg.NUM_FILTERS
else:
layer_nums = layer_strides = num_filters = []
if self.model_cfg.get('UPSAMPLE_STRIDES', None) is not None:
assert len(self.model_cfg.UPSAMPLE_STRIDES) == len(self.model_cfg.NUM_UPSAMPLE_FILTERS)
num_upsample_filters = self.model_cfg.NUM_UPSAMPLE_FILTERS
upsample_strides = self.model_cfg.UPSAMPLE_STRIDES
else:
upsample_strides = num_upsample_filters = []
num_levels = len(layer_nums)
c_in_list = [input_channels, *num_filters[:-1]]
self.blocks = nn.ModuleList()
self.deblocks = nn.ModuleList()
# 初始化全局注意力机制
self.gam = GAM_Attention(input_channels, input_channels) # 这里进行替换
for idx in range(num_levels):
cur_layers = [
nn.ZeroPad2d(1),
nn.Conv2d(
c_in_list[idx], num_filters[idx], kernel_size=3,
stride=layer_strides[idx], padding=0, bias=False
),
nn.BatchNorm2d(num_filters[idx], eps=1e-3, momentum=0.01),
nn.ReLU()
]
for k in range(layer_nums[idx]):
cur_layers.extend([
nn.Conv2d(num_filters[idx], num_filters[idx], kernel_size=3, padding=1, bias=False),
nn.BatchNorm2d(num_filters[idx], eps=1e-3, momentum=0.01),
nn.ReLU()
])
self.blocks.append(nn.Sequential(*cur_layers))
if len(upsample_strides) > 0:
stride = upsample_strides[idx]
if stride > 1 or (stride == 1 and not self.model_cfg.get('USE_CONV_FOR_NO_STRIDE', False)):
self.deblocks.append(nn.Sequential(
nn.ConvTranspose2d(
num_filters[idx], num_upsample_filters[idx],
upsample_strides[idx],
stride=upsample_strides[idx], bias=False
),
nn.BatchNorm2d(num_upsample_filters[idx], eps=1e-3, momentum=0.01),
nn.ReLU()
))
else:
stride = np.round(1 / stride).astype(np.int)
self.deblocks.append(nn.Sequential(
nn.Conv2d(
num_filters[idx], num_upsample_filters[idx],
stride,
stride=stride, bias=False
),
nn.BatchNorm2d(num_upsample_filters[idx], eps=1e-3, momentum=0.01),
nn.ReLU()
))
c_in = sum(num_upsample_filters)
if len(upsample_strides) > num_levels:
self.deblocks.append(nn.Sequential(
nn.ConvTranspose2d(c_in, c_in, upsample_strides[-1], stride=upsample_strides[-1], bias=False),
nn.BatchNorm2d(c_in, eps=1e-3, momentum=0.01),
nn.ReLU(),
))
self.num_bev_features = c_in
def forward(self, data_dict):
"""
Args:
data_dict:
spatial_features
Returns:
"""
spatial_features = data_dict['spatial_features']
# 在第一次下采样前应用 GAM 注意力机制
spatial_features = self.gam(spatial_features)
ups = []
ret_dict = {}
x = spatial_features
for i in range(len(self.blocks)):
x = self.blocks[i](x)
stride = int(spatial_features.shape[2] / x.shape[2])
ret_dict['spatial_features_%dx' % stride] = x
if len(self.deblocks) > 0:
ups.append(self.deblocks[i](x))
else:
ups.append(x)
if len(ups) > 1:
x = torch.cat(ups, dim=1)
elif len(ups) == 1:
x = ups[0]
if len(self.deblocks) > len(self.blocks):
x = self.deblocks[-1](x)
data_dict['spatial_features_2d'] = x
return data_dict
# 定义基础 BEV 骨干网络 V1
class BaseBEVBackboneV1(nn.Module):
def __init__(self, model_cfg, **kwargs):
super().__init__()
self.model_cfg = model_cfg
layer_nums = self.model_cfg.LAYER_NUMS
num_filters = self.model_cfg.NUM_FILTERS
assert len(layer_nums) == len(num_filters) == 2
num_upsample_filters = self.model_cfg.NUM_UPSAMPLE_FILTERS
upsample_strides = self.model_cfg.UPSAMPLE_STRIDES
assert len(num_upsample_filters) == len(upsample_strides)
num_levels = len(layer_nums)
self.blocks = nn.ModuleList()
self.deblocks = nn.ModuleList()
# 初始化全局注意力机制
self.gam = GAM_Attention(num_filters[0], num_filters[0]) # 这里进行替换
for idx in range(num_levels):
cur_layers = [
nn.ZeroPad2d(1),
nn.Conv2d(
num_filters[idx], num_filters[idx], kernel_size=3,
stride=1, padding=0, bias=False
),
nn.BatchNorm2d(num_filters[idx], eps=1e-3, momentum=0.01),
nn.ReLU()
]
for k in range(layer_nums[idx]):
cur_layers.extend([
nn.Conv2d(num_filters[idx], num_filters[idx], kernel_size=3, padding=1, bias=False),
nn.BatchNorm2d(num_filters[idx], eps=1e-3, momentum=0.01),
nn.ReLU()
])
self.blocks.append(nn.Sequential(*cur_layers))
if len(upsample_strides) > 0:
stride = upsample_strides[idx]
if stride >= 1:
self.deblocks.append(nn.Sequential(
nn.ConvTranspose2d(
num_filters[idx], num_upsample_filters[idx],
upsample_strides[idx],
stride=upsample_strides[idx], bias=False
),
nn.BatchNorm2d(num_upsample_filters[idx], eps=1e-3, momentum=0.01),
nn.ReLU()
))
else:
stride = np.round(1 / stride).astype(np.int)
self.deblocks.append(nn.Sequential(
nn.Conv2d(
num_filters[idx], num_upsample_filters[idx],
stride,
stride=stride, bias=False
),
nn.BatchNorm2d(num_upsample_filters[idx], eps=1e-3, momentum=0.01),
nn.ReLU()
))
c_in = sum(num_upsample_filters)
if len(upsample_strides) > num_levels:
self.deblocks.append(nn.Sequential(
nn.ConvTranspose2d(c_in, c_in, upsample_strides[-1], stride=upsample_strides[-1], bias=False),
nn.BatchNorm2d(c_in, eps=1e-3, momentum=0.01),
nn.ReLU(),
))
def forward(self, data_dict):
x = data_dict['spatial_features']
# 应用 GAM 注意力机制
x = self.gam(x)
ups = []
for i in range(len(self.blocks)):
x = self.blocks[i](x)
ups.append(x)
if len(ups) > 1:
x = torch.cat(ups, dim=1)
elif len(ups) == 1:
x = ups[0]
if len(self.deblocks) > 0:
x = self.deblocks[-1](x)
data_dict['spatial_features_2d'] = x
return data_dict
class BaseBEVBackboneV1(nn.Module):
def __init__(self, model_cfg, **kwargs):
super().__init__()
self.model_cfg = model_cfg
layer_nums = self.model_cfg.LAYER_NUMS
num_filters = self.model_cfg.NUM_FILTERS
assert len(layer_nums) == len(num_filters) == 2
num_upsample_filters = self.model_cfg.NUM_UPSAMPLE_FILTERS
upsample_strides = self.model_cfg.UPSAMPLE_STRIDES
assert len(num_upsample_filters) == len(upsample_strides)
num_levels = len(layer_nums)
self.blocks = nn.ModuleList()
self.deblocks = nn.ModuleList()
# Initialize GAM attention mechanism
self.gam = GAM_Attention(num_filters[0], num_filters[0]) # Change here
for idx in range(num_levels):
cur_layers = [
nn.ZeroPad2d(1),
nn.Conv2d(
num_filters[idx], num_filters[idx], kernel_size=3,
stride=1, padding=0, bias=False
),
nn.BatchNorm2d(num_filters[idx], eps=1e-3, momentum=0.01),
nn.ReLU()
]
for k in range(layer_nums[idx]):
cur_layers.extend([
nn.Conv2d(num_filters[idx], num_filters[idx], kernel_size=3, padding=1, bias=False),
nn.BatchNorm2d(num_filters[idx], eps=1e-3, momentum=0.01),
nn.ReLU()
])
self.blocks.append(nn.Sequential(*cur_layers))
if len(upsample_strides) > 0:
stride = upsample_strides[idx]
if stride >= 1:
self.deblocks.append(nn.Sequential(
nn.ConvTranspose2d(
num_filters[idx], num_upsample_filters[idx],
upsample_strides[idx],
stride=upsample_strides[idx], bias=False
),
nn.BatchNorm2d(num_upsample_filters[idx], eps=1e-3, momentum=0.01),
nn.ReLU()
))
else:
stride = np.round(1 / stride).astype(np.int)
self.deblocks.append(nn.Sequential(
nn.Conv2d(
num_filters[idx], num_upsample_filters[idx],
stride,
stride=stride, bias=False
),
nn.BatchNorm2d(num_upsample_filters[idx], eps=1e-3, momentum=0.01),
nn.ReLU()
))
c_in = sum(num_upsample_filters)
if len(upsample_strides) > num_levels:
self.deblocks.append(nn.Sequential(
nn.ConvTranspose2d(c_in, c_in, upsample_strides[-1], stride=upsample_strides[-1], bias=False),
nn.BatchNorm2d(c_in, eps=1e-3, momentum=0.01),
nn.ReLU(),
))
def forward(self, data_dict):
x = data_dict['spatial_features']
# Apply GAM attention mechanism before the first downsample
x = self.gam(x)
ups = []
for i in range(len(self.blocks)):
x = self.blocks[i](x)
ups.append(x)
if len(ups) > 1:
x = torch.cat(ups, dim=1)
elif len(ups) == 1:
x = ups[0]
if len(self.deblocks) > 0:
x = self.deblocks[-1](x)
data_dict['spatial_features_2d'] = x
return data_dict
class BaseBEVBackboneV1(nn.Module):
def __init__(self, model_cfg, **kwargs):
super().__init__()
self.model_cfg = model_cfg
layer_nums = self.model_cfg.LAYER_NUMS
num_filters = self.model_cfg.NUM_FILTERS
assert len(layer_nums) == len(num_filters) == 2
num_upsample_filters = self.model_cfg.NUM_UPSAMPLE_FILTERS
upsample_strides = self.model_cfg.UPSAMPLE_STRIDES
assert len(num_upsample_filters) == len(upsample_strides)
num_levels = len(layer_nums)
self.blocks = nn.ModuleList()
self.deblocks = nn.ModuleList()
for idx in range(num_levels):
cur_layers = [
nn.ZeroPad2d(1),
nn.Conv2d(
num_filters[idx], num_filters[idx], kernel_size=3,
stride=1, padding=0, bias=False
),
nn.BatchNorm2d(num_filters[idx], eps=1e-3, momentum=0.01),
nn.ReLU()
]
for k in range(layer_nums[idx]):
cur_layers.extend([
nn.Conv2d(num_filters[idx], num_filters[idx], kernel_size=3, padding=1, bias=False),
nn.BatchNorm2d(num_filters[idx], eps=1e-3, momentum=0.01),
nn.ReLU()
])
self.blocks.append(nn.Sequential(*cur_layers))
if len(upsample_strides) > 0:
stride = upsample_strides[idx]
if stride >= 1:
self.deblocks.append(nn.Sequential(
nn.ConvTranspose2d(
num_filters[idx], num_upsample_filters[idx],
upsample_strides[idx],
stride=upsample_strides[idx], bias=False
),
nn.BatchNorm2d(num_upsample_filters[idx], eps=1e-3, momentum=0.01),
nn.ReLU()
))
else:
stride = np.round(1 / stride).astype(np.int)
self.deblocks.append(nn.Sequential(
nn.Conv2d(
num_filters[idx], num_upsample_filters[idx],
stride,
stride=stride, bias=False
),
nn.BatchNorm2d(num_upsample_filters[idx], eps=1e-3, momentum=0.01),
nn.ReLU()
))
c_in = sum(num_upsample_filters)
if len(upsample_strides) > num_levels:
self.deblocks.append(nn.Sequential(
nn.ConvTranspose2d(c_in, c_in, upsample_strides[-1], stride=upsample_strides[-1], bias=False),
nn.BatchNorm2d(c_in, eps=1e-3, momentum=0.01),
nn.ReLU(),
))
self.num_bev_features = c_in
def forward(self, data_dict):
"""
Args:
data_dict:
spatial_features
Returns:
"""
spatial_features = data_dict['multi_scale_2d_features']
x_conv4 = spatial_features['x_conv4']
x_conv5 = spatial_features['x_conv5']
ups = [self.deblocks[0](x_conv4)]
x = self.blocks[1](x_conv5)
ups.append(self.deblocks[1](x))
x = torch.cat(ups, dim=1)
x = self.blocks[0](x)
data_dict['spatial_features_2d'] = x
return data_dict
class BasicBlock(nn.Module):
expansion: int = 1
def __init__(
self,
inplanes: int,
planes: int,
stride: int = 1,
padding: int = 1,
downsample: bool = False,
) -> None:
super().__init__()
self.conv1 = nn.Conv2d(inplanes, planes, kernel_size=3, stride=stride, padding=padding, bias=False)
self.bn1 = nn.BatchNorm2d(planes, eps=1e-3, momentum=0.01)
self.relu1 = nn.ReLU()
self.conv2 = nn.Conv2d(planes, planes, kernel_size=3, padding=1, bias=False)
self.bn2 = nn.BatchNorm2d(planes, eps=1e-3, momentum=0.01)
self.relu2 = nn.ReLU()
self.downsample = downsample
if self.downsample:
self.downsample_layer = nn.Sequential(
nn.Conv2d(inplanes, planes, kernel_size=1, stride=stride, padding=0, bias=False),
nn.BatchNorm2d(planes, eps=1e-3, momentum=0.01)
)
self.stride = stride
def forward(self, x):
identity = x
out = self.conv1(x)
out = self.bn1(out)
out = self.relu1(out)
out = self.conv2(out)
out = self.bn2(out)
if self.downsample:
identity = self.downsample_layer(x)
out += identity
out = self.relu2(out)
return out
class BaseBEVResBackbone(nn.Module):
def __init__(self, model_cfg, input_channels):
super().__init__()
self.model_cfg = model_cfg
if self.model_cfg.get('LAYER_NUMS', None) is not None:
assert len(self.model_cfg.LAYER_NUMS) == len(self.model_cfg.LAYER_STRIDES) == len(self.model_cfg.NUM_FILTERS)
layer_nums = self.model_cfg.LAYER_NUMS
layer_strides = self.model_cfg.LAYER_STRIDES
num_filters = self.model_cfg.NUM_FILTERS
else:
layer_nums = layer_strides = num_filters = []
if self.model_cfg.get('UPSAMPLE_STRIDES', None) is not None:
assert len(self.model_cfg.UPSAMPLE_STRIDES) == len(self.model_cfg.NUM_UPSAMPLE_FILTERS)
num_upsample_filters = self.model_cfg.NUM_UPSAMPLE_FILTERS
upsample_strides = self.model_cfg.UPSAMPLE_STRIDES
else:
upsample_strides = num_upsample_filters = []
num_levels = len(layer_nums)
c_in_list = [input_channels, *num_filters[:-1]]
self.blocks = nn.ModuleList()
self.deblocks = nn.ModuleList()
for idx in range(num_levels):
cur_layers = [
# nn.ZeroPad2d(1),
BasicBlock(c_in_list[idx], num_filters[idx], layer_strides[idx], 1, True)
]
for k in range(layer_nums[idx]):
cur_layers.extend([
BasicBlock(num_filters[idx], num_filters[idx])
])
self.blocks.append(nn.Sequential(*cur_layers))
if len(upsample_strides) > 0:
stride = upsample_strides[idx]
if stride >= 1:
self.deblocks.append(nn.Sequential(
nn.ConvTranspose2d(
num_filters[idx], num_upsample_filters[idx],
upsample_strides[idx],
stride=upsample_strides[idx], bias=False
),
nn.BatchNorm2d(num_upsample_filters[idx], eps=1e-3, momentum=0.01),
nn.ReLU()
))
else:
stride = np.round(1 / stride).astype(np.int)
self.deblocks.append(nn.Sequential(
nn.Conv2d(
num_filters[idx], num_upsample_filters[idx],
stride,
stride=stride, bias=False
),
nn.BatchNorm2d(num_upsample_filters[idx], eps=1e-3, momentum=0.01),
nn.ReLU()
))
c_in = sum(num_upsample_filters) if len(num_upsample_filters) > 0 else sum(num_filters)
if len(upsample_strides) > num_levels:
self.deblocks.append(nn.Sequential(
nn.ConvTranspose2d(c_in, c_in, upsample_strides[-1], stride=upsample_strides[-1], bias=False),
nn.BatchNorm2d(c_in, eps=1e-3, momentum=0.01),
nn.ReLU(),
))
self.num_bev_features = c_in
def forward(self, data_dict):
"""
Args:
data_dict:
spatial_features
Returns:
"""
spatial_features = data_dict['spatial_features']
ups = []
ret_dict = {}
x = spatial_features
for i in range(len(self.blocks)):
x = self.blocks[i](x)
stride = int(spatial_features.shape[2] / x.shape[2])
ret_dict['spatial_features_%dx' % stride] = x
if len(self.deblocks) > 0:
ups.append(self.deblocks[i](x))
else:
ups.append(x)
if len(ups) > 1:
x = torch.cat(ups, dim=1)
elif len(ups) == 1:
x = ups[0]
if len(self.deblocks) > len(self.blocks):
x = self.deblocks[-1](x)
data_dict['spatial_features_2d'] = x
return data_dict
6.输入以下命令进行训练:
python train.py --cfg_file cfgs/kitti_models/pointpillar.yaml
7.训练过程如下所示:
8.后续考虑加入空洞卷积模块。
标签:OpenPCDet,idx,nn,Pointpillars,self,Global,upsample,num,filters From: https://blog.csdn.net/A1828776499/article/details/143565546