PyTorch : Parallélisme de données multi-GPU et multi-nœuds

Cette page explique comment distribuer un modèle neuronal artificiel implémenté dans un code PyTorch, selon la méthode du parallélisme de données.

Nous documentons ici la solution intégrée DistributedDataParallel, qui est la plus performante selon la documentation PyTorch. Il s'agit d'un parallélisme multi-process qui fonctionne aussi bien en mono-nœud qu'en multi-nœuds.

Configuration multi-process avec SLURM

Pour le multi-nœuds, il est nécessaire d'utiliser le multi-processing géré par SLURM (exécution via la commande SLURM srun). Pour le mono-nœud il est possible d'utiliser, comme la documentation PyTorch l'indique, torch.multiprocessing.spawn. Cependant il est possible et plus pratique d'utiliser le multi-processing SLURM dans tous les cas, en mono-nœud ou en multi-nœuds. C'est ce que nous documentons dans cette page.

Dans SLURM, lorsqu'on lance un script avec la commande srun, le script est automatiquement distribué sur toutes les tâches prédéfinies. Par exemple, si nous réservons 4 nœuds octo-GPU en demandant 3 GPU par nœud, nous obtenons :

  • 4 nœuds, indexés de 0 à 3
  • 3 GPU/nœud indexés de 0 à 2 sur chaque nœud,
  • 4 x 3 = 12 processus au total permettant d'exécuter 12 tâches avec les rangs de 0 à 11

Mutlti-process en SLURM

Illustration d'une réservation SLURM de 4 nœuds et 3 GPU par nœud, soit 12 processes.
Les communications collectives inter-nœuds sont gérées par la librairie NCCL.

Voici des exemples d'en-têtes de script SLURM pour Jean-Zay :

  • pour une réservation de N nœuds quadri-GPU via la partition gpu par défaut :
    #SBATCH --nodes=N            # nombre total de noeuds (N à définir)
    #SBATCH --ntasks-per-node=4  # nombre de tache par noeud (ici 4 taches soit 1 tache par GPU)
    #SBATCH --gres=gpu:4         # nombre de GPU reserve par noeud (ici 4 soit tous les GPU)
    #SBATCH --cpus-per-task=10   # nombre de coeurs par tache (donc 4x10 = 40 coeurs soit tous les coeurs)
    #SBATCH --hint=nomultithread


    Remarque : ici, les nœuds sont réservés en exclusivité. En particulier, cela nous donne accès à toute la mémoire de chaque nœud.

  • pour une réservation de N nœuds octo-GPU via la partition gpu_p2 :
    #SBATCH --partition=gpu_p2
    #SBATCH --nodes=N            # nombre total de noeuds (N à définir)
    #SBATCH --ntasks-per-node=8  # nombre de tache par noeud (ici 8 taches soit 1 tache par GPU)
    #SBATCH --gres=gpu:8         # nombre de GPU reserve par noeud (ici 8 soit tous les GPU)
    #SBATCH --cpus-per-task=3    # nombre de coeurs par tache (donc 8x3 = 24 coeurs soit tous les coeurs)
    #SBATCH --hint=nomultithread


    Remarque : ici, les nœuds sont réservés en exclusivité. En particulier, cela nous donne accès à toute la mémoire de chaque nœud.

Implémentation de la solution DistributedDataParallel

