Data Producers#

DNIKit provides a some data Producers that load data and dispatch batches. All pipelines must start with a data producer. (Batches are only pulled through a pipeline when an Introspector’s .introspect() method is called.)

Here are most of the available out-of-the-box data loaders, and links to their API for more information.

Example Datasets#

dnikit_tensorflow has several example datasets, which wrap off-the-shelf datasets from the Keras library. They are loaded via the TFDatasetExamples:

from dnikit_tensorflow import TFDatasetExamples

cifar10 = TFDatasetExamples.CIFAR10()

Available datasets are: CIFAR10, CIFAR100, MNIST, and FashionMNIST.

All example datasets are TrainTestSplitProducers with attach_metadata and max_samples initialization parameters, and shuffle and subset methods. For instance, the following code loads MNIST, then creates a subset of 100 fives drawn only from the test set:

mnist = TFDatasetExamples.MNIST(attach_metadata=True)
mnist_100_test_fives = mnist.subset(labels=[5], datasets=["test"], max_samples=100)

# Inspect the batches generated by mnist_100_test_fives:
for batch in mnist_100_test_fives(batch_size=10):
    imgs = batch.fields["samples"]  # ndarray in shape (batch.batch_size, 28, 28, 1)
    labels = batch.metadata[Batch.StdKeys.LABELS]["label"]  # will be all 5's
    dataset_ids = batch.metadata[Batch.StdKeys.LABELS]["dataset"]  # will be all 1's

In addition, CIFAR10, CIFAR100 and FashionMNIST accept string labels on the subset method, e.g. to load all foxes from CIFAR100:

cifar100 = TFDatasetExamples.CIFAR100(label_mode='fine')
foxes = cifar100.subset(labels=["fox"])

To check what string labels an example dataset supports, the built-in Keras datasets contain a str_to_label_idx() method.

Data loaders#

ImageProducer#

DNIKit provides a helper producer, ImageProducer, to load all images from a local directory. By default, it will do a recursive search through all subdirectories. For example, if the MNIST dataset is stored locally:

from dnikit.base import ImageProducer

mnist_dataset = ImageProducer('path/to/mnist/directory')
class dnikit.base.ImageProducer(directory, *, extensions=None, recursive=True, field='images')[source]

ImageProducer is a data Producer that streams images loaded from the filesystem.

The images are loaded in NHWC format with C=1 for grayscale images, C=3 (RGB) for color images and C=4 (RGBA) with images with transparency.

Note

OpenCV is used to load images and therefore this class supports every format that library supports. Check the OpenCV docs for supported formats.

Warning

All images must have the same height, width and number of channels (HWC). Otherwise batch creation in __call__() will fail.

Parameters:
  • directory (Path) – root directory where images are located. As with other Producer images are only loaded to memory when batches are requested.

  • extensions (None | str | Collection[str]) – [keyword arg, optional] one, many or none extensions to be used to discover images when traversing the filesystem. If no extensions are provided default list will be used (which includes jpeg, jpg, png, bmp, and tiff).

  • recursive (bool) – [keyword arg, optional] if True, all subdirectories of directory will be traversed, otherwise only directory will be used to discover images (defaults to True).

  • field (str) – [keyword arg, optional] the key under which the images will be stored in the resulting data Batch (defaults to “images”).

Raises:

TrainTestSplitProducer#

class dnikit.base.TrainTestSplitProducer(split_dataset, attach_metadata=True, max_samples=-1)[source]

Produce Batches from a train/test split of the form:

(x_train, y_train), (x_test, y_test)

where variables are numpy arrays, the x arrays represent features, and the y arrays represent labels.

For instance, for MNIST, features array x_train might be of shape (60000, 28, 28, 1) with corresponding labels array y_train might be of shape (60000, 1).

Only one of x_train, x_test can be empty (size 0 NumPy array).

Note

This format is the direct output of calling load_data() on a tf.keras.dataset. One can initialize a dataset from tf.keras.datasets simply by writing:

TrainTestSplitProducer(tf.keras.datasets.cifar10.load_data())

Parameters:

Writing a custom Producer#

To teach DNIKit how to load data as batches, it’s possible to create a custom producer by subclassing Producer. Here is an example of creating a custom Producer for the CIFAR-10 dataset.

DNIKit operates on datasets in batches, so that it can handle large-scale datasets without loading everything into memory at once. For each batch, it’s possible to also attach metadata like unique identifiers and labels, which help in data introspector analysis and visualization features:

  • Identifier: unique identifier for each data sample (in this case, the path to the file)

  • Label: a dict of (key, value) with a number of labels for each data sample
    • E.g. "class" label: airplane, automobile, etc.

    • E.g. "dataset" label: train vs. test

