returnn.datasets.distrib_files

DistributeFilesDataset

https://github.com/rwth-i6/returnn/issues/1519

class returnn.datasets.distrib_files.DistributeFilesDataset(*, files: List[str | Tuple[str | Tuple[FileTree, ...] | Dict[Any, FileTree] | List[FileTree], ...] | Dict[Any, str | Tuple[FileTree, ...] | Dict[Any, FileTree] | List[FileTree]] | List[str | Tuple[FileTree, ...] | Dict[Any, FileTree] | List[FileTree]]] | PathLike | Callable[[], List[str | Tuple[str | Tuple[FileTree, ...] | Dict[Any, FileTree] | List[FileTree], ...] | Dict[Any, str | Tuple[FileTree, ...] | Dict[Any, FileTree] | List[FileTree]] | List[str | Tuple[FileTree, ...] | Dict[Any, FileTree] | List[FileTree]]]], get_sub_epoch_dataset: Callable[[List[str | Tuple[str | Tuple[FileTree, ...] | Dict[Any, FileTree] | List[FileTree], ...] | Dict[Any, str | Tuple[FileTree, ...] | Dict[Any, FileTree] | List[FileTree]] | List[str | Tuple[FileTree, ...] | Dict[Any, FileTree] | List[FileTree]]]], Dict[str, Any]], preload_next_n_sub_epochs: int = 1, buffer_size: int = 1, distrib_shard_files: bool = False, sharding_fix: bool | None = None, _meta_info_cache: Dict[str, Any] | None = None, _distrib_info: Dict[str, int] | None = None, **kwargs)[source]

Dataset that distributes files over subepochs and then creates a sub dataset for every sub epoch for a given (random) subset of the files. The sub dataset is user-defined via a function get_sub_epoch_dataset. Thus, this dataset wraps the sub datasets.

It is conceptually very similar to ConcatDataset in the sense that it concatenates all the sub datasets together to form one larger dataset.

This scheme allows to shuffle over the files, which makes shuffling much more efficient over a large dataset at the cost of no longer shuffling over the full dataset in every subepoch. Instead, the quality of the shuffle depends on the number of files the dataset is split into – the more files per subepoch, the better.

Additionally, this scheme allows to prefetch and cache the upcoming needed files, e.g. copying them from a NFS to local disk, when the local disk can not store the whole dataset, and/or when the local disk would allow for faster access. For that, the user can use returnn.util.file_cache.CachedFile inside the returned dict from get_sub_epoch_dataset.

It was also designed with multi-GPU training in mind, where each GPU should get a set of files to work on, and the sub epochs across the GPUs would not be exactly of the same length. Whenever one GPU finishes its sub epoch, all remaining data on the other GPUs is dropped. First, our scheme tries to make the length of the sub epochs as equally distributed as possible after random shuffling by using the size of the files as proxy for the length of the contents. Further, due to the random shuffling, there should not be any bias in the data distribution. Specifically, we don’t want that some data might be visited more often than others (at least its expected value should be the same).

In case the dataset grows so large it is unreasonable to expect one worker to ever see all the data, this dataset can also shard the file list on a per-worker basis before distributing across subepochs. This behavior can be configured by setting the property "distrib_shard_files": True. The dataset attempts to split the files as evenly as possible based on the file size.

Example usage:

def get_sub_epoch_dataset(files_subepoch: List[str]) -> Dict[str, Any]:
  from returnn.util.file_cache import CachedFile
  return {
    "class": "HDFDataset",
    "files": [CachedFile(fn) for fn in files_subepoch],
  }

train = {
  "class": "DistributeFilesDataset",
  "files": [
    "/nfs/big_data_1.hdf",
    ...
  ],  # M files
  "get_sub_epoch_dataset": get_sub_epoch_dataset,
  "partition_epoch": P,  # P << M
}

Instead of a plain list of strings, you can also provide a list of any nested structures to keep multimodal data together:

def get_sub_epoch_dataset(files_subepoch: List[Tuple[str, str]]) -> Dict[str, Any]:
  from returnn.util.file_cache import CachedFile

  alignments, features = tuple(zip(*files_subepoch)) # transpose

  return {
    "class": "MetaDataset",
    "data_map": {"classes": ("alignments", "data"), "data": ("features", "data")},
    "datasets": {
      "alignments": {
        "class": "HDFDataset",
        "files": alignments,
        "seq_ordering": "random",
      },
      "features": {
        "class": "HDFDataset",
        "files": features,
      },
    },
    "seq_order_control_dataset": "alignments",
  }

train = {
  "class": "DistributeFilesDataset",
  "files": [
    ("/nfs/alignment_1.hdf", "/nfs/features_1.hdf"),
    ...
  ],  # M entries
  "get_sub_epoch_dataset": get_sub_epoch_dataset,
  "partition_epoch": P,  # P << M
}

In this case the file sizes for sub epoch distribution are summed up per list entry by iterating over the structure leaves.

For some discussion, see https://github.com/rwth-i6/returnn/issues/1519 and https://github.com/rwth-i6/returnn/issues/1524.

Parameters:
  • files – the files to shuffle over, can also be a list of arbitrarily nested python objects to keep associated heterogeneous data together. When the list grows too large to be serialized into a RETURNN config, the list of files can also be specified as a path to a .txt file containing one file per line, or a python file containing the repr of a list of arbitrarily nested python objects, or a JSON file containing a list of arbitarily nested (JSON) objects. It can also be a callable which returns such a list.

  • get_sub_epoch_dataset – callable which returns a dataset dict for a given subset of files

  • preload_next_n_sub_epochs – how many sub epoch datasets to preload

  • buffer_size – buffer size for each worker, number of seqs to prefetch

  • distrib_shard_files – shard the data across worker processes in distributed training scenaria

  • sharding_fix – whether the sub-epoch dataset must NOT shard its seq order again when this dataset is sharded (any _num_shards > 1, e.g. via distrib_shard_files). Without the fix, the sub-epoch dataset inherits _num_shards/_shard_index and shards again, on top of the file-level sharding, so every rank consumes only 1/num_shards of its own file shard – usually NOT what you want (silent data loss), see https://github.com/rwth-i6/returnn/issues/1738. None (default): fixed behavior iff behavior_version >= 26 (else legacy, with a warning). True: fixed behavior, independent of the behavior version. Use this to be safe: an older RETURNN without this option fails loudly instead of silently doing the wrong thing. False: keep the legacy behavior and silence the warning.

  • _meta_info_cache – for internal use

  • _distrib_info – for internal use

initialize()[source]

init

supports_sharding() bool[source]
Returns:

whether the dataset supports sharding based on the seq_order

init_seq_order(epoch: int | None = None, seq_list=None, seq_order=None) bool[source]
Parameters:
  • epoch

  • seq_list

  • seq_order

Returns:

whether the order changed (True is always safe to return)

have_seqs() bool[source]

have seqs

finish_epoch(*, free_resources: bool = False)[source]

finish epoch

get_data_keys() List[str][source]

data keys

get_all_tags() List[str][source]

get all tags

get_total_num_seqs(*, fast: bool = False) int[source]

get total num seqs