returnn.datasets.postprocessing

Provides PostprocessingDataset.

class returnn.datasets.postprocessing.PostprocessingDataset(dataset: Dict[str, Any], map_seq: Callable | None = None, map_seq_stream: Callable | None = None, map_outputs: Dict[str, Any] | None = None, **kwargs)[source]

A dataset that allows for generic post-processing of data from another dataset using a function on the segment level and on the level of multiple segments via an iterator.

This allows integrating various data augmentation techniques like e.g. Mixup, SpecAugment or speed perturbation into the data loading pipeline.

The integration into the data loading pipeline makes it easy to distribute the data processing work across multiple CPU cores using MultiProcDataset and in turn frees the GPU from data preprocessing tasks.

Example usage:

from returnn.tensor.dim import Dim, DimTypes

time_dim = Dim(None, kind=DimTypes.Spatial)
new_data_dim = Dim(128)

train = {
    "class": "PostprocessingDataset",
    "dataset": {
        "class": "HDFDataset",
        "files": ["/path/to/data.hdf"],
    },
    # one of them, but not both:
    # (data: TensorDict, *, rng: numpy.random.RandomState, **kwargs) -> TensorDict
    "map_seq": map_seq,
    # (iter: Iterator[TensorDict], *, rng: numpy.random.RandomState, **kwargs) -> Iterator[TensorDict]
    "map_seq_stream": map_seqs,
    # only required when data shapes change wrt. the wrapped dataset:
    "map_outputs": {
        "data": {"dims": [time_dim, new_data_dim]},
    },
}

The dataset itself does not support its own seq ordering and relies on the wrapped dataset for seq ordering instead. Specifying a seq_ordering other than default results in an error.

However, we provide an iterator that implements the common laplace:.NUM_SEQS_PER_BIN-variant of seq ordering that any custom map_seq_stream-style postprocessing iterator can be composed with to implement the ordering via LaplaceOrdering.

Like this:

from returnn.datasets.postprocessing import LaplaceOrdering, Sequential

def my_map_seq_stream(iterator):
    ...

train = {
    "class": "PostprocessingDataset",
    # ...
    "map_seq_stream": Sequential(
        my_map_seq_stream,
        LaplaceOrdering(num_seqs_per_bin=1000),
    ),
}
Parameters:
  • dataset – inner dataset to be post-processed

  • map_seq – post processor function operating on the single-segment level. Signature: (data: TensorDict, *, rng: numpy.random.RandomState, **kwargs) -> TensorDict To avoid confusion on the order of how the processing functions are applied to the data, only one of map_seq and map_seq_stream can be specified at a time. To ensure forwards compatibility, the function must accept **kwargs as its last argument. This is enforced by passing randomly named parameters at runtime.

  • map_seq_stream

    post processor function operating on the multiple segment level via an iterator. Allows merging multiple segments into one, or generating multiple output segments from one input segment. Signature:

    (iter: Iterator[TensorDict], *, rng: numpy.random.RandomState, **kwargs) -> Iterator[TensorDict]

    To avoid confusion on the order of how the processing functions are applied to the data, only one of map_seq and map_seq_stream can be specified at a time. To ensure forwards compatibility, the function must accept **kwargs as its last argument. This is enforced by passing randomly named parameters at runtime.

  • map_outputs – Type and axis specification of the outputs of the mapping functions, like extern_data and model_outputs. To simplify the common case when no shapes change, this value can be left unspecified. The dataset then assumes the same data layout as returned by the wrapped dataset. Example: map_outputs={“data”: {“dim”: 42}}

  • kwargs – see CachedDataset2, Dataset

init_seq_order(epoch: int | None = None, seq_list: List[str] | None = None, seq_order: List[int] | None = None)[source]
Parameters:
  • epoch

  • seq_list

  • seq_order

Returns:

whether the order changed (True is always safe to return)

get_current_seq_order()[source]
Returns:

current seq order of wrapped dataset, if map_seq_stream is not used

get_data_keys()[source]
Returns:

available data keys

get_data_dtype(key)[source]
Returns:

dtype of data entry key

supports_sharding() bool[source]
Returns:

whether this dataset supports sharding

class returnn.datasets.postprocessing.LaplaceOrdering(num_seqs_per_bin: int, length_key: str = 'data')[source]

Iterator compatible with PostprocessingDataset’s map_seq_stream applying laplace sequence ordering based on the number of segments per bin.

To be composed with any custom data postprocessing logic via Sequential.

Parameters:
  • num_seqs_per_bin – number of segments in a single laplace bin.

  • length_key – data key to determine the segment length from for ordering.

class returnn.datasets.postprocessing.Sequential(*postprocessing_funcs: Callable)[source]

Callable that composes multiple postprocessing functions into one by sequential application, i.e. Sequential(f, g)(x) = (g ∘ f)(x) = g(f(x)).

Can either compose map_seq-style single-segment processor functions or map_seq_stream-style iterators operating on multiple segments. Just make sure not to mix both styles.

Parameters:

postprocessing_funcs – Postprocessing functions to compose.