# Understanding Megatron-LM's core - MPU
---
NVIDIA's Megatron-LM makes training very large langauge models ( up to one trillion parameters ) a reality, Megatron-LM's core MPU ( Model Paralleism Unit ) forms the base for all subsequence optimization efforts on training very large models, such as [DeepSpeed](https://www.deepspeed.ai/features/#model-parallelism).

However, the common side effect is that, with a bad training configuration, one could easily suffer from low GPUs utilization( screenshots below as an example of a bad training configuration which resulted in low or inconsistent gpus utilization).

Therefore, we will be taking our very first step of understanding how the core of Megatron-LM's mpu works and thereafter
taking a closer look on Megatron-LM's training runs performance. 

![a training run with low or inconsistent gpus utils example](./Megatron-LM/pics/naive_run.JPG)

## Learning Objectives
The goal of this lab is to understand the core of Megatron-LM's mpu = model parallelism unit works

- How Megatron-LM is groupping GPU-affinity per model parallel configuration ( pipeline parallel | tensor parallel )
- Tensor Parallel : Column Parallel
- Tensor Parallel : Row Parallel


---------------------------------------------------------------------------
## How Megatron-LM is groupping GPU-affinity per model-data parallel configuration

Parallelism : Model & Data 
- p = Pipeline Model Parallel 
- t = Tensor Model Parallel
- d = Data Parallal 
- n = Total number of GPUs used in the training

**Note : Megatron-LM requires p * t * d = n**

Parallel group - grouping for torch distributed (NCCL)
- num_tensor_model_parallel_groups = n / t
- num_pipeline_model_parallel_groups = n / p
- num_data_parallel_groups = n / d

assuming our configurations as following : 

tensor_model_parallel_size_=2 

pipeline_model_parallel_size_= 4

let's say we have a total of 16 GPUs denoted by g0 ... g15 

therefore, world_size=16 

then accoridng to Megatron-LM [initializer.py](https://github.com/NVIDIA/Megatron-LM/blob/90e0a0dd08159e1c95f4f9d99bb8687f327d36c3/megatron/mpu/initialize.py) we should see the following ...

 8 data_parallel groups:
 [g0, g2], [g1, g3], [g4, g6], [g5, g7], [g8, g10], [g9, g11], [g12, g14], [g13, g15]
 8 tensor model-parallel groups:
 [g0, g1], [g2, g3], [g4, g5], [g6, g7], [g8, g9], [g10, g11], [g12, g13], [g14, g15]
 4 pipeline model-parallel groups:
 [g0, g4, g8, g12], [g1, g5, g9, g13], [g2, g6, g10, g14], [g3, g7, g11, g15]

**Note that for efficiency, the caller should make sure adjacent ranks are on the same DGX box.**

For example if we are using 2 DGX-1 boxes
with a total of 16 GPUs, rank 0 to 7 belong to the first box and
ranks 8 to 15 belong to the second box.

In [1]:
import itertools
def ensure_divisibility(numerator, denominator):
 """Ensure that numerator is divisible by the denominator."""
 assert numerator % denominator == 0, '{} is not divisible by {}'.format(
 numerator, denominator)
def initialize_model_parallel(tensor_model_parallel_size_=2,
 pipeline_model_parallel_size_= 4,
 world_size=16):
 print(' ---------- world size is set to : {} ---------- '.format(world_size))
 print('> initializing tensor model parallel with size {}'.format(tensor_model_parallel_size_))
 print('> initializing pipeline model parallel with size {}'.format(pipeline_model_parallel_size_))
 
 tensor_model_parallel_size = min(tensor_model_parallel_size_, world_size)
 pipeline_model_parallel_size = min(pipeline_model_parallel_size_, world_size)
 
 # make sure world_size is divisible by t * p 
 ensure_divisibility(world_size,tensor_model_parallel_size * pipeline_model_parallel_size)
 
 data_parallel_size = world_size // (tensor_model_parallel_size * pipeline_model_parallel_size)
 print("> data parallel size is set to : ", data_parallel_size)
 num_tensor_model_parallel_groups = world_size // tensor_model_parallel_size
 num_pipeline_model_parallel_groups = world_size // pipeline_model_parallel_size
 num_data_parallel_groups = world_size // data_parallel_size
 print("---------- parallel groups ----------")
 print("num_tensor_model_parallel_groups : ", num_tensor_model_parallel_groups)
 print("num_pipeline_model_parallel_groups : ", num_pipeline_model_parallel_groups)
 print("num_data_parallel_groups : ",num_data_parallel_groups )
 # Build the data-parallel groups.
 _DATA_PARALLEL_GROUP = []
 _MODEL_PARALLEL_GROUP = []
 _TENSOR_MODEL_PARALLEL_GROUP = []
 _PIPE_MODEL_PARALLEL_GROUP=[] 
 _MODEL_PARALLEL_GROUP=[]
 all_data_parallel_group_ranks = []
 for i in range(pipeline_model_parallel_size):
 start_rank = i * num_pipeline_model_parallel_groups
 end_rank = (i + 1) * num_pipeline_model_parallel_groups
 #print("start rank : {} | end rank :{}".format(start_rank, end_rank))
 temp=[]
 for j in range(tensor_model_parallel_size):
 ranks = range(start_rank + j, end_rank,
 tensor_model_parallel_size)
 temp.append(list(ranks))
 all_data_parallel_group_ranks.append(list(ranks))
 _DATA_PARALLEL_GROUP=all_data_parallel_group_ranks

 for i in range(num_pipeline_model_parallel_groups):
 ranks = range(i, world_size,
 num_pipeline_model_parallel_groups) 
 _PIPE_MODEL_PARALLEL_GROUP.append(list(ranks))
 
 
 for i in range(data_parallel_size):
 ranks = [data_parallel_group_ranks[i]
 for data_parallel_group_ranks in all_data_parallel_group_ranks]
 _MODEL_PARALLEL_GROUP.append(ranks)
 
 for i in range(num_tensor_model_parallel_groups):
 ranks = range(i * tensor_model_parallel_size,
 (i + 1) * tensor_model_parallel_size)
 _TENSOR_MODEL_PARALLEL_GROUP.append(list(ranks))
 print("-----"*20)
 print("_DATA_PARALLEL_GROUP \n :", _DATA_PARALLEL_GROUP)
 print("-----"*20)
 print("_TENSOR_MODEL_PARALLEL_GROUP \n :", _TENSOR_MODEL_PARALLEL_GROUP)
 print("-----"*20)
 print("_PIPE_MODEL_PARALLEL_GROUP \n :", _PIPE_MODEL_PARALLEL_GROUP)
 print("-----"*20)
 print("Total :{} full models being partitioned into :{} GPUs ".format(len(_MODEL_PARALLEL_GROUP),world_size))
 for idx, m in zip(range(len(_MODEL_PARALLEL_GROUP)),_MODEL_PARALLEL_GROUP):
 m=[str(l) for l in m]
 print("model {} : is partitioned into gpus :{}".format(str(idx),','.join(m))) 


---
## sanity check, verify the below matches [megatron/mpu/initializer.py](https://github.com/NVIDIA/Megatron-LM/blob/main/megatron/mpu/initialize.py#L63)
 Let's say we have a total of 16 GPUs denoted by g0 ... g15 
 8 data_parallel groups:
 [g0, g2], [g1, g3], [g4, g6], [g5, g7], [g8, g10], [g9, g11], [g12, g14], [g13, g15]
 8 tensor model-parallel groups:
 [g0, g1], [g2, g3], [g4, g5], [g6, g7], [g8, g9], [g10, g11], [g12, g13], [g14, g15]
 4 pipeline model-parallel groups:
 [g0, g4, g8, g12], [g1, g5, g9, g13], [g2, g6, g10, g14], [g3, g7, g11, g15]
 Note that for efficiency, the caller should make sure adjacent ranks
 are on the same DGX box. For example if we are using 2 DGX-1 boxes
 with a total of 16 GPUs, rank 0 to 7 belong to the first box and
 ranks 8 to 15 belong to the second box.


In [2]:
initialize_model_parallel(tensor_model_parallel_size_=2,
 pipeline_model_parallel_size_= 4,
 world_size=16)

 ---------- world size is set to : 16 ---------- 
> initializing tensor model parallel with size 2
> initializing pipeline model parallel with size 4
> data parallel size is set to : 2
---------- parallel groups ----------
num_tensor_model_parallel_groups : 8
num_pipeline_model_parallel_groups : 4
num_data_parallel_groups : 8
----------------------------------------------------------------------------------------------------
_DATA_PARALLEL_GROUP 
 : [[0, 2], [1, 3], [4, 6], [5, 7], [8, 10], [9, 11], [12, 14], [13, 15]]
----------------------------------------------------------------------------------------------------
_TENSOR_MODEL_PARALLEL_GROUP 
 : [[0, 1], [2, 3], [4, 5], [6, 7], [8, 9], [10, 11], [12, 13], [14, 15]]
----------------------------------------------------------------------------------------------------
_PIPE_MODEL_PARALLEL_GROUP 
 : [[0, 4, 8, 12], [1, 5, 9, 13], [2, 6, 10, 14], [3, 7, 11, 15]]
---------------------------------------------------------------------------

In [3]:
tensor_model_parallel_size= 4 # try a different tensor_model_parallel_size_
pipeline_model_parallel_size= 2 # try a different pipeline_model_parallel_size_
world_size=16
assert world_size%(tensor_model_parallel_size * pipeline_model_parallel_size)==0,'please make sure world_size is divisible by tensor_model_parallel_size * pipeline_model_parallel_size' 

initialize_model_parallel(tensor_model_parallel_size_=tensor_model_parallel_size,pipeline_model_parallel_size_= pipeline_model_parallel_size,world_size=world_size)

 ---------- world size is set to : 16 ---------- 
> initializing tensor model parallel with size 4
> initializing pipeline model parallel with size 2
> data parallel size is set to : 2
---------- parallel groups ----------
num_tensor_model_parallel_groups : 4
num_pipeline_model_parallel_groups : 8
num_data_parallel_groups : 8
----------------------------------------------------------------------------------------------------
_DATA_PARALLEL_GROUP 
 : [[0, 4], [1, 5], [2, 6], [3, 7], [8, 12], [9, 13], [10, 14], [11, 15]]
----------------------------------------------------------------------------------------------------
_TENSOR_MODEL_PARALLEL_GROUP 
 : [[0, 1, 2, 3], [4, 5, 6, 7], [8, 9, 10, 11], [12, 13, 14, 15]]
----------------------------------------------------------------------------------------------------
_PIPE_MODEL_PARALLEL_GROUP 
 : [[0, 8], [1, 9], [2, 10], [3, 11], [4, 12], [5, 13], [6, 14], [7, 15]]
---------------------------------------------------------------------------

----------------------------------------------------------------------
## Megatron-LM's Column Parallel 
[ColumnParallel reference](https://github.com/NVIDIA/Megatron-LM/blob/90e0a0dd08159e1c95f4f9d99bb8687f327d36c3/megatron/mpu/layers.py#L201)
![ColumnParallel](./Megatron-LM/pics/ColumnParallel.JPG)

In [3]:
import sys
sys.path.append("./Megatron-LM")
from megatron.mpu import layers
from torch.nn.parameter import Parameter
import torch.nn.init as init
import torch
import random
from megatron import *
from megatron.mpu.tests import *

from megatron.mpu.utils import *

In [4]:
## the below class is slightly modified from the original Megatron repo to skip environment variable initialization such as world_size
global world_size 
world_size = 16
class myColumnParallelLinear(torch.nn.Module):
 """Linear layer with column parallelism.
 The linear layer is defined as Y = XA + b. A is parallelized along
 its second dimension as A = [A_1, ..., A_p].
 Arguments:
 input_size: first dimension of matrix A.
 output_size: second dimension of matrix A.
 bias: If true, add bias
 gather_output: If true, call all-gether on output and make Y avaiable
 to all GPUs, otherwise, every GPU will have its output
 which is Y_i = XA_i
 init_method: method to initialize weights. Note that bias is always set
 to zero.
 stride: For the strided linear layers.
 keep_master_weight_for_test: This was added for testing and should be
 set to False. It returns the master weights
 used for initialization.
 skip_bias_add: This was added to enable performance optimations where bias
 can be fused with other elementwise operations. we skip 
 adding bias but instead return it.
 """

 def __init__(self, input_size, output_size, bias=True, gather_output=True,
 init_method=init.xavier_normal_, stride=1,
 keep_master_weight_for_test=False,
 skip_bias_add=False):
 super(myColumnParallelLinear, self).__init__()

 # Keep input parameters
 self.input_size = input_size
 self.output_size = output_size
 self.gather_output = gather_output
 # Divide the weight matrix along the last dimension.
 
 self.output_size_per_partition = divide(output_size, world_size)
 self.skip_bias_add = skip_bias_add

 # Parameters.
 # Note: torch.nn.functional.linear performs XA^T + b and as a result
 # we allocate the transpose.
 # Initialize weight. 
 use_cpu_initialization=True # hard coded to use cpu
 params_dtype = torch.float # skipping need of args
 
 if use_cpu_initialization:
 self.weight = Parameter(torch.empty(self.output_size_per_partition,
 self.input_size,
 dtype=params_dtype))
 
 self.master_weight = m_initialize_affine_weight_cpu(
 self.weight, self.output_size, self.input_size,
 self.output_size_per_partition, 0, init_method,
 stride=stride, return_master_weight=keep_master_weight_for_test)
 
 else:
 self.weight = Parameter(torch.empty(
 self.output_size_per_partition, self.input_size,
 device=torch.cuda.current_device(), dtype=params_dtype))
 _initialize_affine_weight_gpu(self.weight, init_method,
 partition_dim=0, stride=stride)
 
 if bias:
 if use_cpu_initialization:
 self.bias = Parameter(torch.empty(
 self.output_size_per_partition, dtype=params_dtype))
 else:
 self.bias = Parameter(torch.empty(
 self.output_size_per_partition,
 device=torch.cuda.current_device(),
 dtype=params_dtype))
 # Always initialize bias to zero.
 with torch.no_grad():
 self.bias.zero_()
 else:
 self.register_parameter('bias', None)

 def forward(self, input_):
 # Set up backprop all-reduce.
 print("in Column parallel forward")
 input_parallel = copy_to_tensor_model_parallel_region(input_)
 # Matrix multiply.

 bias = self.bias if not self.skip_bias_add else None
 output_parallel = F.linear(input_parallel, self.weight, bias)
 if self.gather_output:
 # All-gather across the partitions.
 output = gather_from_tensor_model_parallel_region(output_parallel)
 else:
 output = output_parallel 
 output_bias = self.bias if self.skip_bias_add else None
 return output, output_bias

In [5]:
def get_weight_list(master_weight,tensor_model_parallel_gp):
 my_weight_list=[]
 a,b=master_weight.size()
 print("A = [")
 tensor_model_parallel_gp=list(itertools.chain(*tensor_model_parallel_gp))
 cnt=0
 for gp in tensor_model_parallel_gp :
 if which_model_parallel=='col': 
 temp=master_weight[gp::world_size].T
 if cnt < world_size -1 : 
 print("A{}=".format(str(cnt)), temp.size(), end = ',')
 else:
 print("A{}=".format(str(cnt)), temp.size())
 elif which_model_parallel =='row':
 temp=master_weight.T
 temp=temp[gp::world_size]
 if cnt < world_size -1 :
 print("A{}=".format(str(cnt)), temp.size(),',')
 else:
 print("A{}=".format(str(cnt)), temp.size())

 else:
 print("set which_model_parallel to **col** or **row**")
 cnt+=1 
 my_weight_list.append(temp)
 
 print(" ]")
 print(len(my_weight_list))
 return my_weight_list
def m_initialize_affine_weight_cpu(weight, output_size, input_size,
 per_partition_size, partition_dim,
 init_method, stride=1,
 return_master_weight=False):
 """Initialize affine weight for model parallel.
 Build the master weight on all processes and scatter
 the relevant chunk."""
 params_dtype = torch.float
 # Initialize master weight
 master_weight = torch.empty(output_size, input_size,
 dtype=torch.float,
 requires_grad=False) 
 
 master_weight = master_weight.to(dtype=params_dtype)
 # Split and copy
 per_partition_per_stride_size = divide(per_partition_size, stride)
 print("per_partition_per_stride_size ",per_partition_per_stride_size)
 weight_list = torch.split(master_weight, per_partition_per_stride_size,
 dim=partition_dim)
 ######## tensor_model_parallel_gp below is hard-coded for tensor_model_parallel_size= 2 , pipeline_model_parallel_size= 4 ########
 ######## if you use other model parallel configuration , please copy and paste it below ########
 tensor_model_parallel_gp=[[0, 1], [2, 3], [4, 5], [6, 7], [8, 9], [10, 11], [12, 13], [14, 15]] 
 my_weight_list = get_weight_list(master_weight,tensor_model_parallel_gp)
 
 with torch.no_grad():
 torch.cat(my_weight_list, dim=partition_dim, out=weight)
 if return_master_weight:
 return master_weight
 return None

## Peek inside Column Parallel Class

In [6]:
tensor_model_parallel_size= 2 
pipeline_model_parallel_size= 4 
input_size = 1024 # 1024 rows
output_size = 512 # 256 columns
which_model_parallel='col'
print("this is how A is sliced column-wised ...\n")
testCol=myColumnParallelLinear(input_size, output_size, bias=True, gather_output=True,
 init_method=init.xavier_normal_, stride=1,
 keep_master_weight_for_test=False,
 skip_bias_add=False)

this is how A is sliced column-wised ...

per_partition_per_stride_size 32
A = [
A0= torch.Size([1024, 32]),A1= torch.Size([1024, 32]),A2= torch.Size([1024, 32]),A3= torch.Size([1024, 32]),A4= torch.Size([1024, 32]),A5= torch.Size([1024, 32]),A6= torch.Size([1024, 32]),A7= torch.Size([1024, 32]),A8= torch.Size([1024, 32]),A9= torch.Size([1024, 32]),A10= torch.Size([1024, 32]),A11= torch.Size([1024, 32]),A12= torch.Size([1024, 32]),A13= torch.Size([1024, 32]),A14= torch.Size([1024, 32]),A15= torch.Size([1024, 32])
 ]
16


In [7]:
per_partition_per_stride_size=32
assert 16* per_partition_per_stride_size == 512 

In [8]:
type(testCol)

__main__.myColumnParallelLinear

In [9]:
testCol.input_size, testCol.output_size

(1024, 512)

----------------------------------------------------------------------
## Megatron-LM's Row Parallel 
[RowParallel reference](https://github.com/NVIDIA/Megatron-LM/blob/90e0a0dd08159e1c95f4f9d99bb8687f327d36c3/megatron/mpu/layers.py#L294)
![RowParallel](./Megatron-LM/pics/RowParallel.JPG)

In [10]:
class myRowParallelLinear(torch.nn.Module):
 """Linear layer with row parallelism.
 The linear layer is defined as Y = XA + b. A is parallelized along
 its first dimension and X along its second dimension as:
 - -
 | A_1 |
 | . |
 A = | . | X = [X_1, ..., X_p]
 | . |
 | A_p |
 - -
 Arguments:
 input_size: first dimension of matrix A.
 output_size: second dimension of matrix A.
 bias: If true, add bias. Note that bias is not parallelized.
 input_is_parallel: If true, we assume that the input is already
 split across the GPUs and we do not split
 again.
 init_method: method to initialize weights. Note that bias is always set
 to zero.
 stride: For the strided linear layers.
 keep_master_weight_for_test: This was added for testing and should be
 set to False. It returns the master weights
 used for initialization.
 skip_bias_add: This was added to enable performance optimations where bias
 can be fused with other elementwise operations. we skip 
 adding bias but instead return it.
 """

 def __init__(self, input_size, output_size, bias=True,
 input_is_parallel=False,
 init_method=init.xavier_normal_, stride=1,
 keep_master_weight_for_test=False,
 skip_bias_add=False):
 super(myRowParallelLinear, self).__init__()

 # Keep input parameters
 self.input_size = input_size
 self.output_size = output_size
 self.input_is_parallel = input_is_parallel
 # Divide the weight matrix along the last dimension.
 self.input_size_per_partition = divide(input_size, world_size)
 self.skip_bias_add = skip_bias_add
 print("input_size_per_partition ", self.input_size_per_partition)
 

 # Parameters.
 # Note: torch.nn.functional.linear performs XA^T + b and as a result
 # we allocate the transpose.
 # Initialize weight.
 use_cpu_initialization=True # hard coded to use cpu
 params_dtype = torch.float # skipping need of args
 if use_cpu_initialization:
 self.weight = Parameter(torch.empty(self.output_size,
 self.input_size_per_partition,
 dtype=params_dtype))
 self.master_weight = m_initialize_affine_weight_cpu(
 self.weight, self.output_size, self.input_size,
 self.input_size_per_partition, 1, init_method,
 stride=stride, return_master_weight=keep_master_weight_for_test)
 else:
 self.weight = Parameter(torch.empty(
 self.output_size, self.input_size_per_partition,
 device=torch.cuda.current_device(), dtype=params_dtype))
 _initialize_affine_weight_gpu(self.weight, init_method,
 partition_dim=1, stride=stride)
 if bias:
 if use_cpu_initialization:
 self.bias = Parameter(torch.empty(self.output_size,
 dtype=params_dtype))
 else:
 self.bias = Parameter(torch.empty(
 self.output_size, device=torch.cuda.current_device(),
 dtype=params_dtype))
 # Always initialize bias to zero.
 with torch.no_grad():
 self.bias.zero_()
 else:
 self.register_parameter('bias', None)



 def forward(self, input_):
 # Set up backprop all-reduce.
 if self.input_is_parallel:
 input_parallel = input_
 else:
 input_parallel = scatter_to_tensor_model_parallel_region(input_)
 # Matrix multiply.
 output_parallel = F.linear(input_parallel, self.weight)
 # All-reduce across all the partitions.
 output_ = reduce_from_tensor_model_parallel_region(output_parallel)
 if not self.skip_bias_add:
 output = output_ + self.bias if self.bias is not None else output_
 output_bias = None
 else:
 output = output_
 output_bias = self.bias
 return output, output_bias

In [11]:
tensor_model_parallel_size= 2 
pipeline_model_parallel_size= 4 
input_size = 1024 # first dimension of the matrix
output_size = 512 # 2nd dimension of the matrix
print("this is how A is sliced Row-wised ...\n")
which_model_parallel='row'
testRow=myRowParallelLinear(input_size,output_size, bias=True,
 input_is_parallel=False,
 init_method=init.xavier_normal_, stride=1,
 keep_master_weight_for_test=False,
 skip_bias_add=False)

this is how A is sliced Row-wised ...

input_size_per_partition 64
per_partition_per_stride_size 64
A = [
A0= torch.Size([64, 512]) ,
A1= torch.Size([64, 512]) ,
A2= torch.Size([64, 512]) ,
A3= torch.Size([64, 512]) ,
A4= torch.Size([64, 512]) ,
A5= torch.Size([64, 512]) ,
A6= torch.Size([64, 512]) ,
A7= torch.Size([64, 512]) ,
A8= torch.Size([64, 512]) ,
A9= torch.Size([64, 512]) ,
A10= torch.Size([64, 512]) ,
A11= torch.Size([64, 512]) ,
A12= torch.Size([64, 512]) ,
A13= torch.Size([64, 512]) ,
A14= torch.Size([64, 512]) ,
A15= torch.Size([64, 512])
 ]
16


In [72]:
per_partition_per_stride_size=64
assert 16* per_partition_per_stride_size == 1024 

In [58]:
testRow.input_size, testRow.output_size

(1024, 512)

--- 

## Additional Resources

Efficient Large-Scale Language Model Training on GPU Clusters : https://arxiv.org/pdf/2104.04473.pdf 

Pushing Forward the Frontiers of Natural Language Processing : https://blogs.nvidia.com/blog/2021/09/16/nlp-frontiers-ai-hardware-summit/

---
## Up Next -
[About GPT's tokenizer](./Day2-3_GPT_vocab_merge_files.ipynb)
## Back To Start Menu
[start menu](../Start_Here.ipynb)

-----


## Licensing 

This material is released by OpenACC-Standard.org, in collaboration with NVIDIA Corporation, under the Creative Commons Attribution 4.0 International (CC BY 4.0). 