#!/usr/bin/env python # coding: utf-8 # %% import argparse import tensorflow as tf from tensorflow.keras.datasets import cifar10 from tensorflow.keras.layers import Input, Add, Dense, Activation, ZeroPadding2D, BatchNormalization, Flatten, Conv2D, AveragePooling2D, MaxPooling2D, GlobalMaxPooling2D from tensorflow.keras.models import Model, load_model from tensorflow.keras.preprocessing import image from tensorflow.keras.applications.imagenet_utils import preprocess_input from tensorflow.keras import backend as K from tensorflow.keras.initializers import glorot_uniform import horovod.tensorflow.keras as hvd import sys import time def parse_args(): parser = argparse.ArgumentParser() parser.add_argument("--batch-size", type=int, default=256, help="Batch size") args = parser.parse_args() return args args = parse_args() global g_args g_args = args batch_size = args.batch_size # Horovod: initialize Horovod. hvd.init() # Horovod: pin GPU to be used to process local rank (one GPU per process) gpus = tf.config.experimental.list_physical_devices('GPU') for gpu in gpus: tf.config.experimental.set_memory_growth(gpu, True) if gpus: tf.config.experimental.set_visible_devices(gpus[hvd.local_rank()], 'GPU') (images, labels), _ = tf.keras.datasets.cifar10.load_data() dataset = tf.data.Dataset.from_tensor_slices( (tf.cast(images[...] / 255.0, tf.float32), tf.cast(labels, tf.int64)) ) dataset = dataset.repeat().shuffle(10000).batch(batch_size) def convolutional_block(X, f, filters, stage, block, s=2): # Defining name basis conv_name_base = 'res' + str(stage) + block + '_branch' bn_name_base = 'bn' + str(stage) + block + '_branch' # Retrieve Filters F1, F2, F3 = filters # Save the input value X_shortcut = X ##### MAIN PATH ##### # First component of main path X = Conv2D(filters=F1, kernel_size=(1, 1), strides=(s, s), padding='valid', name=conv_name_base + '2a', kernel_initializer=glorot_uniform(seed=0))(X) X = BatchNormalization(axis=3, name=bn_name_base + '2a')(X) X = Activation('relu')(X) # Second component of main path X = Conv2D(filters=F2, kernel_size=(f, f), strides=(1, 1), padding='same', name=conv_name_base + '2b', kernel_initializer=glorot_uniform(seed=0))(X) X = BatchNormalization(axis=3, name=bn_name_base + '2b')(X) X = Activation('relu')(X) # Third component of main path X = Conv2D(filters=F3, kernel_size=(1, 1), strides=(1, 1), padding='valid', name=conv_name_base + '2c', kernel_initializer=glorot_uniform(seed=0))(X) X = BatchNormalization(axis=3, name=bn_name_base + '2c')(X) ##### SHORTCUT PATH #### X_shortcut = Conv2D(filters=F3, kernel_size=(1, 1), strides=(s, s), padding='valid', name=conv_name_base + '1', kernel_initializer=glorot_uniform(seed=0))(X_shortcut) X_shortcut = BatchNormalization(axis=3, name=bn_name_base + '1')(X_shortcut) # Final step: Add shortcut value to main path, and pass it through a RELU activation X = Add()([X, X_shortcut]) X = Activation('relu')(X) return X def ResNet(input_shape = (28, 28, 1), classes = 10): # Define the input as a tensor with shape input_shape X_input = Input(shape=input_shape) # Zero-Padding X = ZeroPadding2D((3, 3))(X_input) # Stage 1 X = Conv2D(64, (7, 7), strides = (2, 2), name = 'conv1', kernel_initializer = glorot_uniform(seed=0))(X) X = BatchNormalization(axis = 3, name = 'bn_conv1')(X) X = Activation('relu')(X) X = MaxPooling2D((3, 3), strides=(2, 2))(X) # Stage 2 X = convolutional_block(X, f = 3, filters = [64, 64, 256], stage = 2, block='a', s = 1) # Stage 3 X = convolutional_block(X, f=3, filters=[128, 128, 512], stage=3, block='a', s=2) # AVGPOOL X = AveragePooling2D(pool_size=(2,2), padding='same')(X) # Output layer X = Flatten()(X) X = Dense(classes, activation='softmax', name='fc' + str(classes), kernel_initializer = glorot_uniform(seed=0))(X) # Create model model = Model(inputs = X_input, outputs = X, name='ResNet') return model model = ResNet(input_shape = (32, 32, 3), classes = 10) # %% # Horovod: adjust learning rate based on number of GPUs. scaled_lr = 0.001 * hvd.size() # opt = tf.optimizers.Adam(scaled_lr) from tensorflow_addons.optimizers import LAMB # Replace the Adam optimizer with NovoGrad: opt = LAMB(learning_rate=scaled_lr) # Horovod: add Horovod DistributedOptimizer. opt = hvd.DistributedOptimizer( opt, backward_passes_per_step=1, average_aggregated_gradients=True) # Horovod: Specify `experimental_run_tf_function=False` to ensure TensorFlow # uses hvd.DistributedOptimizer() to compute gradients. model.compile(loss=tf.losses.SparseCategoricalCrossentropy(), optimizer=opt, metrics=['accuracy'], experimental_run_tf_function=False) class PrintLR(tf.keras.callbacks.Callback): def __init__(self, total_images=0): self.total_images = total_images def on_epoch_begin(self, epoch, logs=None): self.epoch_start_time = time.time() def on_epoch_end(self, epoch, logs=None): if hvd.rank() == 0 : epoch_time = time.time() - self.epoch_start_time print('Epoch time : {}'.format(epoch_time)) images_per_sec = round(self.total_images / epoch_time, 2) print('Images/sec: {}'.format(images_per_sec)) callbacks = [ # Horovod: broadcast initial variable states from rank 0 to all other processes. # This is necessary to ensure consistent initialization of all workers when # training is started with random weights or restored from a checkpoint. hvd.callbacks.BroadcastGlobalVariablesCallback(0), # Horovod: average metrics among workers at the end of every epoch. # # Note: This callback must be in the list before the ReduceLROnPlateau, # TensorBoard or other metrics-based callbacks. hvd.callbacks.MetricAverageCallback(), PrintLR(total_images=len(labels)), hvd.callbacks.LearningRateWarmupCallback(initial_lr=scaled_lr, warmup_epochs=3, verbose=1), ] # model.summary() # Horovod: write logs on worker 0. verbose = 1 if hvd.rank() == 0 else 0 # Train the model. # Horovod: adjust number of steps based on number of GPUs. model.fit(dataset, steps_per_epoch=len(labels) // (batch_size*hvd.size()), callbacks=callbacks, epochs=20, verbose=verbose)