Caffe2 - Python API
A deep learning, cross platform ML framework
resnet50_trainer.py
1 # Copyright (c) 2016-present, Facebook, Inc.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 ##############################################################################
15 
16 # Module caffe2.python.examples.resnet50_trainer
17 from __future__ import absolute_import
18 from __future__ import division
19 from __future__ import print_function
20 from __future__ import unicode_literals
21 
22 import argparse
23 import logging
24 import numpy as np
25 import time
26 import os
27 
28 from caffe2.python import core, workspace, experiment_util, data_parallel_model
29 from caffe2.python import data_parallel_model_utils, dyndep, optimizer
30 from caffe2.python import timeout_guard, model_helper, brew
31 from caffe2.proto import caffe2_pb2
32 
33 import caffe2.python.models.resnet as resnet
34 from caffe2.python.modeling.initializers import Initializer, pFP16Initializer
37 from caffe2.python.predictor_constants import predictor_constants as predictor_constants
38 
39 '''
40 Parallelized multi-GPU distributed trainer for Resnet 50. Can be used to train
41 on imagenet data, for example.
42 
43 To run the trainer in single-machine multi-gpu mode by setting num_shards = 1.
44 
45 To run the trainer in multi-machine multi-gpu mode with M machines,
46 run the same program on all machines, specifying num_shards = M, and
47 shard_id = a unique integer in the set [0, M-1].
48 
49 For rendezvous (the trainer processes have to know about each other),
50 you can either use a directory path that is visible to all processes
51 (e.g. NFS directory), or use a Redis instance. Use the former by
52 passing the `file_store_path` argument. Use the latter by passing the
53 `redis_host` and `redis_port` arguments.
54 '''
55 
56 logging.basicConfig()
57 log = logging.getLogger("resnet50_trainer")
58 log.setLevel(logging.DEBUG)
59 
60 dyndep.InitOpsLibrary('@/caffe2/caffe2/distributed:file_store_handler_ops')
61 dyndep.InitOpsLibrary('@/caffe2/caffe2/distributed:redis_store_handler_ops')
62 
63 
64 def AddImageInput(model, reader, batch_size, img_size, dtype, is_test):
65  '''
66  The image input operator loads image and label data from the reader and
67  applies transformations to the images (random cropping, mirroring, ...).
68  '''
69  data, label = brew.image_input(
70  model,
71  reader, ["data", "label"],
72  batch_size=batch_size,
73  output_type=dtype,
74  use_gpu_transform=True if model._device_type == 1 else False,
75  use_caffe_datum=True,
76  mean=128.,
77  std=128.,
78  scale=256,
79  crop=img_size,
80  mirror=1,
81  is_test=is_test,
82  )
83 
84  data = model.StopGradient(data, data)
85 
86 
87 def AddNullInput(model, reader, batch_size, img_size, dtype):
88  '''
89  The null input function uses a gaussian fill operator to emulate real image
90  input. A label blob is hardcoded to a single value. This is useful if you
91  want to test compute throughput or don't have a dataset available.
92  '''
93  suffix = "_fp16" if dtype == "float16" else ""
94  model.param_init_net.GaussianFill(
95  [],
96  ["data" + suffix],
97  shape=[batch_size, 3, img_size, img_size],
98  )
99  if dtype == "float16":
100  model.param_init_net.FloatToHalf("data" + suffix, "data")
101 
102  model.param_init_net.ConstantFill(
103  [],
104  ["label"],
105  shape=[batch_size],
106  value=1,
107  dtype=core.DataType.INT32,
108  )
109 
110 
111 def SaveModel(args, train_model, epoch):
112  prefix = "[]_{}".format(train_model._device_prefix, train_model._devices[0])
113  predictor_export_meta = pred_exp.PredictorExportMeta(
114  predict_net=train_model.net.Proto(),
115  parameters=data_parallel_model.GetCheckpointParams(train_model),
116  inputs=[prefix + "/data"],
117  outputs=[prefix + "/softmax"],
118  shapes={
119  prefix + "/softmax": (1, args.num_labels),
120  prefix + "/data": (args.num_channels, args.image_size, args.image_size)
121  }
122  )
123 
124  # save the train_model for the current epoch
125  model_path = "%s/%s_%d.mdl" % (
126  args.file_store_path,
127  args.save_model_name,
128  epoch,
129  )
130 
131  # set db_type to be "minidb" instead of "log_file_db", which breaks
132  # the serialization in save_to_db. Need to switch back to log_file_db
133  # after migration
134  pred_exp.save_to_db(
135  db_type="minidb",
136  db_destination=model_path,
137  predictor_export_meta=predictor_export_meta,
138  )
139 
140 
141 def LoadModel(path, model):
142  '''
143  Load pretrained model from file
144  '''
145  log.info("Loading path: {}".format(path))
146  meta_net_def = pred_exp.load_from_db(path, 'minidb')
147  init_net = core.Net(pred_utils.GetNet(
148  meta_net_def, predictor_constants.GLOBAL_INIT_NET_TYPE))
149  predict_init_net = core.Net(pred_utils.GetNet(
150  meta_net_def, predictor_constants.PREDICT_INIT_NET_TYPE))
151 
152  predict_init_net.RunAllOnGPU()
153  init_net.RunAllOnGPU()
154 
155  assert workspace.RunNetOnce(predict_init_net)
156  assert workspace.RunNetOnce(init_net)
157 
158  # Hack: fix iteration counter which is in CUDA context after load model
159  itercnt = workspace.FetchBlob("optimizer_iteration")
160  workspace.FeedBlob(
161  "optimizer_iteration",
162  itercnt,
163  device_option=core.DeviceOption(caffe2_pb2.CPU, 0)
164  )
165 
166 
167 def RunEpoch(
168  args,
169  epoch,
170  train_model,
171  test_model,
172  total_batch_size,
173  num_shards,
174  expname,
175  explog,
176 ):
177  '''
178  Run one epoch of the trainer.
179  TODO: add checkpointing here.
180  '''
181  # TODO: add loading from checkpoint
182  log.info("Starting epoch {}/{}".format(epoch, args.num_epochs))
183  epoch_iters = int(args.epoch_size / total_batch_size / num_shards)
184  for i in range(epoch_iters):
185  # This timeout is required (temporarily) since CUDA-NCCL
186  # operators might deadlock when synchronizing between GPUs.
187  timeout = 600.0 if i == 0 else 60.0
188  with timeout_guard.CompleteInTimeOrDie(timeout):
189  t1 = time.time()
190  workspace.RunNet(train_model.net.Proto().name)
191  t2 = time.time()
192  dt = t2 - t1
193 
194  fmt = "Finished iteration {}/{} of epoch {} ({:.2f} images/sec)"
195  log.info(fmt.format(i + 1, epoch_iters, epoch, total_batch_size / dt))
196  prefix = "{}_{}".format(
197  train_model._device_prefix,
198  train_model._devices[0])
199  accuracy = workspace.FetchBlob(prefix + '/accuracy')
200  loss = workspace.FetchBlob(prefix + '/loss')
201  train_fmt = "Training loss: {}, accuracy: {}"
202  log.info(train_fmt.format(loss, accuracy))
203 
204  num_images = epoch * epoch_iters * total_batch_size
205  prefix = "{}_{}".format(train_model._device_prefix, train_model._devices[0])
206  accuracy = workspace.FetchBlob(prefix + '/accuracy')
207  loss = workspace.FetchBlob(prefix + '/loss')
208  learning_rate = workspace.FetchBlob(
209  data_parallel_model.GetLearningRateBlobNames(train_model)[0]
210  )
211  test_accuracy = 0
212  if (test_model is not None):
213  # Run 100 iters of testing
214  ntests = 0
215  for _ in range(0, 100):
216  workspace.RunNet(test_model.net.Proto().name)
217  for g in test_model._devices:
218  test_accuracy += np.asscalar(workspace.FetchBlob(
219  "{}_{}".format(test_model._device_prefix, g) + '/accuracy'
220  ))
221  ntests += 1
222  test_accuracy /= ntests
223  else:
224  test_accuracy = (-1)
225 
226  explog.log(
227  input_count=num_images,
228  batch_count=(i + epoch * epoch_iters),
229  additional_values={
230  'accuracy': accuracy,
231  'loss': loss,
232  'learning_rate': learning_rate,
233  'epoch': epoch,
234  'test_accuracy': test_accuracy,
235  }
236  )
237  assert loss < 40, "Exploded gradients :("
238 
239  # TODO: add checkpointing
240  return epoch + 1
241 
242 
243 def Train(args):
244  # Either use specified device list or generate one
245  if args.gpus is not None:
246  gpus = [int(x) for x in args.gpus.split(',')]
247  num_gpus = len(gpus)
248  else:
249  gpus = list(range(args.num_gpus))
250  num_gpus = args.num_gpus
251 
252  log.info("Running on GPUs: {}".format(gpus))
253 
254  # Verify valid batch size
255  total_batch_size = args.batch_size
256  batch_per_device = total_batch_size // num_gpus
257  assert \
258  total_batch_size % num_gpus == 0, \
259  "Number of GPUs must divide batch size"
260 
261  # Round down epoch size to closest multiple of batch size across machines
262  global_batch_size = total_batch_size * args.num_shards
263  epoch_iters = int(args.epoch_size / global_batch_size)
264 
265  assert \
266  epoch_iters > 0, \
267  "Epoch size must be larger than batch size times shard count"
268 
269  args.epoch_size = epoch_iters * global_batch_size
270  log.info("Using epoch size: {}".format(args.epoch_size))
271 
272  # Create ModelHelper object
273  train_arg_scope = {
274  'order': 'NCHW',
275  'use_cudnn': True,
276  'cudnn_exhaustive_search': True,
277  'ws_nbytes_limit': (args.cudnn_workspace_limit_mb * 1024 * 1024),
278  }
279  train_model = model_helper.ModelHelper(
280  name="resnet50", arg_scope=train_arg_scope
281  )
282 
283  num_shards = args.num_shards
284  shard_id = args.shard_id
285 
286  # Expect interfaces to be comma separated.
287  # Use of multiple network interfaces is not yet complete,
288  # so simply use the first one in the list.
289  interfaces = args.distributed_interfaces.split(",")
290 
291  # Rendezvous using MPI when run with mpirun
292  if os.getenv("OMPI_COMM_WORLD_SIZE") is not None:
293  num_shards = int(os.getenv("OMPI_COMM_WORLD_SIZE", 1))
294  shard_id = int(os.getenv("OMPI_COMM_WORLD_RANK", 0))
295  if num_shards > 1:
296  rendezvous = dict(
297  kv_handler=None,
298  num_shards=num_shards,
299  shard_id=shard_id,
300  engine="GLOO",
301  transport=args.distributed_transport,
302  interface=interfaces[0],
303  mpi_rendezvous=True,
304  exit_nets=None)
305 
306  elif num_shards > 1:
307  # Create rendezvous for distributed computation
308  store_handler = "store_handler"
309  if args.redis_host is not None:
310  # Use Redis for rendezvous if Redis host is specified
311  workspace.RunOperatorOnce(
312  core.CreateOperator(
313  "RedisStoreHandlerCreate", [], [store_handler],
314  host=args.redis_host,
315  port=args.redis_port,
316  prefix=args.run_id,
317  )
318  )
319  else:
320  # Use filesystem for rendezvous otherwise
321  workspace.RunOperatorOnce(
322  core.CreateOperator(
323  "FileStoreHandlerCreate", [], [store_handler],
324  path=args.file_store_path,
325  prefix=args.run_id,
326  )
327  )
328 
329  rendezvous = dict(
330  kv_handler=store_handler,
331  shard_id=shard_id,
332  num_shards=num_shards,
333  engine="GLOO",
334  transport=args.distributed_transport,
335  interface=interfaces[0],
336  exit_nets=None)
337 
338  else:
339  rendezvous = None
340 
341  # Model building functions
342  def create_resnet50_model_ops(model, loss_scale):
343  initializer = (pFP16Initializer if args.dtype == 'float16'
344  else Initializer)
345 
346  with brew.arg_scope([brew.conv, brew.fc],
347  WeightInitializer=initializer,
348  BiasInitializer=initializer,
349  enable_tensor_core=args.enable_tensor_core,
350  float16_compute=args.float16_compute):
351  pred = resnet.create_resnet50(
352  model,
353  "data",
354  num_input_channels=args.num_channels,
355  num_labels=args.num_labels,
356  no_bias=True,
357  no_loss=True,
358  )
359 
360  if args.dtype == 'float16':
361  pred = model.net.HalfToFloat(pred, pred + '_fp32')
362 
363  softmax, loss = model.SoftmaxWithLoss([pred, 'label'],
364  ['softmax', 'loss'])
365  loss = model.Scale(loss, scale=loss_scale)
366  brew.accuracy(model, [softmax, "label"], "accuracy")
367  return [loss]
368 
369  def add_optimizer(model):
370  stepsz = int(30 * args.epoch_size / total_batch_size / num_shards)
371 
372  if args.float16_compute:
373  # TODO: merge with multi-prceision optimizer
374  opt = optimizer.build_fp16_sgd(
375  model,
376  args.base_learning_rate,
377  momentum=0.9,
378  nesterov=1,
379  weight_decay=args.weight_decay, # weight decay included
380  policy="step",
381  stepsize=stepsz,
382  gamma=0.1
383  )
384  else:
385  optimizer.add_weight_decay(model, args.weight_decay)
386  opt = optimizer.build_multi_precision_sgd(
387  model,
388  args.base_learning_rate,
389  momentum=0.9,
390  nesterov=1,
391  policy="step",
392  stepsize=stepsz,
393  gamma=0.1
394  )
395  return opt
396 
397  # Define add_image_input function.
398  # Depends on the "train_data" argument.
399  # Note that the reader will be shared with between all GPUS.
400  if args.train_data == "null":
401  def add_image_input(model):
402  AddNullInput(
403  model,
404  None,
405  batch_size=batch_per_device,
406  img_size=args.image_size,
407  dtype=args.dtype,
408  )
409  else:
410  reader = train_model.CreateDB(
411  "reader",
412  db=args.train_data,
413  db_type=args.db_type,
414  num_shards=num_shards,
415  shard_id=shard_id,
416  )
417 
418  def add_image_input(model):
419  AddImageInput(
420  model,
421  reader,
422  batch_size=batch_per_device,
423  img_size=args.image_size,
424  dtype=args.dtype,
425  is_test=False,
426  )
427 
428  def add_post_sync_ops(model):
429  """Add ops applied after initial parameter sync."""
430  for param_info in model.GetOptimizationParamInfo(model.GetParams()):
431  if param_info.blob_copy is not None:
432  model.param_init_net.HalfToFloat(
433  param_info.blob,
434  param_info.blob_copy[core.DataType.FLOAT]
435  )
436 
437  # Create parallelized model
438  data_parallel_model.Parallelize(
439  train_model,
440  input_builder_fun=add_image_input,
441  forward_pass_builder_fun=create_resnet50_model_ops,
442  optimizer_builder_fun=add_optimizer,
443  post_sync_builder_fun=add_post_sync_ops,
444  devices=gpus,
445  rendezvous=rendezvous,
446  optimize_gradient_memory=False,
447  cpu_device=args.use_cpu,
448  shared_model=args.use_cpu,
449  combine_spatial_bn=args.use_cpu,
450  )
451 
452  if args.model_parallel:
453  # Shift half of the activations to another GPU
454  assert workspace.NumCudaDevices() >= 2 * args.num_gpus
455  activations = data_parallel_model_utils.GetActivationBlobs(train_model)
456  data_parallel_model_utils.ShiftActivationDevices(
457  train_model,
458  activations=activations[len(activations) // 2:],
459  shifts={g: args.num_gpus + g for g in range(args.num_gpus)},
460  )
461 
462  data_parallel_model.OptimizeGradientMemory(train_model, {}, set(), False)
463 
464  workspace.RunNetOnce(train_model.param_init_net)
465  workspace.CreateNet(train_model.net)
466 
467  # Add test model, if specified
468  test_model = None
469  if (args.test_data is not None):
470  log.info("----- Create test net ----")
471  test_arg_scope = {
472  'order': "NCHW",
473  'use_cudnn': True,
474  'cudnn_exhaustive_search': True,
475  }
476  test_model = model_helper.ModelHelper(
477  name="resnet50_test", arg_scope=test_arg_scope, init_params=False
478  )
479 
480  test_reader = test_model.CreateDB(
481  "test_reader",
482  db=args.test_data,
483  db_type=args.db_type,
484  )
485 
486  def test_input_fn(model):
487  AddImageInput(
488  model,
489  test_reader,
490  batch_size=batch_per_device,
491  img_size=args.image_size,
492  dtype=args.dtype,
493  is_test=True,
494  )
495 
496  data_parallel_model.Parallelize(
497  test_model,
498  input_builder_fun=test_input_fn,
499  forward_pass_builder_fun=create_resnet50_model_ops,
500  post_sync_builder_fun=add_post_sync_ops,
501  param_update_builder_fun=None,
502  devices=gpus,
503  cpu_device=args.use_cpu,
504  )
505  workspace.RunNetOnce(test_model.param_init_net)
506  workspace.CreateNet(test_model.net)
507 
508  epoch = 0
509  # load the pre-trained model and reset epoch
510  if args.load_model_path is not None:
511  LoadModel(args.load_model_path, train_model)
512 
513  # Sync the model params
514  data_parallel_model.FinalizeAfterCheckpoint(train_model)
515 
516  # reset epoch. load_model_path should end with *_X.mdl,
517  # where X is the epoch number
518  last_str = args.load_model_path.split('_')[-1]
519  if last_str.endswith('.mdl'):
520  epoch = int(last_str[:-4])
521  log.info("Reset epoch to {}".format(epoch))
522  else:
523  log.warning("The format of load_model_path doesn't match!")
524 
525  expname = "resnet50_gpu%d_b%d_L%d_lr%.2f_v2" % (
526  args.num_gpus,
527  total_batch_size,
528  args.num_labels,
529  args.base_learning_rate,
530  )
531 
532  explog = experiment_util.ModelTrainerLog(expname, args)
533 
534  # Run the training one epoch a time
535  while epoch < args.num_epochs:
536  epoch = RunEpoch(
537  args,
538  epoch,
539  train_model,
540  test_model,
541  total_batch_size,
542  num_shards,
543  expname,
544  explog
545  )
546 
547  # Save the model for each epoch
548  SaveModel(args, train_model, epoch)
549 
550  model_path = "%s/%s_" % (
551  args.file_store_path,
552  args.save_model_name
553  )
554  # remove the saved model from the previous epoch if it exists
555  if os.path.isfile(model_path + str(epoch - 1) + ".mdl"):
556  os.remove(model_path + str(epoch - 1) + ".mdl")
557 
558 
559 def main():
560  # TODO: use argv
561  parser = argparse.ArgumentParser(
562  description="Caffe2: Resnet-50 training"
563  )
564  parser.add_argument("--train_data", type=str, default=None, required=True,
565  help="Path to training data (or 'null' to simulate)")
566  parser.add_argument("--test_data", type=str, default=None,
567  help="Path to test data")
568  parser.add_argument("--db_type", type=str, default="lmdb",
569  help="Database type (such as lmdb or leveldb)")
570  parser.add_argument("--gpus", type=str,
571  help="Comma separated list of GPU devices to use")
572  parser.add_argument("--num_gpus", type=int, default=1,
573  help="Number of GPU devices (instead of --gpus)")
574  parser.add_argument("--model_parallel", type=bool, default=False,
575  help="Split model over 2 x num_gpus")
576  parser.add_argument("--num_channels", type=int, default=3,
577  help="Number of color channels")
578  parser.add_argument("--image_size", type=int, default=227,
579  help="Input image size (to crop to)")
580  parser.add_argument("--num_labels", type=int, default=1000,
581  help="Number of labels")
582  parser.add_argument("--batch_size", type=int, default=32,
583  help="Batch size, total over all GPUs")
584  parser.add_argument("--epoch_size", type=int, default=1500000,
585  help="Number of images/epoch, total over all machines")
586  parser.add_argument("--num_epochs", type=int, default=1000,
587  help="Num epochs.")
588  parser.add_argument("--base_learning_rate", type=float, default=0.1,
589  help="Initial learning rate.")
590  parser.add_argument("--weight_decay", type=float, default=1e-4,
591  help="Weight decay (L2 regularization)")
592  parser.add_argument("--cudnn_workspace_limit_mb", type=int, default=64,
593  help="CuDNN workspace limit in MBs")
594  parser.add_argument("--num_shards", type=int, default=1,
595  help="Number of machines in distributed run")
596  parser.add_argument("--shard_id", type=int, default=0,
597  help="Shard id.")
598  parser.add_argument("--run_id", type=str,
599  help="Unique run identifier (e.g. uuid)")
600  parser.add_argument("--redis_host", type=str,
601  help="Host of Redis server (for rendezvous)")
602  parser.add_argument("--redis_port", type=int, default=6379,
603  help="Port of Redis server (for rendezvous)")
604  parser.add_argument("--file_store_path", type=str, default="/tmp",
605  help="Path to directory to use for rendezvous")
606  parser.add_argument("--save_model_name", type=str, default="resnet50_model",
607  help="Save the trained model to a given name")
608  parser.add_argument("--load_model_path", type=str, default=None,
609  help="Load previously saved model to continue training")
610  parser.add_argument("--use_cpu", type=bool, default=False,
611  help="Use CPU instead of GPU")
612  parser.add_argument('--dtype', default='float',
613  choices=['float', 'float16'],
614  help='Data type used for training')
615  parser.add_argument('--float16_compute', action='store_true',
616  help="Use float 16 compute, if available")
617  parser.add_argument('--enable-tensor-core', action='store_true',
618  help='Enable Tensor Core math for Conv and FC ops')
619  parser.add_argument("--distributed_transport", type=str, default="tcp",
620  help="Transport to use for distributed run [tcp|ibverbs]")
621  parser.add_argument("--distributed_interfaces", type=str, default="",
622  help="Network interfaces to use for distributed run")
623 
624  args = parser.parse_args()
625 
626  Train(args)
627 
628 if __name__ == '__main__':
629  workspace.GlobalInit(['caffe2', '--caffe2_log_level=2'])
630  main()