returnn.tf.distributed#

This is for distributed TensorFlow support. For an overview of the terminology, the concepts and the technology, see here.

Distributed TensorFlow covers multiple levels of functionality:

  • Low level: TF server and client (TF session) connecting to the server via gRPC. ClusterSpec describes the collection of all servers.

  • High level via strategies (tf.distribute.Strategy). See the official guide. This can be used by the Keras API, by the Estimator API, and by a custom training loop.

  • Concepts and terminology.

RETURNN potentially could support all of this, although it will try to be very explicit about it. Currently we do not use the high level strategy concept but only the low level functionality. This implementation was originally discussed here.

This is also related to Horovod. Horovod and distributed TF are orthogonal to each other. They can both be mixed, or used independently.

This is also related to the dataset pipeline TFDataPipeline.

class returnn.tf.distributed.MPIClusterResolver[source]#

ClusterResolver for MPI. Distributed TF is in general totally independent of MPI. We only use MPI here to figure out the ClusterSpec. After this is set up, MPI will not be used anymore. TF itself will not make use of MPI; all communications are handled via gRPC. (Although Horovod would use MPI, but that is another topic.)

If you use Sun Grid Engine (SGE) / Oracle Grid Engine, with the parallel environment (PE) feature (doc) (i.e. the SGE job was started e.g. via: qsub -pe mpi 8), then you might run your sub processes (slots) via mpirun, e.g. like:

mpirun -np 8 -mca pml ob1 -mca btl ^openib python returnn/rnn.py ...

Open MPI knows about SGE and will correctly start subprocesses (for each PE slot) (potentially remotely). From the Open MPI doc:

Open MPI will automatically detect when it is running inside SGE and will just “do the Right Thing.” Specifically, if you execute an mpirun command in a SGE job, it will automatically use the SGE mechanisms to launch and kill processes. There is no need to specify what nodes to run on - Open MPI will obtain this information directly from SGE and default to a number of processes equal to the slot count specified.

SGE provides the PE_HOSTFILE env var points to a file which lists all hosts and number of slots per host. This would be available for the SGE main job process, i.e. where mpirun is run, but this might not be available on the subprocesses (slots) which are started by mpirun remotely.

Within such a MPI process, the only reliable way to get information about the other peer processes, we must use MPI functions. A straight-forward simple way for this is the mpi4py module. mpi4py can be mixed together with Horovod, so this is a sensible choice.

This is somewhat similar to SlurmClusterResolver. Also related: MPIClusterResolver PR. https://github.com/Peidong-Wang/Distributed-TensorFlow-Using-MPI/ https://stackoverflow.com/questions/10912793/how-are-mpi-processes-started

cluster_spec()[source]#

Retrieve the current state of the cluster and return a ClusterSpec.

Returns:

A ClusterSpec representing the state of the cluster at the moment this function is called.

Implementors of this function must take care in ensuring that the ClusterSpec returned is up-to-date at the time of calling this function. This usually means retrieving the information from the underlying cluster management system every time this function is invoked and reconstructing a cluster_spec, rather than attempting to cache anything.

master(task_type=None, task_id=None, rpc_layer=None)[source]#

Retrieves the name or URL of the session master.

Args:

task_type: (Optional) The type of the TensorFlow task of the master. task_id: (Optional) The index of the TensorFlow task of the master. rpc_layer: (Optional) The RPC protocol for the given cluster.

Returns:

The name or URL of the session master.

Implementors of this function must take care in ensuring that the master returned is up-to-date at the time to calling this function. This usually means retrieving the master every time this function is invoked.

num_accelerators(task_type=None, task_id=None, config_proto=None)[source]#

Returns the number of accelerator cores per worker.

This returns the number of accelerator cores (such as GPUs and TPUs) available per worker.

Optionally, we allow callers to specify the task_type, and task_id, for if they want to target a specific TensorFlow process to query the number of accelerators. This is to support heterogenous environments, where the number of accelerators cores per host is different.

Args:
task_type: (Optional) The type of the TensorFlow task of the machine we

want to query.

task_id: (Optional) The index of the TensorFlow task of the machine we

want to query.

config_proto: (Optional) Configuration for starting a new session to

query how many accelerator cores it has.

Returns:

A map of accelerator types to number of cores.

class returnn.tf.distributed.LocalOnlyClusterResolver[source]#

Cluster resolver for one local instance.

cluster_spec()[source]#
Return type:

ClusterSpec

master(task_type=None, task_id=None, rpc_layer=None)[source]#

Retrieves the name or URL of the session master.

Args:

task_type: (Optional) The type of the TensorFlow task of the master. task_id: (Optional) The index of the TensorFlow task of the master. rpc_layer: (Optional) The RPC protocol for the given cluster.

Returns:

The name or URL of the session master.

Implementors of this function must take care in ensuring that the master returned is up-to-date at the time to calling this function. This usually means retrieving the master every time this function is invoked.

class returnn.tf.distributed.ReturnnDefaultStrategy[source]#

RETURNN default strategy.

class returnn.tf.distributed.ReturnnDefaultStrategyExtended(container_strategy)[source]#

RETURNN default strategy extended.

returnn.tf.distributed.init_distributed_tf(config)[source]#

This is called early in startup of RETURNN.

Parameters:

config (Config) –

returnn.tf.distributed.is_enabled()[source]#
Return type:

bool

returnn.tf.distributed.get_session_target()[source]#

This would be called if you have a local custom graph in the current process (replica) and want to execute parts of it. This is e.g. the case for between-graph replication. After creating the graph, you would create a session which connects to the server returned by this function.

Returns:

URL of the TF server, where the local session should connect to

Return type:

str