2 from __future__
import absolute_import
3 from __future__
import division
4 from __future__
import print_function
5 from __future__
import unicode_literals
13 from caffe2.python import core, workspace, experiment_util, data_parallel_model
16 from caffe2.proto
import caffe2_pb2
25 Parallelized multi-GPU distributed trainer for Resne(X)t. 26 Can be used to train on imagenet data, for example. 27 The default parameters can train a standard Resnet-50 (1x64d), and parameters 28 can be provided to train ResNe(X)t models (e.g., ResNeXt-101 32x4d). 30 To run the trainer in single-machine multi-gpu mode by setting num_shards = 1. 32 To run the trainer in multi-machine multi-gpu mode with M machines, 33 run the same program on all machines, specifying num_shards = M, and 34 shard_id = a unique integer in the set [0, M-1]. 36 For rendezvous (the trainer processes have to know about each other), 37 you can either use a directory path that is visible to all processes 38 (e.g. NFS directory), or use a Redis instance. Use the former by 39 passing the `file_store_path` argument. Use the latter by passing the 40 `redis_host` and `redis_port` arguments. 44 log = logging.getLogger(
"ResNe(X)t_trainer")
45 log.setLevel(logging.DEBUG)
47 dyndep.InitOpsLibrary(
'@/caffe2/caffe2/distributed:file_store_handler_ops')
48 dyndep.InitOpsLibrary(
'@/caffe2/caffe2/distributed:redis_store_handler_ops')
58 mean_per_channel=
None,
62 The image input operator loads image and label data from the reader and 63 applies transformations to the images (random cropping, mirroring, ...). 65 data, label = brew.image_input(
67 reader, [
"data",
"label"],
68 batch_size=batch_size,
70 use_gpu_transform=
True if core.IsGPUDeviceType(model._device_type)
else False,
72 mean_per_channel=mean_per_channel,
73 std_per_channel=std_per_channel,
83 data = model.StopGradient(data, data)
86 def AddNullInput(model, reader, batch_size, img_size, dtype):
88 The null input function uses a gaussian fill operator to emulate real image 89 input. A label blob is hardcoded to a single value. This is useful if you 90 want to test compute throughput or don't have a dataset available. 92 suffix =
"_fp16" if dtype ==
"float16" else "" 93 model.param_init_net.GaussianFill(
96 shape=[batch_size, 3, img_size, img_size],
98 if dtype ==
"float16":
99 model.param_init_net.FloatToHalf(
"data" + suffix,
"data")
101 model.param_init_net.ConstantFill(
106 dtype=core.DataType.INT32,
110 def SaveModel(args, train_model, epoch, use_ideep):
111 prefix =
"[]_{}".format(train_model._device_prefix, train_model._devices[0])
112 predictor_export_meta = pred_exp.PredictorExportMeta(
113 predict_net=train_model.net.Proto(),
114 parameters=data_parallel_model.GetCheckpointParams(train_model),
115 inputs=[prefix +
"/data"],
116 outputs=[prefix +
"/softmax"],
118 prefix +
"/softmax": (1, args.num_labels),
119 prefix +
"/data": (args.num_channels, args.image_size, args.image_size)
124 model_path =
"%s/%s_%d.mdl" % (
125 args.file_store_path,
126 args.save_model_name,
135 db_destination=model_path,
136 predictor_export_meta=predictor_export_meta,
137 use_ideep = use_ideep
141 def LoadModel(path, model, use_ideep):
143 Load pretrained model from file 145 log.info(
"Loading path: {}".format(path))
146 meta_net_def = pred_exp.load_from_db(path,
'minidb')
147 init_net = core.Net(pred_utils.GetNet(
148 meta_net_def, predictor_constants.GLOBAL_INIT_NET_TYPE))
149 predict_init_net = core.Net(pred_utils.GetNet(
150 meta_net_def, predictor_constants.PREDICT_INIT_NET_TYPE))
153 predict_init_net.RunAllOnIDEEP()
155 predict_init_net.RunAllOnGPU()
157 init_net.RunAllOnIDEEP()
159 init_net.RunAllOnGPU()
161 assert workspace.RunNetOnce(predict_init_net)
162 assert workspace.RunNetOnce(init_net)
165 itercnt = workspace.FetchBlob(
"optimizer_iteration")
167 "optimizer_iteration",
169 device_option=core.DeviceOption(caffe2_pb2.CPU, 0)
184 Run one epoch of the trainer. 185 TODO: add checkpointing here. 188 log.info(
"Starting epoch {}/{}".format(epoch, args.num_epochs))
189 epoch_iters = int(args.epoch_size / total_batch_size / num_shards)
190 test_epoch_iters = int(args.test_epoch_size / total_batch_size / num_shards)
191 for i
in range(epoch_iters):
194 timeout = args.first_iter_timeout
if i == 0
else args.timeout
195 with timeout_guard.CompleteInTimeOrDie(timeout):
197 workspace.RunNet(train_model.net.Proto().name)
201 fmt =
"Finished iteration {}/{} of epoch {} ({:.2f} images/sec)" 202 log.info(fmt.format(i + 1, epoch_iters, epoch, total_batch_size / dt))
203 prefix =
"{}_{}".format(
204 train_model._device_prefix,
205 train_model._devices[0])
206 accuracy = workspace.FetchBlob(prefix +
'/accuracy')
207 loss = workspace.FetchBlob(prefix +
'/loss')
208 train_fmt =
"Training loss: {}, accuracy: {}" 209 log.info(train_fmt.format(loss, accuracy))
211 num_images = epoch * epoch_iters * total_batch_size
212 prefix =
"{}_{}".format(train_model._device_prefix, train_model._devices[0])
213 accuracy = workspace.FetchBlob(prefix +
'/accuracy')
214 loss = workspace.FetchBlob(prefix +
'/loss')
215 learning_rate = workspace.FetchBlob(
216 data_parallel_model.GetLearningRateBlobNames(train_model)[0]
219 test_accuracy_top5 = 0
220 if test_model
is not None:
223 for _
in range(test_epoch_iters):
224 workspace.RunNet(test_model.net.Proto().name)
225 for g
in test_model._devices:
226 test_accuracy += np.asscalar(workspace.FetchBlob(
227 "{}_{}".format(test_model._device_prefix, g) +
'/accuracy' 229 test_accuracy_top5 += np.asscalar(workspace.FetchBlob(
230 "{}_{}".format(test_model._device_prefix, g) +
'/accuracy_top5' 233 test_accuracy /= ntests
234 test_accuracy_top5 /= ntests
237 test_accuracy_top5 = (-1)
240 input_count=num_images,
241 batch_count=(i + epoch * epoch_iters),
243 'accuracy': accuracy,
245 'learning_rate': learning_rate,
247 'top1_test_accuracy': test_accuracy,
248 'top5_test_accuracy': test_accuracy_top5,
251 assert loss < 40,
"Exploded gradients :(" 259 if args.gpus
is not None:
260 gpus = [int(x)
for x
in args.gpus.split(
',')]
263 gpus = list(range(args.num_gpus))
264 num_gpus = args.num_gpus
266 log.info(
"Running on GPUs: {}".format(gpus))
269 total_batch_size = args.batch_size
270 batch_per_device = total_batch_size // num_gpus
272 total_batch_size % num_gpus == 0, \
273 "Number of GPUs must divide batch size" 276 if args.image_mean_per_channel:
278 len(args.image_mean_per_channel) == args.num_channels, \
279 "The number of channels of image mean doesn't match input" 281 if args.image_std_per_channel:
283 len(args.image_std_per_channel) == args.num_channels, \
284 "The number of channels of image std doesn't match input" 287 global_batch_size = total_batch_size * args.num_shards
288 epoch_iters = int(args.epoch_size / global_batch_size)
292 "Epoch size must be larger than batch size times shard count" 294 args.epoch_size = epoch_iters * global_batch_size
295 log.info(
"Using epoch size: {}".format(args.epoch_size))
301 'cudnn_exhaustive_search':
False,
308 'cudnn_exhaustive_search':
True,
309 'ws_nbytes_limit': (args.cudnn_workspace_limit_mb * 1024 * 1024),
311 train_model = model_helper.ModelHelper(
312 name=
'resnext' + str(args.num_layers), arg_scope=train_arg_scope
315 num_shards = args.num_shards
316 shard_id = args.shard_id
321 interfaces = args.distributed_interfaces.split(
",")
324 if os.getenv(
"OMPI_COMM_WORLD_SIZE")
is not None:
325 num_shards = int(os.getenv(
"OMPI_COMM_WORLD_SIZE", 1))
326 shard_id = int(os.getenv(
"OMPI_COMM_WORLD_RANK", 0))
330 num_shards=num_shards,
333 transport=args.distributed_transport,
334 interface=interfaces[0],
340 store_handler =
"store_handler" 341 if args.redis_host
is not None:
343 workspace.RunOperatorOnce(
345 "RedisStoreHandlerCreate", [], [store_handler],
346 host=args.redis_host,
347 port=args.redis_port,
353 workspace.RunOperatorOnce(
355 "FileStoreHandlerCreate", [], [store_handler],
356 path=args.file_store_path,
362 kv_handler=store_handler,
364 num_shards=num_shards,
366 transport=args.distributed_transport,
367 interface=interfaces[0],
374 def create_resnext_model_ops(model, loss_scale):
375 initializer = (PseudoFP16Initializer
if args.dtype ==
'float16' 378 with brew.arg_scope([brew.conv, brew.fc],
379 WeightInitializer=initializer,
380 BiasInitializer=initializer,
381 enable_tensor_core=args.enable_tensor_core,
382 float16_compute=args.float16_compute):
383 pred = resnet.create_resnext(
386 num_input_channels=args.num_channels,
387 num_labels=args.num_labels,
388 num_layers=args.num_layers,
389 num_groups=args.resnext_num_groups,
390 num_width_per_group=args.resnext_width_per_group,
395 if args.dtype ==
'float16':
396 pred = model.net.HalfToFloat(pred, pred +
'_fp32')
398 softmax, loss = model.SoftmaxWithLoss([pred,
'label'],
400 loss = model.Scale(loss, scale=loss_scale)
401 brew.accuracy(model, [softmax,
"label"],
"accuracy", top_k=1)
402 brew.accuracy(model, [softmax,
"label"],
"accuracy_top5", top_k=5)
405 def add_optimizer(model):
406 stepsz = int(30 * args.epoch_size / total_batch_size / num_shards)
408 if args.float16_compute:
410 opt = optimizer.build_fp16_sgd(
412 args.base_learning_rate,
415 weight_decay=args.weight_decay,
421 optimizer.add_weight_decay(model, args.weight_decay)
422 opt = optimizer.build_multi_precision_sgd(
424 args.base_learning_rate,
436 if args.train_data ==
"null":
437 def add_image_input(model):
441 batch_size=batch_per_device,
442 img_size=args.image_size,
446 reader = train_model.CreateDB(
449 db_type=args.db_type,
450 num_shards=num_shards,
454 def add_image_input(model):
458 batch_size=batch_per_device,
459 img_size=args.image_size,
462 mean_per_channel=args.image_mean_per_channel,
463 std_per_channel=args.image_std_per_channel,
466 def add_post_sync_ops(model):
467 """Add ops applied after initial parameter sync.""" 468 for param_info
in model.GetOptimizationParamInfo(model.GetParams()):
469 if param_info.blob_copy
is not None:
470 model.param_init_net.HalfToFloat(
472 param_info.blob_copy[core.DataType.FLOAT]
476 data_parallel_model.Parallelize(
478 input_builder_fun=add_image_input,
479 forward_pass_builder_fun=create_resnext_model_ops,
480 optimizer_builder_fun=add_optimizer,
481 post_sync_builder_fun=add_post_sync_ops,
483 rendezvous=rendezvous,
484 optimize_gradient_memory=
False,
485 cpu_device=args.use_cpu,
486 ideep=args.use_ideep,
487 shared_model=args.use_cpu,
488 combine_spatial_bn=args.use_cpu,
491 data_parallel_model.OptimizeGradientMemory(train_model, {}, set(),
False)
493 workspace.RunNetOnce(train_model.param_init_net)
494 workspace.CreateNet(train_model.net)
498 if (args.test_data
is not None):
499 log.info(
"----- Create test net ----")
503 'cudnn_exhaustive_search':
False,
509 'cudnn_exhaustive_search':
True,
511 test_model = model_helper.ModelHelper(
512 name=
'resnext' + str(args.num_layers) +
"_test",
513 arg_scope=test_arg_scope,
517 test_reader = test_model.CreateDB(
520 db_type=args.db_type,
523 def test_input_fn(model):
527 batch_size=batch_per_device,
528 img_size=args.image_size,
531 mean_per_channel=args.image_mean_per_channel,
532 std_per_channel=args.image_std_per_channel,
535 data_parallel_model.Parallelize(
537 input_builder_fun=test_input_fn,
538 forward_pass_builder_fun=create_resnext_model_ops,
539 post_sync_builder_fun=add_post_sync_ops,
540 param_update_builder_fun=
None,
542 cpu_device=args.use_cpu,
544 workspace.RunNetOnce(test_model.param_init_net)
545 workspace.CreateNet(test_model.net)
549 if args.load_model_path
is not None:
550 LoadModel(args.load_model_path, train_model, args.use_ideep)
553 data_parallel_model.FinalizeAfterCheckpoint(train_model)
557 last_str = args.load_model_path.split(
'_')[-1]
558 if last_str.endswith(
'.mdl'):
559 epoch = int(last_str[:-4])
560 log.info(
"Reset epoch to {}".format(epoch))
562 log.warning(
"The format of load_model_path doesn't match!")
564 expname =
"resnext_%d_gpu%d_b%d_L%d_lr%.2f_v2" % (
569 args.base_learning_rate,
572 explog = experiment_util.ModelTrainerLog(expname, args)
575 while epoch < args.num_epochs:
588 SaveModel(args, train_model, epoch, args.use_ideep)
590 model_path =
"%s/%s_" % (
591 args.file_store_path,
595 if os.path.isfile(model_path + str(epoch - 1) +
".mdl"):
596 os.remove(model_path + str(epoch - 1) +
".mdl")
601 parser = argparse.ArgumentParser(
602 description=
"Caffe2: ResNe(X)t training" 604 parser.add_argument(
"--train_data", type=str, default=
None, required=
True,
605 help=
"Path to training data (or 'null' to simulate)")
606 parser.add_argument(
"--num_layers", type=int, default=50,
607 help=
"The number of layers in ResNe(X)t model")
608 parser.add_argument(
"--resnext_num_groups", type=int, default=1,
609 help=
"The cardinality of resnext")
610 parser.add_argument(
"--resnext_width_per_group", type=int, default=64,
611 help=
"The cardinality of resnext")
612 parser.add_argument(
"--test_data", type=str, default=
None,
613 help=
"Path to test data")
614 parser.add_argument(
"--image_mean_per_channel", type=float, nargs=
'+',
615 help=
"The per channel mean for the images")
616 parser.add_argument(
"--image_std_per_channel", type=float, nargs=
'+',
617 help=
"The per channel standard deviation for the images")
618 parser.add_argument(
"--test_epoch_size", type=int, default=50000,
619 help=
"Number of test images")
620 parser.add_argument(
"--db_type", type=str, default=
"lmdb",
621 help=
"Database type (such as lmdb or leveldb)")
622 parser.add_argument(
"--gpus", type=str,
623 help=
"Comma separated list of GPU devices to use")
624 parser.add_argument(
"--num_gpus", type=int, default=1,
625 help=
"Number of GPU devices (instead of --gpus)")
626 parser.add_argument(
"--num_channels", type=int, default=3,
627 help=
"Number of color channels")
628 parser.add_argument(
"--image_size", type=int, default=224,
629 help=
"Input image size (to crop to)")
630 parser.add_argument(
"--num_labels", type=int, default=1000,
631 help=
"Number of labels")
632 parser.add_argument(
"--batch_size", type=int, default=32,
633 help=
"Batch size, total over all GPUs")
634 parser.add_argument(
"--epoch_size", type=int, default=1500000,
635 help=
"Number of images/epoch, total over all machines")
636 parser.add_argument(
"--num_epochs", type=int, default=1000,
638 parser.add_argument(
"--base_learning_rate", type=float, default=0.1,
639 help=
"Initial learning rate.")
640 parser.add_argument(
"--weight_decay", type=float, default=1e-4,
641 help=
"Weight decay (L2 regularization)")
642 parser.add_argument(
"--cudnn_workspace_limit_mb", type=int, default=64,
643 help=
"CuDNN workspace limit in MBs")
644 parser.add_argument(
"--num_shards", type=int, default=1,
645 help=
"Number of machines in distributed run")
646 parser.add_argument(
"--shard_id", type=int, default=0,
648 parser.add_argument(
"--run_id", type=str,
649 help=
"Unique run identifier (e.g. uuid)")
650 parser.add_argument(
"--redis_host", type=str,
651 help=
"Host of Redis server (for rendezvous)")
652 parser.add_argument(
"--redis_port", type=int, default=6379,
653 help=
"Port of Redis server (for rendezvous)")
654 parser.add_argument(
"--file_store_path", type=str, default=
"/tmp",
655 help=
"Path to directory to use for rendezvous")
656 parser.add_argument(
"--save_model_name", type=str, default=
"resnext_model",
657 help=
"Save the trained model to a given name")
658 parser.add_argument(
"--load_model_path", type=str, default=
None,
659 help=
"Load previously saved model to continue training")
660 parser.add_argument(
"--use_cpu", type=bool, default=
False,
661 help=
"Use CPU instead of GPU")
662 parser.add_argument(
"--use_ideep", type=bool, default=
False,
664 parser.add_argument(
'--dtype', default=
'float',
665 choices=[
'float',
'float16'],
666 help=
'Data type used for training')
667 parser.add_argument(
'--float16_compute', action=
'store_true',
668 help=
"Use float 16 compute, if available")
669 parser.add_argument(
'--enable_tensor_core', action=
'store_true',
670 help=
'Enable Tensor Core math for Conv and FC ops')
671 parser.add_argument(
"--distributed_transport", type=str, default=
"tcp",
672 help=
"Transport to use for distributed run [tcp|ibverbs]")
673 parser.add_argument(
"--distributed_interfaces", type=str, default=
"",
674 help=
"Network interfaces to use for distributed run")
676 parser.add_argument(
"--first_iter_timeout", type=int, default=600,
677 help=
"Timeout (secs) of the first iteration " 678 "(default: %(default)s)")
679 parser.add_argument(
"--timeout", type=int, default=60,
680 help=
"Timeout (secs) of each (except the first) iteration " 681 "(default: %(default)s)")
683 args = parser.parse_args()
688 if __name__ ==
'__main__':
689 workspace.GlobalInit([
'caffe2',
'--caffe2_log_level=2'])