# TFDataPipeline¶

## TensorFlow data pipeline¶

This module covers all related code to handle the data loading, preprocessing, chunking, batching and related things in TensorFlow, i.e. the TensorFlow data pipeline from the Dataset.

Some related documents:

### Data shuffling¶

1. The sequence shuffling is implemented as part of the Dataset, although we could also use a tf.RandomShuffleQueue on sequence level for training.
2. Chunk shuffling can be done with another tf.RandomShuffleQueue for training.
3. Frame shuffling only makes sense for non-recurrent networks. It also only makes sense if we already did any windowing beforehand. It also only makes sense in training. In that case, we could do chunking just with chunk size 1 and chunk step 1, or maybe chunk size = context window size, and then the frame shuffling is just chunk shuffling, thus we do not need any separate frame shuffling logic.

### Generic pipeline¶

1. We initialize the Dataset for some epoch. The Dataset could shuffle the sequence order based on the epoch. Then we iterate over the sequences/entries of the dataset (seq_idx), starting with seq_idx = 0, checking Dataset.is_less_than_num_seqs. We get the data for each data key (e.g. “data” or “classes”) as Numpy arrays with any shape. They don’t need to have the same number of time frames, although in the common case where we have a frame-wise alignment of class labels, they would have. In any case, we basically have dict[str,numpy.ndarray], the seq_idx and also a seq_tag.
2. We could implement another sequence shuffling with tf.RandomShuffleQueue in training.
3. We do chunking of the data of each sequence, i.e. selecting chunks of chunk size frames, iterating over the frames of a sequence with stride = chunk step. While doing chunking, we could add more context around each chunk (zero padding at the borders) if needed, e.g. if we use windowing or convolution with padding=”valid”, such that the output of the net will match that of the targets. However, this depends on where the data is used in the network; maybe it is used at multiple points?
4. We can do chunk shuffling with another tf.RandomShuffleQueue in training.
5. We build up batches from the chunks.

### First the simple method via feed_dict and placeholders¶

The input data which (and optionally the targets) can be represented with tf.placeholder and feed via feed_dict from tf.Session.run which does one train/eval/forward step. In this case, any preprocessing such as chunking and batching must be done beforehand via Numpy. This was the initial implementation and is also the standard implementation for the Theano backend.

This is not optimal because the tf.Session.run first has to copy the data from CPU to GPU and then can do whatever it is supposed to do (e.g. one train step). So copying and then the calculation is done in serial but it could be done in parallel with some other method which we will discuss below. Also the preprocessing could involve some more complex operations which could be slow with Python + Numpy. Also the chunk shuffling is more difficult to implement and would be slower compared to a pure TF solution.

### Pipeline implementation¶

1. One thread which goes over the Dataset. No need for different training/eval queue, no random-shuffle-queue, seq-shuffling is done by the Dataset. Here we can also handle the logic to add the context_size padding to the input. Maybe use Dataset.iterate_seqs which gets us the offsets for each chunk. We can then just add the context_size to each. After that, chunking can be done (can be done in the same thread just at the final step).
2. Another thread TFBatchingQueue, which collects seqs or chunks and prepares batches.

It depends on whether the full network is recurrent or not.

class TFDataPipeline.PipeBase[source]
have_data_for_dequeue()[source]
Returns: if we can dequeue from us now without blocking bool
have_incoming_data(dep_pipe_connector)[source]
Parameters: dep_pipe_connector (PipeConnectorBase) – will queue data to us whether we have now or in the future data ready for dequeue bool
class TFDataPipeline.PipeConnectorBase[source]
is_running()[source]

E.g. for pipe_in/pipe_out model: If the pipe_in has data, we increase our counter by 1, then dequeue from pipe_in, do sth and queue to pipe_out, and only then decrease the counter again. Thus, if we return False, we have ensured that the pipe_out already has the data, or there is no data anymore. If we return True, we will ensure that we will push more data to pipe_out at some point.

