returnn.tf.data_pipeline
¶
TensorFlow data pipeline¶
This module covers all related code to handle the data loading, preprocessing, chunking, batching and related things in TensorFlow, i.e. the TensorFlow data pipeline from the Dataset.
Some related documents:
https://github.com/rwth-i6/returnn/issues/292 (new dataset pipeline) https://www.tensorflow.org/guide/datasets https://www.tensorflow.org/versions/r1.12/api_guides/python/threading_and_queues https://www.tensorflow.org/guide/performance/overview https://www.tensorflow.org/guide/performance/datasets https://github.com/tensorflow/docs/tree/r1.10/site/en/api_docs/python/tf/contrib/data https://github.com/tensorflow/tensorflow/issues/4535 https://github.com/tensorflow/tensorflow/blob/master/tensorflow/contrib/data/README.md https://stackoverflow.com/questions/41187745/tensorflow-how-can-i-evaluate-a-validation-data-queue
Data shuffling¶
The sequence shuffling is implemented as part of the Dataset, although we could also use a tf.RandomShuffleQueue on sequence level for training.
Chunk shuffling can be done with another tf.RandomShuffleQueue for training.
Frame shuffling only makes sense for non-recurrent networks. It also only makes sense if we already did any windowing beforehand. It also only makes sense in training. In that case, we could do chunking just with chunk size 1 and chunk step 1, or maybe chunk size = context window size, and then the frame shuffling is just chunk shuffling, thus we do not need any separate frame shuffling logic.
Generic pipeline¶
We initialize the Dataset for some epoch. The Dataset could shuffle the sequence order based on the epoch. Then we iterate over the sequences/entries of the dataset (seq_idx), starting with seq_idx = 0, checking Dataset.is_less_than_num_seqs. We get the data for each data key (e.g. “data” or “classes”) as Numpy arrays with any shape. They don’t need to have the same number of time frames, although in the common case where we have a frame-wise alignment of class labels, they would have. In any case, we basically have dict[str,numpy.ndarray], the seq_idx and also a seq_tag.
We could implement another sequence shuffling with tf.RandomShuffleQueue in training.
We do chunking of the data of each sequence, i.e. selecting chunks of chunk size frames, iterating over the frames of a sequence with stride = chunk step. While doing chunking, we could add more context around each chunk (zero padding at the borders) if needed, e.g. if we use windowing or convolution with padding=”valid”, such that the output of the net will match that of the targets. However, this depends on where the data is used in the network; maybe it is used at multiple points?
We can do chunk shuffling with another tf.RandomShuffleQueue in training.
We build up batches from the chunks.
First the simple method via feed_dict and placeholders¶
This is implemented in FeedDictDataProvider
.
The input data which (and optionally the targets) can be represented with tf.compat.v1.placeholder and feed via feed_dict from TFCompat.v1.Session.run which does one train/eval/forward step. In this case, any preprocessing such as chunking and batching must be done beforehand via Numpy. This was the initial implementation and is also the standard implementation for the Theano backend.
This is not optimal because the tf.compat.v1.Session.run first has to copy the data from CPU to GPU and then can do whatever it is supposed to do (e.g. one train step). So copying and then the calculation is done in serial but it could be done in parallel with some other method which we will discuss below. Also the preprocessing could involve some more complex operations which could be slow with Python + Numpy. Also the chunk shuffling is more difficult to implement and would be slower compared to a pure TF solution.
Implementation via new tf.dataset API¶
Define def dataset_pipeline(context: InputContext) -> tf.data.Dataset
in your config.
See DatasetDataProvider
.
Some use case¶
Conv net training. For every sequence, window around every frame for context. Window must belong together, no unnecessary zero padding should be done introduced by chunking. Thus, windowing must be done before chunking, or additional zero-padding must be added before chunking. Then formulated differently: Chunking with step 1, output for a chunk is a single frame. It also means that the windowing can not be part of the network because we will get only chunks there, or the windowing makes only sense with padding=”valid”, otherwise we would get way too much zero-padding also at the border of every chunk. The same is true for convolution, pooling and others. I.e. padding in time should always be in “valid” mode. If we feed in a whole sequence, must return the whole sequence, in recog, forwarding, search or eval. With padding=”valid”, the output has less time frames, exactly context-size less frames. Conv should use padding=”valid” anyway to save computation time, and only explicitly pad zeros where needed. In recog, the input-format is (batch, time + context_size, …) which is zero-padded by context_size additional frames. So, have context_size as an additional global option for the network (could be set explicitly in config, or calculated automatically at construction). When chunking for such case, we also should have chunks with such zero-paddings so that recog matches. So, basically, a preprocessing step, before chunking, in both training and recog, is to add zero-padding of size context_size to the input, then we get the output of the expected size.
It depends on whether the full network is recurrent or not.
- class returnn.tf.data_pipeline.DataProviderBase(*, extern_data: TensorDict, data_keys: str | None = None)[source]¶
Base class which wraps up the logic in this class. See derived classes.
- Parameters:
extern_data
data_keys
- have_more_data(session)[source]¶
It is supposed to return True as long as we want to continue with the current epoch in the current dataset (train or eval). This is called from the same thread which runs the main computation graph (e.g. train steps).
- Parameters:
session (tf.compat.v1.Session)
- Returns:
whether the next session.run() can run in the current epoch & dataset
- Return type:
bool
- get_feed_dict(single_threaded=False)[source]¶
Gets the feed dict for TF session run(). Note that this will block if there is nothing in the queue. The queue gets filled by the other thread, via self.thread_main().
- Parameters:
single_threaded (bool) – whether to not use the queue
- Returns:
We dequeue one batch from the queue and provide the data for all placeholders of our external data. Additionally, there can be some meta information.
- Return type:
dict[tf.Tensor,tf.Tensor],dict[str]
- class returnn.tf.data_pipeline.FeedDictDataProvider(*, dataset: Dataset, batches: BatchSetGenerator, enforce_min_len1: bool = False, capacity: int = 10, batch_slice: slice | None = None, **kwargs)[source]¶
This class will fill all the placeholders used for training or forwarding or evaluation etc. of a
returnn.tf.network.TFNetwork
.It will run a background thread which reads the data from a dataset and puts it into a queue when you call start_threads().
In principle, this class can also be used independently of TensorFlow.
- Parameters:
dataset
batches
enforce_min_len1
capacity
batch_slice – select a subset of the batches
extern_data
data_keys
- get_next_batch(*, consider_batch_slice: bool = False) Dict[str, ndarray] | None [source]¶
This assumes that we have more data, i.e. self.batches.has_more().
The code is basically independent of TF now. self.extern_data can be any TensorDict.
- Parameters:
consider_batch_slice
- Returns:
batch-data-value-dict or None. if not consider_batch_slice, will never be None
- have_more_data(session)[source]¶
- Parameters:
session (tf.compat.v1.Session|None)
- Return type:
bool
- Returns:
when we go through an epoch and finished reading, this will return False
If this returns True, you can definitely read another item from the queue. Threading safety: This assumes that there is no other consumer thread for the queue.
- get_feed_dict(single_threaded=False)[source]¶
Gets the feed dict for TF session run(). Note that this will block if there is nothing in the queue. The queue gets filled by the other thread, via self.thread_main().
- Parameters:
single_threaded (bool) – whether to not use the queue
- Returns:
we dequeue one batch from the queue and provide it for all placeholders of our external data, and additionally return some meta information.
- Return type:
(dict[tf.Tensor,numpy.ndarray],dict[str])
- class returnn.tf.data_pipeline.InputContext(parent, extern_data, config, dataset_name, returnn_dataset, iterator)[source]¶
This object will be passed to the dataset pipeline function (
dataset_pipeline
in the config) and provides all relevant information, functions, dataset transformations.The initial design of this class was discussed here: https://github.com/rwth-i6/returnn/issues/292
- Parameters:
parent (DatasetDataProvider)
extern_data (ExternData)
config (returnn.config.Config)
dataset_name (str) – e.g. “train” or “dev”
returnn_dataset (Dataset)
iterator (tensorflow.data.Iterator)
- get_returnn_dataset(**kwargs)[source]¶
- Returns:
The RETURNN
Dataset
instances wrapped in atf.data.Dataset
. Note that in distributed TF, this dataset would only be used in the dataset loader worker. However, in all cases this will return some dataset. You are not allowed to read from it, though. A follow-up call tomap_producer_to_consumer()
will take care of this logic.- Return type:
tensorflow.data.Dataset
- get_default_max_seqs()[source]¶
- Returns:
batch size in number of seqs, used e.g. for padded_batch
- Return type:
int
- padded_batch_dataset(dataset, drop_remainder=False)[source]¶
- Parameters:
dataset (tensorflow.data.Dataset)
drop_remainder (bool) – if True, we would have a static batch size
- Return type:
tensorflow.data.Dataset
- map_producer_to_consumer(dataset)[source]¶
- Parameters:
dataset (tensorflow.data.Dataset)
- Return type:
tensorflow.data.Dataset
- prefetch_to_consumer_device(dataset)[source]¶
This must be called on the consumer (trainer) worker, i.e. after
map_producer_to_consumer()
.- Parameters:
dataset (tensorflow.data.Dataset)
- Return type:
tensorflow.data.Dataset
- class returnn.tf.data_pipeline.DatasetDataProvider(extern_data, config, datasets=None)[source]¶
Use a
tf.data.Dataset
as input. This will be used ifdataset_pipeline
is set in the config. See the discussion about the new dataset pipeline (https://github.com/rwth-i6/returnn/issues/292).Note that this has also a state: the current active dataset.
- Parameters:
extern_data (ExternData)
datasets (list[str]|dict[str,Dataset|None]|None) – e.g. [“train”, “dev”]
config (returnn.config.Config)
- start_threads(session)[source]¶
Start background threads.
Currently this wil not actually start the background threads. All/any background threads of tf.data are started automatically when needed.
However, this will initialize the TF dataset iterator.
- Parameters:
session (tf.compat.v1.Session)
- have_more_data(session)[source]¶
- Parameters:
session (tf.compat.v1.Session)
- Returns:
whether the next session.run() can run in the current epoch & dataset
- Return type:
bool