returnn.tf.data_pipeline#

TensorFlow data pipeline#

This module covers all related code to handle the data loading, preprocessing, chunking, batching and related things in TensorFlow, i.e. the TensorFlow data pipeline from the Dataset.

Some related documents:

https://github.com/rwth-i6/returnn/issues/292 (new dataset pipeline) https://www.tensorflow.org/guide/datasets https://www.tensorflow.org/versions/r1.12/api_guides/python/threading_and_queues https://www.tensorflow.org/guide/performance/overview https://www.tensorflow.org/guide/performance/datasets https://github.com/tensorflow/docs/tree/r1.10/site/en/api_docs/python/tf/contrib/data https://github.com/tensorflow/tensorflow/issues/4535 https://github.com/tensorflow/tensorflow/blob/master/tensorflow/contrib/data/README.md https://stackoverflow.com/questions/41187745/tensorflow-how-can-i-evaluate-a-validation-data-queue

Data shuffling#

  1. The sequence shuffling is implemented as part of the Dataset, although we could also use a tf.RandomShuffleQueue on sequence level for training.

  2. Chunk shuffling can be done with another tf.RandomShuffleQueue for training.

  3. Frame shuffling only makes sense for non-recurrent networks. It also only makes sense if we already did any windowing beforehand. It also only makes sense in training. In that case, we could do chunking just with chunk size 1 and chunk step 1, or maybe chunk size = context window size, and then the frame shuffling is just chunk shuffling, thus we do not need any separate frame shuffling logic.

Generic pipeline#

  1. We initialize the Dataset for some epoch. The Dataset could shuffle the sequence order based on the epoch. Then we iterate over the sequences/entries of the dataset (seq_idx), starting with seq_idx = 0, checking Dataset.is_less_than_num_seqs. We get the data for each data key (e.g. “data” or “classes”) as Numpy arrays with any shape. They don’t need to have the same number of time frames, although in the common case where we have a frame-wise alignment of class labels, they would have. In any case, we basically have dict[str,numpy.ndarray], the seq_idx and also a seq_tag.

  2. We could implement another sequence shuffling with tf.RandomShuffleQueue in training.

  3. We do chunking of the data of each sequence, i.e. selecting chunks of chunk size frames, iterating over the frames of a sequence with stride = chunk step. While doing chunking, we could add more context around each chunk (zero padding at the borders) if needed, e.g. if we use windowing or convolution with padding=”valid”, such that the output of the net will match that of the targets. However, this depends on where the data is used in the network; maybe it is used at multiple points?

  4. We can do chunk shuffling with another tf.RandomShuffleQueue in training.

  5. We build up batches from the chunks.

First the simple method via feed_dict and placeholders#

This is implemented in FeedDictDataProvider.

The input data which (and optionally the targets) can be represented with tf.compat.v1.placeholder and feed via feed_dict from TFCompat.v1.Session.run which does one train/eval/forward step. In this case, any preprocessing such as chunking and batching must be done beforehand via Numpy. This was the initial implementation and is also the standard implementation for the Theano backend.

This is not optimal because the tf.compat.v1.Session.run first has to copy the data from CPU to GPU and then can do whatever it is supposed to do (e.g. one train step). So copying and then the calculation is done in serial but it could be done in parallel with some other method which we will discuss below. Also the preprocessing could involve some more complex operations which could be slow with Python + Numpy. Also the chunk shuffling is more difficult to implement and would be slower compared to a pure TF solution.

Implementation via new tf.dataset API#

Define def dataset_pipeline(context: InputContext) -> tf.data.Dataset in your config. See DatasetDataProvider.

Some use case#

