MaskRCNN(Facebook官網(wǎng)Pytorch版本)
Resnet部分
首先來看有FPN的Resnet是如何搭建的,我們假設(shè)所使用的模型是ResnetTop5
class ResNet(nn.Module):
def __init__(self, cfg):
super(ResNet, self).__init__()
# If we want to use the cfg in forward(), then we should make a copy
# of it and store it for later use:
# self.cfg = cfg.clone()
# Translate string names to implementations
# stem_module = StemWithFixedBatchNorm,后面說明了,
# 這個模塊的意思是resnet的第一個卷積層所使用的batchnorm的四個參數(shù)都為常量
# stage_specs = ResNet50FPNStagesTo5
# 實際上,stage_specs是下面的這樣一個tuple
'''
(StageSpec(index=1, block_count=3, return_features=True),
StageSpec(index=2, block_count=4, return_features=True),
StageSpec(index=3, block_count=6, return_features=True),
StageSpec(index=4, block_count=3, return_features=True))
'''
# transformation_module = BottleneckWithFixedBatchNorm
stem_module = _STEM_MODULES[cfg.MODEL.RESNETS.STEM_FUNC]
stage_specs = _STAGE_SPECS[cfg.MODEL.BACKBONE.CONV_BODY]
# transformation_module = BottleneckWithFixedBatchNorm
transformation_module = _TRANSFORMATION_MODULES[cfg.MODEL.RESNETS.TRANS_FUNC]
# Construct the stem module
# 在這里的意思是resnet的第一個卷積層所使用的batchnorm的四個參數(shù)都為常量,
# 相當于batchnorm層什么操作都沒有做,只是乘了一個常數(shù)1
self.stem = stem_module(cfg)
# Constuct the specified ResNet stages
num_groups = cfg.MODEL.RESNETS.NUM_GROUPS
width_per_group = cfg.MODEL.RESNETS.WIDTH_PER_GROUP
in_channels = cfg.MODEL.RESNETS.STEM_OUT_CHANNELS
stage2_bottleneck_channels = num_groups * width_per_group
stage2_out_channels = cfg.MODEL.RESNETS.RES2_OUT_CHANNELS
self.stages = []
self.return_features = {}
for stage_spec in stage_specs:
name = "layer" + str(stage_spec.index)
# stage2_relative_factor = 1,2,4,8
stage2_relative_factor = 2 ** (stage_spec.index - 1)
# bottleneck_channels = 64, 128, 256, 512
bottleneck_channels = stage2_bottleneck_channels * stage2_relative_factor
# out_channels = 256, 512, 1024, 2048
out_channels = stage2_out_channels * stage2_relative_factor
# stage_with_dcn = (False, False, False, False)
stage_with_dcn = cfg.MODEL.RESNETS.STAGE_WITH_DCN[stage_spec.index -1]
module = _make_stage(
transformation_module,
in_channels,
bottleneck_channels,
out_channels,
stage_spec.block_count,
num_groups,
cfg.MODEL.RESNETS.STRIDE_IN_1X1,
first_stride=int(stage_spec.index > 1) + 1,
dcn_config={
"stage_with_dcn": stage_with_dcn,
"with_modulated_dcn": cfg.MODEL.RESNETS.WITH_MODULATED_DCN,
"deformable_groups": cfg.MODEL.RESNETS.DEFORMABLE_GROUPS,
}
)
in_channels = out_channels
self.add_module(name, module)
self.stages.append(name)
self.return_features[name] = stage_spec.return_features
# Optionally freeze (requires_grad=False) parts of the backbone
self._freeze_backbone(cfg.MODEL.BACKBONE.FREEZE_CONV_BODY_AT)
# 相當于將Resnet的第一個卷積層,第二個卷積層及其配套的batchnorm等層的參數(shù)freeze
def _freeze_backbone(self, freeze_at):
if freeze_at < 0:
return
for stage_index in range(freeze_at):
if stage_index == 0:
m = self.stem # stage 0 is the stem
else:
m = getattr(self, "layer" + str(stage_index))
for p in m.parameters():
p.requires_grad = False
def forward(self, x):
outputs = []
x = self.stem(x)
for stage_name in self.stages:
x = getattr(self, stage_name)(x)
if self.return_features[stage_name]:
outputs.append(x)
# 所返回的FPN是按照從大到小的feature map排列的
return outputs
上面所用到的_make_stage函數(shù),實際上是按3,4,6,3個block來生成Resnet的結(jié)構(gòu)
def _make_stage(
transformation_module,
in_channels,
bottleneck_channels,
out_channels,
block_count,
num_groups,
stride_in_1x1,
first_stride,
dilation=1,
dcn_config={}
):
blocks = []
stride = first_stride
for _ in range(block_count):
blocks.append(
transformation_module(
in_channels,
bottleneck_channels,
out_channels,
num_groups,
stride_in_1x1,
stride,
dilation=dilation,
dcn_config=dcn_config
)
)
# stride = 1在這里表示在block_count中,只有第一個block做了降采樣
stride = 1
in_channels = out_channels
return nn.Sequential(*blocks)
下面是Resnet的Bottleneck的具體實現(xiàn),在這里有一個疑問就是為何所有的Batchnorm都使用FrozenBatchNorm2d,這豈不意味著Batchnorm基本沒有起到什么作用嗎?在Facebook的Github上有人提出了這個疑問,作者是這樣解答的:
The reason why we use FrozenBatchNorm2d instead of BatchNorm2d is that the sizes of the batches are very small, which makes the batch statistics very poor and degrades performance.
Plus, when using multiple GPUs, the batch statistics are not accumulated from multiple devices, so that only a single GPU compute the statistics.
class Bottleneck(nn.Module):
def __init__(
self,
in_channels,
bottleneck_channels,
out_channels,
num_groups,
stride_in_1x1,
stride,
dilation,
norm_func,
dcn_config
):
super(Bottleneck, self).__init__()
self.downsample = None
if in_channels != out_channels:
down_stride = stride if dilation == 1 else 1
self.downsample = nn.Sequential(
Conv2d(
in_channels, out_channels,
kernel_size=1, stride=down_stride, bias=False
),
norm_func(out_channels),
)
for modules in [self.downsample,]:
for l in modules.modules():
if isinstance(l, Conv2d):
nn.init.kaiming_uniform_(l.weight, a=1)
if dilation > 1:
stride = 1 # reset to be 1
# The original MSRA ResNet models have stride in the first 1x1 conv
# The subsequent fb.torch.resnet and Caffe2 ResNe[X]t implementations have
# stride in the 3x3 conv
# 這里的意思是,在最開始提出的Resnet中,是將第一個1*1的卷積的步長為2來實現(xiàn)降采樣的
# 而在Facebook的實現(xiàn)中,是將第二個3*3的卷積的步長為2來實現(xiàn)降采樣的
stride_1x1, stride_3x3 = (stride, 1) if stride_in_1x1 else (1, stride)
self.conv1 = Conv2d(
in_channels,
bottleneck_channels,
kernel_size=1,
stride=stride_1x1,
bias=False,
)
self.bn1 = norm_func(bottleneck_channels)
# TODO: specify init for the above
# 在我們所使用的Resnet版本中,既沒有使用空洞卷積,也沒有使用可分離卷積
with_dcn = dcn_config.get("stage_with_dcn", False)
if with_dcn:
deformable_groups = dcn_config.get("deformable_groups", 1)
with_modulated_dcn = dcn_config.get("with_modulated_dcn", False)
self.conv2 = DFConv2d(
bottleneck_channels,
bottleneck_channels,
with_modulated_dcn=with_modulated_dcn,
kernel_size=3,
stride=stride_3x3,
groups=num_groups,
dilation=dilation,
deformable_groups=deformable_groups,
bias=False
)
else:
self.conv2 = Conv2d(
bottleneck_channels,
bottleneck_channels,
kernel_size=3,
stride=stride_3x3,
padding=dilation,
bias=False,
groups=num_groups,
dilation=dilation
)
nn.init.kaiming_uniform_(self.conv2.weight, a=1)
self.bn2 = norm_func(bottleneck_channels)
self.conv3 = Conv2d(
bottleneck_channels, out_channels, kernel_size=1, bias=False
)
self.bn3 = norm_func(out_channels)
for l in [self.conv1, self.conv3,]:
nn.init.kaiming_uniform_(l.weight, a=1)
def forward(self, x):
identity = x
out = self.conv1(x)
out = self.bn1(out)
out = F.relu_(out)
out = self.conv2(out)
out = self.bn2(out)
out = F.relu_(out)
out0 = self.conv3(out)
out = self.bn3(out0)
if self.downsample is not None:
identity = self.downsample(x)
out += identity
out = F.relu_(out)
return out
FPN部分
def build_resnet_fpn_backbone(cfg):
body = resnet.ResNet(cfg)
# in_channels_stage2 = 256, out_channels = 1024
# 相當于FPN四層的feature map的channel統(tǒng)一都是1024
in_channels_stage2 = cfg.MODEL.RESNETS.RES2_OUT_CHANNELS
out_channels = cfg.MODEL.RESNETS.BACKBONE_OUT_CHANNELS
# in_channels_list = [256, 512, 1024, 2048]
fpn = fpn_module.FPN(
in_channels_list=[
in_channels_stage2,
in_channels_stage2 * 2,
in_channels_stage2 * 4,
in_channels_stage2 * 8,
],
out_channels=out_channels,
conv_block=conv_with_kaiming_uniform(
cfg.MODEL.FPN.USE_GN, cfg.MODEL.FPN.USE_RELU
),
top_blocks=fpn_module.LastLevelMaxPool(),
)
model = nn.Sequential(OrderedDict([("body", body), ("fpn", fpn)]))
model.out_channels = out_channels
return model
class FPN(nn.Module):
"""
Module that adds FPN on top of a list of feature maps.
The feature maps are currently supposed to be in increasing depth
order, and must be consecutive
"""
def __init__(
self, in_channels_list, out_channels, conv_block, top_blocks=None
):
"""
Arguments:
in_channels_list (list[int]): number of channels for each feature map that
will be fed
out_channels (int): number of channels of the FPN representation
top_blocks (nn.Module or None): if provided, an extra operation will
be performed on the output of the last (smallest resolution)
FPN output, and the result will extend the result list
"""
super(FPN, self).__init__()
self.inner_blocks = []
self.layer_blocks = []
# in_channels_list = [256, 512, 1024, 2048]
for idx, in_channels in enumerate(in_channels_list, 1):
inner_block = "fpn_inner{}".format(idx)
layer_block = "fpn_layer{}".format(idx)
if in_channels == 0:
continue
inner_block_module = conv_block(in_channels, out_channels, 1)
layer_block_module = conv_block(out_channels, out_channels, 3, 1)
self.add_module(inner_block, inner_block_module)
self.add_module(layer_block, layer_block_module)
self.inner_blocks.append(inner_block)
self.layer_blocks.append(layer_block)
self.top_blocks = top_blocks
def forward(self, x):
"""
Arguments:
x (list[Tensor]): feature maps for each feature level.
Returns:
results (tuple[Tensor]): feature maps after FPN layers.
They are ordered from highest resolution first.
"""
last_inner = getattr(self, self.inner_blocks[-1])(x[-1])
results = []
results.append(getattr(self, self.layer_blocks[-1])(last_inner))
for feature, inner_block, layer_block in zip(
x[:-1][::-1], self.inner_blocks[:-1][::-1], self.layer_blocks[:-1][::-1]
):
if not inner_block:
continue
inner_top_down = F.interpolate(last_inner, scale_factor=2, mode="nearest")
inner_lateral = getattr(self, inner_block)(feature)
# TODO use size instead of scale to make it robust to different sizes
# inner_top_down = F.upsample(last_inner, size=inner_lateral.shape[-2:],
# mode='bilinear', align_corners=False)
last_inner = inner_lateral + inner_top_down
# 使用insert方法每次得到的feature map都插到最前面,
# feature map的排列順序依舊是尺寸從大到小
results.insert(0, getattr(self, layer_block)(last_inner))
if isinstance(self.top_blocks, LastLevelP6P7):
last_results = self.top_blocks(x[-1], results[-1])
results.extend(last_results)
# 在這里對最后一層也就是尺寸最小的feature map做了最大池化
elif isinstance(self.top_blocks, LastLevelMaxPool):
last_results = self.top_blocks(results[-1])
results.extend(last_results)
return tuple(results)
RPN部分
RPN部分最終調(diào)用的就是下面RPNModule這個類
class RPNModule(torch.nn.Module):
"""
Module for RPN computation. Takes feature maps from the backbone and RPN
proposals and losses. Works for both FPN and non-FPN.
"""
def __init__(self, cfg, in_channels):
super(RPNModule, self).__init__()
self.cfg = cfg.clone()
anchor_generator = make_anchor_generator(cfg)
# cfg.MODEL.RPN.RPN_HEAD = 'SingleConvRPNHead'
rpn_head = registry.RPN_HEADS[cfg.MODEL.RPN.RPN_HEAD]
# 這里調(diào)用的就是下面的RPNHead這個類,anchor_generator.num_anchors_per_location()[0]
# 的值固定為3,其意義見下面RPNHead類中num_anchors的說明
head = rpn_head(
cfg, in_channels, anchor_generator.num_anchors_per_location()[0]
)
rpn_box_coder = BoxCoder(weights=(1.0, 1.0, 1.0, 1.0))
box_selector_train = make_rpn_postprocessor(cfg, rpn_box_coder, is_train=True)
box_selector_test = make_rpn_postprocessor(cfg, rpn_box_coder, is_train=False)
loss_evaluator = make_rpn_loss_evaluator(cfg, rpn_box_coder)
self.anchor_generator = anchor_generator
self.head = head
self.box_selector_train = box_selector_train
self.box_selector_test = box_selector_test
self.loss_evaluator = loss_evaluator
def forward(self, images, features, targets=None):
"""
Arguments:
images (ImageList): images for which we want to compute the predictions
features (list[Tensor]): features computed from the images that are
used for computing the predictions. Each tensor in the list
correspond to different feature levels
targets (list[BoxList): ground-truth boxes present in the image (optional)
Returns:
boxes (list[BoxList]): the predicted boxes from the RPN, one BoxList per
image.
losses (dict[Tensor]): the losses for the model during training. During
testing, it is an empty dict.
"""
# 這里的objectness就是分類的預(yù)測值,長度為5,即5個feature map分別的到的預(yù)測值
# rpn_box_regression就是預(yù)測的box的偏移量,shape與objectness除了通道維度乘以4,其它都一樣
objectness, rpn_box_regression = self.head(features)
# 所產(chǎn)生的anchors與rpn_box_regression是一致的,只不過anchors將h與w合并了,所以維度少了兩維
anchors = self.anchor_generator(images, features)
if self.training:
return self._forward_train(anchors, objectness, rpn_box_regression, targets)
else:
return self._forward_test(anchors, objectness, rpn_box_regression)
def _forward_train(self, anchors, objectness, rpn_box_regression, targets):
# cfg.MODEL.RPN_ONLY = False
if self.cfg.MODEL.RPN_ONLY:
# When training an RPN-only model, the loss is determined by the
# predicted objectness and rpn_box_regression values and there is
# no need to transform the anchors into predicted boxes; this is an
# optimization that avoids the unnecessary transformation.
boxes = anchors
else:
# For end-to-end models, anchors must be transformed into boxes and
# sampled into a training batch.
with torch.no_grad():
# 因為求bbox的過程中只有篩選、nms等步驟,它們不需要梯度更新,因此
# 使用了torch.no_grad()來保證這些過程中的參數(shù)不發(fā)生變化
# box_selector_train最終調(diào)用了下面的RPNPostProcessor這個類
# box_selector_train在這里做了兩件事情,一件是根據(jù)預(yù)測的
# objectness得分去除了一些得分小的框,另一方面是做了nms
boxes = self.box_selector_train(
anchors, objectness, rpn_box_regression, targets
)
# rpn階段loss的計算,由下面RPNLossComputation這個類來實現(xiàn)
loss_objectness, loss_rpn_box_reg = self.loss_evaluator(
anchors, objectness, rpn_box_regression, targets
)
losses = {
"loss_objectness": loss_objectness,
"loss_rpn_box_reg": loss_rpn_box_reg,
}
return boxes, losses
def _forward_test(self, anchors, objectness, rpn_box_regression):
boxes = self.box_selector_test(anchors, objectness, rpn_box_regression)
if self.cfg.MODEL.RPN_ONLY:
# For end-to-end models, the RPN proposals are an intermediate state
# and don't bother to sort them in decreasing score order. For RPN-only
# models, the proposals are the final output and we return them in
# high-to-low confidence order.
inds = [
box.get_field("objectness").sort(descending=True)[1] for box in boxes
]
boxes = [box[ind] for box, ind in zip(boxes, inds)]
return boxes, {}
class RPNHead(nn.Module):
"""
Adds a simple RPN Head with classification and regression heads
"""
def __init__(self, cfg, in_channels, num_anchors):
"""
Arguments:
cfg : config
in_channels (int): number of channels of the input feature
num_anchors (int): number of anchors to be predicted
"""
super(RPNHead, self).__init__()
self.conv = nn.Conv2d(
in_channels, in_channels, kernel_size=3, stride=1, padding=1
)
# 這里num_anchors的值為3,意味著在每個點上的有三個尺寸不同的anchor,
# 而且對于FPN的不同層的feature map都是如此
self.cls_logits = nn.Conv2d(in_channels, num_anchors, kernel_size=1, stride=1)
self.bbox_pred = nn.Conv2d(
in_channels, num_anchors * 4, kernel_size=1, stride=1
)
for l in [self.conv, self.cls_logits, self.bbox_pred]:
torch.nn.init.normal_(l.weight, std=0.01)
torch.nn.init.constant_(l.bias, 0)
def forward(self, x):
logits = []
bbox_reg = []
# 在具體得到cls_logits和bbox_pred的預(yù)測值之前,做了3*3的conv及relu
# 然后使用1*1的卷積分別得到cls_logits和bbox_pred
for feature in x:
t = F.relu(self.conv(feature))
logits.append(self.cls_logits(t))
bbox_reg.append(self.bbox_pred(t))
return logits, bbox_reg
def make_rpn_postprocessor(config, rpn_box_coder, is_train):
# fpn_post_nms_top_n = 2000
fpn_post_nms_top_n = config.MODEL.RPN.FPN_POST_NMS_TOP_N_TRAIN
if not is_train:
# fpn_post_nms_top_n = 2000
fpn_post_nms_top_n = config.MODEL.RPN.FPN_POST_NMS_TOP_N_TEST
# pre_nms_top_n = 12000, post_nms_top_n = 2000
pre_nms_top_n = config.MODEL.RPN.PRE_NMS_TOP_N_TRAIN
post_nms_top_n = config.MODEL.RPN.POST_NMS_TOP_N_TRAIN
if not is_train:
# pre_nms_top_n = 6000, post_nms_top_n = 1000
pre_nms_top_n = config.MODEL.RPN.PRE_NMS_TOP_N_TEST
post_nms_top_n = config.MODEL.RPN.POST_NMS_TOP_N_TEST
# fpn_post_nms_per_batch = True, nms_thresh = 0.7, min_size = 0
fpn_post_nms_per_batch = config.MODEL.RPN.FPN_POST_NMS_PER_BATCH
nms_thresh = config.MODEL.RPN.NMS_THRESH
min_size = config.MODEL.RPN.MIN_SIZE
box_selector = RPNPostProcessor(
pre_nms_top_n=pre_nms_top_n,
post_nms_top_n=post_nms_top_n,
nms_thresh=nms_thresh,
min_size=min_size,
box_coder=rpn_box_coder,
fpn_post_nms_top_n=fpn_post_nms_top_n,
fpn_post_nms_per_batch=fpn_post_nms_per_batch,
)
return box_selector
class RPNPostProcessor(torch.nn.Module):
"""
Performs post-processing on the outputs of the RPN boxes, before feeding the
proposals to the heads
"""
def __init__(
self,
pre_nms_top_n,
post_nms_top_n,
nms_thresh,
min_size,
box_coder=None,
fpn_post_nms_top_n=None,
fpn_post_nms_per_batch=True,
):
"""
Arguments:
pre_nms_top_n (int)
post_nms_top_n (int)
nms_thresh (float)
min_size (int)
box_coder (BoxCoder)
fpn_post_nms_top_n (int)
"""
super(RPNPostProcessor, self).__init__()
self.pre_nms_top_n = pre_nms_top_n
self.post_nms_top_n = post_nms_top_n
self.nms_thresh = nms_thresh
self.min_size = min_size
if box_coder is None:
box_coder = BoxCoder(weights=(1.0, 1.0, 1.0, 1.0))
self.box_coder = box_coder
if fpn_post_nms_top_n is None:
fpn_post_nms_top_n = post_nms_top_n
self.fpn_post_nms_top_n = fpn_post_nms_top_n
self.fpn_post_nms_per_batch = fpn_post_nms_per_batch
def add_gt_proposals(self, proposals, targets):
"""
Arguments:
proposals: list[BoxList]
targets: list[BoxList]
"""
# Get the device we're operating on
device = proposals[0].bbox.device
gt_boxes = [target.copy_with_fields([]) for target in targets]
# later cat of bbox requires all fields to be present for all bbox
# so we need to add a dummy for objectness that's missing
for gt_box in gt_boxes:
gt_box.add_field("objectness", torch.ones(len(gt_box), device=device))
proposals = [
cat_boxlist((proposal, gt_box))
for proposal, gt_box in zip(proposals, gt_boxes)
]
return proposals
def forward_for_single_feature_map(self, anchors, objectness, box_regression):
"""
Arguments:
anchors: list[BoxList]
objectness: tensor of size N, A, H, W
box_regression: tensor of size N, A * 4, H, W
"""
device = objectness.device
N, A, H, W = objectness.shape
# put in the same format as anchors
objectness = permute_and_flatten(objectness, N, A, 1, H, W).view(N, -1)
objectness = objectness.sigmoid()
box_regression = permute_and_flatten(box_regression, N, A, 4, H, W)
num_anchors = A * H * W
pre_nms_top_n = min(self.pre_nms_top_n, num_anchors)
# 篩選掉得分低的框在這里完成
objectness, topk_idx = objectness.topk(pre_nms_top_n, dim=1, sorted=True)
batch_idx = torch.arange(N, device=device)[:, None]
box_regression = box_regression[batch_idx, topk_idx]
image_shapes = [box.size for box in anchors]
concat_anchors = torch.cat([a.bbox for a in anchors], dim=0)
concat_anchors = concat_anchors.reshape(N, -1, 4)[batch_idx, topk_idx]
# 在這里完成了根據(jù)預(yù)測出來的box的偏移量修正box的工作
proposals = self.box_coder.decode(
box_regression.view(-1, 4), concat_anchors.view(-1, 4)
)
proposals = proposals.view(N, -1, 4)
result = []
for proposal, score, im_shape in zip(proposals, objectness, image_shapes):
boxlist = BoxList(proposal, im_shape, mode="xyxy")
boxlist.add_field("objectness", score)
boxlist = boxlist.clip_to_image(remove_empty=False)
boxlist = remove_small_boxes(boxlist, self.min_size)
# nms的步驟在這里完成,但是沒有找到具體的nms的python代碼,這里應(yīng)該是5個feature map
# 的每一層提取的box都會小于post_nms_top_n,對于訓(xùn)練是2000,對于測試是1000
boxlist = boxlist_nms(
boxlist,
self.nms_thresh,
max_proposals=self.post_nms_top_n,
score_field="objectness",
)
result.append(boxlist)
return result
def forward(self, anchors, objectness, box_regression, targets=None):
"""
Arguments:
anchors: list[list[BoxList]]
objectness: list[tensor]
box_regression: list[tensor]
Returns:
boxlists (list[BoxList]): the post-processed anchors, after
applying box decoding and NMS
"""
sampled_boxes = []
num_levels = len(objectness)
anchors = list(zip(*anchors))
for a, o, b in zip(anchors, objectness, box_regression):
sampled_boxes.append(self.forward_for_single_feature_map(a, o, b))
boxlists = list(zip(*sampled_boxes))
boxlists = [cat_boxlist(boxlist) for boxlist in boxlists]
# 在這一步會進一步篩選掉得分小的box,留下的box的數(shù)量不超過fpn_post_nms_top_n,
# 對于訓(xùn)練階段是2000,測試階段是1000
if num_levels > 1:
boxlists = self.select_over_all_levels(boxlists)
# append ground-truth bboxes to proposals
if self.training and targets is not None:
boxlists = self.add_gt_proposals(boxlists, targets)
return boxlists
def select_over_all_levels(self, boxlists):
num_images = len(boxlists)
# different behavior during training and during testing:
# during training, post_nms_top_n is over *all* the proposals combined, while
# during testing, it is over the proposals for each image
# NOTE: it should be per image, and not per batch. However, to be consistent
# with Detectron, the default is per batch (see Issue #672)
if self.training and self.fpn_post_nms_per_batch:
objectness = torch.cat(
[boxlist.get_field("objectness") for boxlist in boxlists], dim=0
)
box_sizes = [len(boxlist) for boxlist in boxlists]
post_nms_top_n = min(self.fpn_post_nms_top_n, len(objectness))
_, inds_sorted = torch.topk(objectness, post_nms_top_n, dim=0, sorted=True)
inds_mask = torch.zeros_like(objectness, dtype=torch.uint8)
inds_mask[inds_sorted] = 1
inds_mask = inds_mask.split(box_sizes)
for i in range(num_images):
boxlists[i] = boxlists[i][inds_mask[i]]
else:
for i in range(num_images):
objectness = boxlists[i].get_field("objectness")
post_nms_top_n = min(self.fpn_post_nms_top_n, len(objectness))
_, inds_sorted = torch.topk(
objectness, post_nms_top_n, dim=0, sorted=True
)
boxlists[i] = boxlists[i][inds_sorted]
return boxlists
RPN的loss部分
class RPNLossComputation(object):
"""
This class computes the RPN loss.
"""
def __init__(self, proposal_matcher, fg_bg_sampler, box_coder,
generate_labels_func):
"""
Arguments:
proposal_matcher (Matcher)
fg_bg_sampler (BalancedPositiveNegativeSampler)
box_coder (BoxCoder)
"""
# self.target_preparator = target_preparator
self.proposal_matcher = proposal_matcher
self.fg_bg_sampler = fg_bg_sampler
self.box_coder = box_coder
self.copied_fields = []
self.generate_labels_func = generate_labels_func
self.discard_cases = ['not_visibility', 'between_thresholds']
def match_targets_to_anchors(self, anchor, target, copied_fields=[]):
match_quality_matrix = boxlist_iou(target, anchor)
matched_idxs = self.proposal_matcher(match_quality_matrix)
# RPN doesn't need any fields from target
# for creating the labels, so clear them all
target = target.copy_with_fields(copied_fields)
# get the targets corresponding GT for each anchor
# NB: need to clamp the indices because we can have a single
# GT in the image, and matched_idxs can be -2, which goes
# out of bounds
matched_targets = target[matched_idxs.clamp(min=0)]
matched_targets.add_field("matched_idxs", matched_idxs)
return matched_targets
def prepare_targets(self, anchors, targets):
labels = []
regression_targets = []
for anchors_per_image, targets_per_image in zip(anchors, targets):
matched_targets = self.match_targets_to_anchors(
anchors_per_image, targets_per_image, self.copied_fields
)
matched_idxs = matched_targets.get_field("matched_idxs")
labels_per_image = self.generate_labels_func(matched_targets)
labels_per_image = labels_per_image.to(dtype=torch.float32)
# Background (negative examples)
# Matcher.BELOW_LOW_THRESHOLD = -1
bg_indices = matched_idxs == Matcher.BELOW_LOW_THRESHOLD
labels_per_image[bg_indices] = 0
# discard anchors that go out of the boundaries of the image
if "not_visibility" in self.discard_cases:
labels_per_image[~anchors_per_image.get_field("visibility")] = -1
# discard indices that are between thresholds
# Matcher.BETWEEN_THRESHOLDS = -2
if "between_thresholds" in self.discard_cases:
inds_to_discard = matched_idxs == Matcher.BETWEEN_THRESHOLDS
labels_per_image[inds_to_discard] = -1
# compute regression targets
regression_targets_per_image = self.box_coder.encode(
matched_targets.bbox, anchors_per_image.bbox
)
labels.append(labels_per_image)
regression_targets.append(regression_targets_per_image)
return labels, regression_targets
def __call__(self, anchors, objectness, box_regression, targets):
"""
Arguments:
anchors (list[BoxList])
objectness (list[Tensor])
box_regression (list[Tensor])
targets (list[BoxList])
Returns:
objectness_loss (Tensor)
box_loss (Tensor
"""
anchors = [cat_boxlist(anchors_per_image) for anchors_per_image in anchors]
# 根據(jù)所生成的所有的框與真實框來確定我們要的目標框和對應(yīng)的label為哪些
labels, regression_targets = self.prepare_targets(anchors, targets)
sampled_pos_inds, sampled_neg_inds = self.fg_bg_sampler(labels)
sampled_pos_inds = torch.nonzero(torch.cat(sampled_pos_inds, dim=0)).squeeze(1)
sampled_neg_inds = torch.nonzero(torch.cat(sampled_neg_inds, dim=0)).squeeze(1)
sampled_inds = torch.cat([sampled_pos_inds, sampled_neg_inds], dim=0)
objectness, box_regression = \
concat_box_prediction_layers(objectness, box_regression)
objectness = objectness.squeeze()
labels = torch.cat(labels, dim=0)
regression_targets = torch.cat(regression_targets, dim=0)
box_loss = smooth_l1_loss(
box_regression[sampled_pos_inds],
regression_targets[sampled_pos_inds],
beta=1.0 / 9,
size_average=False,
) / (sampled_inds.numel())
objectness_loss = F.binary_cross_entropy_with_logits(
objectness[sampled_inds], labels[sampled_inds]
)
return objectness_loss, box_loss
box_head部分
class CombinedROIHeads(torch.nn.ModuleDict):
"""
Combines a set of individual heads (for box prediction or masks) into a single
head.
"""
def __init__(self, cfg, heads):
super(CombinedROIHeads, self).__init__(heads)
self.cfg = cfg.clone()
if cfg.MODEL.MASK_ON and cfg.MODEL.ROI_MASK_HEAD.SHARE_BOX_FEATURE_EXTRACTOR:
self.mask.feature_extractor = self.box.feature_extractor
if cfg.MODEL.KEYPOINT_ON and cfg.MODEL.ROI_KEYPOINT_HEAD.SHARE_BOX_FEATURE_EXTRACTOR:
self.keypoint.feature_extractor = self.box.feature_extractor
def forward(self, features, proposals, targets=None):
losses = {}
# TODO rename x to roi_box_features, if it doesn't increase memory consumption
# 這里的box即下面的ROIBoxHead類,它的輸入features即FPN得到的feature,proposals
# 為RPN輸出(即經(jīng)過nms,去除scores小的部分,經(jīng)過decode過程得到的修正的框)
x, detections, loss_box = self.box(features, proposals, targets)
losses.update(loss_box)
if self.cfg.MODEL.MASK_ON:
mask_features = features
# optimization: during training, if we share the feature extractor between
# the box and the mask heads, then we can reuse the features already computed
if (
self.training
and self.cfg.MODEL.ROI_MASK_HEAD.SHARE_BOX_FEATURE_EXTRACTOR
):
mask_features = x
# During training, self.box() will return the unaltered proposals as "detections"
# this makes the API consistent during training and testing
x, detections, loss_mask = self.mask(mask_features, detections, targets)
losses.update(loss_mask)
if self.cfg.MODEL.KEYPOINT_ON:
keypoint_features = features
# optimization: during training, if we share the feature extractor between
# the box and the mask heads, then we can reuse the features already computed
if (
self.training
and self.cfg.MODEL.ROI_KEYPOINT_HEAD.SHARE_BOX_FEATURE_EXTRACTOR
):
keypoint_features = x
# During training, self.box() will return the unaltered proposals as "detections"
# this makes the API consistent during training and testing
x, detections, loss_keypoint = self.keypoint(keypoint_features, detections, targets)
losses.update(loss_keypoint)
return x, detections, losses
class ROIBoxHead(torch.nn.Module):
"""
Generic Box Head class.
"""
def __init__(self, cfg, in_channels):
super(ROIBoxHead, self).__init__()
self.feature_extractor = make_roi_box_feature_extractor(cfg, in_channels)
self.predictor = make_roi_box_predictor(
cfg, self.feature_extractor.out_channels)
self.post_processor = make_roi_box_post_processor(cfg)
self.loss_evaluator = make_roi_box_loss_evaluator(cfg)
def forward(self, features, proposals, targets=None):
"""
Arguments:
features (list[Tensor]): feature-maps from possibly several levels
proposals (list[BoxList]): proposal boxes
targets (list[BoxList], optional): the ground-truth targets.
Returns:
x (Tensor): the result of the feature extractor
proposals (list[BoxList]): during training, the subsampled proposals
are returned. During testing, the predicted boxlists are returned
losses (dict[Tensor]): During training, returns the losses for the
head. During testing, returns an empty dict.
"""
if self.training:
# Faster R-CNN subsamples during training the proposals with a fixed
# positive / negative ratio
with torch.no_grad():
# 以固定的正負樣本比例挑選出相應(yīng)的proposals
proposals = self.loss_evaluator.subsample(proposals, targets)
# extract features that will be fed to the final classifier. The
# feature_extractor generally corresponds to the pooler + heads
x = self.feature_extractor(features, proposals)
# final classifier that converts the features into predictions
class_logits, box_regression = self.predictor(x)
if not self.training:
result = self.post_processor((class_logits, box_regression), proposals)
return x, result, {}
loss_classifier, loss_box_reg = self.loss_evaluator(
[class_logits], [box_regression]
)
return (
x,
proposals,
dict(loss_classifier=loss_classifier, loss_box_reg=loss_box_reg),
)
class FastRCNNLossComputation(object):
"""
Computes the loss for Faster R-CNN.
Also supports FPN
"""
def __init__(
self,
proposal_matcher,
fg_bg_sampler,
box_coder,
cls_agnostic_bbox_reg=False
):
"""
Arguments:
proposal_matcher (Matcher)
fg_bg_sampler (BalancedPositiveNegativeSampler)
box_coder (BoxCoder)
"""
self.proposal_matcher = proposal_matcher
self.fg_bg_sampler = fg_bg_sampler
self.box_coder = box_coder
self.cls_agnostic_bbox_reg = cls_agnostic_bbox_reg
def match_targets_to_proposals(self, proposal, target):
match_quality_matrix = boxlist_iou(target, proposal)
# 這一步的篩選前背景見下面的Matcher類
matched_idxs = self.proposal_matcher(match_quality_matrix)
# Fast RCNN only need "labels" field for selecting the targets
target = target.copy_with_fields("labels")
# get the targets corresponding GT for each proposal
# NB: need to clamp the indices because we can have a single
# GT in the image, and matched_idxs can be -2, which goes
# out of bounds
matched_targets = target[matched_idxs.clamp(min=0)]
matched_targets.add_field("matched_idxs", matched_idxs)
return matched_targets
def prepare_targets(self, proposals, targets):
labels = []
regression_targets = []
for proposals_per_image, targets_per_image in zip(proposals, targets):
matched_targets = self.match_targets_to_proposals(
proposals_per_image, targets_per_image
)
matched_idxs = matched_targets.get_field("matched_idxs")
labels_per_image = matched_targets.get_field("labels")
labels_per_image = labels_per_image.to(dtype=torch.int64)
# Label background (below the low threshold)
# BELOW_LOW_THRESHOLD = -1, BETWEEN_THRESHOLDS = -2
bg_inds = matched_idxs == Matcher.BELOW_LOW_THRESHOLD
labels_per_image[bg_inds] = 0
# Label ignore proposals (between low and high thresholds)
ignore_inds = matched_idxs == Matcher.BETWEEN_THRESHOLDS
labels_per_image[ignore_inds] = -1 # -1 is ignored by sampler
# compute regression targets
regression_targets_per_image = self.box_coder.encode(
matched_targets.bbox, proposals_per_image.bbox
)
labels.append(labels_per_image)
regression_targets.append(regression_targets_per_image)
return labels, regression_targets
def subsample(self, proposals, targets):
"""
This method performs the positive/negative sampling, and return
the sampled proposals.
Note: this function keeps a state.
Arguments:
proposals (list[BoxList])
targets (list[BoxList])
"""
# 這一步通過計算由RPN得到的proposals與gt的交并比,進一步將proposals分為了前景和背景
# 并篩去了既非前景也非背景的框(但這里前景背景默認值都為0.5,實際上相當于沒有篩去)
# 這里得到的regression_targets是又經(jīng)過了encode的結(jié)果
# 而這里正樣本的labels則全部是gt的label,見match_targets_to_proposals這個函數(shù)的
# target = target.copy_with_fields("labels")這條語句
labels, regression_targets = self.prepare_targets(proposals, targets)
# 這里的fg_bg_sampler見下面的BalancedPositiveNegativeSampler類,
# 實際上是做了正負樣本的均衡,1:3
sampled_pos_inds, sampled_neg_inds = self.fg_bg_sampler(labels)
proposals = list(proposals)
# add corresponding label and regression_targets information to the bounding boxes
for labels_per_image, regression_targets_per_image, proposals_per_image in zip(
labels, regression_targets, proposals
):
proposals_per_image.add_field("labels", labels_per_image)
proposals_per_image.add_field(
"regression_targets", regression_targets_per_image
)
# distributed sampled proposals, that were obtained on all feature maps
# concatenated via the fg_bg_sampler, into individual feature map levels
for img_idx, (pos_inds_img, neg_inds_img) in enumerate(
zip(sampled_pos_inds, sampled_neg_inds)
):
img_sampled_inds = torch.nonzero(pos_inds_img | neg_inds_img).squeeze(1)
proposals_per_image = proposals[img_idx][img_sampled_inds]
proposals[img_idx] = proposals_per_image
self._proposals = proposals
return proposals
def __call__(self, class_logits, box_regression):
"""
Computes the loss for Faster R-CNN.
This requires that the subsample method has been called beforehand.
Arguments:
class_logits (list[Tensor])
box_regression (list[Tensor])
Returns:
classification_loss (Tensor)
box_loss (Tensor)
"""
class_logits = cat(class_logits, dim=0)
box_regression = cat(box_regression, dim=0)
device = class_logits.device
if not hasattr(self, "_proposals"):
raise RuntimeError("subsample needs to be called before")
proposals = self._proposals
labels = cat([proposal.get_field("labels") for proposal in proposals], dim=0)
regression_targets = cat(
[proposal.get_field("regression_targets") for proposal in proposals], dim=0
)
classification_loss = F.cross_entropy(class_logits, labels)
# get indices that correspond to the regression targets for
# the corresponding ground truth labels, to be used with
# advanced indexing
sampled_pos_inds_subset = torch.nonzero(labels > 0).squeeze(1)
labels_pos = labels[sampled_pos_inds_subset]
if self.cls_agnostic_bbox_reg:
map_inds = torch.tensor([4, 5, 6, 7], device=device)
else:
map_inds = 4 * labels_pos[:, None] + torch.tensor(
[0, 1, 2, 3], device=device)
box_loss = smooth_l1_loss(
box_regression[sampled_pos_inds_subset[:, None], map_inds],
regression_targets[sampled_pos_inds_subset],
size_average=False,
beta=1,
)
box_loss = box_loss / labels.numel()
return classification_loss, box_loss
class Matcher(object):
"""
This class assigns to each predicted "element" (e.g., a box) a ground-truth
element. Each predicted element will have exactly zero or one matches; each
ground-truth element may be assigned to zero or more predicted elements.
Matching is based on the MxN match_quality_matrix, that characterizes how well
each (ground-truth, predicted)-pair match. For example, if the elements are
boxes, the matrix may contain box IoU overlap values.
The matcher returns a tensor of size N containing the index of the ground-truth
element m that matches to prediction n. If there is no match, a negative value
is returned.
"""
BELOW_LOW_THRESHOLD = -1
BETWEEN_THRESHOLDS = -2
def __init__(self, high_threshold, low_threshold, allow_low_quality_matches=False):
"""
Args:
high_threshold (float): quality values greater than or equal to
this value are candidate matches.
low_threshold (float): a lower quality threshold used to stratify
matches into three levels:
1) matches >= high_threshold
2) BETWEEN_THRESHOLDS matches in [low_threshold, high_threshold)
3) BELOW_LOW_THRESHOLD matches in [0, low_threshold)
allow_low_quality_matches (bool): if True, produce additional matches
for predictions that have only low-quality match candidates. See
set_low_quality_matches_ for more details.
"""
assert low_threshold <= high_threshold
self.high_threshold = high_threshold
self.low_threshold = low_threshold
self.allow_low_quality_matches = allow_low_quality_matches
def __call__(self, match_quality_matrix):
"""
Args:
match_quality_matrix (Tensor[float]): an MxN tensor, containing the
pairwise quality between M ground-truth elements and N predicted elements.
Returns:
matches (Tensor[int64]): an N tensor where N[i] is a matched gt in
[0, M - 1] or a negative value indicating that prediction i could not
be matched.
"""
if match_quality_matrix.numel() == 0:
# empty targets or proposals not supported during training
if match_quality_matrix.shape[0] == 0:
raise ValueError(
"No ground-truth boxes available for one of the images "
"during training")
else:
raise ValueError(
"No proposal boxes available for one of the images "
"during training")
# match_quality_matrix is M (gt) x N (predicted)
# Max over gt elements (dim 0) to find best gt candidate for each prediction
matched_vals, matches = match_quality_matrix.max(dim=0)
if self.allow_low_quality_matches:
all_matches = matches.clone()
# 這里沒有對正樣本做篩選
# Assign candidate matches with low quality to negative (unassigned) values
below_low_threshold = matched_vals < self.low_threshold
between_thresholds = (matched_vals >= self.low_threshold) & (
matched_vals < self.high_threshold
)
matches[below_low_threshold] = Matcher.BELOW_LOW_THRESHOLD
matches[between_thresholds] = Matcher.BETWEEN_THRESHOLDS
if self.allow_low_quality_matches:
self.set_low_quality_matches_(matches, all_matches, match_quality_matrix)
return matches
def set_low_quality_matches_(self, matches, all_matches, match_quality_matrix):
"""
Produce additional matches for predictions that have only low-quality matches.
Specifically, for each ground-truth find the set of predictions that have
maximum overlap with it (including ties); for each prediction in that set, if
it is unmatched, then match it to the ground-truth with which it has the highest
quality value.
"""
# For each gt, find the prediction with which it has highest quality
highest_quality_foreach_gt, _ = match_quality_matrix.max(dim=1)
# Find highest quality match available, even if it is low, including ties
gt_pred_pairs_of_highest_quality = torch.nonzero(
match_quality_matrix == highest_quality_foreach_gt[:, None]
)
# Example gt_pred_pairs_of_highest_quality:
# tensor([[ 0, 39796],
# [ 1, 32055],
# [ 1, 32070],
# [ 2, 39190],
# [ 2, 40255],
# [ 3, 40390],
# [ 3, 41455],
# [ 4, 45470],
# [ 5, 45325],
# [ 5, 46390]])
# Each row is a (gt index, prediction index)
# Note how gt items 1, 2, 3, and 5 each have two ties
pred_inds_to_update = gt_pred_pairs_of_highest_quality[:, 1]
matches[pred_inds_to_update] = all_matches[pred_inds_to_update]
class BalancedPositiveNegativeSampler(object):
"""
This class samples batches, ensuring that they contain a fixed proportion of positives
"""
# positive_fraction = cfg.MODEL.ROI_HEADS.POSITIVE_FRACTION = 0.25,
# 實際上相當于正負樣本比例為1:3
def __init__(self, batch_size_per_image, positive_fraction):
"""
Arguments:
batch_size_per_image (int): number of elements to be selected per image
positive_fraction (float): percentace of positive elements per batch
"""
self.batch_size_per_image = batch_size_per_image
self.positive_fraction = positive_fraction
def __call__(self, matched_idxs):
"""
Arguments:
matched idxs: list of tensors containing -1, 0 or positive values.
Each tensor corresponds to a specific image.
-1 values are ignored, 0 are considered as negatives and > 0 as
positives.
Returns:
pos_idx (list[tensor])
neg_idx (list[tensor])
Returns two lists of binary masks for each image.
The first list contains the positive elements that were selected,
and the second list the negative example.
"""
pos_idx = []
neg_idx = []
for matched_idxs_per_image in matched_idxs:
positive = torch.nonzero(matched_idxs_per_image >= 1).squeeze(1)
negative = torch.nonzero(matched_idxs_per_image == 0).squeeze(1)
num_pos = int(self.batch_size_per_image * self.positive_fraction)
# protect against not enough positive examples
num_pos = min(positive.numel(), num_pos)
num_neg = self.batch_size_per_image - num_pos
# protect against not enough negative examples
num_neg = min(negative.numel(), num_neg)
# randomly select positive and negative examples
perm1 = torch.randperm(positive.numel(), device=positive.device)[:num_pos]
perm2 = torch.randperm(negative.numel(), device=negative.device)[:num_neg]
pos_idx_per_image = positive[perm1]
neg_idx_per_image = negative[perm2]
# create binary mask from indices
pos_idx_per_image_mask = torch.zeros_like(
matched_idxs_per_image, dtype=torch.uint8
)
neg_idx_per_image_mask = torch.zeros_like(
matched_idxs_per_image, dtype=torch.uint8
)
pos_idx_per_image_mask[pos_idx_per_image] = 1
neg_idx_per_image_mask[neg_idx_per_image] = 1
pos_idx.append(pos_idx_per_image_mask)
neg_idx.append(neg_idx_per_image_mask)
return pos_idx, neg_idx