Follow the comments in the Cifar10Producer code block below to learn how to create a custom producer.

import os
from pathlib import Path
import typing as t

import numpy as np
import cv2
from keras.datasets import cifar10

from dnikit.base import Producer, Batch

class Cifar10Producer(Producer):

    def __init__(self, data_path: str, max_data: int = -1) -> None:

        # Where data will be written to be packaged up with Dataset Report
        self.data_path = data_path

        # Max data samples to pull from. This is helpful for local debugging.
        self.max_data = max_data

        # Load entire CIFAR10 dataset into memory
        (x_train, y_train), (x_test, y_test) = cifar10.load_data()

        # Concatenate the train and test into one array, as well as the train/test labels, and the class labels
        self.dataset = np.concatenate((x_train, x_test))
        self.dataset_labels = ['train']*len(x_train) + ['test']*len(x_test)
        self.class_labels = np.squeeze(np.concatenate((y_train, y_test)))
        self.class_to_name = ['airplane', 'automobile', 'bird', 'cat', 'deer', 'dog', 'frog', 'horse', 'ship', 'truck']

    def __post_init__(self) -> None:
        if self.max_data <=0:
            self.max_data = len(self.dataset)

    def _class_path(self, index: int) -> str:
        return f"{self.dataset_labels[index]}/{self.class_to_name[int(self.class_labels[index])]}"

    def _write_images_to_disk(self, ii: int, jj: int) -> None:
        for idx in range(ii, jj):
            base_path = os.path.join(self.data_path, self._class_path(idx))
            Path(base_path).mkdir(exist_ok=True, parents=True)
            filename = os.path.join(base_path, f"image{idx}.png")
            # Write to disk after converting to BGR format, used by opencv
            cv2.imwrite(filename, cv2.cvtColor(self.dataset[idx, ...], cv2.COLOR_RGB2BGR))

    def __call__(self, batch_size: int) -> t.Iterable[Batch]:
        """The important function... yield a batch of data from the downloaded dataset"""

        # Iteratively loop over the data samples and yield it in batches
        for ii in range(0, self.max_data, batch_size):
            jj = min(ii+batch_size, self.max_data)

            # Optional step, write data locally since it was loaded from keras
            self._write_images_to_disk(ii, jj)

            # Create batch from data already in memory
            builder = Batch.Builder(
                fields={"images": self.dataset[ii:jj, ...]}
            )

            # Use pathname as the identifier for each data sample, excluding base data directory
            builder.metadata[Batch.StdKeys.IDENTIFIER] = [
                os.path.join(self._class_path(idx), f"image{idx}.png")
                for idx in range(ii, jj)
            ]
            # Add class and dataset labels
            builder.metadata[Batch.StdKeys.LABELS] = {
                "class": [self.class_to_name[int(lbl_idx)] for lbl_idx in self.class_labels[ii:jj]],
                "dataset": self.dataset_labels[ii:jj]
            }

            yield builder.make_batch()

To use this custom Producer, it can be instantiated like so:

cifar10_producer = Cifar10Producer(
    # Where to store the data to disk. For exporting a standalone report,
    # this should be a local (relative) path to the current working directory.
    data_path='./cifar/',

    # This "max data" param is purely for running a notebook quickly
    #    Remove this param to run on the whole dataset
    max_data=1000
)

Producer of Model Responses#

As noted, a Producer can also produce model responses to feed directly into an Introspector.

from dnikit.base import Producer, Batch
from dnikit.introspectors import Familiarity

def function_to_run_model_inference_on_batch(data: np.ndarray, response_name: str) -> np.ndarray:
    # extract `response_name` for data when running through dummy model
    # ...
    return responses_for_input_data

class MyModelResponseProducer(Producer):

    def __init__(self, datafolder: str, response_name: str) -> None:
        self.datafolder = datafolder
        self.response_name = response_name

    def __call__(self, batch_size: int) -> t.Iterable[Batch]:
        for ii in range(0, n_data_samples, batch_size):

            current_data = # read next `batch_size` data samples
            responses = function_to_run_model_inference_on_batch(current_data, self.response_name)

            yield Batch({self.response_name: responses})

my_response_producer = MyModelResponseProducer(datafolder, 'response1')
familiarity = Familiarity.introspect(my_response_producer)
...

This is a good option if model responses have already been generated and saved, the model is in a format that DNIKit does not currently support (e.g., JAX), or the model is hosted on the cloud and responses will be fetched asynchronously.