Returns: counter > 0 bool
class TFDataPipeline.DatasetReader(extern_data, dataset, coord, feed_callback, with_seq_tag=False, with_seq_idx=False, with_epoch_end=False)[source]

Reads from Dataset into a queue.

Parameters: extern_data (ExternData) – dataset (Dataset) – coord (tf.train.Coordinator) – feed_callback ((dict[str,numpy.ndarray|str|int])->None) – with_seq_tag (bool) – with_seq_idx (bool) – with_epoch_end (bool) –
SpecialKeys = ('seq_tag', 'seq_idx', 'epoch_end')[source]
get_dtype_for_key(key)[source]
Parameters: key (str) – str
get_shape_for_key(key)[source]
Parameters: key (str) – shape without batch-dim tuple[int | None]
get_queue_kwargs()[source]
loop()[source]
is_running()[source]
class TFDataPipeline.MakePlaceholders(data_keys, extern_data, with_batch)[source]
Parameters: data_keys (list[str]) – extern_data (ExternData) – with_batch (bool) –
data_placeholders()[source]
feed_dict(d)[source]
Parameters: d (dict[str,numpy.ndarray|str|int]) – keys replaced by placeholders dict[tf.placeholder,numpy.ndarray|str|int]
class TFDataPipeline.TFDataQueues(extern_data, capacity=100, seed=1, with_batch=False, enqueue_data=None)[source]

Generic queues which differ between train/eval queues.

Parameters: extern_data (ExternData) – this specifies the data keys capacity (int) – seed (int) – with_batch (bool) – whether we have the batch-dim in input/output enqueue_data (dict[str,tf.Tensor]) – if provided, will be the input
make_dequeue_op()[source]
have_more(tf_session)[source]
Parameters: tf_session (tf.Session) –
have_data_for_dequeue()[source]
one_more_enqueue_is_enough()[source]
enqueue(tf_session, data=None)[source]
Parameters: tf_session (tf.Session) – data (dict[str,numpy.ndarray]|None) – needed iff self.with_feed_input
class TFDataPipeline.TFChunkingQueueRunner(extern_data, make_dequeue_op, target_queue, chunk_size=None, chunk_step=None, context_window=None, source_has_epoch_end_signal=False)[source]

Implements chunking in pure TF. I.e. we get full sequences of varying lengths as input (from a queue), and we go over it with stride = chunk step, and extract a window of chunk size at each position, which we feed into the target queue. Optionally, for each chunk, we can add more frames (context window) around the chunk.

Parameters: extern_data (ExternData) – make_dequeue_op (()->dict[str,tf.Tensor]) – target_queue (tf.QueueBase) – chunk_size (int|None) – chunk_step (int|None) – context_window (int|NumbersDict|None) – source_has_epoch_end_signal (bool) –
is_running()[source]
class TFDataPipeline.TFBatchingQueue(data_queues, batch_size, max_seqs, capacity=10)[source]

Wrapper around tf.PaddingFIFOQueue with more control. Gets in data via TFDataQueues without batch-dim, and adds the batch-dim, according to batch_size and max_seqs. Output can be accessed via self.output_as_extern_data. This will represent the final output used by the network, controlled by QueueDataProvider.

Parameters: data_queues (TFDataQueues) – batch_size (int) – max_seqs (int) – capacity (int) –
loop(tf_session, coord)[source]
Parameters: tf_session (tf.Session) – coord (tf.train.Coordinator) –
class TFDataPipeline.QueueOutput[source]
get_data()[source]
Return type: dict[str,tf.Tensor]
have_data()[source]
class TFDataPipeline.CpuToDefaultDevStage(input_data, names, dtypes, extern_data, data_keys)[source]
Parameters: input_data (dict[str,tf.Tensor]) – names (list[str]) – data_keys + extra info dtypes (list[tf.DType|str]) – corresponds to names extern_data (ExternData) – data_keys (list[str]) –
loop(parent, coord, session)[source]
Parameters: parent (QueueDataProvider) – coord (tf.train.Coordinator) – session (tf.Session) –
class TFDataPipeline.DataProviderBase(extern_data, data_keys)[source]

