Source code for optical.converter.base

"""
__author__: HashTagML
license: MIT
Created: Tuesday, 30th March 2021
"""
# TODO: needs better solution for Handling TFrecords

import os
from typing import Optional, Union
from pathlib import Path
import altair as alt
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MultiLabelBinarizer
from skmultilearn.model_selection import iterative_train_test_split

from .converter import (
    convert_coco,
    convert_csv,
    convert_pascal,
    convert_sagemaker,
    convert_yolo,
    convert_tfrecord,
    convert_createml,
    convert_simple_json,
)
from .utils import filter_split_category, ifnone, find_splits

pd.options.mode.chained_assignment = None
_TF_INSTALLED = True
try:
    import tensorflow as tf  # noqa: F401
except ImportError:
    _TF_INSTALLED = False


[docs]class FormatSpec: """The base class to represent all annotation formats"""
[docs] def __init__( self, root: Optional[Union[str, os.PathLike]] = None, has_split: Optional[bool] = False, df: Optional[pd.DataFrame] = None, format: Optional[str] = None, ): self.root = Path(root) self._has_image_split = has_split self.master_df = df self._format = format self._splits = None
# @abstractmethod # removing absract class as it cannot be instantiated from within # as required for split def _resolve_dataframe(self): pass def __str__(self): return f"{self.format.upper()}[root:{self.root}, splits:[{', '.join(self.splits)}]]" def __repr__(self): return self.format @property def format(self): if self._format is None: return self.__module__.split(".")[-1] return self._format @property def splits(self): return self._splits def _find_splits(self): splits, has_image_split = find_splits(self._image_dir, self._annotation_dir, self.format) self._has_image_split = has_image_split self._splits = splits
[docs] def bbox_stats(self, split: Optional[str] = None, category: Optional[str] = None) -> pd.DataFrame: """computes bbox descriptive stats e.g., mean, std etc. Args: split (Optional[str]): split of the dataset e.g., ``train``, ``valid`` etc. Defaults to None. category (Optional[str]): category to filter out. Defaults to None. Returns: pd.DataFrame: stats of the bounding boxes """ df = filter_split_category(self.master_df, split, category) return df[["x_min", "y_min", "width", "height"]].describe()
[docs] def show_distribution(self) -> alt.Chart: """Plots distribution of labels in different splits of the dataset""" df = self.master_df[["split", "category", "image_id"]].copy() distribution = df.groupby(["split", "category"])["image_id"].size().rename("count") distribution = pd.DataFrame(distribution / distribution.groupby(level=0).sum()).reset_index() return ( alt.Chart(distribution) .mark_bar(cornerRadiusTopLeft=5, cornerRadiusTopRight=5) .encode(x="category:O", y="count:Q", color="category", column="split") )
[docs] def bbox_scatter( self, split: Optional[str] = None, category: Optional[str] = None, limit: int = 1000 ) -> alt.Chart: """plots scatter of width and height of bounding boxes Args: split (Optional[str]): split of the dataset e.g., ``train``, ``valid`` etc. Defaults to None. category (Optional[str]): category to filter out. Defaults to None. limit (int, optional): number of samples to plot. Defaults to 1000. """ df = filter_split_category(self.master_df, split, category).drop("image_path", axis=1) limit = min(min(limit, len(df)), 5000) df = df.sample(n=limit, replace=False, random_state=42) return alt.Chart(df).mark_circle(size=30).encode(x="width", y="height", color="category")
[docs] def describe(self) -> pd.DataFrame: """shows basic data distribution in different split""" df = ( self.master_df.groupby(["split"]) .agg({"image_id": [pd.Series.nunique, "size"], "category": pd.Series.nunique}) .reset_index() ) df.columns = df.columns.get_level_values(0) + "_" + df.columns.get_level_values(1) df.rename( columns={ "image_id_nunique": "images", "image_id_size": "annotations", "category_nunique": "categories", "split_": "split", }, inplace=True, ) return df
[docs] def split(self, test_size: float = 0.2, stratified: bool = False, random_state: int = 42): """splits the dataset into train and validation sets Args: test_size (float, optional): Fraction of total images to be kept for validation. Defaults to 0.2. stratified (bool, optional): Whether to stratify the split. Defaults to False. random_state (int, optional): random state for the split. Defaults to 42. Returns: FormatSpec: Returns an instance of `FormatSpec` class """ label_df = self.master_df.copy() if stratified: class_df = label_df[["image_id", "class_id"]].copy() class_df.drop_duplicates(inplace=True) gdf = class_df.groupby("image_id")["class_id"].agg(lambda x: x.tolist()).reset_index() mlb = MultiLabelBinarizer() out = mlb.fit_transform(gdf.class_id) label_names = [f"class_{x}" for x in mlb.classes_] out = pd.DataFrame(data=out, columns=label_names) gdf = pd.concat([gdf, out], axis=1) gdf.drop(["class_id"], axis=1, inplace=True) train_images, _, test_images, _ = iterative_train_test_split( gdf[["image_id"]].values, gdf[label_names].values, test_size=test_size ) train_images = train_images.ravel() test_images = test_images.ravel() else: image_ids = label_df.image_id.unique() train_images, test_images = train_test_split(image_ids, test_size=test_size, random_state=random_state) train_df = label_df.loc[label_df["image_id"].isin(train_images.tolist())] test_df = label_df.loc[label_df["image_id"].isin(test_images.tolist())] train_df.loc[:, "split"] = "train" test_df.loc[:, "split"] = "valid" master_df = pd.concat([train_df, test_df], ignore_index=True) return FormatSpec(self.root, True, master_df, format=self.format)
[docs] def save( self, output_dir: Optional[Union[str, os.PathLike]], export_to: Optional[str] = None, copy_images: bool = True ): """Just another api for convert. Similar to export""" export_to = ifnone(export_to, self.format) return self.convert(export_to, output_dir=output_dir, copy_images=copy_images)
def convert( self, to: str, output_dir: Optional[str] = None, save_under: Optional[str] = None, copy_images: bool = False, **kwargs, ): if to.lower() == "yolo": return convert_yolo( self.master_df, self.root, copy_images=copy_images, save_under=save_under, output_dir=output_dir, ) elif to.lower() == "coco": return convert_coco( self.master_df, self.root, copy_images=copy_images, save_under=save_under, output_dir=output_dir, ) elif to.lower() == "pascal": return convert_pascal( self.master_df, self.root, output_dir=output_dir, save_under=save_under, copy_images=copy_images, ) elif to.lower() == "csv": return convert_csv( self.master_df, self.root, output_dir=output_dir, save_under=save_under, copy_images=copy_images, ) elif to.lower() == "sagemaker": return convert_sagemaker( self.master_df, self.root, copy_images=copy_images, save_under=save_under, output_dir=output_dir, **kwargs, ) elif to.lower() == "createml": return convert_createml( self.master_df, self.root, copy_images=copy_images, save_under=save_under, output_dir=output_dir, ) elif to.lower() == "simple_json": return convert_simple_json( self.master_df, self.root, copy_images=copy_images, save_under=save_under, output_dir=output_dir, ) elif to.lower() == "tfrecord": if _TF_INSTALLED: return convert_tfrecord( self.master_df, self.root, has_image_split=self._has_image_split, output_dir=output_dir, save_under=save_under, copy_images=copy_images, ) else: raise ImportError("Please Install Tensorflow for tfrecord support") else: raise NotImplementedError