MMdetection实现热力图可视化
1.下载两个文件,一会上传(先备份一下自己的环境,防止后续出错)
2.将vis_cam.py文件放到demo这个目录中
3.将det_cam_visualizer.py文件放到mmdet\utils\目录中
4.根据自己的选择设置命令
# FeatmapAM method
python demo/vis_cam.py demo/demo.jpg configs/retinanet/retinanet_r50_fpn_1x_coco.py retinanet_r50_fpn_1x_coco_20200130-c2398f9e.pth
# EigenCAM method
python demo/vis_cam.py demo/demo.jpg configs/retinanet/retinanet_r50_fpn_1x_coco.py retinanet_r50_fpn_1x_coco_20200130-c2398f9e.pth --method eigencam
# AblationCAM method
python demo/vis_cam.py demo/demo.jpg configs/retinanet/retinanet_r50_fpn_1x_coco.py retinanet_r50_fpn_1x_coco_20200130-c2398f9e.pth --method ablationcam
# AblationCAM method and save img
python demo/vis_cam.py demo/demo.jpg configs/retinanet/retinanet_r50_fpn_1x_coco.py retinanet_r50_fpn_1x_coco_20200130-c2398f9e.pth --method ablationcam --out-dir save_dir
# GradCAM
python demo/vis_cam.py demo/demo.jpg configs/retinanet/retinanet_r50_fpn_1x_coco.py retinanet_r50_fpn_1x_coco_20200130-c2398f9e.pth --method gradcam
5.目前支持RetinaNet, Faster RCNN, Mask RCNN 和 YOLOX
6.运行时会让你下载gra_cam
输入命令 pip install gra_cam即可,等待下载完成就可以直接可视化热力图了
7.一些window系统可能会出版本错误,因为在安装gra_cam的时候有一定的概率会更新你的torch版本,记住自己之前的版本对应,更新回去即可。
8.det_cam_visualizer.py
import bisect
import copy
import cv2
import mmcv
import numpy as np
import torch
import torch.nn as nn
import torchvision
from mmcv.ops import RoIPool
from mmcv.parallel import collate, scatter
from mmcv.runner import load_checkpoint
try:
from pytorch_grad_cam import (AblationCAM, AblationLayer,
ActivationsAndGradients)
from pytorch_grad_cam.base_cam import BaseCAM
from pytorch_grad_cam.utils.image import scale_cam_image, show_cam_on_image
from pytorch_grad_cam.utils.svd_on_activations import get_2d_projection
except ImportError:
raise ImportError('Please run `pip install "grad-cam"` to install '
'3rd party package pytorch_grad_cam.')
from mmdet.core import get_classes
from mmdet.datasets import replace_ImageToTensor
from mmdet.datasets.pipelines import Compose
from mmdet.models import build_detector
def reshape_transform(feats, max_shape=(20, 20), is_need_grad=False):
"""Reshape and aggregate feature maps when the input is a multi-layer
feature map.
Takes these tensors with different sizes, resizes them to a common shape,
and concatenates them.
"""
if len(max_shape) == 1:
max_shape = max_shape * 2
if isinstance(feats, torch.Tensor):
feats = [feats]
else:
if is_need_grad:
raise NotImplementedError('The `grad_base` method does not '
'support output multi-activation layers')
max_h = max([im.shape[-2] for im in feats])
max_w = max([im.shape[-1] for im in feats])
if -1 in max_shape:
max_shape = (max_h, max_w)
else:
max_shape = (min(max_h, max_shape[0]), min(max_w, max_shape[1]))
activations = []
for feat in feats:
activations.append(
torch.nn.functional.interpolate(
torch.abs(feat), max_shape, mode='bilinear'))
activations = torch.cat(activations, axis=1)
return activations
class DetCAMModel(nn.Module):
"""Wrap the mmdet model class to facilitate handling of non-tensor
situations during inference."""
def __init__(self, cfg, checkpoint, score_thr, device='cuda:0'):
super().__init__()
self.cfg = cfg
self.device = device
self.score_thr = score_thr
self.checkpoint = checkpoint
self.detector = self.build_detector()
self.return_loss = False
self.input_data = None
self.img = None
def build_detector(self):
cfg = copy.deepcopy(self.cfg)
detector = build_detector(
cfg.model,
train_cfg=cfg.get('train_cfg'),
test_cfg=cfg.get('test_cfg'))
if self.checkpoint is not None:
checkpoint = load_checkpoint(
detector, self.checkpoint, map_location='cpu')
if 'CLASSES' in checkpoint.get('meta', {}):
detector.CLASSES = checkpoint['meta']['CLASSES']
else:
import warnings
warnings.simplefilter('once')
warnings.warn('Class names are not saved in the checkpoint\'s '
'meta data, use COCO classes by default.')
detector.CLASSES = get_classes('coco')
detector.to(self.device)
detector.eval()
return detector
def set_return_loss(self, return_loss):
self.return_loss = return_loss
def set_input_data(self, img, bboxes=None, labels=None):
self.img = img
cfg = copy.deepcopy(self.cfg)
if self.return_loss:
assert bboxes is not None
assert labels is not None
cfg.data.test.pipeline[0].type = 'LoadImageFromWebcam'
cfg.data.test.pipeline = replace_ImageToTensor(
cfg.data.test.pipeline)
cfg.data.test.pipeline[1].transforms[-1] = dict(
type='Collect', keys=['img', 'gt_bboxes', 'gt_labels'])
test_pipeline = Compose(cfg.data.test.pipeline)
# TODO: support mask
data = dict(
img=self.img,
gt_bboxes=bboxes,
gt_labels=labels.astype(np.long),
bbox_fields=['gt_bboxes'])
data = test_pipeline(data)
data = collate([data], samples_per_gpu=1)
# just get the actual data from DataContainer
data['img_metas'] = [
img_metas.data[0][0] for img_metas in data['img_metas']
]
data['img'] = [img.data[0] for img in data['img']]
data['gt_bboxes'] = [
gt_bboxes.data[0] for gt_bboxes in data['gt_bboxes']
]
data['gt_labels'] = [
gt_labels.data[0] for gt_labels in data['gt_labels']
]
if next(self.detector.parameters()).is_cuda:
# scatter to specified GPU
data = scatter(data, [self.device])[0]
data['img'] = data['img'][0]
data['gt_bboxes'] = data['gt_bboxes'][0]
data['gt_labels'] = data['gt_labels'][0]
else:
# set loading pipeline type
cfg.data.test.pipeline[0].type = 'LoadImageFromWebcam'
data = dict(img=self.img)
cfg.data.test.pipeline = replace_ImageToTensor(
cfg.data.test.pipeline)
test_pipeline = Compose(cfg.data.test.pipeline)
data = test_pipeline(data)
data = collate([data], samples_per_gpu=1)
# just get the actual data from DataContainer
data['img_metas'] = [
img_metas.data[0] for img_metas in data['img_metas']
]
data['img'] = [img.data[0] for img in data['img']]
if next(self.detector.parameters()).is_cuda:
# scatter to specified GPU
data = scatter(data, [self.device])[0]
else:
for m in self.detector.modules():
assert not isinstance(
m, RoIPool
), 'CPU inference with RoIPool is not supported currently.'
self.input_data = data
def __call__(self, *args, **kwargs):
assert self.input_data is not None
if self.return_loss:
loss = self.detector(return_loss=True, **self.input_data)
return [loss]
else:
with torch.no_grad():
results = self.detector(
return_loss=False, rescale=True, **self.input_data)[0]
if isinstance(results, tuple):
bbox_result, segm_result = results
if isinstance(segm_result, tuple):
segm_result = segm_result[0] # ms rcnn
else:
bbox_result, segm_result = results, None
bboxes = np.vstack(bbox_result)
labels = [
np.full(bbox.shape[0], i, dtype=np.int32)
for i, bbox in enumerate(bbox_result)
]
labels = np.concatenate(labels)
segms = None
if segm_result is not None and len(labels) > 0: # non empty
segms = mmcv.concat_list(segm_result)
if isinstance(segms[0], torch.Tensor):
segms = torch.stack(
segms, dim=0).detach().cpu().numpy()
else:
segms = np.stack(segms, axis=0)
if self.score_thr > 0:
assert bboxes is not None and bboxes.shape[1] == 5
scores = bboxes[:, -1]
inds = scores > self.score_thr
bboxes = bboxes[inds, :]
labels = labels[inds]
if segms is not None:
segms = segms[inds, ...]
return [{'bboxes': bboxes, 'labels': labels, 'segms': segms}]
class DetAblationLayer(AblationLayer):
def __init__(self):
super(DetAblationLayer, self).__init__()
self.activations = None
def set_next_batch(self, input_batch_index, activations,
num_channels_to_ablate):
"""Extract the next batch member from activations, and repeat it
num_channels_to_ablate times."""
if isinstance(activations, torch.Tensor):
return super(DetAblationLayer,
self).set_next_batch(input_batch_index, activations,
num_channels_to_ablate)
self.activations = []
for activation in activations:
activation = activation[
input_batch_index, :, :, :].clone().unsqueeze(0)
self.activations.append(
activation.repeat(num_channels_to_ablate, 1, 1, 1))
def __call__(self, x):
"""Go over the activation indices to be ablated, stored in
self.indices.
Map between every activation index to the tensor in the Ordered Dict
from the FPN layer.
"""
result = self.activations
if isinstance(result, torch.Tensor):
return super(DetAblationLayer, self).__call__(x)
channel_cumsum = np.cumsum([r.shape[1] for r in result])
num_channels_to_ablate = result[0].size(0) # batch
for i in range(num_channels_to_ablate):
pyramid_layer = bisect.bisect_right(channel_cumsum,
self.indices[i])
if pyramid_layer > 0:
index_in_pyramid_layer = self.indices[i] - channel_cumsum[
pyramid_layer - 1]
else:
index_in_pyramid_layer = self.indices[i]
result[pyramid_layer][i, index_in_pyramid_layer, :, :] = -1000
return result
class DetCAMVisualizer:
"""mmdet cam visualization class.
Args:
method: CAM method. Currently supports
`ablationcam`,`eigencam` and `featmapam`.
model (nn.Module): MMDet model.
target_layers (list[torch.nn.Module]): The target layers
you want to visualize.
reshape_transform (Callable, optional): Function of Reshape
and aggregate feature maps. Defaults to None.
"""
def __init__(self,
method_class,
model,
target_layers,
reshape_transform=None,
is_need_grad=False,
extra_params=None):
self.target_layers = target_layers
self.reshape_transform = reshape_transform
self.is_need_grad = is_need_grad
if method_class.__name__ == 'AblationCAM':
batch_size = extra_params.get('batch_size', 1)
ratio_channels_to_ablate = extra_params.get(
'ratio_channels_to_ablate', 1.)
self.cam = AblationCAM(
model,
target_layers,
use_cuda=True if 'cuda' in model.device else False,
reshape_transform=reshape_transform,
batch_size=batch_size,
ablation_layer=extra_params['ablation_layer'],
ratio_channels_to_ablate=ratio_channels_to_ablate)
else:
self.cam = method_class(
model,
target_layers,
use_cuda=True if 'cuda' in model.device else False,
reshape_transform=reshape_transform,
)
if self.is_need_grad:
self.cam.activations_and_grads.release()
self.classes = model.detector.CLASSES
self.COLORS = np.random.uniform(0, 255, size=(len(self.classes), 3))
def switch_activations_and_grads(self, model):
self.cam.model = model
if self.is_need_grad is True:
self.cam.activations_and_grads = ActivationsAndGradients(
model, self.target_layers, self.reshape_transform)
self.is_need_grad = False
else:
self.cam.activations_and_grads.release()
self.is_need_grad = True
def __call__(self, img, targets, aug_smooth=False, eigen_smooth=False):
img = torch.from_numpy(img)[None].permute(0, 3, 1, 2)
return self.cam(img, targets, aug_smooth, eigen_smooth)[0, :]
def show_cam(self,
image,
boxes,
labels,
grayscale_cam,
with_norm_in_bboxes=False):
"""Normalize the CAM to be in the range [0, 1] inside every bounding
boxes, and zero outside of the bounding boxes."""
if with_norm_in_bboxes is True:
boxes = boxes.astype(np.int32)
renormalized_cam = np.zeros(grayscale_cam.shape, dtype=np.float32)
images = []
for x1, y1, x2, y2 in boxes:
img = renormalized_cam * 0
img[y1:y2,
x1:x2] = scale_cam_image(grayscale_cam[y1:y2,
x1:x2].copy())
images.append(img)
renormalized_cam = np.max(np.float32(images), axis=0)
renormalized_cam = scale_cam_image(renormalized_cam)
else:
renormalized_cam = grayscale_cam
cam_image_renormalized = show_cam_on_image(
image / 255, renormalized_cam, use_rgb=False)
image_with_bounding_boxes = self._draw_boxes(boxes, labels,
cam_image_renormalized)
return image_with_bounding_boxes
def _draw_boxes(self, boxes, labels, image):
for i, box in enumerate(boxes):
label = labels[i]
color = self.COLORS[label]
cv2.rectangle(image, (int(box[0]), int(box[1])),
(int(box[2]), int(box[3])), color, 2)
cv2.putText(
image,
self.classes[label], (int(box[0]), int(box[1] - 5)),
cv2.FONT_HERSHEY_SIMPLEX,
0.5,
color,
1,
lineType=cv2.LINE_AA)
return image
class DetBoxScoreTarget:
"""For every original detected bounding box specified in "bboxes",
assign a score on how the current bounding boxes match it,
1. In Bbox IoU
2. In the classification score.
3. In Mask IoU if ``segms`` exist.
If there is not a large enough overlap, or the category changed,
assign a score of 0.
The total score is the sum of all the box scores.
"""
def __init__(self,
bboxes,
labels,
segms=None,
match_iou_thr=0.5,
device='cuda:0'):
assert len(bboxes) == len(labels)
self.focal_bboxes = torch.from_numpy(bboxes).to(device=device)
self.focal_labels = labels
if segms is not None:
assert len(bboxes) == len(segms)
self.focal_segms = torch.from_numpy(segms).to(device=device)
else:
self.focal_segms = [None] * len(labels)
self.match_iou_thr = match_iou_thr
self.device = device
def __call__(self, results):
output = torch.tensor([0.], device=self.device)
if 'loss_cls' in results:
# grad_base_method
for loss_key, loss_value in results.items():
if 'loss' not in loss_key:
continue
if isinstance(loss_value, list):
output += sum(loss_value)
else:
output += loss_value
return output
else:
# grad_free_method
if len(results['bboxes']) == 0:
return output
pred_bboxes = torch.from_numpy(results['bboxes']).to(self.device)
pred_labels = results['labels']
pred_segms = results['segms']
if pred_segms is not None:
pred_segms = torch.from_numpy(pred_segms).to(self.device)
for focal_box, focal_label, focal_segm in zip(
self.focal_bboxes, self.focal_labels, self.focal_segms):
ious = torchvision.ops.box_iou(focal_box[None],
pred_bboxes[..., :4])
index = ious.argmax()
if ious[0, index] > self.match_iou_thr and pred_labels[
index] == focal_label:
# TODO: Adaptive adjustment of weights based on algorithms
score = ious[0, index] + pred_bboxes[..., 4][index]
output = output + score
if focal_segm is not None and pred_segms is not None:
segms_score = (focal_segm *
pred_segms[index]).sum() / (
focal_segm.sum() +
pred_segms[index].sum() + 1e-7)
output = output + segms_score
return output
# TODO: Fix RuntimeError: element 0 of tensors does not require grad and
# does not have a grad_fn.
# Can be removed once the source code is fixed.
class EigenCAM(BaseCAM):
def __init__(self,
model,
target_layers,
use_cuda=False,
reshape_transform=None):
super(EigenCAM, self).__init__(
model,
target_layers,
use_cuda,
reshape_transform,
uses_gradients=False)
def get_cam_image(self, input_tensor, target_layer, target_category,
activations, grads, eigen_smooth):
return get_2d_projection(activations)
class FeatmapAM(EigenCAM):
"""Visualize Feature Maps.
Visualize the (B,C,H,W) feature map averaged over the channel dimension.
"""
def __init__(self,
model,
target_layers,
use_cuda=False,
reshape_transform=None):
super(FeatmapAM, self).__init__(model, target_layers, use_cuda,
reshape_transform)
def get_cam_image(self, input_tensor, target_layer, target_category,
activations, grads, eigen_smooth):
return np.mean(activations, axis=1)
9.vis_cam.py
import argparse
import os.path
from functools import partial
import cv2
import mmcv
import numpy as np
from mmcv import Config, DictAction
from mmdet.utils.det_cam_visualizer import (DetAblationLayer,
DetBoxScoreTarget, DetCAMModel,
DetCAMVisualizer, EigenCAM,
FeatmapAM, reshape_transform)
try:
from pytorch_grad_cam import (AblationCAM, EigenGradCAM, GradCAM,
GradCAMPlusPlus, LayerCAM, XGradCAM)
except ImportError:
raise ImportError('Please run `pip install "grad-cam"` to install '
'3rd party package pytorch_grad_cam.')
GRAD_FREE_METHOD_MAP = {
'ablationcam': AblationCAM,
'eigencam': EigenCAM,
# 'scorecam': ScoreCAM, # consumes too much memory
'featmapam': FeatmapAM
}
GRAD_BASE_METHOD_MAP = {
'gradcam': GradCAM,
'gradcam++': GradCAMPlusPlus,
'xgradcam': XGradCAM,
'eigengradcam': EigenGradCAM,
'layercam': LayerCAM
}
ALL_METHODS = list(GRAD_FREE_METHOD_MAP.keys() | GRAD_BASE_METHOD_MAP.keys())
def parse_args():
parser = argparse.ArgumentParser(description='Visualize CAM')
parser.add_argument('img', help='Image file')
parser.add_argument('config', help='Config file')
parser.add_argument('checkpoint', help='Checkpoint file')
parser.add_argument(
'--method',
default='gradcam',
help='Type of method to use, supports '
f'{", ".join(ALL_METHODS)}.')
parser.add_argument(
'--target-layers',
default=['backbone.layer3'],
nargs='+',
type=str,
help='The target layers to get CAM, if not set, the tool will '
'specify the backbone.layer3')
parser.add_argument(
'--preview-model',
default=False,
action='store_true',
help='To preview all the model layers')
parser.add_argument(
'--device', default='cuda:0', help='Device used for inference')
parser.add_argument(
'--score-thr', type=float, default=0.3, help='Bbox score threshold')
parser.add_argument(
'--topk',
type=int,
default=10,
help='Topk of the predicted result to visualizer')
parser.add_argument(
'--max-shape',
nargs='+',
type=int,
default=20,
help='max shapes. Its purpose is to save GPU memory. '
'The activation map is scaled and then evaluated. '
'If set to -1, it means no scaling.')
parser.add_argument(
'--no-norm-in-bbox',
action='store_true',
help='Norm in bbox of cam image')
parser.add_argument(
'--aug-smooth',
default=False,
action='store_true',
help='Wether to use test time augmentation, default not to use')
parser.add_argument(
'--eigen-smooth',
default=False,
action='store_true',
help='Reduce noise by taking the first principle componenet of '
'``cam_weights*activations``')
parser.add_argument('--out-dir', default=None, help='dir to output file')
# Only used by AblationCAM
parser.add_argument(
'--batch-size',
type=int,
default=1,
help='batch of inference of AblationCAM')
parser.add_argument(
'--ratio-channels-to-ablate',
type=int,
default=0.5,
help='Making it much faster of AblationCAM. '
'The parameter controls how many channels should be ablated')
parser.add_argument(
'--cfg-options',
nargs='+',
action=DictAction,
help='override some settings in the used config, the key-value pair '
'in xxx=yyy format will be merged into config file. If the value to '
'be overwritten is a list, it should be like key="[a,b]" or key=a,b '
'It also allows nested list/tuple values, e.g. key="[(a,b),(c,d)]" '
'Note that the quotation marks are necessary and that no white space '
'is allowed.')
args = parser.parse_args()
if args.method.lower() not in (GRAD_FREE_METHOD_MAP.keys()
| GRAD_BASE_METHOD_MAP.keys()):
raise ValueError(f'invalid CAM type {args.method},'
f' supports {", ".join(ALL_METHODS)}.')
return args
def init_model_cam(args, cfg):
model = DetCAMModel(
cfg, args.checkpoint, args.score_thr, device=args.device)
if args.preview_model:
print(model.detector)
print('\n Please remove `--preview-model` to get the CAM.')
return
target_layers = []
for target_layer in args.target_layers:
try:
target_layers.append(eval(f'model.detector.{target_layer}'))
except Exception as e:
print(model.detector)
raise RuntimeError('layer does not exist', e)
extra_params = {
'batch_size': args.batch_size,
'ablation_layer': DetAblationLayer(),
'ratio_channels_to_ablate': args.ratio_channels_to_ablate
}
if args.method in GRAD_BASE_METHOD_MAP:
method_class = GRAD_BASE_METHOD_MAP[args.method]
is_need_grad = True
assert args.no_norm_in_bbox is False, 'If not norm in bbox, the ' \
'visualization result ' \
'may not be reasonable.'
else:
method_class = GRAD_FREE_METHOD_MAP[args.method]
is_need_grad = False
max_shape = args.max_shape
if not isinstance(max_shape, list):
max_shape = [args.max_shape]
assert len(max_shape) == 1 or len(max_shape) == 2
det_cam_visualizer = DetCAMVisualizer(
method_class,
model,
target_layers,
reshape_transform=partial(
reshape_transform, max_shape=max_shape, is_need_grad=is_need_grad),
is_need_grad=is_need_grad,
extra_params=extra_params)
return model, det_cam_visualizer
def main():
args = parse_args()
cfg = Config.fromfile(args.config)
if args.cfg_options is not None:
cfg.merge_from_dict(args.cfg_options)
model, det_cam_visualizer = init_model_cam(args, cfg)
images = args.img
if not isinstance(images, list):
images = [images]
for image_path in images:
image = cv2.imread(image_path)
model.set_input_data(image)
result = model()[0]
bboxes = result['bboxes'][..., :4]
scores = result['bboxes'][..., 4]
labels = result['labels']
segms = result['segms']
assert bboxes is not None and len(bboxes) > 0
if args.topk > 0:
idxs = np.argsort(-scores)
bboxes = bboxes[idxs[:args.topk]]
labels = labels[idxs[:args.topk]]
if segms is not None:
segms = segms[idxs[:args.topk]]
targets = [
DetBoxScoreTarget(bboxes=bboxes, labels=labels, segms=segms)
]
if args.method in GRAD_BASE_METHOD_MAP:
model.set_return_loss(True)
model.set_input_data(image, bboxes=bboxes, labels=labels)
det_cam_visualizer.switch_activations_and_grads(model)
grayscale_cam = det_cam_visualizer(
image,
targets=targets,
aug_smooth=args.aug_smooth,
eigen_smooth=args.eigen_smooth)
image_with_bounding_boxes = det_cam_visualizer.show_cam(
image, bboxes, labels, grayscale_cam, not args.no_norm_in_bbox)
if args.out_dir:
mmcv.mkdir_or_exist(args.out_dir)
out_file = os.path.join(args.out_dir, os.path.basename(image_path))
mmcv.imwrite(image_with_bounding_boxes, out_file)
else:
cv2.namedWindow(os.path.basename(image_path), 0)
cv2.imshow(os.path.basename(image_path), image_with_bounding_boxes)
cv2.waitKey(0)
if args.method in GRAD_BASE_METHOD_MAP:
model.set_return_loss(False)
det_cam_visualizer.switch_activations_and_grads(model)
if __name__ == '__main__':
main()
10.更详细的内容可以看此大佬的分享: