Batch Processors#

DNIKit provides a large number of various Processors for batches. For instance, a processor might resize the images in a batch, perform data augmentation, remove batch fields, attach metadata, rename labels, etc. These processors are chained together in pipelines, acting as PipelineStages. Note that Processors always come after a data Producer, which is what generates batches to begin with.

Here are most of the available batch processors and data loaders, and links to their API for more information.

Batch filtering and concatenation#

Composer for filtering#

class dnikit.processors.Composer(filter)[source]

Apply a filter function to all batches, e.g. composing filter(b).

Parameters:

filter (Callable[[Batch], Batch | None]) – The filter function to apply to every batch in the pipeline. The filter should take a single Batch as input and return a transformed batch (e.g. a subset) or None (to produce an empty batch).

Concatenator for merging fields#

class dnikit.processors.Concatenator(dim, output_field, fields)[source]

This PipelineStage will concatenate 2 or more fields in the Batch and produce a new field with the given output_field.

Example

If there were fields M and N with dimensions BxM1xZ and BxN1xZ and they were concatenated along dimension 1, the result will have a new field of size Bx(M1+N1)xZ.

Parameters:

Renaming fields and metadata#

FieldRenamer#

class dnikit.processors.FieldRenamer(mapping)[source]

A PipelineStage that renames some fields from a Batch.

Parameters:

mapping (Mapping[str, str]) – a dictionary (or similar) whose keys are the old field names and values are the new field names.

MetadataRenamer#

class dnikit.processors.MetadataRenamer(mapping, *, meta_keys=None)[source]

A PipelineStage that renames some metadata fields in a Batch. This only works with metadata that has key type Batch.DictMetaKey.

Parameters:
  • mapping (Mapping[str, str]) – a dictionary (or similar) whose keys are the old metadata field names and values are the new metadata field names.

  • meta_keys (None | DictMetaKey | Collection[DictMetaKey]) – [keyword arg, optional] either a single instance or an iterable of metadata keys of type Batch.DictMetaKey whose key-fields will be renamed. If None (the default case), all key-fields for all metadata keys will be renamed.

Note

MetadataRenamer only works with class:Batch.DictMetaKey <dnikit.base.Batch.DictMetaKey> (which has entries that can be renamed).

Removing fields and metadata#

FieldRemover#

class dnikit.processors.FieldRemover(*, fields, keep=False)[source]

A PipelineStage that removes some fields from a Batch.

Parameters:
  • fields (str | Collection[str]) – [keyword arg] a single field name, or an iterable of field names, to be removed.

  • keep (bool) – [keyword arg, optional] if True, the fields input will be kept and all other will be removed

MetadataRemover#

class dnikit.processors.MetadataRemover(*, meta_keys=None, keys=None, keep=False)[source]

A PipelineStage that removes some metadata from a Batch.

Parameters:
  • meta_keys (None | MetaKey | DictMetaKey | Collection[MetaKey | DictMetaKey]) – [keyword arg, optional] either a single instance or an iterable of Batch.MetaKey / Batch.DictMetaKey that may be removed. If None (the default case), this processor will operate on all metadata keys.

  • keys (Any) – [keyword arg, optional] key within metadata to be removed. metadata with metadata key type Batch.DictMetaKey is a mapping from str: data-type. This argument specifies the str key-field that will be removed from the batch’s metadata, where the metadata must have metadata key type Batch.DictMetaKey. If None (the default case), this processor will operate on all key-fields for metadata with type Batch.DictMetaKey metadata key.

  • keep (bool) – [keyword arg, optional] if True, the selected meta_keys and keys now specify what to keep, and all other data will be removed.

SnapshotRemover#

class dnikit.processors.SnapshotRemover(snapshots=None, keep=False)[source]

A PipelineStage that removes snapshots from a Batch. If used with no arguments, this will remove all snapshots.

Parameters:

General data transforms#

MeanStdNormalizer#

class dnikit.processors.MeanStdNormalizer(*, mean, std, fields=None)[source]

A Processor that standardizes a field of a Batch by subtracting the mean and adjusting the standard deviation to 1.

More precisely, if x is the data to be processed, the following processing is applied: (x - mean) / std.

Parameters:
  • mean (float) – [keyword arg] The mean to be applied

  • std (float) – [keyword arg] The standard deviation to be applied

  • fields (None | str | Collection[str]) – [keyword arg, optional] a single field name, or an iterable of field names, to be processed. If fields param is None, then all fields will be processed.

Pooler (Max Pooling)#

class dnikit.processors.Pooler(*, dim, method, fields=None)[source]

A Processor that pools the axes of a data field from a Batch with a specific method.

Parameters:
  • dim (int | Collection[int]) – [keyword arg] The dimension (one or many) to be pooled. E.g., Spatial pooling is generally (1, 2).

  • method (Method) – [keyword arg] Pooling method. See Pooler.Method for full list of options.

  • fields (None | str | Collection[str]) – [keyword arg, optional] a single field name, or an iterable of field names, to be pooled. If the fields param is None, then all the fields in the batch will be pooled.

Transposer#

class dnikit.processors.Transposer(*, dim, fields=None)[source]

A Processor that transposes dimensions in a data field from a Batch. This processor will reorder the dimensions of the data as specified in the dim param.

Example

To reorder NCHW to NHWC (or vice versa), specify Transposer(dim=[0,3,1,2])

