Processors API#

class dnikit.processors.Processor(func, *, fields=None)[source]#

Bases: PipelineStage

Class to apply transformations to the fields of Batch.

All other processors in DNIKit should inherit from this class. Note that this is not an abstract base class. A custom valid processor can be instantiated by simply passing a function, as shown in the next example:

Example

def to_db_func(in: np.ndarray) -> np.ndarray:
    ref_value = 1e-5
    return 20 * np.log10(in/ref_value)

processor = Processor(to_db_func)
# processor can now be used with a pipeline.
Parameters:
  • func (Callable[[ndarray], ndarray]) – transformation to be applied to selected fields.

  • fields (None | str | Collection[str]) – [keyword arg, optional] a single field, or an iterable of fields, to be processed. If fields is None, then all all fields will be processed.

class dnikit.processors.Cacher(storage_path=None)[source]#

Bases: PipelineStage

Cacher is a PipelineStage that will cache to disk the batches produced by the previous Producer in a pipeline created with pipeline().

The first time a pipeline with a Cacher is executed, Cacher store the batches to disk. Every time the pipeline is called after that, batches will be read directly from disk, without doing any computation for previous stages.

Note that batches may be quite large and this may require a large portion of available disk space. Be mindful when using Cacher.

If the data from the producer does not have Batch.StdKeys.IDENTIFIER, this class will assign a numeric identifier. This cannot be used across calls to Cacher but will be consistent for all uses of the pipelined_producer.

Example

producer = ... # create a valid dnikit Producer
processor = ... # create a valid dnikit Processor
cacher = Cacher()

# Pipeline everything
pipelined_producer = pipeline(producer, processor, cacher)

# No results have been cached
cacher.cached  # returns False

# Trigger pipeline
batches = list(pipelined_producer(batch_size=32)) # producer and processor are invoked.

# Results have been cached
cacher.cached  # returns True

# Trigger pipeline again (fast, because batch_size has the same value as before)
list(pipelined_producer(batch_size=32))  # producer and processor are NOT invoked

# Trigger pipeline once more (slower, because batch_size is different from first time)
list(pipelined_producer(batch_size=48))  # producer and processor are NOT invoked

The typical use-case for this class is to cache the results of expensive computation (such as inference and post-processing) to avoid re-doing said computation more than once.

Note

Just as with Model, and Processor no computation (or in this case, caching) will be executed until the pipeline is triggered.

See also

dnikit.base.multi_introspect() which allows several introspectors to use the same batches without storing them in the file-system. multi_introspect() may be a better option for very large datasets.

Warning

Cacher has the ability to resize batches if batches of different sizes are requested (see example). However, doing so is relatively computationally expensive since it involves concatenating and splitting batches. Therefore it’s recommended to use this feature sparingly.

Warning

Unlike other PipelineStage, Cacher will raise a DNIKitException if it is used with more than one pipeline. This is to avoid reading batches generated from another pipeline with different characteristics.

Parameters:

storage_path (Path | None) – [optional ] If set, Cacher will store batches in storage_path, otherwise it will create a random temporary directory.

as_producer()[source]#

Get a CachedProducer which loads the batches stored by this Cacher.

Raises:

DNIKitException – if called before caching has been completed.

Return type:

CachedProducer

property cached: bool#

True if all the batches have already been cached.

static clear(storage_path=None)[source]#

Clears files produced by Cacher and CachedProducer.

Parameters:

storage_path (Path | None) – if None (default case), function will clear all dnikit caches under a system’s temporary directory. Otherwise it will clear all dnikit caches under the specified directory.

Raises:

NotADirectoryError – if storage_path is not a valid directory.

Return type:

None

Warning

Make sure to only call this function once pipelines are no longer needed (or before pipelines are used at all). Otherwise, a cache that is already in use may be destroyed!

property storage_path: Path#

The (absolute) path where the batches are being cached.

class dnikit.processors.Composer(filter)[source]#

Bases: PipelineStage

Apply a filter function to all batches, e.g. composing filter(b).

Parameters:

filter (Callable[[Batch], Batch | None]) – The filter function to apply to every batch in the pipeline. The filter should take a single Batch as input and return a transformed batch (e.g. a subset) or None (to produce an empty batch).

classmethod from_dict_metadata(metadata_key, label_dimension, label)[source]#

Initialize a Composer to filter Batches by restrictions on their metadata, as accessed with a DictMetaKey, e.g., Batch.StdKeys.LABELS.

Parameters:
Returns:

Composer that filters Batches by dict metadata criteria

Return type:

Composer

classmethod from_element_filter(elem_filter)[source]#

Initialize a Composer that filters batch data based on element-wise filter criteria

Parameters:

elem_filter (Callable[[ElementType], bool]) – Batch.element-wise validation fnc. Returns True if valid else False

Returns:

Composer that filters batches to only elements that meet filter criteria

Return type:

Composer

class dnikit.processors.Concatenator(dim, output_field, fields)[source]#

Bases: PipelineStage

