#!/usr/bin/env python # coding: utf-8 # %% import argparse import tensorflow as tf import horovod.tensorflow.keras as hvd import sys import time def parse_args(): parser = argparse.ArgumentParser() parser.add_argument("--batch-size", type=int, default=256, help="Batch size") args = parser.parse_args() return args args = parse_args() global g_args g_args = args batch_size = args.batch_size # Horovod: initialize Horovod. hvd.init() # Horovod: pin GPU to be used to process local rank (one GPU per process) gpus = tf.config.experimental.list_physical_devices('GPU') for gpu in gpus: tf.config.experimental.set_memory_growth(gpu, True) if gpus: tf.config.experimental.set_visible_devices(gpus[hvd.local_rank()], 'GPU') (mnist_images, mnist_labels), _ = tf.keras.datasets.mnist.load_data(path='mnist-%d.npz' % hvd.rank()) dataset = tf.data.Dataset.from_tensor_slices( (tf.cast(mnist_images[..., tf.newaxis] / 255.0, tf.float32), tf.cast(mnist_labels, tf.int64)) ) dataset = dataset.repeat().shuffle(10000).batch(batch_size) mnist_model = tf.keras.Sequential([ tf.keras.layers.Conv2D(32, [3, 3], activation='relu'), tf.keras.layers.Conv2D(64, [3, 3], activation='relu'), tf.keras.layers.MaxPooling2D(pool_size=(2, 2)), tf.keras.layers.Dropout(0.25), tf.keras.layers.Flatten(), tf.keras.layers.Dense(128, activation='relu'), tf.keras.layers.Dropout(0.5), tf.keras.layers.Dense(10, activation='softmax') ]) # Horovod: adjust learning rate based on number of GPUs. opt = tf.optimizers.Adam(0.001) # Horovod: add Horovod DistributedOptimizer. opt = hvd.DistributedOptimizer(opt, backward_passes_per_step=1, average_aggregated_gradients=True) # Horovod: Specify `experimental_run_tf_function=False` to ensure TensorFlow # uses hvd.DistributedOptimizer() to compute gradients. mnist_model.compile(loss=tf.losses.SparseCategoricalCrossentropy(), optimizer=opt, metrics=['accuracy'], experimental_run_tf_function=False) class PrintLR(tf.keras.callbacks.Callback): def __init__(self, total_images=0): self.total_images = total_images def on_train_begin(self, epoch, logs=None): global seconds1 seconds1 = time.time() def on_epoch_begin(self, epoch, logs=None): self.epoch_start_time = time.time() def on_epoch_end(self, epoch, logs=None): if hvd.rank() == 0 : epoch_time = time.time() - self.epoch_start_time print('Epoch time : {}'.format(epoch_time)) images_per_sec = round(self.total_images / epoch_time, 2) print('Images/sec: {}'.format(images_per_sec)) callbacks = [ # Horovod: broadcast initial variable states from rank 0 to all other processes. # This is necessary to ensure consistent initialization of all workers when # training is started with random weights or restored from a checkpoint. hvd.callbacks.BroadcastGlobalVariablesCallback(0), # Horovod: average metrics among workers at the end of every epoch. # # Note: This callback must be in the list before the ReduceLROnPlateau, # TensorBoard or other metrics-based callbacks. hvd.callbacks.MetricAverageCallback(), #Throughput calculator PrintLR(total_images=len(mnist_labels)), ] # Horovod: write logs on worker 0. verbose = 2 if hvd.rank() == 0 else 0 # Train the model. # Horovod: adjust number of steps based on number of GPUs. mnist_model.fit(dataset, steps_per_epoch=len(mnist_labels) // (batch_size*hvd.size()), callbacks=callbacks, epochs=8, verbose=verbose)