Parameters:
  • dim (Sequence[int]) – [keyword arg] the new order of the dimensions. It is illegal to reorder the 0th dimension.

  • fields (None | str | Collection[str]) – [keyword arg, optional] a single field name, or an iterable of field names, to be transposed. If fields param is None, then all fields will be transposed.

Raises:

ValueError – if input specifies reordering the 0th dimension

Parameters:

Image operations#

ImageResizer to resize images#

class dnikit.processors.ImageResizer(*, pixel_format, size, fields=None)[source]

Initialize an ImageResizer. This uses OpenCV to resize images. This can convert responses with the structure BxHxWxC (see ImageFormat for alternatives) to a new HxW value. This does not honor aspect ratio – the new image will be exactly the size given. This uses the default OpenCV interpolation, INTER_LINEAR.

Parameters:
  • pixel_format (ImageFormat) – [keyword arg] the layout of the pixel data, see ImageFormat

  • size (Tuple[int, int]) – [keyword arg] the size to scale to, (width, height)

  • fields (None | str | Collection[str]) – [keyword arg, optional] a single field name, or an iterable of field names, to be processed. If fields param is None, then all fields will be resized.

Raises:

ImageRotationProcessor to rotate images#

class dnikit.processors.ImageRotationProcessor(angle=0.0, pixel_format=ImageFormat.HWC, *, cval=(0, 0, 0), fields=None)[source]

Processor that performs image rotation along y-axis on data in a data field from a Batch. BxCHW and BxHWC images accepted with non-normalized values (between 0 and 255).

Parameters:
  • angle (float) – [optional] angle (in degrees) of image rotation; positive values mean counter-clockwise rotation

  • pixel_format (ImageFormat) – [optional] the layout of the pixel data, see ImageFormat

  • cval (Tuple[int, int, int]) – [keyword arg, optional] RGB color value to fill areas outside image; defaults to (0, 0, 0) (black)

  • fields (None | str | Collection[str]) – [keyword arg, optional] a single field name, or an iterable of field names, to be processed. If fields param is None, then all fields will be processed.

Raises:

DNIKitException

if OpenCV is not installed.

Augmentations#

ImageGaussianBlurProcessor#

class dnikit.processors.ImageGaussianBlurProcessor(sigma=0.0, *, fields=None)[source]

Processor that blurs images in a data field from a Batch. BxCHW and BxHWC images accepted with non-normalized values (between 0 and 255).

Parameters:
  • sigma (float) – [optional] blur filter size; recommended values between 0 and 3, but values beyond this range are acceptable.

  • fields (None | str | Collection[str]) – [keyword arg, optional] a single field name, or an iterable of field names, to be processed. If fields param is None, then all fields will be processed.

Raises:

ImageGammaContrastProcessor#

class dnikit.processors.ImageGammaContrastProcessor(gamma=1.0, *, fields=None)[source]

Processor that gamma corrects images in a data field from a Batch. BxCHW and BxHWC images accepted with non-normalized values (between 0 and 255). Image (I) is contrasting using formula (I/255)^gamma*255.

Parameters:
  • gamma (float) – [optional] contrast filter

  • fields (None | str | Collection[str]) – [keyword arg, optional] a single field name, or an iterable of field names, to be processed. If fields param is None, then all fields will be processed.

Raises:

DNIKitException

if OpenCV is not installed.

Utility processors#

Cacher to cache responses from pipelines#

class dnikit.processors.Cacher(storage_path=None)[source]

Cacher is a PipelineStage that will cache to disk the batches produced by the previous Producer in a pipeline created with pipeline().

The first time a pipeline with a Cacher is executed, Cacher store the batches to disk. Every time the pipeline is called after that, batches will be read directly from disk, without doing any computation for previous stages.

Note that batches may be quite large and this may require a large portion of available disk space. Be mindful when using Cacher.

If the data from the producer does not have Batch.StdKeys.IDENTIFIER, this class will assign a numeric identifier. This cannot be used across calls to Cacher but will be consistent for all uses of the pipelined_producer.

Example

producer = ... # create a valid dnikit Producer
processor = ... # create a valid dnikit Processor
cacher = Cacher()

# Pipeline everything
pipelined_producer = pipeline(producer, processor, cacher)

# No results have been cached
cacher.cached  # returns False

# Trigger pipeline
batches = list(pipelined_producer(batch_size=32)) # producer and processor are invoked.

# Results have been cached
cacher.cached  # returns True

# Trigger pipeline again (fast, because batch_size has the same value as before)
list(pipelined_producer(batch_size=32))  # producer and processor are NOT invoked

# Trigger pipeline once more (slower, because batch_size is different from first time)
list(pipelined_producer(batch_size=48))  # producer and processor are NOT invoked

The typical use-case for this class is to cache the results of expensive computation (such as inference and post-processing) to avoid re-doing said computation more than once.

Note

Just as with Model, and Processor no computation (or in this case, caching) will be executed until the pipeline is triggered.

See also

dnikit.base.multi_introspect() which allows several introspectors to use the same batches without storing them in the file-system. multi_introspect() may be a better option for very large datasets.

Warning

Cacher has the ability to resize batches if batches of different sizes are requested (see example). However, doing so is relatively computationally expensive since it involves concatenating and splitting batches. Therefore it’s recommended to use this feature sparingly.

Warning

Unlike other PipelineStage, Cacher will raise a DNIKitException if it is used with more than one pipeline. This is to avoid reading batches generated from another pipeline with different characteristics.

Parameters:

storage_path (Path | None) – [optional ] If set, Cacher will store batches in storage_path, otherwise it will create a random temporary directory.