Data Producers#
DNIKit provides a some data Producers
that load data and dispatch batches. All pipelines
must start with a data producer. (Batches
are only pulled through a pipeline
when an Introspector’s
.introspect()
method is called.)
Here are most of the available out-of-the-box data loaders, and links to their API for more information.
Example Datasets#
dnikit_tensorflow
has several example datasets, which wrap
off-the-shelf datasets from the Keras library.
They are loaded via the TFDatasetExamples
:
from dnikit_tensorflow import TFDatasetExamples
cifar10 = TFDatasetExamples.CIFAR10()
Available datasets are: CIFAR10
,
CIFAR100
,
MNIST
, and
FashionMNIST
.
All example datasets are TrainTestSplitProducers
with attach_metadata
and max_samples
initialization parameters,
and shuffle
and
subset
methods. For instance,
the following code loads MNIST, then creates a subset of 100 fives drawn only from the test set:
mnist = TFDatasetExamples.MNIST(attach_metadata=True)
mnist_100_test_fives = mnist.subset(labels=[5], datasets=["test"], max_samples=100)
# Inspect the batches generated by mnist_100_test_fives:
for batch in mnist_100_test_fives(batch_size=10):
imgs = batch.fields["samples"] # ndarray in shape (batch.batch_size, 28, 28, 1)
labels = batch.metadata[Batch.StdKeys.LABELS]["label"] # will be all 5's
dataset_ids = batch.metadata[Batch.StdKeys.LABELS]["dataset"] # will be all 1's
In addition, CIFAR10, CIFAR100 and FashionMNIST accept string labels
on the subset
method, e.g. to load all foxes from CIFAR100:
cifar100 = TFDatasetExamples.CIFAR100(label_mode='fine')
foxes = cifar100.subset(labels=["fox"])
To check what string labels an example dataset supports, the built-in Keras datasets contain a str_to_label_idx()
method.
Data loaders#
ImageProducer
#
DNIKit provides a helper producer, ImageProducer
,
to load all images from a local directory. By default, it will do a
recursive search through all subdirectories. For example, if
the MNIST dataset is stored locally:
from dnikit.base import ImageProducer
mnist_dataset = ImageProducer('path/to/mnist/directory')
- class dnikit.base.ImageProducer(directory, *, extensions=None, recursive=True, field='images')[source]
ImageProducer
is a dataProducer
that streams images loaded from the filesystem.The images are loaded in
NHWC
format withC=1
for grayscale images,C=3
(RGB) for color images andC=4
(RGBA) with images with transparency.Note
OpenCV is used to load images and therefore this class supports every format that library supports. Check the OpenCV docs for supported formats.
Warning
All images must have the same height, width and number of channels (
HWC
). Otherwise batch creation in__call__()
will fail.- Parameters:
directory (Path) – root directory where images are located. As with other
Producer
images are only loaded to memory when batches are requested.extensions (None | str | Collection[str]) – [keyword arg, optional] one, many or none extensions to be used to discover images when traversing the filesystem. If no extensions are provided default list will be used (which includes jpeg, jpg, png, bmp, and tiff).
recursive (bool) – [keyword arg, optional] if
True
, all subdirectories ofdirectory
will be traversed, otherwise onlydirectory
will be used to discover images (defaults toTrue
).field (str) – [keyword arg, optional] the key under which the images will be stored in the resulting data
Batch
(defaults to “images”).
- Raises:
NotADirectoryError – if the
directory
is not a directory.DNIKitException – if no images are found in the given
directory
.DNIKitException – if OpenCV is not available.
TrainTestSplitProducer
#
- class dnikit.base.TrainTestSplitProducer(split_dataset, attach_metadata=True, max_samples=-1)[source]
Produce
Batches
from a train/test split of the form:(x_train, y_train), (x_test, y_test)
where variables are numpy arrays, the x arrays represent features, and the y arrays represent labels.
For instance, for MNIST, features array
x_train
might be of shape (60000, 28, 28, 1) with corresponding labels arrayy_train
might be of shape (60000, 1).Only one of
x_train
,x_test
can be empty (size 0 NumPy array).Note
This format is the direct output of calling
load_data()
on a tf.keras.dataset. One can initialize a dataset fromtf.keras.datasets
simply by writing:TrainTestSplitProducer(tf.keras.datasets.cifar10.load_data())
- Parameters:
split_dataset (Tuple[Tuple[ndarray, ndarray], Tuple[ndarray, ndarray]]) – see
split_dataset
attach_metadata (bool) – [optional] see
attach_metadata
max_samples (int) – [optional] see
max_samples
Writing a custom Producer#
To teach DNIKit how to load data as batches, it’s possible to create a custom
producer by subclassing Producer
.
Here is an example of creating a custom Producer for the
CIFAR-10 dataset.
DNIKit operates on datasets in batches, so that it can handle large-scale datasets without loading everything into memory at once. For each batch, it’s possible to also attach metadata like unique identifiers and labels, which help in data introspector analysis and visualization features:
Identifier: unique identifier for each data sample (in this case, the path to the file)
- Label: a dict of (key, value) with a number of labels for each data sample
E.g.
"class"
label: airplane, automobile, etc.E.g.
"dataset"
label: train vs. test
Follow the comments in the Cifar10Producer
code block below to learn
how to create a custom producer.
import os
from pathlib import Path
import typing as t
import numpy as np
import cv2
from keras.datasets import cifar10
from dnikit.base import Producer, Batch
class Cifar10Producer(Producer):
def __init__(self, data_path: str, max_data: int = -1) -> None:
# Where data will be written to be packaged up with Dataset Report
self.data_path = data_path
# Max data samples to pull from. This is helpful for local debugging.
self.max_data = max_data
# Load entire CIFAR10 dataset into memory
(x_train, y_train), (x_test, y_test) = cifar10.load_data()
# Concatenate the train and test into one array, as well as the train/test labels, and the class labels
self.dataset = np.concatenate((x_train, x_test))
self.dataset_labels = ['train']*len(x_train) + ['test']*len(x_test)
self.class_labels = np.squeeze(np.concatenate((y_train, y_test)))
self.class_to_name = ['airplane', 'automobile', 'bird', 'cat', 'deer', 'dog', 'frog', 'horse', 'ship', 'truck']
def __post_init__(self) -> None:
if self.max_data <=0:
self.max_data = len(self.dataset)
def _class_path(self, index: int) -> str:
return f"{self.dataset_labels[index]}/{self.class_to_name[int(self.class_labels[index])]}"
def _write_images_to_disk(self, ii: int, jj: int) -> None:
for idx in range(ii, jj):
base_path = os.path.join(self.data_path, self._class_path(idx))
Path(base_path).mkdir(exist_ok=True, parents=True)
filename = os.path.join(base_path, f"image{idx}.png")
# Write to disk after converting to BGR format, used by opencv
cv2.imwrite(filename, cv2.cvtColor(self.dataset[idx, ...], cv2.COLOR_RGB2BGR))
def __call__(self, batch_size: int) -> t.Iterable[Batch]:
"""The important function... yield a batch of data from the downloaded dataset"""
# Iteratively loop over the data samples and yield it in batches
for ii in range(0, self.max_data, batch_size):
jj = min(ii+batch_size, self.max_data)
# Optional step, write data locally since it was loaded from keras
self._write_images_to_disk(ii, jj)
# Create batch from data already in memory
builder = Batch.Builder(
fields={"images": self.dataset[ii:jj, ...]}
)
# Use pathname as the identifier for each data sample, excluding base data directory
builder.metadata[Batch.StdKeys.IDENTIFIER] = [
os.path.join(self._class_path(idx), f"image{idx}.png")
for idx in range(ii, jj)
]
# Add class and dataset labels
builder.metadata[Batch.StdKeys.LABELS] = {
"class": [self.class_to_name[int(lbl_idx)] for lbl_idx in self.class_labels[ii:jj]],
"dataset": self.dataset_labels[ii:jj]
}
yield builder.make_batch()
To use this custom Producer, it can be instantiated like so:
cifar10_producer = Cifar10Producer(
# Where to store the data to disk. For exporting a standalone report,
# this should be a local (relative) path to the current working directory.
data_path='./cifar/',
# This "max data" param is purely for running a notebook quickly
# Remove this param to run on the whole dataset
max_data=1000
)
Producer of Model Responses#
As noted, a Producer can also produce model responses to feed directly into an Introspector.
from dnikit.base import Producer, Batch
from dnikit.introspectors import Familiarity
def function_to_run_model_inference_on_batch(data: np.ndarray, response_name: str) -> np.ndarray:
# extract `response_name` for data when running through dummy model
# ...
return responses_for_input_data
class MyModelResponseProducer(Producer):
def __init__(self, datafolder: str, response_name: str) -> None:
self.datafolder = datafolder
self.response_name = response_name
def __call__(self, batch_size: int) -> t.Iterable[Batch]:
for ii in range(0, n_data_samples, batch_size):
current_data = # read next `batch_size` data samples
responses = function_to_run_model_inference_on_batch(current_data, self.response_name)
yield Batch({self.response_name: responses})
my_response_producer = MyModelResponseProducer(datafolder, 'response1')
familiarity = Familiarity.introspect(my_response_producer)
...
This is a good option if model responses have already been generated and saved, the model is in a format that DNIKit does not currently support (e.g., JAX), or the model is hosted on the cloud and responses will be fetched asynchronously.