Module data_handling.utils
Utility functions for data handling like: 1. Drawing annotations 2. Getting stats. 3. Removing categories or creating subset of dataset of particular categories. 4. Merging datasets.
Expand source code
"""Utility functions for data handling like:
1. Drawing annotations
2. Getting stats.
3. Removing categories or creating subset of dataset of particular categories.
4. Merging datasets."""
from pycocotools.coco import COCO
import json
import os
import cv2
from tqdm import tqdm
from .coco_assistant import coco_assistant
import random
import shutil
import colorsys
from pathlib import Path
def __generate_colors(num_classes):
# generate distict random colors
hsv_tuples = [(x / num_classes, 1., 1.) for x in range(num_classes)]
colors = list(map(lambda x: colorsys.hsv_to_rgb(*x), hsv_tuples))
colors = list(map(lambda x: (int(x[0] * 255), int(x[1] * 255), int(x[2] * 255)), colors))
random.seed(10101) # Fixed seed for consistent colors across runs.
random.shuffle(colors) # Shuffle colors to decorrelate adjacent classes.
random.seed(None) # Reset seed to default.
return colors
def draw_annotations(image_dir, annotation_file, output_dir):
"""Draw annotations on images with coco annotation file
Args:
image_dir (str): Path to input image directory
annotation_file (str): Path to annotation file
output_dir (str): Path to output image directory
"""
random.seed(20)
font_scale = 1
if not os.path.exists(output_dir):
os.makedirs(output_dir)
coco = COCO(annotation_file)
categories = coco.loadCats(coco.getCatIds())
category_dict = {}
for category in categories:
category_dict[category['id']] = category['name']
colors_list = __generate_colors(max(coco.getCatIds())+1)
img_ids = coco.getImgIds()
cat_ids = coco.getCatIds()
for img_id in tqdm(img_ids, desc='Drawing on Images'):
img_id_list = [img_id]
img = coco.loadImgs(img_id_list)
image = cv2.imread(os.path.join(image_dir, img[0]['file_name']))
ann_ids = coco.getAnnIds(imgIds=img_id_list, catIds=cat_ids)
annotation_list = coco.loadAnns(ann_ids)
for annotation in annotation_list:
boxes = annotation['bbox']
boxes[2] = boxes[0] + boxes[2]
boxes[3] = boxes[1] + boxes[3]
x0 = int(boxes[0])
y0 = int(boxes[1])
x1 = int(boxes[2])
y1 = int(boxes[3])
cv2.rectangle(image, (x0, y0), (x1, y1), colors_list[annotation['category_id']], 2)
(_, _), baseline = cv2.getTextSize(
category_dict[annotation['category_id']],
cv2.FONT_HERSHEY_DUPLEX,
font_scale,
1)
cv2.putText(image, category_dict[annotation['category_id']],
(x0, y0 - baseline),
cv2.FONT_HERSHEY_DUPLEX,
font_scale, colors_list[annotation['category_id']], 2)
cv2.imwrite(os.path.join(output_dir, img[0]['file_name']), image)
def annotation_stats(annotation_file):
"""Print stats like number of instances and number of images in that category
Args:
annotation_file (str): coco json annotation file path
"""
coco = COCO(annotation_file)
print("\n%-30s %-5s %-15s %s" % ("Category", "ID", "Instances", "Image Count"))
for cat_id in coco.getCatIds():
print("%-30s %-5d ---> %-15d %d" %
(coco.cats[cat_id]['name'], cat_id, len(coco.getAnnIds(catIds=[cat_id])), len(coco.getImgIds(catIds=[cat_id]))))
def remove_categories(annotation_file, categories, output_annotation_file, keep_blank_images=True):
"""Remove the categories from the annotation file
Args:
annotation_file (str): path to annotation file
categories (list): list of categories to remove
output_annotation_file (str): path to output annotation file
keep_blank_images (bool): keep images with no annotations
"""
data = json.load(open(annotation_file))
categories_remove = [category['id'] for category in data['categories'] if category['name'] in categories]
data['categories'] = [category for category in data['categories'] if category['name'] not in categories]
category_images_ids = []
image_ids = []
annotation_list = []
for annotation in data['annotations']:
if annotation['category_id'] in categories_remove:
category_images_ids.append(annotation['image_id'])
continue
image_ids.append(annotation['image_id'])
annotation_list.append(annotation)
if not keep_blank_images:
image_ids_remove = list(set(category_images_ids) - set(image_ids))
data['images'] = [image for image in data['images'] if image['id'] not in image_ids_remove]
data['annotations'] = annotation_list
with open(output_annotation_file, 'w') as f:
json.dump(data, f)
def category_subset(image_folder, annotation_file, input_cat, output_folder="categories", max_img_per_label=-1):
"""Subset the annotation file to only include the categories specified
Args:
image_folder (str): Path to image folder
annotation_file (str): Path to annotation file
input_cat (list): List of categories to include
output_folder (str): Path to output folder. Defaults to "categories"
max_img_per_label (int): Maximum number of images per label. Defaults to -1, then all images are included.
"""
if not os.path.exists(output_folder):
os.makedirs(output_folder)
if not os.path.exists(os.path.join(output_folder, "images")):
os.makedirs(os.path.join(output_folder, "images"))
if not os.path.exists(os.path.join(output_folder, "annotations")):
os.makedirs(os.path.join(output_folder, "annotations"))
resim_dir = Path(os.path.join(output_folder, "images"))
resann_dir = Path(os.path.join(output_folder, "annotations"))
dst_ann = Path(os.path.join(resann_dir, "annotations.json"))
coco = COCO(annotation_file)
categories = coco.loadCats(coco.getCatIds())
category_dict = {}
cat_Ids = []
for category in categories:
if category['name'] in input_cat:
cat_Ids.append(category['id'])
category_dict[category['name']] = category['id']
img_id_list = []
category_list = coco.loadCats(ids=cat_Ids)
for cat_id in tqdm(cat_Ids, desc='Category data selection'):
img_ids = coco.getImgIds(catIds=[cat_id])
if max_img_per_label > 0 and len(img_ids) > max_img_per_label:
img_ids = [img_ids[i] for i in sorted(
random.sample(range(len(img_ids)), max_img_per_label))]
img_id_list = img_id_list + list(set(img_ids) - set(img_id_list))
images = coco.loadImgs(ids=img_id_list)
ann_ids = coco.getAnnIds(catIds=cat_Ids, imgIds=img_id_list)
annotation_list = coco.loadAnns(ann_ids)
for image in tqdm(images, desc='Copying images'):
source = os.path.join(image_folder, image['file_name'])
destination = resim_dir / image['file_name']
shutil.copyfile(source, destination)
coco_data = json.load(open(annotation_file))
coco_data['categories'] = category_list
coco_data['annotations'] = annotation_list
coco_data['images'] = images
with open(dst_ann, 'w') as outfile:
json.dump(coco_data, outfile)
def merge_datasets(image_folder, annotation_folder, output_folder="merged", merge_images=True, duplicate_frames=True):
"""Merge multiple datasets into a single dataset
Args:
image_folder (str): Path to image folder
annotation_folder (str): Path to annotation folder
output_folder (str): Path to output folder. Defaults to "merged"
merge_images (bool): Merge images. Defaults to True
duplicate_frames (bool): Duplicate frames. Defaults to True
"""
coco_obj = coco_assistant.COCO_Assistant(image_folder, annotation_folder)
coco_obj.merge(output_folder, merge_images, duplicate_frames)
def merge_annotations(image_folder, annotation_folder, output_folder="merged", merge_images=True):
"""Merge multiple annotations of single image into one file.
Args:
image_folder (str): Path to image folder
annotation_folder (str): Path to annotation folder
output_folder (str): Path to output folder. Defaults to "merged"
merge_images (bool): Merge images. Defaults to True
"""
coco_obj = coco_assistant.COCO_Assistant(image_folder, annotation_folder)
coco_obj.merge_same(output_folder, merge_images)
if __name__ == "__main__":
import sys
annotation_stats(sys.argv[1])
# draw_annotations(sys.argv[1], sys.argv[2], sys.argv[3])
Functions
def annotation_stats(annotation_file)
-
Print stats like number of instances and number of images in that category
Args
annotation_file
:str
- coco json annotation file path
Expand source code
def annotation_stats(annotation_file): """Print stats like number of instances and number of images in that category Args: annotation_file (str): coco json annotation file path """ coco = COCO(annotation_file) print("\n%-30s %-5s %-15s %s" % ("Category", "ID", "Instances", "Image Count")) for cat_id in coco.getCatIds(): print("%-30s %-5d ---> %-15d %d" % (coco.cats[cat_id]['name'], cat_id, len(coco.getAnnIds(catIds=[cat_id])), len(coco.getImgIds(catIds=[cat_id]))))
def category_subset(image_folder, annotation_file, input_cat, output_folder='categories', max_img_per_label=-1)
-
Subset the annotation file to only include the categories specified
Args
image_folder
:str
- Path to image folder
annotation_file
:str
- Path to annotation file
input_cat
:list
- List of categories to include
output_folder
:str
- Path to output folder. Defaults to "categories"
max_img_per_label
:int
- Maximum number of images per label. Defaults to -1, then all images are included.
Expand source code
def category_subset(image_folder, annotation_file, input_cat, output_folder="categories", max_img_per_label=-1): """Subset the annotation file to only include the categories specified Args: image_folder (str): Path to image folder annotation_file (str): Path to annotation file input_cat (list): List of categories to include output_folder (str): Path to output folder. Defaults to "categories" max_img_per_label (int): Maximum number of images per label. Defaults to -1, then all images are included. """ if not os.path.exists(output_folder): os.makedirs(output_folder) if not os.path.exists(os.path.join(output_folder, "images")): os.makedirs(os.path.join(output_folder, "images")) if not os.path.exists(os.path.join(output_folder, "annotations")): os.makedirs(os.path.join(output_folder, "annotations")) resim_dir = Path(os.path.join(output_folder, "images")) resann_dir = Path(os.path.join(output_folder, "annotations")) dst_ann = Path(os.path.join(resann_dir, "annotations.json")) coco = COCO(annotation_file) categories = coco.loadCats(coco.getCatIds()) category_dict = {} cat_Ids = [] for category in categories: if category['name'] in input_cat: cat_Ids.append(category['id']) category_dict[category['name']] = category['id'] img_id_list = [] category_list = coco.loadCats(ids=cat_Ids) for cat_id in tqdm(cat_Ids, desc='Category data selection'): img_ids = coco.getImgIds(catIds=[cat_id]) if max_img_per_label > 0 and len(img_ids) > max_img_per_label: img_ids = [img_ids[i] for i in sorted( random.sample(range(len(img_ids)), max_img_per_label))] img_id_list = img_id_list + list(set(img_ids) - set(img_id_list)) images = coco.loadImgs(ids=img_id_list) ann_ids = coco.getAnnIds(catIds=cat_Ids, imgIds=img_id_list) annotation_list = coco.loadAnns(ann_ids) for image in tqdm(images, desc='Copying images'): source = os.path.join(image_folder, image['file_name']) destination = resim_dir / image['file_name'] shutil.copyfile(source, destination) coco_data = json.load(open(annotation_file)) coco_data['categories'] = category_list coco_data['annotations'] = annotation_list coco_data['images'] = images with open(dst_ann, 'w') as outfile: json.dump(coco_data, outfile)
def draw_annotations(image_dir, annotation_file, output_dir)
-
Draw annotations on images with coco annotation file
Args
image_dir
:str
- Path to input image directory
annotation_file
:str
- Path to annotation file
output_dir
:str
- Path to output image directory
Expand source code
def draw_annotations(image_dir, annotation_file, output_dir): """Draw annotations on images with coco annotation file Args: image_dir (str): Path to input image directory annotation_file (str): Path to annotation file output_dir (str): Path to output image directory """ random.seed(20) font_scale = 1 if not os.path.exists(output_dir): os.makedirs(output_dir) coco = COCO(annotation_file) categories = coco.loadCats(coco.getCatIds()) category_dict = {} for category in categories: category_dict[category['id']] = category['name'] colors_list = __generate_colors(max(coco.getCatIds())+1) img_ids = coco.getImgIds() cat_ids = coco.getCatIds() for img_id in tqdm(img_ids, desc='Drawing on Images'): img_id_list = [img_id] img = coco.loadImgs(img_id_list) image = cv2.imread(os.path.join(image_dir, img[0]['file_name'])) ann_ids = coco.getAnnIds(imgIds=img_id_list, catIds=cat_ids) annotation_list = coco.loadAnns(ann_ids) for annotation in annotation_list: boxes = annotation['bbox'] boxes[2] = boxes[0] + boxes[2] boxes[3] = boxes[1] + boxes[3] x0 = int(boxes[0]) y0 = int(boxes[1]) x1 = int(boxes[2]) y1 = int(boxes[3]) cv2.rectangle(image, (x0, y0), (x1, y1), colors_list[annotation['category_id']], 2) (_, _), baseline = cv2.getTextSize( category_dict[annotation['category_id']], cv2.FONT_HERSHEY_DUPLEX, font_scale, 1) cv2.putText(image, category_dict[annotation['category_id']], (x0, y0 - baseline), cv2.FONT_HERSHEY_DUPLEX, font_scale, colors_list[annotation['category_id']], 2) cv2.imwrite(os.path.join(output_dir, img[0]['file_name']), image)
def merge_annotations(image_folder, annotation_folder, output_folder='merged', merge_images=True)
-
Merge multiple annotations of single image into one file.
Args
image_folder
:str
- Path to image folder
annotation_folder
:str
- Path to annotation folder
output_folder
:str
- Path to output folder. Defaults to "merged"
merge_images
:bool
- Merge images. Defaults to True
Expand source code
def merge_annotations(image_folder, annotation_folder, output_folder="merged", merge_images=True): """Merge multiple annotations of single image into one file. Args: image_folder (str): Path to image folder annotation_folder (str): Path to annotation folder output_folder (str): Path to output folder. Defaults to "merged" merge_images (bool): Merge images. Defaults to True """ coco_obj = coco_assistant.COCO_Assistant(image_folder, annotation_folder) coco_obj.merge_same(output_folder, merge_images)
def merge_datasets(image_folder, annotation_folder, output_folder='merged', merge_images=True, duplicate_frames=True)
-
Merge multiple datasets into a single dataset
Args
image_folder
:str
- Path to image folder
annotation_folder
:str
- Path to annotation folder
output_folder
:str
- Path to output folder. Defaults to "merged"
merge_images
:bool
- Merge images. Defaults to True
duplicate_frames
:bool
- Duplicate frames. Defaults to True
Expand source code
def merge_datasets(image_folder, annotation_folder, output_folder="merged", merge_images=True, duplicate_frames=True): """Merge multiple datasets into a single dataset Args: image_folder (str): Path to image folder annotation_folder (str): Path to annotation folder output_folder (str): Path to output folder. Defaults to "merged" merge_images (bool): Merge images. Defaults to True duplicate_frames (bool): Duplicate frames. Defaults to True """ coco_obj = coco_assistant.COCO_Assistant(image_folder, annotation_folder) coco_obj.merge(output_folder, merge_images, duplicate_frames)
def remove_categories(annotation_file, categories, output_annotation_file, keep_blank_images=True)
-
Remove the categories from the annotation file
Args
annotation_file
:str
- path to annotation file
categories
:list
- list of categories to remove
output_annotation_file
:str
- path to output annotation file
keep_blank_images
:bool
- keep images with no annotations
Expand source code
def remove_categories(annotation_file, categories, output_annotation_file, keep_blank_images=True): """Remove the categories from the annotation file Args: annotation_file (str): path to annotation file categories (list): list of categories to remove output_annotation_file (str): path to output annotation file keep_blank_images (bool): keep images with no annotations """ data = json.load(open(annotation_file)) categories_remove = [category['id'] for category in data['categories'] if category['name'] in categories] data['categories'] = [category for category in data['categories'] if category['name'] not in categories] category_images_ids = [] image_ids = [] annotation_list = [] for annotation in data['annotations']: if annotation['category_id'] in categories_remove: category_images_ids.append(annotation['image_id']) continue image_ids.append(annotation['image_id']) annotation_list.append(annotation) if not keep_blank_images: image_ids_remove = list(set(category_images_ids) - set(image_ids)) data['images'] = [image for image in data['images'] if image['id'] not in image_ids_remove] data['annotations'] = annotation_list with open(output_annotation_file, 'w') as f: json.dump(data, f)