Horovod : Parallélisme de données multi-GPU et multi-nœuds

Horovod est une brique logicielle permettant le parallélisme de données pour TensorFlow, Keras, PyTorch, et Apache MXNet. L'objectif d'Horovod est de rendre le code performant et facile à implémenter.

Dans les exemples de la communauté IA, Horovod est souvent utilisé avec Tensorflow pour facilter l'implémentation du parallélisme de données.

Horovod s'appuie sur les librairies de communication MPI et NCCL, ainsi que sur le réseau d'interconnexion Intel Omni-Path (OPA), pour échanger les données entre les équipements (nœuds, GPU, CPU).

Stack Horovod

Description de la stack Horovod.

Nous documentons ici les solutions Horovod sur Jean Zay en TensorFlow ou en Pytorch. Il s'agit d'un parallélisme multi-process qui fonctionne aussi bien en mono-nœud qu'en multi-nœuds.

Configuration multi-process avec SLURM

Un des avantages d'Horovod est qu'aucune configuration manuelle n'est nécessaire dans la description de l'environnement distribué. Horovod récupère directement les informations relatives aux GPU, aux machines disponibles ainsi qu'aux protocoles de communication, à partir de l'environnement machine. En particulier, la solution Horovod assure la portabilité de votre code.

Dans SLURM, lorsqu'on lance un script avec la commande srun, le script est automatiquement distribué sur toutes les tâches prédéfinies. Par exemple, si nous réservons 4 nœuds octo-GPU en demandant 3 GPU par nœud, nous obtenons :

  • 4 nœuds, indexés de 0 à 3
  • 3 GPU/nœud indexés de 0 à 2 sur chaque nœud,
  • 4 x 3 = 12 processus au total permettant d'exécuter 12 tâches avec les rangs de 0 à 11

Mutlti-process en SLURM

Illustration d'une réservation SLURM de 4 nœuds et 3 GPU par nœud, soit 12 processes.
Les communications collectives inter-nœuds sont gérées par la librairie NCCL.

Pour exécuter un code distribué avec Horovod sous l'environnement SLURM, il faut réserver une tâche par GPU impliqué dans le parallélisme de données.

Voici des exemples d'en-têtes de script SLURM permettant d'effectuer :

  • une réservation de N nœuds quadri-GPU via la partition gpu par défaut :
    #SBATCH --nodes=N            # nombre total de nœuds (N à définir)
    #SBATCH --ntasks-per-node=4  # nombre de tache par nœud (ici 4 soit 1 tache par GPU réservé)
    #SBATCH --gres=gpu:4         # nombre de GPU réservé par nœud (ici 4 soit tous les GPU d'un nœud)
    #SBATCH --cpus-per-task=10   # nœuds de cœurs par tache (ici on réserve 4x10 = 40 cœurs par nœud)
    #SBATCH --hint=nomultithread
     
    # execute script
    srun python script.py


    Remarque : ici, les nœuds sont réservés en exclusivité puisque nous réservons la totalité des cœurs et des GPUs. En particulier, cela nous donne accès à toute la mémoire de chaque nœud.

  • une réservation de N nœuds octo-GPU via la partition gpu_p2 :
    #SBATCH --partition=gpu_p2   # doit être spécifiée pour être utilisée
    #SBATCH --nodes=N            # nombre total de nœuds (N à définir)
    #SBATCH --ntasks-per-node=8  # nombre de tache par nœud (ici 8 soit 1 tache par GPU réservé)
    #SBATCH --gres=gpu:8         # nombre de GPU reserve par noeud (ici 8 soit tous les GPU d'un noeud)
    #SBATCH --cpus-per-task=3    # noeuds de coeurs par tache (ici on reserve 8x3 = 24 coeurs par noeud)
    #SBATCH --hint=nomultithread
     
    # execute script
    srun python script.py


    Remarque : ici, les nœuds sont réservés en exclusivité puisque nous réservons la totalité des cœurs et des GPUs. En particulier, cela nous donne accès à toute la mémoire de chaque nœud.

Implémentation de la solution Horovod

Il est possible d'utiliser Horovod dans les configurations logicielles suivantes :

Nous vous renvoyons vers la documentation Horovod pour les détails d'implémentation relatifs à chaque configuration logicielle.

Dans tous les cas, le code diffère légèrement mais les étapes de développement restent identiques. Il faut :

  1. Importer et initialiser Horovod.
  2. Rattacher chaque GPU à un processus distinct.
  3. Augmenter le learning rate proportionnellement au nombre de GPU pour compenser l'augmentation de la taille du batch.
  4. Distribuer l'optimiseur.
  5. S'assurer que les variables sont correctement répliquées sur tous les GPU en début d'apprentissage (et après l'initialisation du modèle).
  6. Sauvegarder des checkpoints uniquement à partir du processus de rang 0.

Par exemple, la solution Horovod implémentée dans un code TensorFlow 2 + Keras est illustrée dans le script suivant :

import tensorflow as tf
import horovod.tensorflow.keras as hvd
 
# Initialize Horovod
hvd.init()
 
# Pin GPU to be used to process local rank (one GPU per process)
gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
    tf.config.experimental.set_memory_growth(gpu, True)
if gpus:
    tf.config.experimental.set_visible_devices(gpus[hvd.local_rank()], 'GPU')
 
# Build model and dataset
dataset = ...
model = ...
opt = tf.optimizers.Adam(0.001 * hvd.size())
 
# Horovod: add Horovod DistributedOptimizer.
opt = hvd.DistributedOptimizer(opt)
 
# Horovod: Specify `experimental_run_tf_function=False` to ensure TensorFlow
# uses hvd.DistributedOptimizer() to compute gradients.
mnist_model.compile(loss=tf.losses.SparseCategoricalCrossentropy(),
                    optimizer=opt,
                    metrics=['accuracy'],
                    experimental_run_tf_function=False)
 
callbacks = [
    # Horovod: broadcast initial variable states from rank 0 to all other processes.
    # This is necessary to ensure consistent initialization of all workers when
    # training is started with random weights or restored from a checkpoint.
    hvd.callbacks.BroadcastGlobalVariablesCallback(0),
]
 
# Horovod: save checkpoints only on worker 0 to prevent other workers from corrupting them.
if hvd.rank() == 0:
    callbacks.append(keras.callbacks.ModelCheckpoint('./checkpoint-{epoch}.h5'))
 
model.fit(dataset,
          steps_per_epoch=500 // hvd.size(),
          callbacks=callbacks,
          epochs=24,
          verbose=1 if hvd.rank() == 0 else 0)

Validation distribuée

L'étape de validation exécutée après chaque epoch ou après un nombre fixé d'itérations d'apprentissage peut se distribuer sur tous les GPU engagés dans l'apprentissage du modèle. Lorsque le parallélisme de données est utilisé et que l'ensemble de données de validation est conséquent, cette solution de validation distribuée sur les GPU semble être la plus efficace et la plus rapide.

Ici, l'enjeu est de calculer les métriques (loss, accuracy, etc…) par batch et par GPU, puis de les pondérer et de les moyenner sur l'ensemble des données de validation.

Pour cela, il faut:

  1. charger les données de validation de la même manière que les données d'apprentissage, mais sans les transformations aléatoires comme la data augmentation ou le shuffling (voir la documentation sur le chargement de bases de données en TensorFlow) :
    # validation dataset loading (imagenet for example)                  
    val_dataset = tf.data.TFRecordDataset(glob.glob(os.environ['DSDIR']+'/imagenet/validation*'))
    # define distributed sharding for validation 
    val_dataset = val_dataset.shard(hvd.size(), hvd.rank())
    # define dataloader for validation
    val_dataset = val_dataset.map(tf_parse_eval, 
                num_parallel_calls=idr_tf.cpus_per_task).batch(batch_size).prefetch(buffer_size=5)
  2. basculer du mode “apprentissage” au mode “validation” pour désactiver certaines fonctionnalités propres à l'entraînement qui sont coûteuse et ici inutiles :
    • model(images, training=False) lors de l'évaluation du modèle, pour basculer le modèle en mode “validation” et désactiver la gestion des dropout, des batchnorm, etc.
    • Sans tf.GradientTape() pour ignorer le calcul du gradient
  3. évaluer le modèle et calculer la métrique par batch de la manière habituelle (ici, nous prenons l'exemple du calcul de la loss, cela sera la même chose pour d'autres métriques) :
    • logits_ = model(images, training=False) suivi de loss_value = loss(labels, logits_)
  4. pondérer et cumuler la métrique par GPU :
    • val_loss.update_state(loss_value, sample_weight=images.shape[0]) avec images.shape[0]) la taille du batch. Sachant que les batches n'ont pas nécessairement la même taille (dernier batch parfois plus petit), il est préférable d'utiliser ici la valeur images.shape[0].
  5. Moyenner les moyennes pondérées de la métrique sur l'ensemble des GPU :
    • hvd.allreduce(val_loss.result()) pour moyenner les valeurs de la métrique calculées par GPU et communiquer le résultat à l'ensemble des GPU. Cette opération entraîne des communications inter-GPU. Horovod utilise la moyenne comme opération allReduce par défaut.

Exemple après chargement des données de validation :

loss = tf.losses.SparseCategoricalCrossentropy() # Define loss function
 
val_loss = tf.keras.metrics.Mean()               # Define Keras metrics 
 
@tf.function                                     # Define validation step function
def eval_step(images, labels):
    logits_ = model(images, training=False)      # evaluate model - switch into validation mode     
    loss_value = loss(labels, logits_)           # compute loss
    val_loss.update_state(loss_value, sample_weight=images.shape[0]) # cumulate weighted mean per GP        
 
val_loss.reset_states()                          # initialize val_loss value
 
for val_images, val_labels in val_loader:
    eval_step(val_images, val_labels)            # Call eval_step function
 
val_loss_dist = hvd.allreduce(val_loss.result()) # Average weighted means and broadcast value to each GPU

Exemple d'application

Un exemple se trouve dans $DSDIR/examples_IA/Horovod_parallel/Example_DataParallelism_Horovod_Tensorflow.ipynb sur Jean-Zay, il utilise la base de données MNIST et un réseau dense simple. L'exemple est un Notebook qui permet de créer un script d'exécution.

Vous pouvez aussi télécharger le notebook en cliquant sur ce lien.

Il est à copier sur votre espace personnel (idéalement sur votre $WORK).

$ cp $DSDIR/examples_IA/Horovod_parallel/Example_DataParallelism_Horovod_Tensorflow.ipynb $WORK

Vous devez ensuite exécuter le Notebook à partir d'une machine frontale de Jean Zay en chargeant préalablement un module TensorFlow. Par exemple:

$ module load tensorflow-gpu/py3/2.3.1
$ idrlab --notebook-dir=$WORK

Documentation et sources