This PipelineStage will concatenate 2 or more fields in the Batch and produce a new field with the given output_field.

Example

If there were fields M and N with dimensions BxM1xZ and BxN1xZ and they were concatenated along dimension 1, the result will have a new field of size Bx(M1+N1)xZ.

Parameters:
dim: int#

the dimension to concatenate along

fields: Sequence[str]#

a sequence of fields to concatenate, in order

output_field: str#

name of the new field (layer name) to hold the result

class dnikit.processors.FieldRemover(*, fields, keep=False)[source]#

Bases: PipelineStage

A PipelineStage that removes some fields from a Batch.

Parameters:
  • fields (str | Collection[str]) – [keyword arg] a single field name, or an iterable of field names, to be removed.

  • keep (bool) – [keyword arg, optional] if True, the fields input will be kept and all other will be removed

class dnikit.processors.FieldRenamer(mapping)[source]#

Bases: PipelineStage

A PipelineStage that renames some fields from a Batch.

Parameters:

mapping (Mapping[str, str]) – a dictionary (or similar) whose keys are the old field names and values are the new field names.

class dnikit.processors.Flattener(order='C', fields=None)[source]#

Bases: Processor

A Processor that collapses array of shape BxN1xN2x.. into BxN

Parameters:
  • order (str) –

    [optional] {C, F, A, K}:

    C (default) means to flatten in row-major (C-style) order.

    F means to flatten in column-major (Fortran-style) order.

    A means to flatten in column-major order if it is Fortran contiguous in memory, row-major order otherwise.

    K means to flatten in the order the elements occur in memory.

  • fields (None | str | Collection[str]) – [optional] a single field name, or an iterable of field names, to be resized. If the fields param is None, then all the fields in the batch will be resized.

Raises:

ValueError – if order param is not one of {C, F, A, K}

class dnikit.processors.ImageGammaContrastProcessor(gamma=1.0, *, fields=None)[source]#

Bases: Processor

Processor that gamma corrects images in a data field from a Batch. BxCHW and BxHWC images accepted with non-normalized values (between 0 and 255). Image (I) is contrasting using formula (I/255)^gamma*255.

Parameters:
  • gamma (float) – [optional] contrast filter

  • fields (None | str | Collection[str]) – [keyword arg, optional] a single field name, or an iterable of field names, to be processed. If fields param is None, then all fields will be processed.

Raises:

DNIKitException – if OpenCV is not installed.

class dnikit.processors.ImageGaussianBlurProcessor(sigma=0.0, *, fields=None)[source]#

Bases: Processor

Processor that blurs images in a data field from a Batch. BxCHW and BxHWC images accepted with non-normalized values (between 0 and 255).

Parameters:
  • sigma (float) – [optional] blur filter size; recommended values between 0 and 3, but values beyond this range are acceptable.

  • fields (None | str | Collection[str]) – [keyword arg, optional] a single field name, or an iterable of field names, to be processed. If fields param is None, then all fields will be processed.

Raises:
class dnikit.processors.ImageResizer(*, pixel_format, size, fields=None)[source]#

Bases: Processor

Initialize an ImageResizer. This uses OpenCV to resize images. This can convert responses with the structure BxHxWxC (see ImageFormat for alternatives) to a new HxW value. This does not honor aspect ratio – the new image will be exactly the size given. This uses the default OpenCV interpolation, INTER_LINEAR.

Parameters:
  • pixel_format (ImageFormat) – [keyword arg] the layout of the pixel data, see ImageFormat

  • size (Tuple[int, int]) – [keyword arg] the size to scale to, (width, height)

  • fields (None | str | Collection[str]) – [keyword arg, optional] a single field name, or an iterable of field names, to be processed. If fields param is None, then all fields will be resized.

Raises:
class dnikit.processors.ImageRotationProcessor(angle=0.0, pixel_format=ImageFormat.HWC, *, cval=(0, 0, 0), fields=None)[source]#

Bases: Processor

Processor that performs image rotation along y-axis on data in a data field from a Batch. BxCHW and BxHWC images accepted with non-normalized values (between 0 and 255).

Parameters:
  • angle (float) – [optional] angle (in degrees) of image rotation; positive values mean counter-clockwise rotation

  • pixel_format (ImageFormat) – [optional] the layout of the pixel data, see ImageFormat

  • cval (Tuple[int, int, int]) – [keyword arg, optional] RGB color value to fill areas outside image; defaults to (0, 0, 0) (black)

  • fields (None | str | Collection[str]) – [keyword arg, optional] a single field name, or an iterable of field names, to be processed. If fields param is None, then all fields will be processed.

Raises:

DNIKitException

if OpenCV is not installed.

class dnikit.processors.MeanStdNormalizer(*, mean, std, fields=None)[source]#

Bases: Processor

A Processor that standardizes a field of a Batch by subtracting the mean and adjusting the standard deviation to 1.

More precisely, if x is the data to be processed, the following processing is applied: (x - mean) / std.