Base class which wraps up the logic in this class. See derived classes.

Parameters: extern_data (ExternData) – data_keys (set(str)|None) –
start_threads()[source]
stop_threads()[source]
have_more_data(session)[source]

It is supposed to return True as long as we want to continue with the current epoch in the current dataset (train or eval). This is called from the same thread which runs the main computation graph (e.g. train steps).

Parameters: session (tf.Session) – whether the next session.run() can run in the current epoch & dataset bool
get_feed_dict(single_threaded=False)[source]

Gets the feed dict for TF session run(). Note that this will block if there is nothing in the queue. The queue gets filled by the other thread, via self.thread_main().

Parameters: single_threaded (bool) – whether to not use the queue we dequeue one batch from the queue and provide it for all placeholders of our external data dict[tf.Tensor,tf.Tensor]
have_reached_end()[source]
Returns: whether the current dataset says that we reached the end. bool
get_dataset_name()[source]
Returns: current dataset name, e.g. “train” or “dev” str
get_complete_frac()[source]
Returns: by how much we are through the current dataset, number between 0 and 1, for visual feedback float
class TFDataPipeline.FeedDictDataProvider(tf_session, dataset, batches, capacity=10, tf_queue=None, **kwargs)[source]

This class will fill all the placeholders used for training or forwarding or evaluation etc. of a TFNetwork.Network. It will run a background thread which reads the data from a dataset and puts it into a queue.

Parameters: tf_session (tf.Session|tf.InteractiveSession) – dataset (Dataset) – batches (BatchSetGenerator) – extern_data (ExternData) – data_keys (set(str)|None) – capacity (int) – tf_queue (TFDataQueues|None) –
start_threads()[source]
stop_threads()[source]
get_next_batch()[source]
thread_main()[source]
have_more_data(session)[source]
Returns: when we go through an epoch and finished reading, this will return False

If this returns True, you can definitely read another item from the queue. Threading safety: This assumes that there is no other consumer thread for the queue.

get_feed_dict(single_threaded=False)[source]

Gets the feed dict for TF session run(). Note that this will block if there is nothing in the queue. The queue gets filled by the other thread, via self.thread_main().

Parameters: single_threaded (bool) – whether to not use the queue we dequeue one batch from the queue and provide it for all placeholders of our external data dict[tf.Tensor,numpy.ndarray]
get_dataset_name()[source]
have_reached_end()[source]
get_complete_frac()[source]
class TFDataPipeline.QueueDataProvider(shuffle_train_seqs=False, **kwargs)[source]

This class is supposed to encapsulate all the logic of this module and to be used by the TF engine. It gets the train and dev dataset instances.

High-level (not differentiating between train/eval) queues: 1. sequence queue (filled by the data from Dataset) 2. chunk queue (filled by chunking, and maybe context window) 3. batch queue (constructed batches from the chunks) 4. staging area (e.g. copy to GPU)

Creates the queues and connector instances (which will be the queue runners). Thus this will be created in the current TF graph, and you need to create a new instance of this class for a new TF graph. This is also only intended to be recreated when we create a new TF graph, so all other things must be created while it exists.

get_feed_dict(single_threaded=False)[source]
have_more_data(session)[source]

It is supposed to return True as long as we want to continue with the current epoch in the current dataset (train or eval). We want to continue if we still can do a next self.final_stage.dequeue op with the current dataset. This is called from the same thread which runs the main computation graph (e.g. train steps), as well as from the final stage thread.

Parameters: session (tf.Session) – whether the next session.run() can run in the current epoch & dataset bool
have_reached_end()[source]
get_complete_frac()[source]
init_dataset(session, dataset, is_train_dataset)[source]
Parameters: session (tf.Session) – dataset (Dataset) – is_train_dataset (bool) –
get_dataset_name()[source]
get_threads()[source]
start_threads()[source]
stop_threads()[source]