Conv net training. For every sequence, window around every frame for context. Window must belong together, no unnecessary zero padding should be done introduced by chunking. Thus, windowing must be done before chunking, or additional zero-padding must be added before chunking. Then formulated differently: Chunking with step 1, output for a chunk is a single frame. It also means that the windowing can not be part of the network because we will get only chunks there, or the windowing makes only sense with padding=”valid”, otherwise we would get way too much zero-padding also at the border of every chunk. The same is true for convolution, pooling and others. I.e. padding in time should always be in “valid” mode. If we feed in a whole sequence, must return the whole sequence, in recog, forwarding, search or eval. With padding=”valid”, the output has less time frames, exactly context-size less frames. Conv should use padding=”valid” anyway to save computation time, and only explicitly pad zeros where needed. In recog, the input-format is (batch, time + context_size, …) which is zero-padded by context_size additional frames. So, have context_size as an additional global option for the network (could be set explicitly in config, or calculated automatically at construction). When chunking for such case, we also should have chunks with such zero-paddings so that recog matches. So, basically, a preprocessing step, before chunking, in both training and recog, is to add zero-padding of size context_size to the input, then we get the output of the expected size.

It depends on whether the full network is recurrent or not.

class returnn.tf.data_pipeline.DataProviderBase(*, extern_data: TensorDict, data_keys: str | None = None)[source]#

Base class which wraps up the logic in this class. See derived classes.

Parameters:
  • extern_data

  • data_keys

start_threads(session)[source]#

Start threads.

Parameters:

session (tf.compat.v1.Session) –

stop_threads()[source]#

Stop threads.

have_more_data(session)[source]#

It is supposed to return True as long as we want to continue with the current epoch in the current dataset (train or eval). This is called from the same thread which runs the main computation graph (e.g. train steps).

Parameters:

session (tf.compat.v1.Session) –

Returns:

whether the next session.run() can run in the current epoch & dataset

Return type:

bool

get_feed_dict(single_threaded=False)[source]#

Gets the feed dict for TF session run(). Note that this will block if there is nothing in the queue. The queue gets filled by the other thread, via self.thread_main().

Parameters:

single_threaded (bool) – whether to not use the queue

Returns:

We dequeue one batch from the queue and provide the data for all placeholders of our external data. Additionally, there can be some meta information.

Return type:

dict[tf.Tensor,tf.Tensor],dict[str]

have_reached_end()[source]#
Returns:

whether the current dataset says that we reached the end.

Return type:

bool

get_dataset_name()[source]#
Returns:

current dataset name, e.g. “train” or “dev”

Return type:

str

get_complete_frac()[source]#
Returns:

by how much we are through the current dataset, number between 0 and 1, for visual feedback

Return type:

float

class returnn.tf.data_pipeline.FeedDictDataProvider(*, dataset: Dataset, batches: BatchSetGenerator, enforce_min_len1: bool = False, capacity: int = 10, batch_slice: slice | None = None, **kwargs)[source]#

This class will fill all the placeholders used for training or forwarding or evaluation etc. of a returnn.tf.network.TFNetwork.

It will run a background thread which reads the data from a dataset and puts it into a queue when you call start_threads().

In principle, this class can also be used independently of TensorFlow.

Parameters:
  • dataset

  • batches

  • enforce_min_len1

  • capacity

  • batch_slice – select a subset of the batches

  • extern_data

  • data_keys

start_threads(session)[source]#

Start the thread.

Parameters:

session (tf.compat.v1.Session) –

stop_threads()[source]#

Stop the thread.

get_next_batch(*, consider_batch_slice: bool = False) Dict[str, ndarray] | None[source]#

This assumes that we have more data, i.e. self.batches.has_more().

The code is basically independent of TF now. self.extern_data can be any TensorDict.

Parameters:

consider_batch_slice

Returns:

batch-data-value-dict or None. if not consider_batch_slice, will never be None

have_more_data(session)[source]#
Parameters:

session (tf.compat.v1.Session|None) –

Return type:

bool

Returns:

when we go through an epoch and finished reading, this will return False

If this returns True, you can definitely read another item from the queue. Threading safety: This assumes that there is no other consumer thread for the queue.

get_feed_dict(single_threaded=False)[source]#