Pour implémenter la solution DistributedDataParallel en PyTorch, il faut:

  1. Définir les variables d'environnement liées au nœud maître.
    • MASTER_ADD, l'adresse IP ou le hostname du nœud correspondant à la tâche 0 (le premier de la liste des nœuds). Si on est en mono-nœud, la valeur localhost suffit.
    • MASTER_PORT, un numéro de port aléatoire. Pour éviter les conflits et par convention on utilisera un numéro de port compris entre 10001 et 20000 (par exemple 12345).
    • Sur Jean Zay, une librairie développée par l'IDRIS a été incluse dans les modules Pytorch pour définir automatiquement les variables MASTER_ADD et MASTER_PORT. Il suffit de l'importer dans votre script :
       import idr_torch 

      Cette commande suffit à créer les variables. Pour votre information, voici ce que contient le script appelé :

      idr_torch.py
      #!/usr/bin/env python
      # coding: utf-8
       
      import os
      import hostlist
       
      # get SLURM variables
      rank = int(os.environ['SLURM_PROCID'])
      local_rank = int(os.environ['SLURM_LOCALID'])
      size = int(os.environ['SLURM_NTASKS'])
      cpus_per_task = int(os.environ['SLURM_CPUS_PER_TASK'])
       
      # get node list from slurm
      hostnames = hostlist.expand_hostlist(os.environ['SLURM_JOB_NODELIST'])
       
      # get IDs of reserved GPU
      gpu_ids = os.environ['SLURM_STEP_GPUS'].split(",")
       
      # define MASTER_ADD & MASTER_PORT
      os.environ['MASTER_ADDR'] = hostnames[0]
      os.environ['MASTER_PORT'] = str(12345 + int(min(gpu_ids))) # to avoid port conflict on the same node

      Remarque : Le module idr_torch récupérant les valeurs de variables d'environnement, vous pouvez les réutiliser dans votre script en appelant idr_torch.rank, idr_torch.local_rank, idr_torch.size et/ou idr_torch.cpus_per_task.

  2. Initialiser le process group (i.e. le nombre de processus, le protocole de communications collectives ou backend, …). Les backends possibles sont NCCL, GLOO et MPI. NCCL est doublement conseillé pour la performance et la garantie de bon fonctionnement sur Jean Zay.
    import torch.distributed as dist
    from torch.nn.parallel import DistributedDataParallel as DDP
     
    dist.init_process_group(backend='nccl', 
                            init_method='env://', 
                            world_size=idr_torch.size, 
                            rank=idr_torch.rank)
  3. Envoyer le modèle sur le GPU. Notez que local_rank (numérotation 0, 1, 2, … pour chaque nœud) sert d'identifiant GPU.
    torch.cuda.set_device(idr_torch.local_rank)
    gpu = torch.device("cuda")
    model = model.to(gpu)
  4. Transformer le modèle en modèle distribué associé à un GPU.
    ddp_model = DDP(model, device_ids=[idr_torch.local_rank])
  5. Envoyer les sous-batches et les labels vers le GPU dédié, lors de l'apprentissage.
    for (images, labels) in train_loader:
        images = images.to(gpu, non_blocking=True)
        labels = labels.to(gpu, non_blocking=True)

    Remarque : ici, l'option non_blocking=True est nécessaire si le DataLoader utilise la fonctionnalité pin memory pour accélérer le chargement des entrées.

    Le code ci-dessous illustre l'utilisation du DataLoader avec un sampler adapté au parallélisme de données.

    batch_size = args.batch_size 
    batch_size_per_gpu = batch_size // idr_torch.size
    # define loss function (criterion) and optimizer
    criterion = nn.CrossEntropyLoss()  
    optimizer = torch.optim.SGD(ddp_model.parameters(), 1e-4)
     
    # Data loading code
    train_dataset = torchvision.datasets.MNIST(root=os.environ['DSDIR'],
                                                   train=True,
                                                   transform=transforms.ToTensor(),
                                                   download=False)
     
    train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset,
                                                                        num_replicas=idr_torch.size,
                                                                        rank=idr_torch.rank)
    train_loader = torch.utils.data.DataLoader(dataset=train_dataset,
                                                   batch_size=batch_size_per_gpu,
                                                   shuffle=False,
                                                   num_workers=0,
                                                   pin_memory=True,
                                                   sampler=train_sampler)

Sauvegarde et chargement de checkpoints

Il est possible de mettre en place des checkpoints lors d'un apprentissage distribué sur des GPU.

Sauvegarde

Le modèle étant répliqué sur chaque GPU, la sauvegarde de checkpoints peut être réalisée par un seul GPU pour limiter les opérations d'écriture. Par convention, on sollicite le GPU de rang 0 :

if idr_torch.rank == 0:
    torch.save(ddp_model.state_dict(), CHECKPOINT_PATH)

Ainsi, le checkpoint contiendra l'information issue du GPU de rang 0 qui est alors sauvegardée dans un format spécifique aux modèles distribués.

Chargement

Au début d'un apprentissage, le chargement d'un checkpoint est d'abord opéré par le CPU, puis l'information est envoyée sur le GPU.

Par défaut et par convention, cet envoi se fait vers l'emplacement mémoire qui a été utilisé lors de l'étape de sauvegarde : avec notre exemple, seul le GPU 0 chargerait le modèle en mémoire.

Pour que l'information soit communiquée à l'ensemble des GPUs, il faut utiliser l'argument map_location de la fonction de chargement torch.load pour rediriger le stockage en mémoire.

Dans l'exemple ci-dessous, l'argument map_location ordonne une redirection du stockage en mémoire vers le GPU de rang local. Cette fonction étant appelée par l'ensemble des GPU, chaque GPU charge bien le checkpoint dans sa propre mémoire :

map_location = {'cuda:%d' % 0: 'cuda:%d' % idr_torch.local_rank} # remap storage from GPU 0 to local GPU 
ddp_model.load_state_dict(torch.load(CHECKPOINT_PATH), map_location=map_location)) # load checkpoint

Remarque : si, comme dans le tutoriel PyTorch, un checkpoint est chargé juste après une sauvegarde, il est nécessaire d'appeler la méthode dist.barrier() avant le chargement. L'appel à dist.barrier() permet de synchroniser les GPUs, garantissant ainsi que la sauvegarde du checkpoint par le GPU de rang 0 est bien achevée avant que les autres GPUs tentent de le charger.

Validation distribuée

L'étape de validation exécutée après chaque epoch ou après un nombre fixé d'itérations d'apprentissage peut se distribuer sur tous les GPU engagés dans l'apprentissage du modèle. Lorsque le parallélisme de données est utilisé et que l'ensemble de données de validation est conséquent, cette solution de validation distribuée sur les GPU semble être la plus efficace et la plus rapide.

Ici, l'enjeu est de calculer les métriques (loss, accuracy, etc…) par batch et par GPU, puis de les pondérer et de les moyenner sur l'ensemble des données de validation.

