Source code for ares.attack.detection.utils

import os
import sys
import functools

import torch
import torch.distributed as dist
import torch.nn.functional as F
from torchvision.utils import draw_bounding_boxes
from torchvision.utils import save_image


[docs]def normalize(tensor, mean, std): """Normalize input tensor with given mean and std. Args: tensor (torch.Tensor): Float tensor image of shape (B, C, H, W) to be denormalized. mean (torch.Tensor): Float tensor means of size (C, ) for each channel. std (torch.Tensor): Float tensor standard deviations of size (C, ) for each channel. """ return (tensor - mean[None]) / std[None]
[docs]def denormalize(tensor, mean, std): """Denormalize input tensor with given mean and std. Args: tensor (torch.Tensor): Float tensor image of shape (B, C, H, W) to be denormalized. mean (torch.Tensor): Float tensor means of size (C, ) for each channel. std (torch.Tensor): Float tensor standard deviations of size (C, ) for each channel. """ return tensor * std[None] + mean[None]
[docs]def is_distributed() -> bool: """Return True if distributed environment has been initialized.""" return dist.is_available() and dist.is_initialized()
[docs]def is_main_process(group=None) -> int: """Whether the current rank of the given process group is equal to 0. Note: Calling ``get_rank`` in non-distributed environment will return True Args: group (ProcessGroup, optional): The process group to work on. If None, the default process group will be used. Defaults to None. Returns: bool """ if is_distributed(): # handle low versions of torch like 1.5.0 which does not support # passing in None for group argument if group is None: group = dist.distributed_c10d._get_default_group() return dist.get_rank(group) == 0 else: return True
[docs]def get_word_size(group=None): """Return the number of used GPUs.""" if is_distributed(): # handle low versions of torch like 1.5.0 which does not support # passing in None for group argument if group is None: group = dist.distributed_c10d._get_default_group() return dist.get_world_size(group) else: return 1
[docs]def main_only(func): """Decorate those methods which should be executed in main process. Args: func (callable): Function to be decorated. Returns: callable: Return decorated function. """ @functools.wraps(func) def wrapper(*args, **kwargs): if is_main_process(): return func(*args, **kwargs) return wrapper
[docs]@main_only def mkdirs_if_not_exists(dir): """Make dirs if it does not exist.""" if not os.path.exists(dir): os.makedirs(dir)
[docs]def save_patches_to_images(patches, save_dir, class_names, labels=None): """Save adversarial patches to images. Args: patches (torch.Tensor): Aderversarial patches with Shape [N,C=3,H,W]. save_dir (str): Path to save adversarial patches. class_names (str): Names of classes corresponding to patches. labels (torch.Tensor): Labels of patches. """ mkdirs_if_not_exists(save_dir) if not labels: labels = torch.arange(len(class_names)) for cls_name, label in zip(class_names, labels): patch = patches[label] file_name = cls_name + '.png' save_image(patch, os.path.join(save_dir, file_name))
[docs]def save_images(img_tensors, data_samples, save_dir, with_bboxes=True, width=5, scale=True): """Save images. Args: img_tensors (torch.Tensor): Image tensor with shape [N,C,H,W] and value range [0, 1]. data_samples (list): List of mmdet.structures.DetDataSample. save_dir (str): Path to save images. with_bboxes (bool): Whether to save images with bbox rectangles on images. width (int): Line width to draw rectangles. scale (bool): Whethe to scale images to original size. """ mkdirs_if_not_exists(save_dir) for img, data_sample in zip(img_tensors, data_samples): img_shape = data_sample.img_shape # (H, W) img = img[:, :img_shape[0], :img_shape[1]] * 255 img = img.int().to(torch.uint8) img_name = os.path.basename(data_sample.img_path) if with_bboxes: bboxes = data_sample.pred_instances.bboxes.clone() scale_w, scale_h = data_sample.scale_factor bboxes[:, 1::2] *= scale_w bboxes[:, 0::2] *= scale_h img = draw_bounding_boxes(img, bboxes, width=width) if scale: ori_shape = data_sample.ori_shape img = F.interpolate(img[None], size=ori_shape, align_corners=True, mode='bilinear')[0] save_image(img / 255, os.path.join(save_dir, img_name))
[docs]class HiddenPrints: """Context manager to shield the output of print functions""" def __enter__(self): self._original_stdout = sys.stdout sys.stdout = open(os.devnull, 'w') def __exit__(self, exc_type, exc_val, exc_tb): sys.stdout.close() sys.stdout = self._original_stdout
[docs]def tv_loss(images, reduction='mean'): """Implementation of the total variation loss (L_{tv}) proposed in the arxiv paper "Fooling automated surveillance cameras: adversarial patches to attack person detection". Args: images (torch.Tensor): Image tensor with shape [N, C, H, W] where N, C, H and W are the number of images, channel, height and width. reduction (str): Supported reduction methods are mean, sum and none. Returns: torch.Tensor """ if images.dim() == 3: images = images.unsqueeze(0) assert images.dim() == 4, 'Input tensor should be 4-dim, but got {%d}-dim'.format(images.dim()) N, C, H, W = images.shape tv_column = torch.abs(images[..., 1:] - images[..., :-1] + 0.000001).view(N, -1) tv_column = torch.mean(tv_column, dim=1) tv_row = torch.abs(images[:, :, 1:] - images[:, :, :-1] + 0.000001).view(N, -1) tv_row = torch.mean(tv_row, dim=1) tv_loss = tv_column + tv_row if reduction == 'mean': return tv_loss.mean() elif reduction == 'sum': return tv_loss.sum() elif reduction == 'none': return tv_loss
[docs]def modify_test_pipeline(cfg): """The default pipeline for testing in mmdet is usually as follows: "LoadImageFromFile-->Resize-->LoadAnnotations-->PackDetInputs", which makes the gt bboxes are not resized. To resize bboxes also when resizing images, we move the "LoadAnnotations" before "Resize". """ pipeline = cfg.test_dataloader.dataset.pipeline pop_idx = None for i, transform in enumerate(pipeline): if transform.type == 'LoadAnnotations': pop_idx = i break if pop_idx: # move LoadAnnotations before Resize t = pipeline.pop(pop_idx) pipeline.insert(1, t)
[docs]def modify_train_pipeline(cfg): """Modify some dataset settings in train dataloader to that in test dataloader.""" modified_keys = ['data_root', 'ann_file', 'data_prefix'] for key in modified_keys: if cfg.train_dataloader.dataset.get('dataset'): cfg.train_dataloader.dataset.dataset[key] = cfg.test_dataloader.dataset[key] else: cfg.train_dataloader.dataset[key] = cfg.test_dataloader.dataset[key] if cfg.train_dataloader.dataset.get('dataset'): cfg.train_dataloader.dataset.dataset.filter_cfg = dict(filter_empty_gt=True) else: cfg.train_dataloader.dataset.filter_cfg = dict(filter_empty_gt=True)
[docs]def build_optimizer(params, **kwargs): """Build optimizer.""" # TODO: Add more optimizers. __factory__ = {'Adam': torch.optim.Adam, 'SGD': torch.optim.SGD} return __factory__[kwargs['type']](params, **kwargs['kwargs'])
[docs]def all_reduce(tensor, reduction='sum'): """Gather all tensor results across all GPUs if ddp.""" if not is_distributed(): return op_factory = {'sum':dist.ReduceOp.SUM, 'avg':dist.ReduceOp.AVG} assert reduction in op_factory, f'Expected reductions are none, sum and mean, but got {reduction} instead!' op = op_factory[reduction.lower()] dist.all_reduce(tensor, op)
[docs]class EnableLossCal(): """This context manager is to calculate loss for detectors from mmdet in eval mode as in training mode."""
[docs] def __init__(self, model: torch.nn.Module): self.model = model self.pre_training = self.model.training
def __enter__(self): self.model.training = True def __exit__(self, exc_type, exc_val, exc_tb): self.model.training = self.pre_training