inception_distributed_train.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316
  1. # Copyright 2016 Google Inc. All Rights Reserved.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. # ==============================================================================
  15. """A library to train Inception using multiple replicas with synchronous update.
  16. Please see accompanying README.md for details and instructions.
  17. """
  18. from __future__ import absolute_import
  19. from __future__ import division
  20. from __future__ import print_function
  21. from datetime import datetime
  22. import os.path
  23. import time
  24. import numpy as np
  25. import tensorflow as tf
  26. from inception import image_processing
  27. from inception import inception_model as inception
  28. from inception.slim import slim
  29. FLAGS = tf.app.flags.FLAGS
  30. tf.app.flags.DEFINE_string('job_name', '', 'One of "ps", "worker"')
  31. tf.app.flags.DEFINE_string('ps_hosts', '',
  32. """Comma-separated list of hostname:port for the """
  33. """parameter server jobs. e.g. """
  34. """'machine1:2222,machine2:1111,machine2:2222'""")
  35. tf.app.flags.DEFINE_string('worker_hosts', '',
  36. """Comma-separated list of hostname:port for the """
  37. """worker jobs. e.g. """
  38. """'machine1:2222,machine2:1111,machine2:2222'""")
  39. tf.app.flags.DEFINE_string('train_dir', '/tmp/imagenet_train',
  40. """Directory where to write event logs """
  41. """and checkpoint.""")
  42. tf.app.flags.DEFINE_integer('max_steps', 1000000, 'Number of batches to run.')
  43. tf.app.flags.DEFINE_string('subset', 'train', 'Either "train" or "validation".')
  44. tf.app.flags.DEFINE_boolean('log_device_placement', False,
  45. 'Whether to log device placement.')
  46. # Task ID is used to select the chief and also to access the local_step for
  47. # each replica to check staleness of the gradients in sync_replicas_optimizer.
  48. tf.app.flags.DEFINE_integer(
  49. 'task_id', 0, 'Task ID of the worker/replica running the training.')
  50. # More details can be found in the sync_replicas_optimizer class:
  51. # tensorflow/python/training/sync_replicas_optimizer.py
  52. tf.app.flags.DEFINE_integer('num_replicas_to_aggregate', -1,
  53. """Number of gradients to collect before """
  54. """updating the parameters.""")
  55. tf.app.flags.DEFINE_integer('save_interval_secs', 10 * 60,
  56. 'Save interval seconds.')
  57. tf.app.flags.DEFINE_integer('save_summaries_secs', 180,
  58. 'Save summaries interval seconds.')
  59. # **IMPORTANT**
  60. # Please note that this learning rate schedule is heavily dependent on the
  61. # hardware architecture, batch size and any changes to the model architecture
  62. # specification. Selecting a finely tuned learning rate schedule is an
  63. # empirical process that requires some experimentation. Please see README.md
  64. # more guidance and discussion.
  65. #
  66. # Learning rate decay factor selected from https://arxiv.org/abs/1604.00981
  67. tf.app.flags.DEFINE_float('initial_learning_rate', 0.045,
  68. 'Initial learning rate.')
  69. tf.app.flags.DEFINE_float('num_epochs_per_decay', 2.0,
  70. 'Epochs after which learning rate decays.')
  71. tf.app.flags.DEFINE_float('learning_rate_decay_factor', 0.94,
  72. 'Learning rate decay factor.')
  73. # Constants dictating the learning rate schedule.
  74. RMSPROP_DECAY = 0.9 # Decay term for RMSProp.
  75. RMSPROP_MOMENTUM = 0.9 # Momentum in RMSProp.
  76. RMSPROP_EPSILON = 1.0 # Epsilon term for RMSProp.
  77. def train(target, dataset, cluster_spec):
  78. """Train Inception on a dataset for a number of steps."""
  79. # Number of workers and parameter servers are infered from the workers and ps
  80. # hosts string.
  81. num_workers = len(cluster_spec.as_dict()['worker'])
  82. num_parameter_servers = len(cluster_spec.as_dict()['ps'])
  83. # If no value is given, num_replicas_to_aggregate defaults to be the number of
  84. # workers.
  85. if FLAGS.num_replicas_to_aggregate == -1:
  86. num_replicas_to_aggregate = num_workers
  87. else:
  88. num_replicas_to_aggregate = FLAGS.num_replicas_to_aggregate
  89. # Both should be greater than 0 in a distributed training.
  90. assert num_workers > 0 and num_parameter_servers > 0, (' num_workers and '
  91. 'num_parameter_servers'
  92. ' must be > 0.')
  93. # Choose worker 0 as the chief. Note that any worker could be the chief
  94. # but there should be only one chief.
  95. is_chief = (FLAGS.task_id == 0)
  96. # Ops are assigned to worker by default.
  97. with tf.device('/job:worker/task:%d' % FLAGS.task_id):
  98. # Variables and its related init/assign ops are assigned to ps.
  99. with slim.scopes.arg_scope(
  100. [slim.variables.variable, slim.variables.global_step],
  101. device=slim.variables.VariableDeviceChooser(num_parameter_servers)):
  102. # Create a variable to count the number of train() calls. This equals the
  103. # number of updates applied to the variables.
  104. global_step = slim.variables.global_step()
  105. # Calculate the learning rate schedule.
  106. num_batches_per_epoch = (dataset.num_examples_per_epoch() /
  107. FLAGS.batch_size)
  108. # Decay steps need to be divided by the number of replicas to aggregate.
  109. decay_steps = int(num_batches_per_epoch * FLAGS.num_epochs_per_decay /
  110. num_replicas_to_aggregate)
  111. # Decay the learning rate exponentially based on the number of steps.
  112. lr = tf.train.exponential_decay(FLAGS.initial_learning_rate,
  113. global_step,
  114. decay_steps,
  115. FLAGS.learning_rate_decay_factor,
  116. staircase=True)
  117. # Add a summary to track the learning rate.
  118. tf.scalar_summary('learning_rate', lr)
  119. # Create an optimizer that performs gradient descent.
  120. opt = tf.train.RMSPropOptimizer(lr,
  121. RMSPROP_DECAY,
  122. momentum=RMSPROP_MOMENTUM,
  123. epsilon=RMSPROP_EPSILON)
  124. images, labels = image_processing.distorted_inputs(
  125. dataset,
  126. batch_size=FLAGS.batch_size,
  127. num_preprocess_threads=FLAGS.num_preprocess_threads)
  128. # Number of classes in the Dataset label set plus 1.
  129. # Label 0 is reserved for an (unused) background class.
  130. num_classes = dataset.num_classes() + 1
  131. logits = inception.inference(images, num_classes, for_training=True)
  132. # Add classification loss.
  133. inception.loss(logits, labels)
  134. # Gather all of the losses including regularization losses.
  135. losses = tf.get_collection(slim.losses.LOSSES_COLLECTION)
  136. losses += tf.get_collection(tf.GraphKeys.REGULARIZATION_LOSSES)
  137. total_loss = tf.add_n(losses, name='total_loss')
  138. if is_chief:
  139. # Compute the moving average of all individual losses and the
  140. # total loss.
  141. loss_averages = tf.train.ExponentialMovingAverage(0.9, name='avg')
  142. loss_averages_op = loss_averages.apply(losses + [total_loss])
  143. # Attach a scalar summmary to all individual losses and the total loss;
  144. # do the same for the averaged version of the losses.
  145. for l in losses + [total_loss]:
  146. loss_name = l.op.name
  147. # Name each loss as '(raw)' and name the moving average version of the
  148. # loss as the original loss name.
  149. tf.scalar_summary(loss_name + ' (raw)', l)
  150. tf.scalar_summary(loss_name, loss_averages.average(l))
  151. # Add dependency to compute loss_averages.
  152. with tf.control_dependencies([loss_averages_op]):
  153. total_loss = tf.identity(total_loss)
  154. # Track the moving averages of all trainable variables.
  155. # Note that we maintain a 'double-average' of the BatchNormalization
  156. # global statistics.
  157. # This is not needed when the number of replicas are small but important
  158. # for synchronous distributed training with tens of workers/replicas.
  159. exp_moving_averager = tf.train.ExponentialMovingAverage(
  160. inception.MOVING_AVERAGE_DECAY, global_step)
  161. variables_to_average = (
  162. tf.trainable_variables() + tf.moving_average_variables())
  163. # Add histograms for model variables.
  164. for var in variables_to_average:
  165. tf.histogram_summary(var.op.name, var)
  166. # Create synchronous replica optimizer.
  167. opt = tf.train.SyncReplicasOptimizer(
  168. opt,
  169. replicas_to_aggregate=num_replicas_to_aggregate,
  170. replica_id=FLAGS.task_id,
  171. total_num_replicas=num_workers,
  172. variable_averages=exp_moving_averager,
  173. variables_to_average=variables_to_average)
  174. batchnorm_updates = tf.get_collection(slim.ops.UPDATE_OPS_COLLECTION)
  175. assert batchnorm_updates, 'Batchnorm updates are missing'
  176. batchnorm_updates_op = tf.group(*batchnorm_updates)
  177. # Add dependency to compute batchnorm_updates.
  178. with tf.control_dependencies([batchnorm_updates_op]):
  179. total_loss = tf.identity(total_loss)
  180. # Compute gradients with respect to the loss.
  181. grads = opt.compute_gradients(total_loss)
  182. # Add histograms for gradients.
  183. for grad, var in grads:
  184. if grad is not None:
  185. tf.histogram_summary(var.op.name + '/gradients', grad)
  186. apply_gradients_op = opt.apply_gradients(grads, global_step=global_step)
  187. with tf.control_dependencies([apply_gradients_op]):
  188. train_op = tf.identity(total_loss, name='train_op')
  189. # Get chief queue_runners, init_tokens and clean_up_op, which is used to
  190. # synchronize replicas.
  191. # More details can be found in sync_replicas_optimizer.
  192. chief_queue_runners = [opt.get_chief_queue_runner()]
  193. init_tokens_op = opt.get_init_tokens_op()
  194. clean_up_op = opt.get_clean_up_op()
  195. # Create a saver.
  196. saver = tf.train.Saver()
  197. # Build the summary operation based on the TF collection of Summaries.
  198. summary_op = tf.merge_all_summaries()
  199. # Build an initialization operation to run below.
  200. init_op = tf.initialize_all_variables()
  201. # We run the summaries in the same thread as the training operations by
  202. # passing in None for summary_op to avoid a summary_thread being started.
  203. # Running summaries and training operations in parallel could run out of
  204. # GPU memory.
  205. sv = tf.train.Supervisor(is_chief=is_chief,
  206. logdir=FLAGS.train_dir,
  207. init_op=init_op,
  208. summary_op=None,
  209. global_step=global_step,
  210. saver=saver,
  211. save_model_secs=FLAGS.save_interval_secs)
  212. tf.logging.info('%s Supervisor' % datetime.now())
  213. sess_config = tf.ConfigProto(
  214. allow_soft_placement=True,
  215. log_device_placement=FLAGS.log_device_placement)
  216. # Get a session.
  217. sess = sv.prepare_or_wait_for_session(target, config=sess_config)
  218. # Start the queue runners.
  219. queue_runners = tf.get_collection(tf.GraphKeys.QUEUE_RUNNERS)
  220. sv.start_queue_runners(sess, queue_runners)
  221. tf.logging.info('Started %d queues for processing input data.',
  222. len(queue_runners))
  223. if is_chief:
  224. sv.start_queue_runners(sess, chief_queue_runners)
  225. sess.run(init_tokens_op)
  226. # Train, checking for Nans. Concurrently run the summary operation at a
  227. # specified interval. Note that the summary_op and train_op never run
  228. # simultaneously in order to prevent running out of GPU memory.
  229. next_summary_time = time.time() + FLAGS.save_summaries_secs
  230. while not sv.should_stop():
  231. try:
  232. start_time = time.time()
  233. loss_value, step = sess.run([train_op, global_step])
  234. assert not np.isnan(loss_value), 'Model diverged with loss = NaN'
  235. if step > FLAGS.max_steps:
  236. break
  237. duration = time.time() - start_time
  238. if step % 30 == 0:
  239. examples_per_sec = FLAGS.batch_size / float(duration)
  240. format_str = ('Worker %d: %s: step %d, loss = %.2f'
  241. '(%.1f examples/sec; %.3f sec/batch)')
  242. tf.logging.info(format_str %
  243. (FLAGS.task_id, datetime.now(), step, loss_value,
  244. examples_per_sec, duration))
  245. # Determine if the summary_op should be run on the chief worker.
  246. if is_chief and next_summary_time < time.time():
  247. tf.logging.info('Running Summary operation on the chief.')
  248. summary_str = sess.run(summary_op)
  249. sv.summary_computed(sess, summary_str)
  250. tf.logging.info('Finished running Summary operation.')
  251. # Determine the next time for running the summary.
  252. next_summary_time += FLAGS.save_summaries_secs
  253. except:
  254. if is_chief:
  255. tf.logging.info('About to execute sync_clean_up_op!')
  256. sess.run(clean_up_op)
  257. raise
  258. # Stop the supervisor. This also waits for service threads to finish.
  259. sv.stop()
  260. # Save after the training ends.
  261. if is_chief:
  262. saver.save(sess,
  263. os.path.join(FLAGS.train_dir, 'model.ckpt'),
  264. global_step=global_step)