Pour cela, il faut:

  1. charger les données de validation de la même manière que les données d'apprentissage, mais sans les transformations aléatoires comme la data augmentation ou le shuffling (voir la documentation sur le chargement de bases de données en PyTorch) :
    # validation dataset loading (imagenet for example)                
    val_dataset = torchvision.datasets.ImageNet(root=root,split='val',transform=val_transform)
     
    # define distributed sampler for validation                                    
    val_sampler = torch.utils.data.distributed.DistributedSampler(val_dataset,
                                                                  num_replicas=idr_torch.size,
                                                                  rank=idr_torch.rank,
                                                                  shuffle=False)
     
    # define dataloader for validation                                                              
    val_loader = torch.utils.data.DataLoader(dataset=val_dataset,
                                             batch_size=batch_size_per_gpu,                    
                                             shuffle=False,
                                             num_workers=4,
                                             pin_memory=True,
                                             sampler=val_sampler,
                                             prefetch_factor=2)
  2. basculer du mode “apprentissage” au mode “validation” pour désactiver certaines fonctionnalités propres à l'entraînement qui sont coûteuse et ici inutiles :
    • model.eval() pour basculer le modèle en mode “validation” et désactiver la gestion des dropout, des batchnorm, etc.
    • with torch.no_grad() pour ignorer le calcul du gradient
    • optionnellement, with autocast() pour utiliser l'AMP (mixed precision)
  3. évaluer le modèle et calculer la métrique par batch de la manière habituelle (ici, nous prenons l'exemple du calcul de la loss, cela sera la même chose pour d'autres métriques) :
    • outputs = model(val_images) suivi de loss = criterion(outputs, val_labels)
  4. pondérer et cumuler la métrique par GPU :
    • val_loss += loss * val_images.size(0) / N avec val_images.size(0) la taille du batch et N la taille globale du dataset de validation. Sachant que les batches n'ont pas nécessairement la même taille (dernier batch parfois plus petit), il est préférable d'utiliser ici la valeur val_images.size(0).
  5. sommer les moyennes pondérées de la métrique sur l'ensemble des GPU :
    • dist.all_reduce(val_loss, op=dist.ReduceOp.SUM) pour sommer les valeurs de la métrique calculées par GPU et communiquer le résultat à l'ensemble des GPU. Cette opération entraîne des communications inter-GPU.

Exemple après chargement des données de validation :

model.eval()                          # switch into validation mode
val_loss = torch.Tensor([0.]).to(gpu) # initialize val_loss value
N = len(val_dataset)                  # get validation dataset length
 
for val_images, val_labels in val_loader:              # loop over validation batches
 
   val_images = val_images.to(gpu, non_blocking=True)  # transfer images and labels to GPUs
   val_labels = val_labels.to(gpu, non_blocking=True) 
 
    with torch.no_grad():                          # deactivate gradient computation
        with autocast():                           # activate AMP
	    outputs = model(val_images)            # evaluate model
  	    loss = criterion(outputs, val_labels)  # compute loss
 
    val_loss += loss * val_images.size(0) / N      # cumulate weighted mean per GPU
 
dist.all_reduce(val_loss, op=dist.ReduceOp.SUM)          # sum weighted means and broadcast value to each GPU
 
model.train() # switch again into training mode

Exemple d'application

Exécution multi-GPU, multi-nœuds avec "DistributedDataParallel"

Un exemple se trouve dans $DSDIR/examples_IA/Torch_parallel/Example_DataParallelism_Pytorch.ipynb sur Jean-Zay, il utilise la base de données MNIST et un réseau dense simple. L'exemple est un Notebook qui permet de créer un script d'exécution.

Vous pouvez aussi télécharger le notebook en cliquant sur ce lien.

Il est à copier sur votre espace personnel (idéalement sur votre $WORK).

$ cp $DSDIR/examples_IA/Torch_parallel/Example_DataParallelism_PyTorch.ipynb $WORK

Vous devez ensuite exécuter le Notebook à partir d'une machine frontale de Jean Zay en chargeant préalablement un module PyTorch. Par exemple:

$ module load pytorch-gpu/py3/1.7.0
$ idrlab --notebook-dir=$WORK

Documentation et sources

Annexes

Sur Jean Zay, pour un model resnet 101, en fixant une taille de mini batch fixe (la taille globale du batch augmente avec le nombre de GPU impliqués), on obtient les vitesses d'apprentissage suivantes qui croîssent avec le nombre de GPU impliqués. Le protocole de communication NCCL est toujours plus performant que GLOO. La communication entre Octo-GPU apparaît plus lente qu'entre quadri-GPU.

Comparaison Gloo vs NCCL

Througputs en fonction du backend de communication lors du parallélisme de modèle en Pytorch

Pour NCCL, voici les temps moyens d'une itération de batch pour un certain nombre de GPU impliqué dans la distribution. Les différences de temps correspondent au temps de synchronisation entre GPU.

 Temps iteration d'apprentissage

Temps moyens d'une itération d'apprentissage lors du parallélisme de modèle en Pytorch

⇐ Revenir à la page sur l'apprentissage distribué