Skip to main content

classy.data.data_modules

Functions

load_coordinates

def load_coordinates(
    coordinates_path: str,
    task: str,
) ‑> TrainCoordinates

Computes the train coordinates of a training process.

Args

coordinates_path: a path to - a file containing the training coordinates (check the documentation for more info) - a single file containing the whole dataset to be split in train / dev - directory containing two (three) files for train, validation (and test) task: one of the supported tasks in classy (e.g. sentence-pair)

Returns

train_coordinates (TrainCoordinates): the train_coordinates containing all the info on the datasets involved in the training.

path_if_exists

def path_if_exists(
    path: str,
    data_driver: DataDriver,
) ‑> Optional[str]

Classes

ClassyDataModule

class ClassyDataModule(pytorch_lightning.core.datamodule.LightningDataModule)

A DataModule standardizes the training, val, test splits, data preparation and transforms. The main advantage is consistent data splits, data preparation and transforms across models.

Example::

class MyDataModule(LightningDataModule): def __init__(self): super().__init__() def prepare_data(self): # download, split, etc... # only called on 1 GPU/TPU in distributed def setup(self, stage): # make assignments here (val/train/test split) # called on every process in DDP def train_dataloader(self): train_split = Dataset(...) return DataLoader(train_split) def val_dataloader(self): val_split = Dataset(...) return DataLoader(val_split) def test_dataloader(self): test_split = Dataset(...) return DataLoader(test_split) def teardown(self): # clean up after fit or test # called on every process in DDP

A DataModule implements 6 key methods:

  • prepare_data (things to do on 1 GPU/TPU not on every GPU/TPU in distributed mode).
  • setup (things to do on every accelerator in distributed mode).
  • train_dataloader the training dataloader.
  • val_dataloader the val dataloader(s).
  • test_dataloader the test dataloader(s).
  • teardown (things to do on every accelerator in distributed mode when finished)

This allows you to share a full dataset without explaining how to download, split, transform, and process the data

__init__

def __init__(
    task: str,
    dataset_path: str,
    dataset: omegaconf.dictconfig.DictConfig,
    validation_dataset: Optional[omegaconf.dictconfig.DictConfig] = None,
    test_dataset: Optional[omegaconf.dictconfig.DictConfig] = None,
    validation_split_size: Optional[float] = None,
    test_split_size: Optional[float] = None,
    max_nontrain_split_size: Optional[int] = None,
    shuffle_dataset: bool = True,
    external_vocabulary_path: Optional[str] = None,
)

build_vocabulary

def build_vocabulary(
    self,
) ‑> None

get_examples

def get_examples(
    self,
    n: int,
) ‑> Tuple[str, List[~T]]

prepare_data

def prepare_data(
    self,
) ‑> None

Use this to download and prepare data.

Warning: DO NOT set state to the model (use setup instead)

since this is NOT called on every GPU in DDP/TPU

Example::

def prepare_data(self): # good download_data() tokenize() etc()
# bad
self.split = data_split
self.some_state = some_other_state()

In DDP prepare_data can be called in two ways (using Trainer(prepare_data_per_node)):

  1. Once per node. This is the default and is only called on LOCAL_RANK=0.
  2. Once in total. Only called on GLOBAL_RANK=0.

Example::

# DEFAULT # called once per node on LOCAL_RANK=0 of that node Trainer(prepare_data_per_node=True)

call on GLOBAL_RANK=0 (great for shared file systems)

Trainer(prepare_data_per_node=False)

Note

Setting prepare_data_per_node with the trainer flag is deprecated and will be removed in v1.7.0. Please set prepare_data_per_node in LightningDataModule or LightningModule directly instead.

This is called before requesting the dataloaders:

.. code-block:: python

model.prepare_data() initialize_distributed() model.setup(stage) model.train_dataloader() model.val_dataloader() model.test_dataloader()

setup

def setup(
    self,
    stage: Optional[str] = None,
) ‑> None

Called at the beginning of fit (train + validate), validate, test, and predict. This is a good hook when you need to build models dynamically or adjust something about them. This hook is called on every process when using DDP.

Args
stage
either 'fit', 'validate', 'test', or 'predict'

Example::

class LitModel(...): def __init__(self): self.l1 = None
def prepare_data(self):
download_data()
tokenize()

# don't do this
self.something = else

def setup(stage):
data = Load_data(...)
self.l1 = nn.Linear(28, data.num_classes)

test_dataloader

def test_dataloader(
    self,
) ‑> Union[torch.utils.data.dataloader.DataLoader, List[torch.utils.data.dataloader.DataLoader], Dict[str, torch.utils.data.dataloader.DataLoader]]

Implement one or multiple PyTorch DataLoaders for testing.

The dataloader you return will not be reloaded unless you set :paramref:~pytorch_lightning.trainer.Trainer.reload_dataloaders_every_n_epochs to a postive integer.

For data processing use the following pattern:

- download in :meth:<code>prepare\_data</code> - process and split in :meth:<code>setup</code>

However, the above are only necessary for distributed processing.

Warning: do not assign state in prepare_data

  • :meth:~pytorch_lightning.trainer.Trainer.fit
  • :meth:prepare_data
  • :meth:setup
  • :meth:train_dataloader
  • :meth:val_dataloader
  • :meth:test_dataloader
Note

Lightning adds the correct sampler for distributed and arbitrary hardware. There is no need to set it yourself.

