import itertools
import logging
import os.path as osp
import tempfile
from collections import OrderedDict
from typing import Dict, List, Optional, Sequence, Union
import ares
import torch
import numpy as np
from mmdet.datasets.api_wrappers import COCO, COCOeval
from mmdet.evaluation.metrics import CocoMetric
from mmdet.registry import METRICS
from mmengine.fileio import load
from terminaltables import AsciiTable
[docs]@METRICS.register_module(force=True)
class CocoMetric(CocoMetric):
"""Custom COCO evaluation metric. This is similar to mmdet.evaluation.metrics.CocoMetric.
Differently, we support to calculate metrics only for kept classes.
Evaluate AR, AP, and mAP for detection tasks including proposal/box
detection and instance segmentation. Please refer to
https://cocodataset.org/#detection-eval for more details.
Args:
ann_file (str, optional): Path to the coco format annotation file.
If not specified, ground truth annotations from the dataset will
be converted to coco format. Defaults to None.
metric (str | List[str]): Metrics to be evaluated. Valid metrics
include 'bbox', 'segm', 'proposal', and 'proposal_fast'.
Defaults to 'bbox'.
classwise (bool): Whether to evaluate the metric class-wise.
Defaults to False.
specified_classes (Sequence[str]) : a tuple to specify the classes to be evaluated.
Defaults to ().
proposal_nums (Sequence[int]): Numbers of proposals to be evaluated.
Defaults to (100, 300, 1000).
iou_thrs (float | List[float], optional): IoU threshold to compute AP
and AR. If not specified, IoUs from 0.5 to 0.95 will be used.
Defaults to None.
metric_items (List[str], optional): Metric result names to be
recorded in the evaluation result. Defaults to None.
format_only (bool): Format the output results without perform
evaluation. It is useful when you want to format the result
to a specific format and submit it to the test server.
Defaults to False.
outfile_prefix (str, optional): The prefix of json files. It includes
the file path and the prefix of filename, e.g., "a/b/prefix".
If not specified, a temp file will be created. Defaults to None.
file_client_args (dict, optional): Arguments to instantiate the
corresponding backend in mmdet <= 3.0.0rc6. Defaults to None.
backend_args (dict, optional): Arguments to instantiate the
corresponding backend. Defaults to None.
collect_device (str): Device name used for collecting results from
different ranks during distributed training. Must be 'cpu' or
'gpu'. Defaults to 'cpu'.
prefix (str, optional): The prefix that will be added in the metric
names to disambiguate homonymous metrics of different evaluators.
If prefix is not provided in the argument, self.default_prefix
will be used instead. Defaults to None.
sort_categories (bool): Whether sort categories in annotations. Only
used for `Objects365V1Dataset`. Defaults to False.
"""
default_prefix: Optional[str] = 'coco'
[docs] def __init__(self,
ann_file: Optional[str] = None,
metric: Union[str, List[str]] = 'bbox',
classwise: bool = False,
specified_classes: Sequence[str] = (),
proposal_nums: Sequence[int] = (100, 300, 1000),
iou_thrs: Optional[Union[float, Sequence[float]]] = None,
metric_items: Optional[Sequence[str]] = None,
format_only: bool = False,
outfile_prefix: Optional[str] = None,
file_client_args: dict = None,
backend_args: dict = None,
collect_device: str = 'cpu',
prefix: Optional[str] = None,
sort_categories: bool = False, ) -> None:
super().__init__(ann_file, metric, classwise, proposal_nums, iou_thrs, metric_items,
format_only, outfile_prefix, file_client_args, backend_args, collect_device,
prefix, sort_categories)
self.specified_classes = specified_classes
[docs] def compute_metrics(self, results: list) -> Dict[str, float]:
"""Compute the metrics from processed results.
Args:
results (list): The processed results of each batch.
Returns:
Dict[str, float]: The computed metrics. The keys are the names of
the metrics, and the values are corresponding results.
"""
logger = logging.getLogger(name=ares.__package_name__)
# split gt and prediction list
gts, preds = zip(*results)
tmp_dir = None
if self.outfile_prefix is None:
tmp_dir = tempfile.TemporaryDirectory()
outfile_prefix = osp.join(tmp_dir.name, 'results')
else:
outfile_prefix = self.outfile_prefix
if self._coco_api is None:
# use converted gt json file to initialize coco api
# log('Converting ground truth to coco format...')
coco_json_path = self.gt_to_coco_json(
gt_dicts=gts, outfile_prefix=outfile_prefix)
self._coco_api = COCO(coco_json_path)
# handle lazy init
if self.cat_ids is None:
self.cat_ids = self._coco_api.get_cat_ids(
cat_names=self.dataset_meta['classes'])
if self.img_ids is None:
self.img_ids = self._coco_api.get_img_ids()
# convert predictions to coco format and dump to json file
result_files = self.results2json(preds, outfile_prefix)
eval_results = OrderedDict()
if self.format_only:
logger.info('results are saved in '
f'{osp.dirname(outfile_prefix)}')
return eval_results
for metric in self.metrics:
logger.info(f'Evaluating {metric}...')
# TODO: May refactor fast_eval_recall to an independent metric?
# fast eval recall
if metric == 'proposal_fast':
ar = self.fast_eval_recall(
preds, self.proposal_nums, self.iou_thrs, logger=logger)
log_msg = []
for i, num in enumerate(self.proposal_nums):
eval_results[f'AR@{num}'] = ar[i]
log_msg.append(f'\nAR@{num}\t{ar[i]:.4f}')
log_msg = ''.join(log_msg)
logger.info(log_msg)
continue
# evaluate proposal, bbox and segm
iou_type = 'bbox' if metric == 'proposal' else metric
if metric not in result_files:
raise KeyError(f'{metric} is not in results')
try:
predictions = load(result_files[metric])
if iou_type == 'segm':
# Refer to https://github.com/cocodataset/cocoapi/blob/master/PythonAPI/pycocotools/coco.py#L331 # noqa
# When evaluating mask AP, if the results contain bbox,
# cocoapi will use the box area instead of the mask area
# for calculating the instance area. Though the overall AP
# is not affected, this leads to different
# small/medium/large mask AP results.
for x in predictions:
x.pop('bbox')
coco_dt = self._coco_api.loadRes(predictions)
except IndexError:
logger.info(
'The testing results of the whole dataset is empty.')
break
coco_eval = COCOeval(self._coco_api, coco_dt, iou_type)
coco_eval.params.catIds = self.cat_ids
coco_eval.params.imgIds = self.img_ids
coco_eval.params.maxDets = list(self.proposal_nums)
coco_eval.params.iouThrs = self.iou_thrs
# mapping of cocoEval.stats
coco_metric_names = {
'mAP': 0,
'mAP_50': 1,
'mAP_75': 2,
'mAP_s': 3,
'mAP_m': 4,
'mAP_l': 5,
'AR@100': 6,
'AR@300': 7,
'AR@1000': 8,
'AR_s@1000': 9,
'AR_m@1000': 10,
'AR_l@1000': 11
}
metric_items = self.metric_items
if metric_items is not None:
for metric_item in metric_items:
if metric_item not in coco_metric_names:
raise KeyError(
f'metric item "{metric_item}" is not supported')
if metric == 'proposal':
coco_eval.params.useCats = 0
coco_eval.evaluate()
coco_eval.accumulate()
coco_eval.summarize()
if metric_items is None:
metric_items = [
'AR@100', 'AR@300', 'AR@1000', 'AR_s@1000',
'AR_m@1000', 'AR_l@1000'
]
for item in metric_items:
val = float(
f'{coco_eval.stats[coco_metric_names[item]]:.3f}')
eval_results[item] = val
else:
coco_eval.evaluate()
coco_eval.accumulate()
if not self.specified_classes:
self.summarize(coco_eval, logger, show_results=True)
else:
self.summarize(coco_eval, logger, show_results=False)
if self.classwise or self.specified_classes: # Compute per-category AP
# Compute per-category AP
# from https://github.com/facebookresearch/detectron2/
precisions = coco_eval.eval['precision']
# precision: (iou, recall, cls, area range, max dets)
assert len(self.cat_ids) == precisions.shape[2]
specified_cats = None
if self.specified_classes:
specified_cats = self._coco_api.get_cat_ids(self.specified_classes)
stats = []
results_per_category = []
for idx, cat_id in enumerate(self.cat_ids):
if specified_cats and cat_id not in specified_cats:
continue
t = []
if specified_cats:
stat_row = []
# area range index 0: all area ranges
# max dets index -1: typically 100 per image
nm = self._coco_api.loadCats(cat_id)[0]
precision = precisions[:, :, idx, 0, -1]
precision = precision[precision > -1]
if precision.size:
ap = np.mean(precision)
else:
ap = float('nan')
t.append(f'{nm["name"]}')
t.append(f'{round(ap, 3)}')
eval_results[f'{nm["name"]}_precision'] = round(ap, 3)
if specified_cats:
stat_row.append(ap)
# indexes of IoU @50 and @75
for iou in [0, 5]:
precision = precisions[iou, :, idx, 0, -1]
precision = precision[precision > -1]
if precision.size:
ap = np.mean(precision)
else:
ap = float('nan')
t.append(f'{round(ap, 3)}')
if specified_cats:
stat_row.append(ap)
# indexes of area of small, median and large
for area in [1, 2, 3]:
precision = precisions[:, :, idx, area, -1]
precision = precision[precision > -1]
if precision.size:
ap = np.mean(precision)
else:
ap = float('nan')
t.append(f'{round(ap, 3)}')
if specified_cats:
stat_row.append(ap)
results_per_category.append(tuple(t))
if specified_cats:
stats.append(stat_row)
if specified_cats:
stat = []
stats = torch.tensor(stats)
for j in range(stats.shape[1]):
col = stats[:, j]
col = col[col.isnan() == 0]
stat.append(col.mean().item())
stat = np.array(stat)
num_columns = len(results_per_category[0])
results_flatten = list(
itertools.chain(*results_per_category))
headers = [
'category', 'mAP', 'mAP_50', 'mAP_75', 'mAP_s',
'mAP_m', 'mAP_l'
]
results_2d = itertools.zip_longest(*[
results_flatten[i::num_columns]
for i in range(num_columns)
])
table_data = [headers]
table_data += [result for result in results_2d]
table = AsciiTable(table_data)
logger.info('\n' + table.table, extra={'simple': True})
if self.specified_classes:
coco_eval.stats[:6] = stat
if metric_items is None:
metric_items = [
'mAP', 'mAP_50', 'mAP_75', 'mAP_s', 'mAP_m', 'mAP_l'
]
for metric_item in metric_items:
key = f'{metric}_{metric_item}'
val = coco_eval.stats[coco_metric_names[metric_item]]
eval_results[key] = float(f'{round(val, 3)}')
ap = coco_eval.stats[:6]
logger.info(f'{metric}_mAP_copypaste: {ap[0]:.3f} '
f'{ap[1]:.3f} {ap[2]:.3f} {ap[3]:.3f} '
f'{ap[4]:.3f} {ap[5]:.3f}')
if tmp_dir is not None:
tmp_dir.cleanup()
return eval_results
[docs] def summarize(self, coco_eval, logger=None, show_results=True):
'''
Compute and display summary metrics for evaluation results.
Note this functin can *only* be applied on the default parameter setting
'''
def _summarize(ap=1, iouThr=None, areaRng='all', maxDets=100):
p = coco_eval.params
iStr = ' {:<18} {} @[ IoU={:<9} | area={:>6s} | maxDets={:>3d} ] = {:0.3f}'
titleStr = 'Average Precision' if ap == 1 else 'Average Recall'
typeStr = '(AP)' if ap == 1 else '(AR)'
iouStr = '{:0.2f}:{:0.2f}'.format(p.iouThrs[0], p.iouThrs[-1]) \
if iouThr is None else '{:0.2f}'.format(iouThr)
aind = [i for i, aRng in enumerate(p.areaRngLbl) if aRng == areaRng]
mind = [i for i, mDet in enumerate(p.maxDets) if mDet == maxDets]
if ap == 1:
# dimension of precision: [TxRxKxAxM]
s = coco_eval.eval['precision']
# IoU
if iouThr is not None:
t = np.where(iouThr == p.iouThrs)[0]
s = s[t]
s = s[:, :, :, aind, mind]
else:
# dimension of recall: [TxKxAxM]
s = coco_eval.eval['recall']
if iouThr is not None:
t = np.where(iouThr == p.iouThrs)[0]
s = s[t]
s = s[:, :, aind, mind]
if len(s[s > -1]) == 0:
mean_s = -1
else:
mean_s = np.mean(s[s > -1])
if show_results:
if logger:
logger.info(iStr.format(titleStr, typeStr, iouStr, areaRng, maxDets, mean_s),
extra={'simple': True})
else:
print((iStr.format(titleStr, typeStr, iouStr, areaRng, maxDets, mean_s)))
return mean_s
def _summarizeDets():
stats = np.zeros((12,))
stats[0] = _summarize(1)
stats[1] = _summarize(1, iouThr=.5, maxDets=coco_eval.params.maxDets[2])
stats[2] = _summarize(1, iouThr=.75, maxDets=coco_eval.params.maxDets[2])
stats[3] = _summarize(1, areaRng='small', maxDets=coco_eval.params.maxDets[2])
stats[4] = _summarize(1, areaRng='medium', maxDets=coco_eval.params.maxDets[2])
stats[5] = _summarize(1, areaRng='large', maxDets=coco_eval.params.maxDets[2])
stats[6] = _summarize(0, maxDets=coco_eval.params.maxDets[0])
stats[7] = _summarize(0, maxDets=coco_eval.params.maxDets[1])
stats[8] = _summarize(0, maxDets=coco_eval.params.maxDets[2])
stats[9] = _summarize(0, areaRng='small', maxDets=coco_eval.params.maxDets[2])
stats[10] = _summarize(0, areaRng='medium', maxDets=coco_eval.params.maxDets[2])
stats[11] = _summarize(0, areaRng='large', maxDets=coco_eval.params.maxDets[2])
return stats
def _summarizeKps():
stats = np.zeros((10,))
stats[0] = _summarize(1, maxDets=20)
stats[1] = _summarize(1, maxDets=20, iouThr=.5)
stats[2] = _summarize(1, maxDets=20, iouThr=.75)
stats[3] = _summarize(1, maxDets=20, areaRng='medium')
stats[4] = _summarize(1, maxDets=20, areaRng='large')
stats[5] = _summarize(0, maxDets=20)
stats[6] = _summarize(0, maxDets=20, iouThr=.5)
stats[7] = _summarize(0, maxDets=20, iouThr=.75)
stats[8] = _summarize(0, maxDets=20, areaRng='medium')
stats[9] = _summarize(0, maxDets=20, areaRng='large')
return stats
if not coco_eval.eval:
raise Exception('Please run accumulate() first')
iouType = coco_eval.params.iouType
if iouType == 'segm' or iouType == 'bbox':
summarize = _summarizeDets
elif iouType == 'keypoints':
summarize = _summarizeKps
coco_eval.stats = summarize()