Parameters:
  • mean (float) – [keyword arg] The mean to be applied

  • std (float) – [keyword arg] The standard deviation to be applied

  • fields (None | str | Collection[str]) – [keyword arg, optional] a single field name, or an iterable of field names, to be processed. If fields param is None, then all fields will be processed.

class dnikit.processors.MetadataRemover(*, meta_keys=None, keys=None, keep=False)[source]#

Bases: PipelineStage

A PipelineStage that removes some metadata from a Batch.

Parameters:
  • meta_keys (None | MetaKey | DictMetaKey | Collection[MetaKey | DictMetaKey]) – [keyword arg, optional] either a single instance or an iterable of Batch.MetaKey / Batch.DictMetaKey that may be removed. If None (the default case), this processor will operate on all metadata keys.

  • keys (Any) – [keyword arg, optional] key within metadata to be removed. metadata with metadata key type Batch.DictMetaKey is a mapping from str: data-type. This argument specifies the str key-field that will be removed from the batch’s metadata, where the metadata must have metadata key type Batch.DictMetaKey. If None (the default case), this processor will operate on all key-fields for metadata with type Batch.DictMetaKey metadata key.

  • keep (bool) – [keyword arg, optional] if True, the selected meta_keys and keys now specify what to keep, and all other data will be removed.

class dnikit.processors.MetadataRenamer(mapping, *, meta_keys=None)[source]#

Bases: PipelineStage

A PipelineStage that renames some metadata fields in a Batch. This only works with metadata that has key type Batch.DictMetaKey.

Parameters:
  • mapping (Mapping[str, str]) – a dictionary (or similar) whose keys are the old metadata field names and values are the new metadata field names.

  • meta_keys (None | DictMetaKey | Collection[DictMetaKey]) – [keyword arg, optional] either a single instance or an iterable of metadata keys of type Batch.DictMetaKey whose key-fields will be renamed. If None (the default case), all key-fields for all metadata keys will be renamed.

Note

MetadataRenamer only works with class:Batch.DictMetaKey <dnikit.base.Batch.DictMetaKey> (which has entries that can be renamed).

class dnikit.processors.PipelineDebugger(label='', first_only=True, dump_fields=False, fields=None)[source]#

Bases: PipelineStage

A PipelineStage that can be used to inspect batches in a pipeline.

Parameters:
static dump(batch, label='', dump_fields=False, fields=None)[source]#

Utility method to produce a dump of a Batch or a Batch.Builder.

Parameters:
Return type:

str

dump_fields: bool = False#

If True, print the contents of the fields.

fields: None | str | Collection[str] = None#

List of fields of interest. Default is None which means all. See dump_fields

first_only: bool = True#

Show the first batch only.

label: str = ''#

Optional label to display.

class dnikit.processors.Pooler(*, dim, method, fields=None)[source]#

Bases: Processor

A Processor that pools the axes of a data field from a Batch with a specific method.

Parameters:
  • dim (int | Collection[int]) – [keyword arg] The dimension (one or many) to be pooled. E.g., Spatial pooling is generally (1, 2).

  • method (Method) – [keyword arg] Pooling method. See Pooler.Method for full list of options.

  • fields (None | str | Collection[str]) – [keyword arg, optional] a single field name, or an iterable of field names, to be pooled. If the fields param is None, then all the fields in the batch will be pooled.

class Method(value)[source]#

Bases: Enum

An enumeration.

AVERAGE = 3#
MAX = 1#
SUM = 2#
class dnikit.processors.SnapshotRemover(snapshots=None, keep=False)[source]#

Bases: PipelineStage

A PipelineStage that removes snapshots from a Batch. If used with no arguments, this will remove all snapshots.

Parameters:
keep: bool = False#

If True, the listed snapshots are kept, else the snapshots will be removed.

snapshots: None | str | Collection[str] = None#

List of snapshots to keep/remove.

class dnikit.processors.SnapshotSaver(save='snapshot', fields=None, keep=True)[source]#

Bases: PipelineStage

A PipelineStage that attaches the current Batch as the snapshot.

Parameters:
fields: None | str | Collection[str] = None#

Optional list of fields to include/remove in the saved snapshot or None for all.

keep: bool = True#

If True, the fields list are the fields to keep, if False, the ones to omit.

save: str = 'snapshot'#

save the current state of the batches under the given key.

class dnikit.processors.Transposer(*, dim, fields=None)[source]#

Bases: Processor

A Processor that transposes dimensions in a data field from a Batch. This processor will reorder the dimensions of the data as specified in the dim param.

Example

To reorder NCHW to NHWC (or vice versa), specify Transposer(dim=[0,3,1,2])

Parameters:
  • dim (Sequence[int]) – [keyword arg] the new order of the dimensions. It is illegal to reorder the 0th dimension.

  • fields (None | str | Collection[str]) – [keyword arg, optional] a single field name, or an iterable of field names, to be transposed. If fields param is None, then all fields will be transposed.

Raises:

ValueError – if input specifies reordering the 0th dimension

Parameters: