
<p> <center> <a href="../Start_Here.ipynb">Home Page</a> </center> </p>

 
<div>
    <span style="float: left; width: 33%; text-align: left;"><a href="2.1.System-Topology.ipynb">Previous Notebook</a></span>
    <span style="float: left; width: 33%; text-align: center;">
        <a href="1.Introduction-to-Distributed-Deep-Learning.ipynb">1</a>
        <a href="2.1.System-Topology.ipynb">2</a>
        <a >3</a>
        <a href="4.Convergence.ipynb">4</a>
    </span>
    <span style="float: left; width: 33%; text-align: right;"><a href="4.Convergence.ipynb">Next Notebook</a></span>
</div>

# Introduction to Distributed Deep Learning - Part 3

**Contents of this notebook:**

- [Hands-on with Distributed training](#Hands-on-with-Distributed-training)
    - [Tensorflow - Keras](#Tensorflow---Keras)
    - [Horovod](#Horovod)

**By the End of this Notebook you will:**

- Implement distributed deep learning training using Tensorflow / Keras.
- Learn concepts of Horovod and implement them.

# Hands-on with Distributed training

In this notebook, we will be focusing on **training** Deep Neural networks using **multiple GPUs** using **Data-parallelism**. We will be using the following frameworks for demonstration. 

- [Tensorflow - Keras](#Tensorflow---Keras)
- [Horovod](#Horovod)

In both cases, we will use **Single Host Multiple Device** implementations, although notes on **Multi-host multiple devices** will be present at relevant places.

You can use either / both of the methods that we mention below. We recommend trying out both of them.

## Tensorflow - Keras


Tensorflow uses `tf.distribute.Strategy` API to distribute training across multiple GPUs, multiple machines or TPUs. Using this API, you can distribute your existing models and training code with minimal code changes. `tf.distribute.Strategy` can be used with a high-level API like **Keras**, and can also be used to distribute custom training loops.

Tensorflow Strategies can be briefly summarised into two axes : 

- **Synchronous vs asynchronous** training: 
    - In sync training, all workers train over different slices of input data in sync and aggregate gradients at each step. 
    - In async training, all workers are independently training over the input data and updating variables asynchronously. 



- **Hardware platform**: You may want to scale your training onto multiple GPUs on one machine, or multiple machines in a network (with 0 or more GPUs each), or on Cloud TPUs.

In order to support these use cases, there are 6 strategies available. Here is a short description of the strategies :

- **MirroredStrategy**
    - `tf.distribute.MirroredStrategy` supports synchronous distributed training on multiple GPUs on one machine. It creates one replica per GPU device. Each variable in the model is mirrored across all the replicas. Together, these variables form a single conceptual variable called MirroredVariable. These variables are kept in sync with each other by applying identical updates.
- **MultiWorkerMirroredStrategy**
    - `tf.distribute.MultiWorkerMirroredStrategy` is very similar to MirroredStrategy. It implements synchronous distributed training across multiple workers, each with potentially multiple GPUs. Similar to `tf.distribute.MirroredStrategy`, it creates copies of all variables in the model on each device across all workers.
    - **Note** : For multi-worker training, you need to set up the `TF_CONFIG` environment variable for each binary running in your cluster. The `TF_CONFIG` environment variable is a JSON string which specifies what tasks constitute a cluster, their addresses and each task's role in the cluster.
- **TPUStrategy**
    - `tf.distribute.TPUStrategy` lets you run your TensorFlow training on Tensor Processing Units (TPUs).
- **CentralStorageStrategy**
    - `tf.distribute.experimental.CentralStorageStrategy` does synchronous training as well. Variables are not mirrored, instead they are placed on the CPU and operations are replicated across all local GPUs. If there is only one GPU, all variables and operations will be placed on that GPU.
- **OneDeviceStrategy**
    - `tf.distribute.OneDeviceStrategy` is a strategy to place all variables and computation on a single specified device.
- **ParameterServerStrategy**
    - Parameter server training is a common data-parallel method to scale up model training on multiple machines. A parameter server training cluster consists of workers and parameter servers. Variables are created on parameter servers and they are read and updated by workers in each step.


Tensorflow by default uses [NCCL](https://developer.nvidia.com/nccl) for communication between GPUs. Kindly refer to the [System Topology Notebook](#as) to learn more about NCCL.

Now that we've given an overview of Tensorflow and the available functionalities let us try out an example using **MirroredStrategy**. 

We will be building a small CNN and train it on the **FMNIST dataset**.

In [None]:
#Import Necessary libraries
import tensorflow_datasets as tfds
import tensorflow as tf
from tensorflow import keras
import time
import os
import sys


# Set number of GPUs to use for training
os.environ["CUDA_VISIBLE_DEVICES"]="0,1,2,3,4,5,6,7"

#Print Tensorflow version
print(tf.__version__)

# Download the MNIST dataset and load it from [TensorFlow Datasets](https://www.tensorflow.org/datasets). This returns a dataset in `tf.data` format.

# Setting `with_info` to `True` includes the metadata for the entire dataset, which is being saved here to `info`.
# Among other things, this metadata object includes the number of train and test examples. 
datasets, info = tfds.load(name='mnist', with_info=True, as_supervised=True)

mnist_train, mnist_test = datasets['train'], datasets['test']

Let us now define a distribution strategy.

In [None]:
### Define distribution strategy
# Create a `MirroredStrategy` object. This will handle distribution, 
# and provides a context manager (`tf.distribute.MirroredStrategy.scope`) to build your model inside.
strategy = tf.distribute.MirroredStrategy()
print('Number of devices: {}'.format(strategy.num_replicas_in_sync))

In [None]:
# Setup input pipeline
# When training a model with multiple GPUs, you can use the extra computing power effectively by increasing the batch size. In general, use the largest batch size that fits the GPU memory, and tune the learning rate accordingly.
num_train_examples = info.splits['train'].num_examples
num_test_examples = info.splits['test'].num_examples

BUFFER_SIZE = 10000

# Setting the batch size per GPU / replica
BATCH_SIZE_PER_REPLICA = 512
BATCH_SIZE = BATCH_SIZE_PER_REPLICA * strategy.num_replicas_in_sync


# Feature scaling function
def scale(image, label):
  image = tf.cast(image, tf.float32)
  image /= 255

  return image, label


# Apply this function to the training and test data, shuffle the training data, and batching it.
train_dataset = mnist_train.map(scale).cache().shuffle(BUFFER_SIZE).batch(BATCH_SIZE)
eval_dataset = mnist_test.map(scale).batch(BATCH_SIZE)

Let us now create and compile the model , note that we need to create and compile the model under the context of `strategy.scope`.

Let us also define callbacks to calculate Throughput obtained.

**Note : We use the same model used in Notebook-1 scaling efficiency experiment, that should help us calculate scaling efficiency from the available data**

In [None]:
with strategy.scope():
  model = tf.keras.Sequential([
          tf.keras.layers.Conv2D(32, [3, 3], activation='relu',input_shape=(28, 28, 1)),
          tf.keras.layers.Conv2D(64, [3, 3], activation='relu'),
          tf.keras.layers.MaxPooling2D(pool_size=(2, 2)),
          tf.keras.layers.Dropout(0.25),
          tf.keras.layers.Flatten(),
          tf.keras.layers.Dense(128, activation='relu'),
          tf.keras.layers.Dropout(0.5),
          tf.keras.layers.Dense(10, activation='softmax')])

  model.compile(loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
                optimizer=tf.keras.mixed_precision.LossScaleOptimizer(tf.keras.optimizers.Adam()),
                metrics=['accuracy'])

In [None]:
# ## Define the callbacks
# Callback for printing the LR at the end of each epoch.
class Throughput(tf.keras.callbacks.Callback):
    def __init__(self, total_images=0):
        self.total_images = total_images
    def on_epoch_begin(self, epoch, logs=None):
        self.epoch_start_time = time.time()
    def on_epoch_end(self, epoch, logs=None):
        epoch_time = time.time() - self.epoch_start_time
        print('Epoch time : {}'.format(epoch_time))
        images_per_sec = round(self.total_images / epoch_time, 2)
        print('Images/sec: {}'.format(images_per_sec))
        
# Now, train the model in the usual way, calling `fit` on the model and passing in the dataset created at the beginning of the tutorial. This step is the same whether you are distributing the training or not.
model.fit(train_dataset, epochs=8, callbacks=Throughput(total_images=len(mnist_train)))

## Result

Output of running the command on DGX-1 : 

```bash
Epoch 1/8
15/15 [==============================] - 23s 72ms/step - loss: 1.2977 - accuracy: 0.6140
Epoch time : 23.148940801620483
Images/sec: 2591.91
Epoch 2/8
15/15 [==============================] - 0s 19ms/step - loss: 0.4460 - accuracy: 0.8667
Epoch time : 0.31006669998168945
Images/sec: 193506.75
Epoch 3/8
15/15 [==============================] - 0s 16ms/step - loss: 0.2840 - accuracy: 0.9169
Epoch time : 0.2699730396270752
Images/sec: 222244.41
Epoch 4/8
15/15 [==============================] - 0s 18ms/step - loss: 0.1961 - accuracy: 0.9426
Epoch time : 0.2857208251953125
Images/sec: 209995.19
Epoch 5/8
15/15 [==============================] - 0s 17ms/step - loss: 0.1482 - accuracy: 0.9579
Epoch time : 0.27691197395324707
Images/sec: 216675.35
Epoch 6/8
15/15 [==============================] - 0s 19ms/step - loss: 0.1178 - accuracy: 0.9658
Epoch time : 0.3074929714202881
Images/sec: 195126.41
Epoch 7/8
15/15 [==============================] - 0s 26ms/step - loss: 0.0999 - accuracy: 0.9699
Epoch time : 0.43077993392944336
Images/sec: 139282.25
Epoch 8/8
15/15 [==============================] - 0s 17ms/step - loss: 0.0851 - accuracy: 0.9748
Epoch time : 0.2854182720184326
Images/sec: 210217.8
```

Kindly restart the kernel or run the following cell to restart the kernel to free up GPU memory before procedding to the further sections.

In [None]:
import IPython
app = IPython.Application.instance()
app.kernel.do_shutdown(True) 

## Horovod

Horovod is a distributed deep learning training framework. It is available for TensorFlow, Keras, PyTorch, and Apache MXNet.

Horovod is an open-source tool initially developed by Uber to support their need for faster deep learning model training across many engineering teams. It is part of a growing ecosystem of approaches to distributed training, including for example, Distributed TensorFlow. Uber decided to develop a solution that utilised MPI for distributed process communication, and the NVIDIA Collective Communications Library (NCCL) for its highly optimised implementation of reductions across distributed processes and nodes. The resulting Horovod package delivers on its promise to scale deep learning model training across multiple GPUs and multiple nodes, with only minor code modification and intuitive debugging.


#### Horovod's MPI Roots

Horovod's connection to MPI runs deep, and for programmers familiar with MPI programming, much of what you program to distribute model training with Horovod will feel very familiar. For those unfamiliar with MPI programming, a brief discussion of some of the conventions and considerations required when distributing processes with Horovod, or MPI, is worthwhile. Horovod, as with MPI, strictly follows the Single-Program Multiple-Data (SPMD) paradigm where we implement the instruction flow of multiple processes in the same file/program. Because multiple processes are executing code in parallel, we have to take care about race conditions and also the synchronisation of participating processes.Horovod assigns a unique numerical ID or rank (an MPI concept) to each process executing the program. This rank can be accessed programmatically. As you will see below, when writing Horovod code, by identifying a process's rank programmatically in the code, we can take steps such as:

- Pin that process to its own exclusive GPU.
- Utilise a single rank for broadcasting values that need to be used uniformly by all ranks.
- Utilise a single rank for collecting and/or reducing values produced by all ranks.
- Utilise a single rank for logging or writing to disk.


To use Horovod with Tensorflow, we would need to make some modifications. The modifications can be listed as follows :

1. Initialise Horovod  

```python
hvd.init()
```

2. Pin each GPU to a single process.

```python
gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
    tf.config.experimental.set_memory_growth(gpu, True)
if gpus:
    tf.config.experimental.set_visible_devices(gpus[hvd.local_rank()], 'GPU')
```

3. Wrap the optimiser in Horovod Distributed optimiser. The distributed optimiser delegates gradient computation to the original optimiser, averages gradients using `all-reduce` or `all-gather`, and then applies those averaged gradients. 

```python 
hvd.DistributedOptimizer
```


4. Broadcast the initial variable states from rank 0 to all other processes.

This is necessary to ensure consistent initialisation of all workers when training is started with random weights or restored from a checkpoint.

This can be done by using the `hvd.broadcast_variables` method after models and optimisers have been initialised.

5. Modify your code to save checkpoints only on worker 0 to prevent other workers from corrupting them.


Let us now go over the modifications using a test code. 

**Note that the below code will run as a single process and will only use one GPU , this is used for explanations and a description on how to write it for multi-GPUs is given at the end**

In [None]:
#Import Necessary libraries
import tensorflow_datasets as tfds
import tensorflow as tf
from tensorflow import keras
import time
import os
import sys
# Import Horovod
import horovod.tensorflow.keras as hvd
# 1. Horovod: initialize Horovod.
hvd.init()

With the typical setup of one GPU per process, setting this to `local_rank`. The first process on the server will be allocated the first GPU, the second process will be allocated the second GPU, and so forth.

In [None]:
# 2. Horovod: pin GPU to be used to process local rank (one GPU per process)
gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
    tf.config.experimental.set_memory_growth(gpu, True)
if gpus:
    tf.config.experimental.set_visible_devices(gpus[hvd.local_rank()], 'GPU')

    
# Load dataset and batching.
(mnist_images, mnist_labels), _ = tf.keras.datasets.mnist.load_data(path='mnist-%d.npz' % hvd.rank())

dataset = tf.data.Dataset.from_tensor_slices(
    (tf.cast(mnist_images[..., tf.newaxis] / 255.0, tf.float32),
             tf.cast(mnist_labels, tf.int64))
)
BATCH_SIZE_PER_REPLCIA = 1024
dataset = dataset.repeat().shuffle(10000).batch(BATCH_SIZE_PER_REPLCIA)

Now , each process will have a copy of the model , we then wrap the Optimizer with `hvd.DistributedOptimizer`

In [None]:
# Building the model
mnist_model = tf.keras.Sequential([
    tf.keras.layers.Conv2D(32, [3, 3], activation='relu'),
    tf.keras.layers.Conv2D(64, [3, 3], activation='relu'),
    tf.keras.layers.MaxPooling2D(pool_size=(2, 2)),
    tf.keras.layers.Dropout(0.25),
    tf.keras.layers.Flatten(),
    tf.keras.layers.Dense(128, activation='relu'),
    tf.keras.layers.Dropout(0.5),
    tf.keras.layers.Dense(10, activation='softmax')
])

# Use Adam optimizer for training 
opt = tf.optimizers.Adam()

# 3. Horovod: Wrap optimizer with DistributedOptimizer.
opt = hvd.DistributedOptimizer(opt, backward_passes_per_step=1, average_aggregated_gradients=True)

# Horovod: Specify `experimental_run_tf_function=False` to ensure TensorFlow
# uses hvd.DistributedOptimizer() to compute gradients.
mnist_model.compile(loss=tf.losses.SparseCategoricalCrossentropy(),
                    optimizer=opt,
                    metrics=['accuracy'],
                    experimental_run_tf_function=False)


#### Callbacks 

We define some necessary callbacks , they are : 

1. `hvd.callbacks.BroadcastGlobalVariablesCallback(0)` :  This is necessary to ensure consistent initialization of all workers when training is started with random weights or restored from a checkpoint.

2. `hvd.callbacks.MetricAverageCallback()` : Since we are not validating the full dataset on each worker anymore, each worker will have different validation results. To improve validation metric quality and reduce variance, we will average validation results among all workers.

3. `Throughput()` : We define the same callback used in the Tensorflow version. This callback gives us the value of the throughput in `Images/sec` to better understand the throughput of the system.

We also make sure to set the `verbose` parameter in `model.fit()` to ensure only one worker prints the results as all workers have identical results. We then fit the dataset, note that we have to define the `steps_per_epoch` parameter. 

In [None]:
class Throughput(tf.keras.callbacks.Callback):
    def __init__(self, total_images=0):
        self.total_images = total_images
    def on_epoch_begin(self, epoch, logs=None):
        self.epoch_start_time = time.time()
    def on_epoch_end(self, epoch, logs=None):
        if hvd.rank() == 0 :
            epoch_time = time.time() - self.epoch_start_time
            print('Epoch time : {}'.format(epoch_time))
            images_per_sec = round(self.total_images / epoch_time, 2)
            print('Images/sec: {}'.format(images_per_sec))

callbacks = [
    # Horovod: broadcast initial variable states from rank 0 to all other processes.
    hvd.callbacks.BroadcastGlobalVariablesCallback(0),
    # Horovod: average metrics among workers at the end of every epoch.
    hvd.callbacks.MetricAverageCallback(),
    # Callback to calculate Throughput
    Throughput(total_images=len(mnist_labels))
]


# Horovod: write logs on worker 0.
verbose = 1 if hvd.rank() == 0 else 0

# Train the model.
# Horovod: adjust number of steps based on number of GPUs.
mnist_model.fit(dataset, steps_per_epoch=len(mnist_labels) // (BATCH_SIZE_PER_REPLCIA*hvd.size()), callbacks=callbacks, epochs=8, verbose=verbose)

Kindly restart the kernel or run the following cell to restart the kernel to free up GPU memory before procedding to the further sections.

In [None]:
import IPython
app = IPython.Application.instance()
app.kernel.do_shutdown(True) 

Like we mentioned earlier , the above cells would run as a single process and thus not utilise multiple GPUs , now to launch multiple process, we need to use `horovodrun` command which inturn invokes the [mpirun](#https://www.open-mpi.org/doc/v4.0/man1/mpirun.1.php) command with certain optimisations.

To run on a machine with 4 GPUs:

```bash
$ horovodrun -np 4 python train.py
```

To run on 4 machines with 4 GPUs each:

```bash
$ horovodrun -np 16 -H server1:4,server2:4,server3:4,server4:4 python train.py
```

Let us now run it with multiple-gpus by setting the `-np` flag , the `-np` flag sets the number of copies of the program that we want to run.

In [None]:
!TF_CPP_MIN_LOG_LEVEL=3 horovodrun -np 8 --mpi-args="--oversubscribe" python3 ../source_code/N3/cnn_fmnist.py --batch-size=2048 2> /dev/null

### Result and scaling efficiency


Let us now look at the different results that obtained and calculate the scaling efficinecy.

Output of running the command on DGX-1 :  

- Horovod ( 1 GPU ) : 

```bash
Epoch 1/8
58/58 [==============================] - 4s 24ms/step - loss: 0.5677 - accuracy: 0.8261
Epoch time : 4.478050947189331
Images/sec: 13398.69
Epoch 2/8
58/58 [==============================] - 1s 16ms/step - loss: 0.1593 - accuracy: 0.9531
Epoch time : 0.9169270992279053
Images/sec: 65435.95
Epoch 3/8
58/58 [==============================] - 1s 16ms/step - loss: 0.1059 - accuracy: 0.9694
Epoch time : 0.9156594276428223
Images/sec: 65526.55
Epoch 4/8
58/58 [==============================] - 1s 16ms/step - loss: 0.0840 - accuracy: 0.9750
Epoch time : 0.9237456321716309
Images/sec: 64952.95
Epoch 5/8
58/58 [==============================] - 1s 16ms/step - loss: 0.0709 - accuracy: 0.9791
Epoch time : 0.906282901763916
Images/sec: 66204.49
Epoch 6/8
58/58 [==============================] - 1s 16ms/step - loss: 0.0601 - accuracy: 0.9819
Epoch time : 0.9177072048187256
Images/sec: 65380.33
Epoch 7/8
58/58 [==============================] - 1s 16ms/step - loss: 0.0546 - accuracy: 0.9834
Epoch time : 0.906665563583374
Images/sec: 66176.55
Epoch 8/8
58/58 [==============================] - 1s 16ms/step - loss: 0.0487 - accuracy: 0.9851
Epoch time : 0.9043354988098145
Images/sec: 66347.06
```

- Horovod ( 8 GPUs ) 

```bash
[1,0]<stdout>:Epoch 1/8
[1,0]<stdout>:3/3 - 7s - loss: 2.1803 - accuracy: 0.2632
[1,0]<stdout>:Epoch time : 7.058096170425415
[1,0]<stdout>:Images/sec: 8500.88
[1,0]<stdout>:Epoch 2/8
[1,0]<stdout>:3/3 - 0s - loss: 1.5363 - accuracy: 0.5832
[1,0]<stdout>:Epoch time : 0.1487276554107666
[1,0]<stdout>:Images/sec: 403421.94
[1,0]<stdout>:Epoch 3/8
[1,0]<stdout>:3/3 - 0s - loss: 0.9254 - accuracy: 0.7147
[1,0]<stdout>:Epoch time : 0.1277930736541748
[1,0]<stdout>:Images/sec: 469509.01
[1,0]<stdout>:Epoch 4/8
[1,0]<stdout>:3/3 - 0s - loss: 0.7013 - accuracy: 0.7752
[1,0]<stdout>:Epoch time : 0.11966228485107422
[1,0]<stdout>:Images/sec: 501411.12
[1,0]<stdout>:Epoch 5/8
[1,0]<stdout>:3/3 - 0s - loss: 0.5648 - accuracy: 0.8245
[1,0]<stdout>:Epoch time : 0.11635589599609375
[1,0]<stdout>:Images/sec: 515659.3
[1,0]<stdout>:Epoch 6/8
[1,0]<stdout>:3/3 - 0s - loss: 0.4633 - accuracy: 0.8568
[1,0]<stdout>:Epoch time : 0.11620521545410156
[1,0]<stdout>:Images/sec: 516327.94
[1,0]<stdout>:Epoch 7/8
[1,0]<stdout>:3/3 - 0s - loss: 0.4102 - accuracy: 0.8748
[1,0]<stdout>:Epoch time : 0.11935830116271973
[1,0]<stdout>:Images/sec: 502688.12
[1,0]<stdout>:Epoch 8/8
[1,0]<stdout>:3/3 - 0s - loss: 0.3582 - accuracy: 0.8957
[1,0]<stdout>:Epoch time : 0.12322497367858887
[1,0]<stdout>:Images/sec: 486914.29
```

#### Scaling efficiency

|#GPUs |Samples/sec|Scaling efficiency|
|-|-|-|
|1| 66176| NA|
|8| 486914| ~91% |

We have achieved a **7.3x** improvement in throughput and an impressive **91%** scaling efficiency. 

But, If we take a closer look at the results, we can find even after eight epochs in both cases, the run with a single GPU at the end of 8 epochs has a **loss of 0.0487 and accuracy of 0.9851** , comparing that with 8 GPU case, we find at the end of 8 epochs we have a **loss of 0.3582 and accuracy of 0.8957**

**This increase in convergence time is noticed when training with larger batch sizes ( when we scale across GPUs, we create a large batch that has a size multiplied by the number of GPUs). Let us discuss the reasons for this and some techniques that we can use for faster convergence in the upcoming notebook.**

***

## Licensing

This material is released by OpenACC-Standard.org, in collaboration with NVIDIA Corporation, under the Creative Commons Attribution 4.0 International (CC BY 4.0).

<div>
    <span style="float: left; width: 33%; text-align: left;"><a href="2.1.System-Topology.ipynb">Previous Notebook</a></span>
    <span style="float: left; width: 33%; text-align: center;">
        <a href="1.Introduction-to-Distributed-Deep-Learning.ipynb">1</a>
        <a href="2.1.System-Topology.ipynb">2</a>
        <a >3</a>
        <a href="4.Convergence.ipynb">4</a>
    </span>
    <span style="float: left; width: 33%; text-align: right;"><a href="4.Convergence.ipynb">Next Notebook</a></span>
</div>

<p> <center> <a href="../Start_Here.ipynb">Home Page</a> </center> </p>

