|
@@ -4,24 +4,27 @@
|
|
import os
|
|
import os
|
|
import time
|
|
import time
|
|
import yaml
|
|
import yaml
|
|
|
|
+from contextlib import nullcontext
|
|
from pathlib import Path
|
|
from pathlib import Path
|
|
from pkg_resources import packaging
|
|
from pkg_resources import packaging
|
|
import contextlib
|
|
import contextlib
|
|
import gc
|
|
import gc
|
|
|
|
+from datetime import datetime
|
|
|
|
|
|
import torch
|
|
import torch
|
|
import torch.cuda.nccl as nccl
|
|
import torch.cuda.nccl as nccl
|
|
import torch.distributed as dist
|
|
import torch.distributed as dist
|
|
from torch.distributed.fsdp import StateDictType
|
|
from torch.distributed.fsdp import StateDictType
|
|
from torch.distributed.fsdp.sharded_grad_scaler import ShardedGradScaler
|
|
from torch.distributed.fsdp.sharded_grad_scaler import ShardedGradScaler
|
|
-# from torch.utils.flop_counter import FlopCounterMode
|
|
|
|
from tqdm import tqdm
|
|
from tqdm import tqdm
|
|
from transformers import LlamaTokenizer
|
|
from transformers import LlamaTokenizer
|
|
|
|
+import json
|
|
|
|
|
|
|
|
|
|
from llama_recipes.model_checkpointing import save_model_checkpoint, save_model_and_optimizer_sharded, save_optimizer_checkpoint
|
|
from llama_recipes.model_checkpointing import save_model_checkpoint, save_model_and_optimizer_sharded, save_optimizer_checkpoint
|
|
-from llama_recipes.policies import fpSixteen,bfSixteen_mixed, get_llama_wrapper
|
|
|
|
|
|
+from llama_recipes.policies import fpSixteen,bfSixteen, get_llama_wrapper
|
|
from llama_recipes.utils.memory_utils import MemoryTrace
|
|
from llama_recipes.utils.memory_utils import MemoryTrace
|
|
|
|
+
|
|
from llama_recipes.utils.tflop_counter import FlopCounterMode
|
|
from llama_recipes.utils.tflop_counter import FlopCounterMode
|
|
|
|
|
|
@contextlib.contextmanager
|
|
@contextlib.contextmanager
|
|
@@ -50,10 +53,13 @@ def maybe_run_profiler(cfg, *args, **kwargs):
|
|
def get_total_flops(model):
|
|
def get_total_flops(model):
|
|
return (sum([v for _, v in model.flop_counts["Global"].items()]))
|
|
return (sum([v for _, v in model.flop_counts["Global"].items()]))
|
|
|
|
|
|
|
|
+from accelerate.utils import is_xpu_available, is_ccl_available
|
|
|
|
+
|
|
|
|
+
|
|
def set_tokenizer_params(tokenizer: LlamaTokenizer):
|
|
def set_tokenizer_params(tokenizer: LlamaTokenizer):
|
|
tokenizer.pad_token_id = 0
|
|
tokenizer.pad_token_id = 0
|
|
tokenizer.padding_side = "left"
|
|
tokenizer.padding_side = "left"
|
|
-
|
|
|
|
|
|
+
|
|
# Converting Bytes to Megabytes
|
|
# Converting Bytes to Megabytes
|
|
def byte2mb(x):
|
|
def byte2mb(x):
|
|
return int(x / 2**20)
|
|
return int(x / 2**20)
|
|
@@ -61,7 +67,7 @@ def byte2mb(x):
|
|
def train(model, train_dataloader,eval_dataloader, tokenizer, optimizer, lr_scheduler, gradient_accumulation_steps, train_config, fsdp_config=None, local_rank=None, rank=None):
|
|
def train(model, train_dataloader,eval_dataloader, tokenizer, optimizer, lr_scheduler, gradient_accumulation_steps, train_config, fsdp_config=None, local_rank=None, rank=None):
|
|
"""
|
|
"""
|
|
Trains the model on the given dataloader
|
|
Trains the model on the given dataloader
|
|
-
|
|
|
|
|
|
+
|
|
Args:
|
|
Args:
|
|
model: The model to be trained
|
|
model: The model to be trained
|
|
train_dataloader: The dataloader containing the training data
|
|
train_dataloader: The dataloader containing the training data
|
|
@@ -73,20 +79,33 @@ def train(model, train_dataloader,eval_dataloader, tokenizer, optimizer, lr_sche
|
|
train_config: The training configuration
|
|
train_config: The training configuration
|
|
eval_dataloader: The dataloader containing the eval data
|
|
eval_dataloader: The dataloader containing the eval data
|
|
tokenizer: tokenizer used in the eval for decoding the predicitons
|
|
tokenizer: tokenizer used in the eval for decoding the predicitons
|
|
-
|
|
|
|
|
|
+
|
|
Returns: results dictionary containing average training and validation perplexity and loss
|
|
Returns: results dictionary containing average training and validation perplexity and loss
|
|
"""
|
|
"""
|
|
# Create a gradient scaler for fp16
|
|
# Create a gradient scaler for fp16
|
|
if train_config.use_fp16 and train_config.enable_fsdp:
|
|
if train_config.use_fp16 and train_config.enable_fsdp:
|
|
scaler = ShardedGradScaler()
|
|
scaler = ShardedGradScaler()
|
|
elif train_config.use_fp16 and not train_config.enable_fsdp:
|
|
elif train_config.use_fp16 and not train_config.enable_fsdp:
|
|
- scaler = torch.cuda.amp.GradScaler()
|
|
|
|
|
|
+ scaler = torch.cuda.amp.GradScaler()
|
|
if train_config.enable_fsdp:
|
|
if train_config.enable_fsdp:
|
|
world_size = int(os.environ["WORLD_SIZE"])
|
|
world_size = int(os.environ["WORLD_SIZE"])
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ autocast = torch.cuda.amp.autocast if train_config.use_fp16 else nullcontext
|
|
|
|
+
|
|
train_prep = []
|
|
train_prep = []
|
|
train_loss = []
|
|
train_loss = []
|
|
val_prep = []
|
|
val_prep = []
|
|
val_loss =[]
|
|
val_loss =[]
|
|
|
|
+
|
|
|
|
+ if train_config.save_metrics:
|
|
|
|
+ metrics_filename = f"{train_config.output_dir}/metrics_data_{local_rank}-{datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}.json"
|
|
|
|
+ train_step_perplexity = []
|
|
|
|
+ train_step_loss = []
|
|
|
|
+ val_step_loss = []
|
|
|
|
+ val_step_perplexity = []
|
|
|
|
+
|
|
epoch_times = []
|
|
epoch_times = []
|
|
checkpoint_times = []
|
|
checkpoint_times = []
|
|
results = {}
|
|
results = {}
|
|
@@ -98,6 +117,7 @@ def train(model, train_dataloader,eval_dataloader, tokenizer, optimizer, lr_sche
|
|
total_loss = 0.0
|
|
total_loss = 0.0
|
|
total_length = len(train_dataloader)//gradient_accumulation_steps
|
|
total_length = len(train_dataloader)//gradient_accumulation_steps
|
|
pbar = tqdm(colour="blue", desc=f"Training Epoch: {epoch+1}", total=total_length, dynamic_ncols=True)
|
|
pbar = tqdm(colour="blue", desc=f"Training Epoch: {epoch+1}", total=total_length, dynamic_ncols=True)
|
|
|
|
+
|
|
with maybe_run_profiler(train_config) as torch_profiler:
|
|
with maybe_run_profiler(train_config) as torch_profiler:
|
|
for step, batch in enumerate(train_dataloader):
|
|
for step, batch in enumerate(train_dataloader):
|
|
gc.collect(1)
|
|
gc.collect(1)
|
|
@@ -152,39 +172,111 @@ def train(model, train_dataloader,eval_dataloader, tokenizer, optimizer, lr_sche
|
|
pbar.update(1)
|
|
pbar.update(1)
|
|
pbar.set_description(f"Training Epoch: {epoch+1}/{train_config.num_epochs}, step {step}/{len(train_dataloader)} completed (loss: {loss.detach().float()})")
|
|
pbar.set_description(f"Training Epoch: {epoch+1}/{train_config.num_epochs}, step {step}/{len(train_dataloader)} completed (loss: {loss.detach().float()})")
|
|
pbar.close()
|
|
pbar.close()
|
|
-
|
|
|
|
|
|
+
|
|
|
|
+ for step, batch in enumerate(train_dataloader):
|
|
|
|
+ for key in batch.keys():
|
|
|
|
+ if train_config.enable_fsdp:
|
|
|
|
+ if is_xpu_available():
|
|
|
|
+ batch[key] = batch[key].to(torch.device(f"xpu:{local_rank}"))
|
|
|
|
+ else:
|
|
|
|
+ batch[key] = batch[key].to(local_rank)
|
|
|
|
+ else:
|
|
|
|
+
|
|
|
|
+ if is_xpu_available():
|
|
|
|
+ batch[key] = batch[key].to('xpu:0')
|
|
|
|
+ else:
|
|
|
|
+ batch[key] = batch[key].to('cuda:0')
|
|
|
|
+ with autocast():
|
|
|
|
+ loss = model(**batch).loss
|
|
|
|
+ loss = loss / gradient_accumulation_steps
|
|
|
|
+ if train_config.save_metrics:
|
|
|
|
+ train_step_loss.append(loss.detach().float().item())
|
|
|
|
+ train_step_perplexity.append(float(torch.exp(loss.detach().float())))
|
|
|
|
+ total_loss += loss.detach().float()
|
|
|
|
+ if train_config.use_fp16:
|
|
|
|
+ # if fp16 is enabled, use gradient scaler to handle gradient update
|
|
|
|
+ scaler.scale(loss).backward()
|
|
|
|
+ if (step + 1) % gradient_accumulation_steps == 0 or step == len(train_dataloader) - 1:
|
|
|
|
+ if train_config.gradient_clipping and train_config.gradient_clipping_threshold > 0.0:
|
|
|
|
+ scaler.unscale_(optimizer)
|
|
|
|
+ if train_config.enable_fsdp:
|
|
|
|
+ model.clip_grad_norm_(train_config.gradient_clipping_threshold)
|
|
|
|
+ else:
|
|
|
|
+ torch.nn.utils.clip_grad_norm_(model.parameters(), train_config.gradient_clipping_threshold)
|
|
|
|
+ scaler.step(optimizer)
|
|
|
|
+ scaler.update()
|
|
|
|
+ optimizer.zero_grad()
|
|
|
|
+ pbar.update(1)
|
|
|
|
+ else:
|
|
|
|
+ # regular backpropagation when fp16 is not used
|
|
|
|
+ loss.backward()
|
|
|
|
+ if (step + 1) % gradient_accumulation_steps == 0 or step == len(train_dataloader) - 1:
|
|
|
|
+ if train_config.gradient_clipping and train_config.gradient_clipping_threshold > 0.0:
|
|
|
|
+ if train_config.enable_fsdp:
|
|
|
|
+ model.clip_grad_norm_(train_config.gradient_clipping_threshold)
|
|
|
|
+ else:
|
|
|
|
+ torch.nn.utils.clip_grad_norm_(model.parameters(), train_config.gradient_clipping_threshold)
|
|
|
|
+ optimizer.step()
|
|
|
|
+ optimizer.zero_grad()
|
|
|
|
+ pbar.update(1)
|
|
|
|
+
|
|
|
|
+ pbar.set_description(f"Training Epoch: {epoch+1}/{train_config.num_epochs}, step {step}/{len(train_dataloader)} completed (loss: {loss.detach().float()})")
|
|
|
|
+
|
|
|
|
+ if train_config.save_metrics:
|
|
|
|
+ save_to_json(metrics_filename, train_step_loss, train_loss, train_step_perplexity, train_prep, val_step_loss, val_loss, val_step_perplexity, val_prep)
|
|
|
|
+ pbar.close()
|
|
|
|
+
|
|
|
|
+
|
|
epoch_end_time = time.perf_counter()-epoch_start_time
|
|
epoch_end_time = time.perf_counter()-epoch_start_time
|
|
- epoch_times.append(epoch_end_time)
|
|
|
|
|
|
+ epoch_times.append(epoch_end_time)
|
|
# Reducing total_loss across all devices if there's more than one CUDA device
|
|
# Reducing total_loss across all devices if there's more than one CUDA device
|
|
- if torch.cuda.device_count() > 1 and train_config.enable_fsdp:
|
|
|
|
|
|
+ if is_xpu_available() and (torch.xpu.device_count() > 1 and train_config.enable_fsdp):
|
|
|
|
+ dist.all_reduce(total_loss, op=dist.ReduceOp.SUM)
|
|
|
|
+ elif torch.cuda.device_count() > 1 and train_config.enable_fsdp:
|
|
dist.all_reduce(total_loss, op=dist.ReduceOp.SUM)
|
|
dist.all_reduce(total_loss, op=dist.ReduceOp.SUM)
|
|
train_epoch_loss = total_loss / len(train_dataloader)
|
|
train_epoch_loss = total_loss / len(train_dataloader)
|
|
if train_config.enable_fsdp:
|
|
if train_config.enable_fsdp:
|
|
train_epoch_loss = train_epoch_loss/world_size
|
|
train_epoch_loss = train_epoch_loss/world_size
|
|
train_perplexity = torch.exp(train_epoch_loss)
|
|
train_perplexity = torch.exp(train_epoch_loss)
|
|
|
|
|
|
- train_prep.append(train_perplexity)
|
|
|
|
- train_loss.append(train_epoch_loss)
|
|
|
|
|
|
+ train_prep.append(float(train_perplexity))
|
|
|
|
+ train_loss.append(float(train_epoch_loss))
|
|
|
|
|
|
if train_config.enable_fsdp:
|
|
if train_config.enable_fsdp:
|
|
if rank==0:
|
|
if rank==0:
|
|
|
|
+ if is_xpu_available():
|
|
|
|
+ print(f"Max XPU memory allocated was {memtrace.peak} GB")
|
|
|
|
+ print(f"Max XPU memory reserved was {memtrace.max_reserved} GB")
|
|
|
|
+ print(f"Peak active XPU memory was {memtrace.peak_active_gb} GB")
|
|
|
|
+ print(f"Xpu Malloc retires : {memtrace.xpu_malloc_retires}")
|
|
|
|
+ else:
|
|
|
|
+ print(f"Max CUDA memory allocated was {memtrace.peak} GB")
|
|
|
|
+ print(f"Max CUDA memory reserved was {memtrace.max_reserved} GB")
|
|
|
|
+ print(f"Peak active CUDA memory was {memtrace.peak_active_gb} GB")
|
|
|
|
+ print(f"Cuda Malloc retires : {memtrace.cuda_malloc_retires}")
|
|
|
|
+ print(f"CPU Total Peak Memory consumed during the train (max): {memtrace.cpu_peaked + memtrace.cpu_begin} GB")
|
|
|
|
+ else:
|
|
|
|
+ if is_xpu_available():
|
|
|
|
+ print(f"Max XPU memory allocated was {memtrace.peak} GB")
|
|
|
|
+ print(f"Max XPU memory reserved was {memtrace.max_reserved} GB")
|
|
|
|
+ print(f"Peak active XPU memory was {memtrace.peak_active_gb} GB")
|
|
|
|
+ print(f"Xpu Malloc retires : {memtrace.xpu_malloc_retires}")
|
|
|
|
+ else:
|
|
print(f"Max CUDA memory allocated was {memtrace.peak} GB")
|
|
print(f"Max CUDA memory allocated was {memtrace.peak} GB")
|
|
print(f"Max CUDA memory reserved was {memtrace.max_reserved} GB")
|
|
print(f"Max CUDA memory reserved was {memtrace.max_reserved} GB")
|
|
print(f"Peak active CUDA memory was {memtrace.peak_active_gb} GB")
|
|
print(f"Peak active CUDA memory was {memtrace.peak_active_gb} GB")
|
|
print(f"Cuda Malloc retires : {memtrace.cuda_malloc_retires}")
|
|
print(f"Cuda Malloc retires : {memtrace.cuda_malloc_retires}")
|
|
- print(f"CPU Total Peak Memory consumed during the train (max): {memtrace.cpu_peaked + memtrace.cpu_begin} GB")
|
|
|
|
- else:
|
|
|
|
- print(f"Max CUDA memory allocated was {memtrace.peak} GB")
|
|
|
|
- print(f"Max CUDA memory reserved was {memtrace.max_reserved} GB")
|
|
|
|
- print(f"Peak active CUDA memory was {memtrace.peak_active_gb} GB")
|
|
|
|
- print(f"Cuda Malloc retires : {memtrace.cuda_malloc_retires}")
|
|
|
|
print(f"CPU Total Peak Memory consumed during the train (max): {memtrace.cpu_peaked + memtrace.cpu_begin} GB")
|
|
print(f"CPU Total Peak Memory consumed during the train (max): {memtrace.cpu_peaked + memtrace.cpu_begin} GB")
|
|
-
|
|
|
|
|
|
+
|
|
# Update the learning rate as needed
|
|
# Update the learning rate as needed
|
|
lr_scheduler.step()
|
|
lr_scheduler.step()
|
|
-
|
|
|
|
|
|
+
|
|
if train_config.run_validation:
|
|
if train_config.run_validation:
|
|
- eval_ppl, eval_epoch_loss = evaluation(model, train_config, eval_dataloader, local_rank, tokenizer)
|
|
|
|
|
|
+ eval_ppl, eval_epoch_loss, temp_val_loss, temp_step_perplexity = evaluation(model, train_config, eval_dataloader, local_rank, tokenizer)
|
|
|
|
+ if train_config.save_metrics:
|
|
|
|
+ val_step_loss.extend(temp_val_loss)
|
|
|
|
+ val_step_perplexity.extend(temp_step_perplexity)
|
|
|
|
+
|
|
checkpoint_start_time = time.perf_counter()
|
|
checkpoint_start_time = time.perf_counter()
|
|
if train_config.save_model and eval_epoch_loss < best_val_loss:
|
|
if train_config.save_model and eval_epoch_loss < best_val_loss:
|
|
if train_config.enable_fsdp:
|
|
if train_config.enable_fsdp:
|
|
@@ -195,23 +287,23 @@ def train(model, train_dataloader,eval_dataloader, tokenizer, optimizer, lr_sche
|
|
print(f"we are about to save the PEFT modules")
|
|
print(f"we are about to save the PEFT modules")
|
|
else:
|
|
else:
|
|
print(f"we are about to save the PEFT modules")
|
|
print(f"we are about to save the PEFT modules")
|
|
- model.save_pretrained(train_config.output_dir)
|
|
|
|
|
|
+ model.save_pretrained(train_config.output_dir)
|
|
if train_config.enable_fsdp:
|
|
if train_config.enable_fsdp:
|
|
- if rank==0:
|
|
|
|
|
|
+ if rank==0:
|
|
print(f"PEFT modules are saved in {train_config.output_dir} directory")
|
|
print(f"PEFT modules are saved in {train_config.output_dir} directory")
|
|
else:
|
|
else:
|
|
print(f"PEFT modules are saved in {train_config.output_dir} directory")
|
|
print(f"PEFT modules are saved in {train_config.output_dir} directory")
|
|
-
|
|
|
|
|
|
+
|
|
else:
|
|
else:
|
|
if not train_config.use_peft and fsdp_config.checkpoint_type == StateDictType.FULL_STATE_DICT:
|
|
if not train_config.use_peft and fsdp_config.checkpoint_type == StateDictType.FULL_STATE_DICT:
|
|
-
|
|
|
|
|
|
+
|
|
save_model_checkpoint(
|
|
save_model_checkpoint(
|
|
model, optimizer, rank, train_config, epoch=epoch
|
|
model, optimizer, rank, train_config, epoch=epoch
|
|
)
|
|
)
|
|
elif not train_config.use_peft and fsdp_config.checkpoint_type == StateDictType.SHARDED_STATE_DICT:
|
|
elif not train_config.use_peft and fsdp_config.checkpoint_type == StateDictType.SHARDED_STATE_DICT:
|
|
print(" Saving the FSDP model checkpoints using SHARDED_STATE_DICT")
|
|
print(" Saving the FSDP model checkpoints using SHARDED_STATE_DICT")
|
|
print("=====================================================")
|
|
print("=====================================================")
|
|
-
|
|
|
|
|
|
+
|
|
save_model_and_optimizer_sharded(model, rank, train_config)
|
|
save_model_and_optimizer_sharded(model, rank, train_config)
|
|
if train_config.save_optimizer:
|
|
if train_config.save_optimizer:
|
|
save_model_and_optimizer_sharded(model, rank, train_config, optim=optimizer)
|
|
save_model_and_optimizer_sharded(model, rank, train_config, optim=optimizer)
|
|
@@ -223,7 +315,7 @@ def train(model, train_dataloader,eval_dataloader, tokenizer, optimizer, lr_sche
|
|
model, optimizer, rank, train_config, epoch=epoch
|
|
model, optimizer, rank, train_config, epoch=epoch
|
|
)
|
|
)
|
|
print(" Saving the FSDP model checkpoints and optimizer using FULL_STATE_DICT")
|
|
print(" Saving the FSDP model checkpoints and optimizer using FULL_STATE_DICT")
|
|
- print("=====================================================")
|
|
|
|
|
|
+ print("=====================================================")
|
|
if train_config.enable_fsdp:
|
|
if train_config.enable_fsdp:
|
|
dist.barrier()
|
|
dist.barrier()
|
|
checkpoint_end_time = time.perf_counter() - checkpoint_start_time
|
|
checkpoint_end_time = time.perf_counter() - checkpoint_start_time
|
|
@@ -235,20 +327,25 @@ def train(model, train_dataloader,eval_dataloader, tokenizer, optimizer, lr_sche
|
|
print(f"best eval loss on epoch {epoch+1} is {best_val_loss}")
|
|
print(f"best eval loss on epoch {epoch+1} is {best_val_loss}")
|
|
else:
|
|
else:
|
|
print(f"best eval loss on epoch {epoch+1} is {best_val_loss}")
|
|
print(f"best eval loss on epoch {epoch+1} is {best_val_loss}")
|
|
- val_loss.append(best_val_loss)
|
|
|
|
- val_prep.append(eval_ppl)
|
|
|
|
|
|
+ val_loss.append(float(best_val_loss))
|
|
|
|
+ val_prep.append(float(eval_ppl))
|
|
if train_config.enable_fsdp:
|
|
if train_config.enable_fsdp:
|
|
if rank==0:
|
|
if rank==0:
|
|
print(f"Epoch {epoch+1}: train_perplexity={train_perplexity:.4f}, train_epoch_loss={train_epoch_loss:.4f}, epoch time {epoch_end_time}s")
|
|
print(f"Epoch {epoch+1}: train_perplexity={train_perplexity:.4f}, train_epoch_loss={train_epoch_loss:.4f}, epoch time {epoch_end_time}s")
|
|
else:
|
|
else:
|
|
print(f"Epoch {epoch+1}: train_perplexity={train_perplexity:.4f}, train_epoch_loss={train_epoch_loss:.4f}, epoch time {epoch_end_time}s")
|
|
print(f"Epoch {epoch+1}: train_perplexity={train_perplexity:.4f}, train_epoch_loss={train_epoch_loss:.4f}, epoch time {epoch_end_time}s")
|
|
|
|
+
|
|
|
|
+ # Saving the results every epoch to plot later
|
|
|
|
+ if train_config.save_metrics:
|
|
|
|
+ save_to_json(metrics_filename, train_step_loss, train_loss, train_step_perplexity, train_prep, val_step_loss, val_loss, val_step_perplexity, val_prep)
|
|
|
|
+
|
|
avg_epoch_time = sum(epoch_times)/ len(epoch_times)
|
|
avg_epoch_time = sum(epoch_times)/ len(epoch_times)
|
|
avg_checkpoint_time = sum(checkpoint_times)/ len(checkpoint_times) if len(checkpoint_times) > 0 else 0
|
|
avg_checkpoint_time = sum(checkpoint_times)/ len(checkpoint_times) if len(checkpoint_times) > 0 else 0
|
|
avg_train_prep = sum(train_prep)/len(train_prep)
|
|
avg_train_prep = sum(train_prep)/len(train_prep)
|
|
avg_train_loss = sum(train_loss)/len(train_loss)
|
|
avg_train_loss = sum(train_loss)/len(train_loss)
|
|
if train_config.run_validation:
|
|
if train_config.run_validation:
|
|
- avg_eval_prep = sum(val_prep)/len(val_prep)
|
|
|
|
- avg_eval_loss = sum(val_loss)/len(val_loss)
|
|
|
|
|
|
+ avg_eval_prep = sum(val_prep)/len(val_prep)
|
|
|
|
+ avg_eval_loss = sum(val_loss)/len(val_loss)
|
|
|
|
|
|
results['avg_train_prep'] = avg_train_prep
|
|
results['avg_train_prep'] = avg_train_prep
|
|
results['avg_train_loss'] = avg_train_loss
|
|
results['avg_train_loss'] = avg_train_loss
|
|
@@ -257,32 +354,37 @@ def train(model, train_dataloader,eval_dataloader, tokenizer, optimizer, lr_sche
|
|
results['avg_eval_loss'] = avg_eval_loss
|
|
results['avg_eval_loss'] = avg_eval_loss
|
|
results["avg_epoch_time"] = avg_epoch_time
|
|
results["avg_epoch_time"] = avg_epoch_time
|
|
results["avg_checkpoint_time"] = avg_checkpoint_time
|
|
results["avg_checkpoint_time"] = avg_checkpoint_time
|
|
|
|
+
|
|
if train_config.flop_counter:
|
|
if train_config.flop_counter:
|
|
results["model_flops"]= TFlops
|
|
results["model_flops"]= TFlops
|
|
-
|
|
|
|
-
|
|
|
|
|
|
+
|
|
|
|
+ if train_config.save_metrics:
|
|
|
|
+ results["metrics_filename"] = metrics_filename
|
|
|
|
+
|
|
#saving the training params including fsdp setting for reference.
|
|
#saving the training params including fsdp setting for reference.
|
|
if train_config.enable_fsdp and not train_config.use_peft:
|
|
if train_config.enable_fsdp and not train_config.use_peft:
|
|
save_train_params(train_config, fsdp_config, rank)
|
|
save_train_params(train_config, fsdp_config, rank)
|
|
-
|
|
|
|
|
|
+
|
|
return results
|
|
return results
|
|
|
|
|
|
def evaluation(model,train_config, eval_dataloader, local_rank, tokenizer):
|
|
def evaluation(model,train_config, eval_dataloader, local_rank, tokenizer):
|
|
"""
|
|
"""
|
|
Evaluates the model on the given dataloader
|
|
Evaluates the model on the given dataloader
|
|
-
|
|
|
|
|
|
+
|
|
Args:
|
|
Args:
|
|
model: The model to evaluate
|
|
model: The model to evaluate
|
|
eval_dataloader: The dataloader containing the evaluation data
|
|
eval_dataloader: The dataloader containing the evaluation data
|
|
local_rank: The rank of the current node in a distributed setting
|
|
local_rank: The rank of the current node in a distributed setting
|
|
tokenizer: The tokenizer used to decode predictions
|
|
tokenizer: The tokenizer used to decode predictions
|
|
-
|
|
|
|
|
|
+
|
|
Returns: eval_ppl, eval_epoch_loss
|
|
Returns: eval_ppl, eval_epoch_loss
|
|
"""
|
|
"""
|
|
if train_config.enable_fsdp:
|
|
if train_config.enable_fsdp:
|
|
- world_size = int(os.environ["WORLD_SIZE"])
|
|
|
|
|
|
+ world_size = int(os.environ["WORLD_SIZE"])
|
|
model.eval()
|
|
model.eval()
|
|
eval_preds = []
|
|
eval_preds = []
|
|
|
|
+ val_step_loss = []
|
|
|
|
+ val_step_perplexity = []
|
|
eval_loss = 0.0 # Initialize evaluation loss
|
|
eval_loss = 0.0 # Initialize evaluation loss
|
|
with MemoryTrace() as memtrace:
|
|
with MemoryTrace() as memtrace:
|
|
for step, batch in enumerate(tqdm(eval_dataloader,colour="green", desc="evaluating Epoch", dynamic_ncols=True)):
|
|
for step, batch in enumerate(tqdm(eval_dataloader,colour="green", desc="evaluating Epoch", dynamic_ncols=True)):
|
|
@@ -291,29 +393,38 @@ def evaluation(model,train_config, eval_dataloader, local_rank, tokenizer):
|
|
if train_config.enable_fsdp:
|
|
if train_config.enable_fsdp:
|
|
batch[key] = batch[key].to(local_rank)
|
|
batch[key] = batch[key].to(local_rank)
|
|
else:
|
|
else:
|
|
- batch[key] = batch[key].to('cuda:0')
|
|
|
|
|
|
+ if is_xpu_available():
|
|
|
|
+ batch[key] = batch[key].to('xpu:0')
|
|
|
|
+ else:
|
|
|
|
+ batch[key] = batch[key].to('cuda:0')
|
|
# Ensure no gradients are computed for this scope to save memory
|
|
# Ensure no gradients are computed for this scope to save memory
|
|
with torch.no_grad():
|
|
with torch.no_grad():
|
|
# Forward pass and compute loss
|
|
# Forward pass and compute loss
|
|
outputs = model(**batch)
|
|
outputs = model(**batch)
|
|
loss = outputs.loss
|
|
loss = outputs.loss
|
|
|
|
+ if train_config.save_metrics:
|
|
|
|
+ val_step_loss.append(loss.detach().float().item())
|
|
|
|
+ val_step_perplexity.append(float(torch.exp(loss.detach().float())))
|
|
|
|
+
|
|
eval_loss += loss.detach().float()
|
|
eval_loss += loss.detach().float()
|
|
# Decode predictions and add to evaluation predictions list
|
|
# Decode predictions and add to evaluation predictions list
|
|
preds = torch.argmax(outputs.logits, -1)
|
|
preds = torch.argmax(outputs.logits, -1)
|
|
eval_preds.extend(
|
|
eval_preds.extend(
|
|
tokenizer.batch_decode(preds.detach().cpu().numpy(), skip_special_tokens=True)
|
|
tokenizer.batch_decode(preds.detach().cpu().numpy(), skip_special_tokens=True)
|
|
)
|
|
)
|
|
-
|
|
|
|
|
|
+
|
|
# If there's more than one CUDA device, reduce evaluation loss across all devices
|
|
# If there's more than one CUDA device, reduce evaluation loss across all devices
|
|
|
|
+ if is_xpu_available() and (torch.xpu.device_count() > 1 and train_config.enable_fsdp):
|
|
|
|
+ dist.all_reduce(eval_loss, op=dist.ReduceOp.SUM)
|
|
if torch.cuda.device_count() > 1 and train_config.enable_fsdp:
|
|
if torch.cuda.device_count() > 1 and train_config.enable_fsdp:
|
|
dist.all_reduce(eval_loss, op=dist.ReduceOp.SUM)
|
|
dist.all_reduce(eval_loss, op=dist.ReduceOp.SUM)
|
|
-
|
|
|
|
|
|
+
|
|
# Compute average loss and perplexity
|
|
# Compute average loss and perplexity
|
|
eval_epoch_loss = eval_loss / len(eval_dataloader)
|
|
eval_epoch_loss = eval_loss / len(eval_dataloader)
|
|
if train_config.enable_fsdp:
|
|
if train_config.enable_fsdp:
|
|
eval_epoch_loss = eval_epoch_loss/world_size
|
|
eval_epoch_loss = eval_epoch_loss/world_size
|
|
eval_ppl = torch.exp(eval_epoch_loss)
|
|
eval_ppl = torch.exp(eval_epoch_loss)
|
|
-
|
|
|
|
|
|
+
|
|
# Print evaluation metrics
|
|
# Print evaluation metrics
|
|
if train_config.enable_fsdp:
|
|
if train_config.enable_fsdp:
|
|
if local_rank==0:
|
|
if local_rank==0:
|
|
@@ -321,7 +432,7 @@ def evaluation(model,train_config, eval_dataloader, local_rank, tokenizer):
|
|
else:
|
|
else:
|
|
print(f" {eval_ppl=} {eval_epoch_loss=}")
|
|
print(f" {eval_ppl=} {eval_epoch_loss=}")
|
|
|
|
|
|
- return eval_ppl, eval_epoch_loss
|
|
|
|
|
|
+ return eval_ppl, eval_epoch_loss, val_step_loss, val_step_perplexity
|
|
|
|
|
|
def freeze_transformer_layers(model, num_layer):
|
|
def freeze_transformer_layers(model, num_layer):
|
|
for i, layer in enumerate(model.model.layers):
|
|
for i, layer in enumerate(model.model.layers):
|
|
@@ -334,11 +445,15 @@ def check_frozen_layers_peft_model(model):
|
|
for i, layer in enumerate(model.base_model.model.model.layers):
|
|
for i, layer in enumerate(model.base_model.model.model.layers):
|
|
for name, param in layer.named_parameters():
|
|
for name, param in layer.named_parameters():
|
|
print(f"Layer {i}, parameter {name}: requires_grad = {param.requires_grad}")
|
|
print(f"Layer {i}, parameter {name}: requires_grad = {param.requires_grad}")
|
|
-
|
|
|
|
-
|
|
|
|
|
|
+
|
|
|
|
+
|
|
def setup():
|
|
def setup():
|
|
"""Initialize the process group for distributed training"""
|
|
"""Initialize the process group for distributed training"""
|
|
- dist.init_process_group("nccl")
|
|
|
|
|
|
+ if is_ccl_available():
|
|
|
|
+ # distributed training on xpus
|
|
|
|
+ dist.init_process_group("ccl")
|
|
|
|
+ else:
|
|
|
|
+ dist.init_process_group("nccl")
|
|
|
|
|
|
|
|
|
|
def setup_environ_flags(rank):
|
|
def setup_environ_flags(rank):
|
|
@@ -348,7 +463,7 @@ def setup_environ_flags(rank):
|
|
# os.environ["TORCH_DISTRIBUTED_DEBUG"] = "DETAIL"
|
|
# os.environ["TORCH_DISTRIBUTED_DEBUG"] = "DETAIL"
|
|
# This flag will help with CUDA memory fragmentations that can lead into OOM in some cases.
|
|
# This flag will help with CUDA memory fragmentations that can lead into OOM in some cases.
|
|
# Note this is only availble in PyTorch Nighlies (as of July 30 2023)
|
|
# Note this is only availble in PyTorch Nighlies (as of July 30 2023)
|
|
- # os.environ['PYTORCH_CUDA_ALLOC_CONF']='expandable_segments:True'
|
|
|
|
|
|
+ # os.environ['PYTORCH_CUDA_ALLOC_CONF']='expandable_segments:True'
|
|
if rank == 0:
|
|
if rank == 0:
|
|
print(f"--> Running with torch dist debug set to detail")
|
|
print(f"--> Running with torch dist debug set to detail")
|
|
|
|
|
|
@@ -362,7 +477,10 @@ def clear_gpu_cache(rank=None):
|
|
"""Clear the GPU cache for all ranks"""
|
|
"""Clear the GPU cache for all ranks"""
|
|
if rank == 0:
|
|
if rank == 0:
|
|
print(f"Clearing GPU cache for all ranks")
|
|
print(f"Clearing GPU cache for all ranks")
|
|
- torch.cuda.empty_cache()
|
|
|
|
|
|
+ if is_xpu_available():
|
|
|
|
+ torch.xpu_empty_cache()
|
|
|
|
+ else:
|
|
|
|
+ torch.cuda.empty_cache()
|
|
|
|
|
|
|
|
|
|
def get_parameter_dtypes(model):
|
|
def get_parameter_dtypes(model):
|
|
@@ -393,14 +511,16 @@ def print_model_size(model, config, rank: int = 0) -> None:
|
|
|
|
|
|
def get_policies(cfg, rank):
|
|
def get_policies(cfg, rank):
|
|
"""Get the policies for mixed precision and fsdp wrapping"""
|
|
"""Get the policies for mixed precision and fsdp wrapping"""
|
|
|
|
+
|
|
|
|
|
|
- verify_bfloat_support = (
|
|
|
|
|
|
+ verify_bfloat_support = ((
|
|
torch.version.cuda
|
|
torch.version.cuda
|
|
and torch.cuda.is_bf16_supported()
|
|
and torch.cuda.is_bf16_supported()
|
|
and packaging.version.parse(torch.version.cuda).release >= (11, 0)
|
|
and packaging.version.parse(torch.version.cuda).release >= (11, 0)
|
|
and dist.is_nccl_available()
|
|
and dist.is_nccl_available()
|
|
and nccl.version() >= (2, 10)
|
|
and nccl.version() >= (2, 10)
|
|
- )
|
|
|
|
|
|
+ ) or
|
|
|
|
+ (is_xpu_available()))
|
|
|
|
|
|
|
|
|
|
mixed_precision_policy = None
|
|
mixed_precision_policy = None
|
|
@@ -411,7 +531,7 @@ def get_policies(cfg, rank):
|
|
bf16_ready = verify_bfloat_support
|
|
bf16_ready = verify_bfloat_support
|
|
|
|
|
|
if bf16_ready and not cfg.use_fp16:
|
|
if bf16_ready and not cfg.use_fp16:
|
|
- mixed_precision_policy = bfSixteen_mixed
|
|
|
|
|
|
+ mixed_precision_policy = bfSixteen
|
|
if rank == 0:
|
|
if rank == 0:
|
|
print(f"bFloat16 enabled for mixed precision - using bfSixteen policy")
|
|
print(f"bFloat16 enabled for mixed precision - using bfSixteen policy")
|
|
elif cfg.use_fp16:
|
|
elif cfg.use_fp16:
|
|
@@ -429,7 +549,7 @@ def save_train_params(train_config, fsdp_config, rank):
|
|
This will be used by converter script in the inference folder to fetch the HF model name or path.
|
|
This will be used by converter script in the inference folder to fetch the HF model name or path.
|
|
It also would be hepful as a log for future references.
|
|
It also would be hepful as a log for future references.
|
|
"""
|
|
"""
|
|
- # Convert the train_config and fsdp_config objects to dictionaries,
|
|
|
|
|
|
+ # Convert the train_config and fsdp_config objects to dictionaries,
|
|
# converting all values to strings to ensure they can be serialized into a YAML file
|
|
# converting all values to strings to ensure they can be serialized into a YAML file
|
|
train_config_dict = {k: str(v) for k, v in vars(train_config).items() if not k.startswith('__')}
|
|
train_config_dict = {k: str(v) for k, v in vars(train_config).items() if not k.startswith('__')}
|
|
fsdp_config_dict = {k: str(v) for k, v in vars(fsdp_config).items() if not k.startswith('__')}
|
|
fsdp_config_dict = {k: str(v) for k, v in vars(fsdp_config).items() if not k.startswith('__')}
|
|
@@ -461,3 +581,17 @@ def save_train_params(train_config, fsdp_config, rank):
|
|
f.write(config_yaml)
|
|
f.write(config_yaml)
|
|
if rank==0:
|
|
if rank==0:
|
|
print(f"training params are saved in {file_name}")
|
|
print(f"training params are saved in {file_name}")
|
|
|
|
+
|
|
|
|
+def save_to_json(output_filename, train_step_loss, train_epoch_loss, train_step_ppl, train_epoch_ppl, val_step_loss, val_epoch_loss, val_step_ppl, val_epoch_ppl):
|
|
|
|
+ metrics_data = {
|
|
|
|
+ "train_step_loss": train_step_loss,
|
|
|
|
+ "train_epoch_loss": train_epoch_loss,
|
|
|
|
+ "train_step_perplexity": train_step_ppl,
|
|
|
|
+ "train_epoch_perplexity": train_epoch_ppl,
|
|
|
|
+ "val_step_loss": val_step_loss,
|
|
|
|
+ "val_epoch_loss": val_epoch_loss,
|
|
|
|
+ "val_step_perplexity": val_step_ppl,
|
|
|
|
+ "val_epoch_perplexity": val_epoch_ppl
|
|
|
|
+ }
|
|
|
|
+ with open(output_filename, "w") as f:
|
|
|
|
+ json.dump(metrics_data, f)
|