Gets the feed dict for TF session run(). Note that this will block if there is nothing in the queue. The queue gets filled by the other thread, via self.thread_main().

Parameters:

single_threaded (bool) – whether to not use the queue

Returns:

we dequeue one batch from the queue and provide it for all placeholders of our external data, and additionally return some meta information.

Return type:

(dict[tf.Tensor,numpy.ndarray],dict[str])

get_dataset_name()[source]#
Return type:

str

have_reached_end()[source]#
Return type:

bool

get_complete_frac()[source]#
Return type:

float

class returnn.tf.data_pipeline.InputContext(parent, extern_data, config, dataset_name, returnn_dataset, iterator)[source]#

This object will be passed to the dataset pipeline function (dataset_pipeline in the config) and provides all relevant information, functions, dataset transformations.

The initial design of this class was discussed here: https://github.com/rwth-i6/returnn/issues/292

Parameters:
get_returnn_dataset(**kwargs)[source]#
Returns:

The RETURNN Dataset instances wrapped in a tf.data.Dataset. Note that in distributed TF, this dataset would only be used in the dataset loader worker. However, in all cases this will return some dataset. You are not allowed to read from it, though. A follow-up call to map_producer_to_consumer() will take care of this logic.

Return type:

tensorflow.data.Dataset

get_default_max_seqs()[source]#
Returns:

batch size in number of seqs, used e.g. for padded_batch

Return type:

int

padded_batch_dataset(dataset, drop_remainder=False)[source]#
Parameters:
  • dataset (tensorflow.data.Dataset) –

  • drop_remainder (bool) – if True, we would have a static batch size

Return type:

tensorflow.data.Dataset

map_producer_to_consumer(dataset)[source]#
Parameters:

dataset (tensorflow.data.Dataset) –

Return type:

tensorflow.data.Dataset

get_consumer_device()[source]#
Returns:

e.g. “/device:GPU:0”

Return type:

str

prefetch_to_consumer_device(dataset)[source]#

This must be called on the consumer (trainer) worker, i.e. after map_producer_to_consumer().

Parameters:

dataset (tensorflow.data.Dataset) –

Return type:

tensorflow.data.Dataset

get_dataset_name()[source]#
Returns:

e.g. “train” or “dev”

Return type:

str

make_iterator_initializer(iterator)[source]#
Parameters:

iterator (tensorflow.data.Iterator) –

Return type:

tf.Operation

class returnn.tf.data_pipeline.DatasetDataProvider(extern_data, config, datasets=None)[source]#

Use a tf.data.Dataset as input. This will be used if dataset_pipeline is set in the config. See the discussion about the new dataset pipeline (https://github.com/rwth-i6/returnn/issues/292).

Note that this has also a state: the current active dataset.

Parameters:
set_current_dataset(dataset_name)[source]#
Parameters:

dataset_name (str) –

start_threads(session)[source]#

Start background threads.

Currently this wil not actually start the background threads. All/any background threads of tf.data are started automatically when needed.

However, this will initialize the TF dataset iterator.

Parameters:

session (tf.compat.v1.Session) –

stop_threads()[source]#

Stop background threads (e.g. prefetching). (Currently a no-op.)

have_more_data(session)[source]#
Parameters:

session (tf.compat.v1.Session) –

Returns:

whether the next session.run() can run in the current epoch & dataset

Return type:

bool

get_feed_dict(single_threaded=False)[source]#
Parameters:

single_threaded (bool) – whether to not use the queue (arg name is slightly misleading)

Returns:

batch,meta

Return type:

dict[tf.Tensor,tf.Tensor],dict[str]

have_reached_end()[source]#
Return type:

bool

get_dataset_name()[source]#
Returns:

current dataset name, e.g. “train” or “dev”

Return type:

str

get_complete_frac()[source]#
Returns:

by how much we are through the current dataset, number between 0 and 1, for visual feedback

Return type:

float