returnn.datasets.postprocessing¶
Provides PostprocessingDataset.
- class returnn.datasets.postprocessing.PostprocessingDataset(*, dataset: Dict[str, Any], map_seq: Callable | None = None, map_seq_stream: Callable | None = None, map_outputs: Dict[str, Any] | None = None, map_seq_stream_preserves_num_seqs: bool | None = None, buf_size: int = 1, num_workers: int = 0, **kwargs)[source]¶
A dataset that allows for generic online post-processing of data from another dataset using a function on the segment level and on the level of multiple segments via an iterator.
This allows integrating various data augmentation techniques like e.g. Mixup, SpecAugment or speed perturbation into the data loading pipeline.
The integration into the data loading pipeline makes it easy to distribute the data processing work across multiple CPU cores and in turn frees the GPU from data preprocessing tasks.
Multiprocessing can either be done using :class:
MultiProcDatasetor by setting num_workers > 0 on this class.The latter only applies parallelism to the post-processing functions themselves, and does not duplicate the underlying dataset once per worker. This is often fast enough and has the advantage of lower memory consumption.
Example usage:
from returnn.tensor.dim import Dim, DimTypes time_dim = Dim(None, kind=DimTypes.Spatial) new_data_dim = Dim(128) train = { "class": "PostprocessingDataset", "dataset": { "class": "HDFDataset", "files": ["/path/to/data.hdf"], }, # one of them, but not both: # (data: TensorDict, *, rng: numpy.random.RandomState, **kwargs) -> TensorDict "map_seq": map_seq, # (iter: Iterator[TensorDict], *, rng: numpy.random.RandomState, **kwargs) -> Iterator[TensorDict] "map_seq_stream": map_seqs, # only required when data shapes change wrt. the wrapped dataset: "map_outputs": { "data": {"dims": [time_dim, new_data_dim]}, }, }
The postprocessor functions operate on ``TensorDict``s, which have entries for all data keys in the underlying dataset.
There may also be additional “meta” entries in the tensor dicts, like
complete_frac,seq_idxandseq_tag. These should be copied over in a manner that is reasonable for the use case at hand and ensures forwards compatibility as well as reasonably possible.The dataset itself does not support its own seq ordering and relies on the wrapped dataset for seq ordering instead. Specifying a
seq_orderingother thandefaultresults in an error.However, we provide an iterator that implements the common laplace:.NUM_SEQS_PER_BIN-variant of seq ordering that any custom
map_seq_stream-style postprocessing iterator can be composed with to implement the ordering viaLaplaceOrdering.Like this:
from returnn.datasets.postprocessing import LaplaceOrdering, Sequential def my_map_seq_stream(iterator): ... train = { "class": "PostprocessingDataset", # ... "map_seq_stream": Sequential( my_map_seq_stream, LaplaceOrdering(num_seqs_per_bin=1000), ), }
- Parameters:
dataset – inner dataset to be post-processed
map_seq – post processor function operating on the single-segment level. Signature: (data: TensorDict, *, rng: numpy.random.RandomState, **kwargs) -> TensorDict To avoid confusion on the order of how the processing functions are applied to the data, only one of
map_seqandmap_seq_streamcan be specified at a time. To ensure forwards compatibility, the function must accept**kwargsas its last argument. This is enforced by passing randomly named parameters at runtime.map_seq_stream –
post processor function operating on the multiple segment level via an iterator. Allows merging multiple segments into one, or generating multiple output segments from one input segment. Signature:
(iter: Iterator[TensorDict], *, rng: numpy.random.RandomState, **kwargs) -> Iterator[TensorDict]To avoid confusion on the order of how the processing functions are applied to the data, only one of
map_seqandmap_seq_streamcan be specified at a time. To ensure forwards compatibility, the function must accept**kwargsas its last argument. This is enforced by passing randomly named parameters at runtime.map_outputs – Type and axis specification of the outputs of the mapping functions, like extern_data and model_outputs. To simplify the common case when no shapes change, this value can be left unspecified. The dataset then assumes the same data layout as returned by the wrapped dataset. Example: map_outputs={“data”: {“dim”: 42}}
map_seq_stream_preserves_num_seqs – whether the function in map_seq_stream preserves the number of sequences, i.e. for every input sequence there is exactly one output sequence.
buf_size – Buffer size for each worker, number of seqs to prefetch. Must be > 0.
num_workers – If > 0, configures the number of worker processes to use for data postprocessing. Only the postprocessing is distributed across subprocesses, the underlying dataset is only instantiated once. This usually has lower memory consumption than using :class:
MultiProcDataset.kwargs – see
CachedDataset2,Dataset
- init_seq_order(epoch: int | None = None, seq_list: List[str] | None = None, seq_order: List[int] | None = None)[source]¶
- Parameters:
epoch
seq_list
seq_order
- Returns:
whether the order changed (True is always safe to return)
- class returnn.datasets.postprocessing.LaplaceOrdering(num_seqs_per_bin: int, length_key: str = 'data')[source]¶
Iterator compatible with
PostprocessingDataset’smap_seq_streamapplying laplace sequence ordering based on the number of segments per bin.To be composed with any custom data postprocessing logic via
Sequential.- Parameters:
num_seqs_per_bin – number of segments in a single laplace bin.
length_key – data key to determine the segment length from for ordering.
- class returnn.datasets.postprocessing.Sequential(*postprocessing_funcs: Callable)[source]¶
Callable that composes multiple postprocessing functions into one by sequential application, i.e. Sequential(f, g)(x) = (g ∘ f)(x) = g(f(x)).
Can either compose
map_seq-style single-segment processor functions ormap_seq_stream-style iterators operating on multiple segments. Just make sure not to mix both styles.- Parameters:
postprocessing_funcs – Postprocessing functions to compose.