Return

A :class:torch.utils.data.DataLoader or a sequence of them specifying testing samples.

Example::

def test_dataloader(self): transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.5,), (1.0,))]) dataset = MNIST(root='/path/to/mnist/', train=False, transform=transform, download=True) loader = torch.utils.data.DataLoader( dataset=dataset, batch_size=self.batch_size, shuffle=False )
return loader

can also return multiple dataloaders

def test_dataloader(self): return [loader_a, loader_b, ..., loader_n]

Note

If you don't need a test dataset and a :meth:test_step, you don't need to implement this method.

Note

In the case where you return multiple test dataloaders, the :meth:test_stepwill have an argument dataloader_idx which matches the order here.

train_dataloader

def train_dataloader(
    self,
) ‑> Union[torch.utils.data.dataloader.DataLoader, List[torch.utils.data.dataloader.DataLoader], Dict[str, torch.utils.data.dataloader.DataLoader]]

Implement one or more PyTorch DataLoaders for training.

Return

A collection of :class:torch.utils.data.DataLoader specifying training samples. In the case of multiple dataloaders, please see this :ref:page <multiple-training-dataloaders>.

The dataloader you return will not be reloaded unless you set :paramref:~pytorch_lightning.trainer.Trainer.reload_dataloaders_every_n_epochs to a positive integer.

For data processing use the following pattern:

- download in :meth:<code>prepare\_data</code> - process and split in :meth:<code>setup</code>

However, the above are only necessary for distributed processing.

Warning: do not assign state in prepare_data

  • :meth:~pytorch_lightning.trainer.Trainer.fit
  • :meth:prepare_data
  • :meth:setup
  • :meth:train_dataloader
Note

Lightning adds the correct sampler for distributed and arbitrary hardware. There is no need to set it yourself.

Example::

# single dataloader def train_dataloader(self): transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.5,), (1.0,))]) dataset = MNIST(root='/path/to/mnist/', train=True, transform=transform, download=True) loader = torch.utils.data.DataLoader( dataset=dataset, batch_size=self.batch_size, shuffle=True ) return loader

multiple dataloaders, return as list

def train_dataloader(self): mnist = MNIST(...) cifar = CIFAR(...) mnist_loader = torch.utils.data.DataLoader( dataset=mnist, batch_size=self.batch_size, shuffle=True ) cifar_loader = torch.utils.data.DataLoader( dataset=cifar, batch_size=self.batch_size, shuffle=True )

# each batch will be a list of tensors: [batch_mnist, batch_cifar]
return [mnist_loader, cifar_loader]

multiple dataloader, return as dict

def train_dataloader(self): mnist = MNIST(...) cifar = CIFAR(...) mnist_loader = torch.utils.data.DataLoader( dataset=mnist, batch_size=self.batch_size, shuffle=True ) cifar_loader = torch.utils.data.DataLoader( dataset=cifar, batch_size=self.batch_size, shuffle=True )

# each batch will be a dict of tensors: {'mnist': batch_mnist, 'cifar': batch_cifar}
return {'mnist': mnist_loader, 'cifar': cifar_loader}

val_dataloader

def val_dataloader(
    self,
) ‑> Union[torch.utils.data.dataloader.DataLoader, List[torch.utils.data.dataloader.DataLoader], Dict[str, torch.utils.data.dataloader.DataLoader]]

Implement one or multiple PyTorch DataLoaders for validation.

The dataloader you return will not be reloaded unless you set :paramref:~pytorch_lightning.trainer.Trainer.reload_dataloaders_every_n_epochs to a positive integer.

It's recommended that all data downloads and preparation happen in :meth:prepare_data.

  • :meth:~pytorch_lightning.trainer.Trainer.fit
  • :meth:prepare_data
  • :meth:train_dataloader
  • :meth:val_dataloader
  • :meth:test_dataloader
Note

Lightning adds the correct sampler for distributed and arbitrary hardware There is no need to set it yourself.

Return

A :class:torch.utils.data.DataLoader or a sequence of them specifying validation samples.

Examples::

def val_dataloader(self): transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.5,), (1.0,))]) dataset = MNIST(root='/path/to/mnist/', train=False, transform=transform, download=True) loader = torch.utils.data.DataLoader( dataset=dataset, batch_size=self.batch_size, shuffle=False )
return loader

can also return multiple dataloaders

def val_dataloader(self): return [loader_a, loader_b, ..., loader_n]

Note

If you don't need a validation dataset and a :meth:validation_step, you don't need to implement this method.

Note

In the case where you return multiple validation dataloaders, the :meth:validation_stepwill have an argument dataloader_idx which matches the order here.

TrainCoordinates

class TrainCoordinates()

TrainCoordinates(main_file_extension: str, main_data_driver: classy.data.data_drivers.DataDriver, train_bundle: Dict[str, classy.data.data_drivers.DataDriver], validation_bundle: Union[Dict[str, classy.data.data_drivers.DataDriver], NoneType], test_bundle: Union[Dict[str, classy.data.data_drivers.DataDriver], NoneType])

__init__

def __init__(
    main_file_extension: str,
    main_data_driver: DataDriver,
    train_bundle: Dict[str, DataDriver],
    validation_bundle: Optional[Dict[str, DataDriver]],
    test_bundle: Optional[Dict[str, DataDriver]],
)