3 from __future__
import absolute_import
4 from __future__
import division
5 from __future__
import print_function
7 from collections
import OrderedDict
8 from future.utils
import viewitems, viewkeys, viewvalues
12 from multiprocessing
import cpu_count
15 model_helper, dyndep, scope, workspace, core, memonger, utils
16 from caffe2.proto
import caffe2_pb2
21 dyndep.InitOpsLibrary(
"@/caffe2/caffe2/contrib/nccl:nccl_ops")
22 dyndep.InitOpsLibrary(
"@/caffe2/caffe2/contrib/gloo:gloo_ops")
23 dyndep.InitOpsLibrary(
"@/caffe2/caffe2/contrib/gloo:gloo_ops_gpu")
25 log = logging.getLogger(
"data_parallel_model")
26 log.setLevel(logging.INFO)
28 _DEFAULT_TIMEOUT_SEC = 30
29 _DEFAULT_BARRIER_NET_TIMEOUT_SEC = 300
32 def Parallelize_GPU(*args, **kwargs):
33 kwargs[
'cpu_device'] =
False 34 Parallelize(*args, **kwargs)
37 def Parallelize_CPU(*args, **kwargs):
38 kwargs[
'cpu_device'] =
True 39 Parallelize(*args, **kwargs)
41 def Parallelize_iDeep(*args, **kwargs):
42 kwargs[
'ideep'] =
True 43 Parallelize(*args, **kwargs)
48 forward_pass_builder_fun,
49 param_update_builder_fun=
None,
50 optimizer_builder_fun=
None,
51 post_sync_builder_fun=
None,
52 pre_grad_net_transformer_fun=
None,
53 net_transformer_fun=
None,
57 broadcast_computed_params=
True,
58 optimize_gradient_memory=
False,
59 dynamic_memory_management=
False,
62 max_concurrent_distributed_ops=16,
65 num_threads_per_device=4,
67 combine_spatial_bn=
False,
68 barrier_net_timeout_sec=_DEFAULT_BARRIER_NET_TIMEOUT_SEC,
71 Function to create a model that can run on many GPUs or CPUs. 72 model_helper_obj: an object of ModelHelper 74 Function that adds the input operators 75 Note: Remember to instantiate reader outside of this 76 function so all devices share same reader object. 77 Signature: input_builder_fun(model) 78 forward_pass_builder_fun: 79 Function to add the operators to the model. 80 Must return list of loss-blob references that 81 are used to build the gradient. Loss scale parameter 82 is passed, as you should scale the loss of your model 83 by 1.0 / the total number of devices. 84 Signature: forward_pass_builder_fun(model, loss_scale) 85 param_update_builder_fun: 86 Function that adds operators that are run after 87 gradient update, such as updating the weights and 88 weight decaying. This is called for each GPU separately. 89 Signature: param_update_builder_fun(model) 90 optimizer_builder_fun: 91 Alternative to param_update_builder_fun, allows one 92 to add an optimizer for the whole model. Called only 93 once, without name or devicescope. 95 Optional function to transform the network after the 96 network is built. It will be called once (NOT once per 100 model, num_devices, device_prefix, device_type) 101 pre_grad_net_transformer_fun: 102 Optional function to transform the network similar to 103 net_transformer_fun, but happens before gradient ops 105 Signature: pre_grad_net_transformer_fun(model) 106 post_sync_builder_fun: 107 Function applied after initial parameter sync has been 108 completed, such as keeping multi-precision parameters 110 Signature: post_sync_builder_fun(model) 111 devices: List of GPU ids, such as [0, 1, 2, 3], 112 rendezvous: used for rendezvous in distributed computation, if None 113 then only one node is used. To create rendezvous, 115 net_type: Network type 116 optimize_gradient_memory: whether to apply 'memonger' to share blobs 117 shared_model (only for CPU) use same parameters on each device 118 in gradient computation to reduce memory footprint. 119 dynamic_memory_management: Whether to apply dynamic memory optimization 120 by freeing unused blobs. The underlying (de)allocation 121 uses cached allocator. For GPU training PLEASE MAKE SURE 122 caffe2_cuda_memory_pool is set. 123 blobs_to_keep : A list of blob names to keep and don't free during 124 dynamic memory optimization (for example loss blob). 125 cpu_device Use CPU instead of GPU. 128 When set to True, applies batch normalization across 129 all devices within the node. If False, batch 130 normalization will be done separately for each device. 131 This option is currently only supported on the CPU. 132 barrier_net_timeout_sec: 133 The timeout in seconds of the barrier net, which is run 134 to synchronize shards before a training epoch starts. 135 Defaults to 300 seconds. 137 assert scope.CurrentDeviceScope()
is None \
138 or scope.CurrentDeviceScope().device_type == caffe2_pb2.CPU, \
139 "Parallelize must be called without device-scope, \ 140 device scope was: {}".format(scope.CurrentDeviceScope())
143 if not (cpu_device
or ideep):
144 devices = list(range(0, workspace.NumCudaDevices()))
146 devices = list(range(0, cpu_count()))
148 if not (cpu_device
or ideep):
150 if gpu >= workspace.NumGpuDevices():
151 log.warning(
"** Only {} GPUs available, GPUs {} requested".format(
152 workspace.NumGpuDevices(), devices))
154 model_helper_obj._device_type = workspace.GpuDeviceType
155 model_helper_obj._device_prefix =
"gpu" 156 model_helper_obj._shared_model =
False 158 assert shared_model
is False,
"Shared model only supported on CPU" 160 model_helper_obj._device_type = caffe2_pb2.IDEEP
161 model_helper_obj._device_prefix =
"ideep" 162 device_name =
"IDEEP" 163 model_helper_obj._shared_model = shared_model
164 if shared_model
and rendezvous
is not None:
165 assert "Shared model only supported on single-node currently" 167 model_helper_obj._device_type = caffe2_pb2.CPU
168 model_helper_obj._device_prefix =
"cpu" 170 model_helper_obj._shared_model = shared_model
171 if shared_model
and rendezvous
is not None:
172 assert "Shared model only supported on single-node currently" 174 log.info(
"Parallelizing model for devices: {}".format(devices))
175 extra_workers = 8
if rendezvous
is not None else 0
176 num_workers = len(devices) * num_threads_per_device + extra_workers
177 max_concurrent_distributed_ops =\
178 min(max_concurrent_distributed_ops, num_workers - 1)
179 model_helper_obj.net.Proto().num_workers = num_workers
180 model_helper_obj.net.Proto().type = net_type
183 model_helper_obj._devices = devices
184 model_helper_obj._rendezvous = rendezvous
185 model_helper_obj._sync_barrier_net =
None 187 model_helper_obj._broadcast_context =
None 188 model_helper_obj._grad_names = []
190 assert isinstance(model_helper_obj, model_helper.ModelHelper)
194 non_datapar_params = copy.copy(model_helper_obj.params)
197 log.info(
"Create input and model training operators")
200 num_shards = 1
if rendezvous
is None else rendezvous[
'num_shards']
201 loss_scale = 1.0 / (len(devices) * num_shards)
203 has_parameter_updates = param_update_builder_fun
is not None or \
204 optimizer_builder_fun
is not None 206 param_update_builder_fun
is not None and 207 optimizer_builder_fun
is not None 208 ),
'Can only specify one of param_update_builder_fun, optimizer_builder_fun' 213 if not has_parameter_updates
and model_helper_obj.init_params:
215 log.warning(
"############# WARNING #############")
216 log.warning(
"Model {}/{} is used for testing/validation but".format(
217 model_helper_obj.name, model_helper_obj))
218 log.warning(
"has init_params=True!")
219 log.warning(
"This can conflict with model training.")
220 log.warning(
"Please ensure model = ModelHelper(init_params=False)")
221 log.warning(
'####################################')
225 for device
in devices:
226 device_opt = core.DeviceOption(model_helper_obj._device_type, device)
227 with core.DeviceScope(device_opt):
228 with core.NameScope(
"{}_{}".format(model_helper_obj._device_prefix,
230 log.info(
"Model for {} : {}".format(device_name, device))
231 input_builder_fun(model_helper_obj)
232 losses = forward_pass_builder_fun(model_helper_obj, loss_scale)
234 if has_parameter_updates:
235 assert isinstance(losses, list), \
236 'Model builder function must return list of loss blobs' 238 assert isinstance(loss, core.BlobReference), \
239 'Model builder func must return list of loss blobs' 241 losses_by_gpu[device] = losses
242 _ValidateParams(model_helper_obj.params)
245 model_helper_obj._device_grouped_blobs =\
246 _GroupByDevice(model_helper_obj, devices,
247 model_helper_obj.params, non_datapar_params)
250 computed_params_grouped =\
251 _GroupByDevice(model_helper_obj, devices,
252 model_helper_obj.GetComputedParams(
''), [])
253 model_helper_obj._device_grouped_blobs.update(computed_params_grouped)
255 model_helper_obj._param_names =\
256 list(viewkeys(model_helper_obj._device_grouped_blobs))
257 model_helper_obj._computed_param_names =\
258 list(viewkeys(computed_params_grouped))
260 if pre_grad_net_transformer_fun:
261 pre_grad_net_transformer_fun(model_helper_obj)
263 if has_parameter_updates:
264 log.info(
"Adding gradient operators")
265 _AddGradientOperators(devices, model_helper_obj, losses_by_gpu)
267 if net_transformer_fun:
271 model_helper_obj._device_prefix,
272 model_helper_obj._device_type)
274 if not has_parameter_updates:
275 log.info(
"Parameter update function not defined --> only forward")
276 _InferBlobDevice(model_helper_obj)
279 if combine_spatial_bn:
280 assert(has_parameter_updates), \
281 'combine_spatial_bn should only be used for train model' 282 _InterleaveOps(model_helper_obj)
284 _CPUInterDeviceBatchNormalization(model_helper_obj)
286 _GPUInterDeviceBatchNormalization(model_helper_obj)
288 _ValidateParams(model_helper_obj.params)
291 param_to_grad = model_helper_obj.param_to_grad
292 grads_ordered = [param_to_grad[p]
for p
in 293 model_helper_obj.params
if p
in param_to_grad]
294 non_datapar_grads = [param_to_grad[p]
for p
in non_datapar_params]
296 gradients_grouped = _GroupByDevice(
302 model_helper_obj._device_grouped_blobs.update(gradients_grouped)
303 model_helper_obj._grad_names = list(viewkeys(gradients_grouped))
304 model_helper_obj._losses_by_gpu = losses_by_gpu
306 _InferBlobDevice(model_helper_obj)
308 log.info(
"Add gradient all-reduces for SyncSGD")
309 if broadcast_computed_params:
310 _BroadcastComputedParams(devices, model_helper_obj, rendezvous, use_nccl)
312 if len(model_helper_obj._grad_names) > 0:
314 reverse_ordered_grads = _GetReverseOrderedGrads(model_helper_obj)
315 assert(len(reverse_ordered_grads) > 0)
317 reverse_ordered_grads,
320 model_helper_obj.net,
323 max_concurrent_distributed_ops,
326 log.info(
"NOTE: Param builder function did not create any parameters.")
328 log.info(
"Post-iteration operators for updating params")
329 num_shards = 1
if rendezvous
is None else rendezvous[
'num_shards']
331 all_params = set(model_helper_obj.GetParams(
''))
333 _PruneParametersForSharing(model_helper_obj)
335 if param_update_builder_fun
is not None:
336 for device
in devices:
337 device_opt = core.DeviceOption(model_helper_obj._device_type, device)
338 with core.DeviceScope(device_opt):
340 "{}_{}".format(model_helper_obj._device_prefix, device)
342 param_update_builder_fun(model_helper_obj)
344 log.info(
"Calling optimizer builder function")
345 optimizer = optimizer_builder_fun(model_helper_obj)
346 model_helper_obj._optimizer = optimizer
348 (sync_blobs, sync_names) = _ComputeBlobsToSync(model_helper_obj)
349 sync_blobs_grouped = _GroupByDevice(
355 model_helper_obj._device_grouped_blobs.update(sync_blobs_grouped)
357 _InferBlobDevice(model_helper_obj)
358 _AnalyzeOperators(model_helper_obj)
362 arg = model_helper_obj.Proto().arg.add()
363 arg.name =
"first_iter_only_one_worker" 367 log.info(
"Add initial parameter sync")
371 model_helper_obj.param_init_net,
372 model_helper_obj.param_init_net,
375 max_concurrent_distributed_ops=1
380 if post_sync_builder_fun
is not None:
381 for device
in devices:
382 device_opt = core.DeviceOption(model_helper_obj._device_type, device)
383 with core.DeviceScope(device_opt):
385 "{}_{}".format(model_helper_obj._device_prefix, device)
387 post_sync_builder_fun(model_helper_obj)
389 assert not (optimize_gradient_memory
and dynamic_memory_management), \
390 """It is not advised to use gradient optimization ('memonger') 391 with dynamic memory management.""" 393 if optimize_gradient_memory:
394 _OptimizeGradientMemorySimple(model_helper_obj, losses_by_gpu, devices)
396 if dynamic_memory_management:
397 _AddDynamicMemoryOptimization(model_helper_obj, blobs_to_keep, devices)
400 model_helper_obj._data_parallel_model_init_nets = [
401 model_helper_obj.param_init_net,
404 model_helper_obj._data_parallel_model_nets = [
407 _AddBarrierToModelNets(model_helper_obj, barrier_net_timeout_sec)
410 _RemapParameterBlobsForSharedModel(model_helper_obj, all_params)
413 def Parallelize_GPU_BMUF(*args, **kwargs):
414 kwargs[
'cpu_device'] =
False 415 Parallelize_BMUF(*args, **kwargs)
418 def Parallelize_CPU_BMUF(*args, **kwargs):
419 kwargs[
'cpu_device'] =
True 420 Parallelize_BMUF(*args, **kwargs)
423 def Parallelize_BMUF(
426 forward_pass_builder_fun,
427 param_update_builder_fun,
428 block_learning_rate=1.0,
436 optimize_gradient_memory=
False,
437 reset_momentum_sgd=
False,
438 warmup_iterations=
None,
439 max_concurrent_distributed_ops=4,
440 add_blobs_to_sync=
None,
441 num_threads_per_device=4,
443 barrier_net_timeout_sec=_DEFAULT_BARRIER_NET_TIMEOUT_SEC,
446 Function to create model that run on many GPUs and creates a net for 447 parameter_updates that can be run independently for number of iterations 448 then followed by another net that runs once to compute the final parameter 449 updates according to block wise model update filtering rule described 450 in : Scalable Training of Deep Learning Machines by Incremental Block 451 Training with Intra-block Parallel Optimization and Blockwise Model-Update 452 Filtering (ICASSP 2016). 454 assert scope.CurrentDeviceScope()
is None \
455 or scope.CurrentDeviceScope().device_type == caffe2_pb2.CPU, \
456 "Parallelize must be called without device-scope, \ 457 device scope was: {}".format(scope.CurrentDeviceScope())
459 assert isinstance(model_helper_obj, model_helper.ModelHelper)
462 devices = list(range(0, workspace.NumGpuDevices()))
463 if master_device
is None:
464 master_device = devices[0]
468 if gpu >= workspace.NumGpuDevices():
469 log.warning(
"** Only {} GPUs available, GPUs {} requested".format(
470 workspace.NumGpuDevices(), devices))
472 model_helper_obj._device_type = workspace.GpuDeviceType
473 model_helper_obj._device_prefix =
"gpu" 475 model_helper_obj._device_type = caffe2_pb2.CPU
476 model_helper_obj._device_prefix =
"cpu" 478 model_helper_obj._devices = devices
479 model_helper_obj._rendezvous = rendezvous
480 model_helper_obj._sync_barrier_net =
None 481 model_helper_obj._broadcast_context =
None 482 model_helper_obj._shared_model =
False 483 master_dev_opt = core.DeviceOption(model_helper_obj._device_type, master_device)
486 num_shards = rendezvous[
'num_shards']
if rendezvous
else 1
488 num_devices = len(devices) * num_shards
490 num_workers = num_threads_per_device * len(devices)
494 loss_scale = 1.0 / num_devices
495 if block_momentum
is None:
496 block_momentum = 1.0 - 1.0 / num_devices
498 max_concurrent_distributed_ops = min(
499 max_concurrent_distributed_ops,
503 model_helper_obj.net.Proto().num_workers = num_workers
504 model_helper_obj.net.Proto().type = net_type
508 model_helper_obj._global_model_init_net = core.Net(
'global_model_init')
509 model_helper_obj._global_model_init_net.Proto().type = net_type
510 model_helper_obj._global_model_init_net.Proto().num_workers = \
515 model_helper_obj._global_model_param_updates_net = core.Net(
'global_model')
516 model_helper_obj._global_model_param_updates_net.Proto().type = net_type
517 model_helper_obj._global_model_param_updates_net.Proto().num_workers = \
521 return "{}_v".format(param)
524 return "{}_g".format(param)
527 return "{}_prev".format(param)
531 non_datapar_params = copy.copy(model_helper_obj.params)
532 model_helper_obj._losses_by_gpu = {}
534 def _InitializeModels(gpu_id):
535 input_builder_fun(model_helper_obj)
536 loss = forward_pass_builder_fun(model_helper_obj, loss_scale)
537 model_helper_obj._losses_by_gpu[gpu_id] = loss
541 device_type=model_helper_obj._device_type,
542 device_prefix=model_helper_obj._device_prefix,
545 _ValidateParams(model_helper_obj.params)
547 model_helper_obj._device_grouped_blobs =\
548 _GroupByDevice(model_helper_obj, devices,
549 model_helper_obj.params, non_datapar_params)
551 model_helper_obj._param_names =\
552 list(viewkeys(model_helper_obj._device_grouped_blobs))
554 _AddGradientOperators(
555 devices, model_helper_obj, model_helper_obj._losses_by_gpu
557 _ValidateParams(model_helper_obj.params)
559 _InferBlobDevice(model_helper_obj)
561 def _InitializeParamUpdate(gpu_id):
562 param_update_builder_fun(model_helper_obj)
565 _InitializeParamUpdate,
566 device_type=model_helper_obj._device_type,
567 device_prefix=model_helper_obj._device_prefix,
571 model_parameter_names = list(
572 viewkeys(model_helper_obj._device_grouped_blobs)
574 if warmup_iterations
is not None:
575 model_helper_obj._warmup_iterations = warmup_iterations
578 model_helper_obj._warmup_broadcast = core.Net(
'warmup-broadcast')
579 model_helper_obj._warmup_broadcast.Proto().type = net_type
580 model_helper_obj._warmup_broadcast.Proto().num_workers = \
586 model_helper_obj.param_init_net,
587 model_helper_obj._warmup_broadcast,
589 model_parameter_names,
590 max_concurrent_distributed_ops
592 for param_name
in viewkeys(model_helper_obj._device_grouped_blobs):
593 param = model_helper_obj._device_grouped_blobs[param_name][master_device]
594 with core.DeviceScope(master_dev_opt):
595 model_helper_obj._warmup_broadcast.Copy(param, _g(param))
598 for param_name
in viewkeys(model_helper_obj._device_grouped_blobs):
599 param = model_helper_obj._device_grouped_blobs[param_name][master_device]
600 with core.DeviceScope(master_dev_opt):
601 model_helper_obj._global_model_init_net.ConstantFill(
602 param, _v(param), value=0.0
604 model_helper_obj._global_model_init_net.Copy(param, _g(param))
606 model_helper_obj._global_model_init_net.ConstantFill(
607 param, _v_prev(param), value=0.0
615 model_parameter_names,
618 model_helper_obj._global_model_param_updates_net,
621 max_concurrent_distributed_ops
633 for param_name
in model_parameter_names:
634 param = model_helper_obj._device_grouped_blobs[param_name][master_device]
635 with core.DeviceScope(master_dev_opt):
637 model_helper_obj._global_model_param_updates_net.Scale(
638 param, param, scale=1.0 / num_devices
640 model_helper_obj._global_model_param_updates_net.Sub(
641 [param, _g(param)], param
643 model_helper_obj._global_model_param_updates_net.Scale(
644 param, param, scale=block_learning_rate
646 model_helper_obj._global_model_param_updates_net.Scale(
647 _v(param), _v(param), scale=block_momentum
649 model_helper_obj._global_model_param_updates_net.Add(
650 [_v(param), param], _v(param)
652 model_helper_obj._global_model_param_updates_net.Add(
653 [_g(param), _v(param)], _g(param)
656 model_helper_obj._global_model_param_updates_net.Sub(
657 [_v(param), _v_prev(param)], _v_prev(param)
659 model_helper_obj._global_model_param_updates_net.Scale(
660 _v_prev(param), _v_prev(param), scale=block_momentum
662 model_helper_obj._global_model_param_updates_net.Sub(
663 [_g(param), _v_prev(param)], _g(param)
665 model_helper_obj._global_model_param_updates_net.Copy(
666 _v(param), _v_prev(param)
668 model_helper_obj._global_model_param_updates_net.Copy(
676 model_helper_obj.param_init_net,
677 model_helper_obj._global_model_param_updates_net,
679 model_parameter_names,
680 max_concurrent_distributed_ops
684 if add_blobs_to_sync
is not None:
688 net=model_helper_obj._global_model_param_updates_net)
691 if reset_momentum_sgd:
692 momentum_ops = [op
for op
in model_helper_obj.net.Proto().op
693 if op.type ==
'MomentumSGDUpdate']
694 for op
in momentum_ops:
695 momentum_blob = op.input[1]
696 with core.DeviceScope(op.device_option):
697 model_helper_obj._global_model_param_updates_net.ConstantFill(
698 [momentum_blob], momentum_blob, value=0.0
701 if optimize_gradient_memory:
702 _OptimizeGradientMemorySimple(
703 model_helper_obj, model_helper_obj._losses_by_gpu, devices
706 model_helper_obj._data_parallel_model_init_nets = [
707 model_helper_obj.param_init_net,
708 model_helper_obj._global_model_init_net
711 model_helper_obj._data_parallel_model_nets = [
712 model_helper_obj.net,
713 (model_helper_obj._global_model_param_updates_net, 1)
715 _AddBarrierToModelNets(model_helper_obj, barrier_net_timeout_sec)
717 def CreateNet(model, overwrite=False):
718 for net_iters
in model._data_parallel_model_nets:
719 if isinstance(net_iters, tuple):
720 workspace.CreateNet(net_iters[0], overwrite=overwrite)
722 workspace.CreateNet(net_iters, overwrite=overwrite)
725 def RunInitNet(model):
726 for init_net
in model._data_parallel_model_init_nets:
727 workspace.RunNetOnce(init_net)
731 def RunWarmup(model):
732 workspace.RunNet(model.net, model._warmup_iterations)
733 workspace.RunNetOnce(model._warmup_broadcast)
736 def RunNet(model, num_iterations):
737 for net_iter
in model._data_parallel_model_nets:
738 if isinstance(net_iter, tuple):
739 workspace.RunNet(net_iter[0].Proto().name, net_iter[1])
741 workspace.RunNet(net_iter, num_iterations)
744 def _AddBarrierToModelNets(model, barrier_net_timeout_sec):
745 if model._rendezvous
is not None and model._rendezvous[
'engine'] ==
'GLOO':
754 model._barrier_init_net = core.Net(
"barrier_init_net")
756 model._barrier_net = _CreateBarrierNet(model, model._barrier_init_net,
757 "pre_training", barrier_net_timeout_sec)
759 model._data_parallel_model_init_nets.insert(0, model._barrier_init_net)
761 model._data_parallel_model_nets.insert(0, model._barrier_net)
764 def _CreateBarrierNet(model, init_net, name_prefix, timeout_sec):
765 log.info(
"Creating barrier net")
766 assert model._rendezvous[
'engine'] ==
'GLOO',
"Engine does not support barrier" 767 comm_world = _CreateOrCloneCommonWorld(
769 name_prefix +
"_barrier_cw",
770 rendezvous=model._rendezvous,
771 timeout_sec=timeout_sec,
773 barrier_net = core.Net(name_prefix +
"_barrier_net")
777 engine=model._rendezvous[
'engine'],
783 def Synchronize(model, timeout_sec=_DEFAULT_BARRIER_NET_TIMEOUT_SEC):
784 warnings.warn(
"The Synchronize API has been deprecated. We now have a " 785 "barrier net which runs before training to ensure all hosts wait " 786 "before training starts. The default timeout for the barrier is " 787 "300s and it can be overridden using the barrier_net_timeout_sec " 788 "parameter when calling Parallelize.",
789 category=DeprecationWarning, stacklevel=2)
790 if model._rendezvous
is None or model._rendezvous[
'num_shards'] <= 1:
794 if model._sync_barrier_net
is None:
795 barrier_init_net = core.Net(
"sync_barrier_init_net")
796 model._sync_barrier_net = _CreateBarrierNet(
797 model, barrier_init_net,
"sync", timeout_sec)
798 workspace.RunNetOnce(barrier_init_net)
799 workspace.CreateNet(model._sync_barrier_net)
800 model._sync_barrier_net_timeout = timeout_sec
801 assert model._sync_barrier_net_timeout == timeout_sec, \
802 "Must use fixed timeout, {} != {}".format(
803 model._sync_barrier_net_timeout, timeout_sec
805 log.info(
"Synchronize run barrier net.")
806 workspace.RunNet(model._sync_barrier_net)
809 def ConvertNetForDevice(net, device=None):
811 Converts all blobs in the net to have namescope gpu_X, and correct 812 device scope. You can use this to enable AppendNet with a 813 forward_pass_builder_fun: 815 def builder_fun(model): 818 data_parallel_model.ConvertNetForDevice(othermodel.net)) 819 model.param_init_net.AppendNet( 820 data_parallel_model.ConvertNetForDevice(othermodel.param_init_net)) 822 mnet = copy.deepcopy(net)
825 device = scope.CurrentDeviceScope()
826 if core.IsGPUDeviceType(device.device_type):
827 device_prefix =
"gpu" 828 elif device.device_type == caffe2_pb2.IDEEP:
829 device_prefix =
"ideep" 831 device_prefix =
"cpu" 833 namescope =
"{}_{}/".format(device_prefix, device.device_id)
834 for op
in mnet.Proto().op:
835 if "RecurrentNetwork" in op.type:
836 raise(
"RecurrentNetwork conversion not yet supported")
837 for i, inputb
in enumerate(op.input):
838 op.input[i] = namescope + inputb
839 for i, outputb
in enumerate(op.output):
840 op.output[i] = namescope + outputb
841 for i, blob
in enumerate(op.control_input):
842 op.control_input[i] = namescope + blob
843 op.device_option.CopyFrom(device)
844 for i, einp
in enumerate(mnet.Proto().external_input):
845 mnet.Proto().external_input[i] = namescope + einp
846 for i, eoutp
in enumerate(mnet.Proto().external_output):
847 mnet.Proto().external_output[i] = namescope + eoutp
851 def _ForEachDevice(devices, f, device_type, device_prefix, scoped=False,
853 for device
in devices:
854 device_opt = core.DeviceOption(device_type, device)
855 with core.DeviceScope(device_opt):
857 with core.NameScope(
"{}_{}".format(device_prefix, device)):
858 f(device, *args, **kwargs)
860 f(device, *args, **kwargs)
863 def _AddGradientOperators(devices, model, losses_by_gpu):
864 def create_grad(lossp):
865 return model.ConstantFill(lossp, str(lossp) +
"_grad", value=1.0)
869 for gpu_id
in devices:
870 device = core.DeviceOption(model._device_type, gpu_id)
871 with core.DeviceScope(device):
872 for l
in losses_by_gpu[gpu_id]:
874 loss_grad[str(l)] = str(lg)
876 model.AddGradientOperators(loss_grad)
879 def ExtractPredictorNet(model, inputs, outputs, device):
881 Returns (net, params) that can be exported to be used as a prediction 884 master_device = model._devices[0]
885 prefix =
"{}_{}/".format(model._device_prefix, master_device)
886 prefix_inputs = [prefix + str(b)
for b
in inputs]
887 prefix_outputs = [prefix + str(b)
for b
in outputs]
888 (predictor_net, export_blobs) = model_helper.ExtractPredictorNet(
889 net_proto=model.net.Proto(),
890 input_blobs=prefix_inputs,
891 output_blobs=prefix_outputs,
895 for (a, b)
in zip(prefix_inputs + prefix_outputs, inputs + outputs)
899 return (predictor_net, export_blobs)
902 def GetCheckpointParams(model):
904 Returns a set of blobs that are needed for a complete check point. 905 They are blobs for the first gpu and iteration blobs. 907 (all_blobs, _) = _ComputeBlobsToSync(model)
912 .startswith(
"{}_{}/".format(model._device_prefix, model._devices[0]))
917 iteration_blobs = set()
918 for op
in model.net.Proto().op:
919 if op.type ==
'Iter' or op.type ==
'AtomicIter':
920 if not op.output[0].startswith(
"{}_".format(model._device_prefix)):
921 iteration_blobs.add(op.output[0])
923 return first_gpu_blobs.union(iteration_blobs)
926 def FinalizeAfterCheckpoint(model, blobs=None, cpu_mode=False):
928 This function should be called after loading parameters from a 929 checkpoint / initial parameters file. 932 if not hasattr(model,
"_checkpoint_net"):
934 (_, uniq_blob_names) = _ComputeBlobsToSync(model)
936 uniq_blob_names = [stripBlobName(p)
for p
in blobs]
940 log.info(
"Creating checkpoint synchronization net")
941 devices = model.GetDevices()
942 for name
in uniq_blob_names:
943 if name
not in model._device_grouped_blobs:
946 core.BlobReference(
"{}_{}{}{}".format(
947 model._device_prefix,
949 scope._NAMESCOPE_SEPARATOR,
952 model._device_grouped_blobs[name] = grouped
954 model._checkpoint_net = core.Net(
"checkpoint_sync_net")
956 model._checkpoint_net.RunAllOnGPU()
958 checkpoint_init_net =
None 959 if (model._rendezvous
is not None and model._rendezvous[
'num_shards'] > 1):
960 checkpoint_init_net = core.Net(
"checkpoint_init_net")
962 checkpoint_init_net.RunAllOnGPU()
968 model._checkpoint_net,
971 max_concurrent_distributed_ops=1
973 if (checkpoint_init_net):
974 workspace.RunNetOnce(checkpoint_init_net)
976 workspace.CreateNet(model._checkpoint_net)
979 log.info(
"Run checkpoint net")
980 workspace.RunNet(model._checkpoint_net.Proto().name)
983 def GetLearningRateBlobNames(model):
985 Returns a list of learning rates blob names used in the optimizer. 987 if model._optimizer
is not None:
988 if model._device_type == caffe2_pb2.CPU
or model._device_type == caffe2_pb2.IDEEP:
989 return [model._optimizer.get_cpu_blob_name(
'lr')]
990 elif core.IsGPUDeviceType(model._device_type):
991 return [model._optimizer.get_gpu_blob_name(
'lr', gpu,
'')
992 for gpu
in model._devices]
995 "Unsupported device type : {}".format(model._device_type)
999 for op
in model.net.Proto().op:
1000 if op.type ==
"LearningRate":
1001 lr_blob_names.append(op.output(0))
1002 return lr_blob_names
1005 def _Broadcast(devices, model, net, param, use_nccl=False):
1007 master_dev = devices[0]
1010 if _IsGPUBlob(model, param):
1011 master_device_opt = core.DeviceOption(model._device_type, master_dev)
1012 with core.DeviceScope(master_device_opt):
1017 list(viewvalues(model._device_grouped_blobs[param])),
1018 list(viewvalues(model._device_grouped_blobs[param])),
1023 for dev_idx
in devices[1:]:
1024 if _IsGPUBlob(model, param):
1025 device_opt = core.DeviceOption(workspace.GpuDeviceType, dev_idx)
1027 device_opt = core.DeviceOption(caffe2_pb2.IDEEP, 0)
if _IsIDEEPBlob(model, param)
else \
1028 core.DeviceOption(caffe2_pb2.CPU, 0)
1029 with core.DeviceScope(device_opt):
1031 model._device_grouped_blobs[param][master_dev],
1032 model._device_grouped_blobs[param][dev_idx]
1036 def _AllReduce(devices, model, net, param, use_nccl=False, control_input=None):
1037 blobs_group = list(viewvalues(model._device_grouped_blobs[param]))
1038 if model._device_type == caffe2_pb2.CUDA
and use_nccl:
1040 model.NCCLAllreduce(
1041 blobs_group, blobs_group, control_input=control_input
1045 if model._device_type == workspace.GpuDeviceType:
1046 p2p_access_pattern = workspace.GetGpuPeerAccessPattern()
1048 p2p_access_pattern =
None 1050 def sumN(*dev_indices):
1051 """Create a Sum op for 2 or more blobs on different devices. 1052 Saves the result on the first device. 1055 dev_indices -- a list of device indices, which can be translated into 1056 CUDA identifiers with model._devices 1058 devices = [model._devices[idx]
for idx
in dev_indices]
1059 blobs = [blobs_group[idx]
for idx
in dev_indices]
1060 device_opt = core.DeviceOption(model._device_type, devices[0])
1061 with core.DeviceScope(device_opt):
1062 for i, peer
in enumerate(devices):
1065 if p2p_access_pattern
is not None and p2p_access_pattern.size
and not p2p_access_pattern[
1069 blobs[i] = model.Copy(
1071 'gpu_{}/{}_gpu{}_copy'.format(devices[0], param, peer)
1073 net.Sum(blobs, [blobs[0]], name=
'dpm')
1075 if len(devices) == 16:
1078 sumN(j * 2, j * 2 + 1)
1080 sumN(j * 4, j * 4 + 2)
1082 sumN(j * 8, j * 8 + 4)
1084 elif len(devices) == 8:
1086 sumN(j * 2, j * 2 + 1)
1088 sumN(j * 4, j * 4 + 2)
1090 elif len(devices) == 4:
1095 sumN(*range(len(devices)))
1097 _Broadcast(devices, model, net, param)
1107 max_concurrent_distributed_ops=4
1109 if rendezvous
is None or rendezvous[
'num_shards'] <= 1:
1110 _SyncAllParamsSingleHost(devices, model, net, unique_param_names)
1112 _SyncAllParamsDistributed(
1119 max_concurrent_distributed_ops
1123 def AddBlobSync(model, blobs, net=None):
1125 Sync a blob across devices and hosts 1129 net = model.net
if net
is None else net
1131 assert not b.startswith(model._device_prefix), \
1132 "Provide unprefixed blob name: {}".format(b)
1133 model._device_grouped_blobs[b] = {
1134 d: core.BlobReference(
"{}_{}/{}".format(model._device_prefix, d, b))
1135 for d
in model._devices
1141 model.param_init_net,
1147 def AddDistributedBlobSync(model, blobs):
1149 Sync blobs across machines (but not across devices) 1151 if model._rendezvous
is None:
1153 synth_name =
"_".join([str(b)
for b
in blobs])
1154 comm_world = _CreateOrCloneCommonWorld(
1155 model.param_init_net,
1156 "blob_sync_cw_" + synth_name,
1157 rendezvous=model._rendezvous,
1160 model.net.Allreduce(
1161 inputs=[comm_world] + blobs,
1163 engine=model._rendezvous[
'engine'],
1167 def _SyncAllParamsDistributed(
1174 max_concurrent_distributed_ops
1176 assert rendezvous[
'num_shards'] > 1
1178 gpu_device_opt = core.DeviceOption(model._device_type, devices[0])
1179 cpu_device_opt = core.DeviceOption(caffe2_pb2.CPU)
1180 ideep_device_opt = core.DeviceOption(caffe2_pb2.IDEEP)
1182 if model._broadcast_context
is None:
1183 model._broadcast_context = CollectivesConcurrencyControl(
1185 max_concurrent_distributed_ops,
1189 context = model._broadcast_context
1191 for param_name
in sorted(unique_param_names):
1192 master_param = model._device_grouped_blobs[param_name][devices[0]]
1193 params_group = list(viewvalues(model._device_grouped_blobs[param_name]))
1195 def broadcast(params):
1196 comm_world, control_input = context.get_control_and_context(params)
1198 inputs=[comm_world] + params,
1201 engine=rendezvous[
'engine'],
1202 control_input=control_input
1205 device_opt = gpu_device_opt
if _IsGPUBlob(
1207 )
else ideep_device_opt
if _IsIDEEPBlob(model, param_name)
else cpu_device_opt
1209 if rendezvous[
'engine'] ==
'GLOO':
1210 with core.DeviceScope(device_opt):
1211 broadcast(params_group)
1214 with core.DeviceScope(device_opt):
1215 param_cpu = net.CopyGPUToCPU(
1217 str(master_param) +
"cpu" 1219 with core.DeviceScope(cpu_device_opt):
1220 broadcast([param_cpu])
1221 with core.DeviceScope(device_opt):
1222 net.CopyCPUToGPU(param_cpu, master_param)
1225 _Broadcast(devices, model, net, param_name)
1228 def _SyncAllParamsSingleHost(devices, model, net, unique_param_names):
1229 for param
in unique_param_names:
1230 _Broadcast(devices, model, net, param)
1233 def _AllReduceBlobs(blob_names, devices, model, net, rendezvous, use_nccl,
1234 max_concurrent_distributed_ops):
1235 if rendezvous
is None or rendezvous[
'num_shards'] <= 1:
1236 _AllReduceBlobsSingleHost(
1244 _AllReduceBlobsDistributed(
1250 max_concurrent_distributed_ops,
1254 def _PruneParametersForSharing(model):
1255 assert model._shared_model
1256 master_prefix =
"{}_{}/".format(model._device_prefix, model._devices[0])
1260 model.params = model.GetParams(master_prefix)
1261 paramset = set(model.params)
1263 model.param_to_grad = {
1264 p: model.param_to_grad[p]
1265 for p
in model.param_to_grad
if p
in paramset
1267 model.weights = [w
for w
in model.weights
if w
in paramset]
1268 model.biases = [w
for w
in model.biases
if w
in paramset]
1271 def _RemapParameterBlobsForSharedModel(model, all_params):
1272 assert model._shared_model
1273 master_prefix =
"{}_{}/".format(
1274 model._device_prefix, model._devices[0])
1275 log.info(
"Remapping param blobs to master -> {}".format(master_prefix))
1276 master_params = set(model.GetParams())
1279 def modify_ops(net):
1281 for op
in net.Proto().op:
1284 for outp
in op.output:
1285 if outp
in all_params
and outp
not in master_params:
1287 log.debug(
"Delete b/c {}: {}".format(outp, str(op)))
1292 for j, inp
in enumerate(op.input):
1293 if inp
in all_params
and inp
not in master_params:
1294 op.input[j] = master_prefix + stripBlobName(inp)
1296 del net.Proto().op[:]
1297 net.Proto().op.extend(ops)
1299 modify_ops(model.param_init_net)
1300 modify_ops(model.net)
1305 Creates common worlds (up to max_concurrent_context) and manage the 1306 sequential execution of collectives that shares the same context with 1307 cyclic control inputs. 1312 max_concurrent_context,
1324 def get_control_and_context(self, control_output_blob):
1325 common_world, control_input = [
None,
None]
1328 common_world = _CreateOrCloneCommonWorld(
1330 "{}_{}_cw".format(self.
name, current_slot),
1333 self.common_worlds.append(common_world)
1334 self.control_inputs.append(control_output_blob)
1340 return common_world, control_input
1343 def _AllReduceBlobsDistributed(
1349 max_concurrent_distributed_ops,
1351 num_workers = model.net.Proto().num_workers
1352 assert num_workers > 1,
"Please specify more than 1 worker" 1353 all_reduce_engine = rendezvous[
'engine']
1355 master_device_opt = core.DeviceOption(model._device_type, devices[0])
1357 reducing_device_opt = master_device_opt
1361 max_concurrent_distributed_ops,
1362 model.param_init_net,
1366 nccl_control_blob =
None 1368 for blob_name
in blob_names:
1369 master_blob = model._device_grouped_blobs[blob_name][devices[0]]
1370 blobs_group = list(viewvalues(model._device_grouped_blobs[blob_name]))
1372 assert master_blob
in blobs_group
1376 reduced_blob = str(master_blob) +
"_red" 1378 def allreduce(blobs, **kwargs):
1379 with core.DeviceScope(reducing_device_opt):
1380 comm_world, control_input = \
1381 context.get_control_and_context(blobs[0])
1383 inputs=[comm_world] + blobs,
1386 engine=all_reduce_engine,
1387 control_input=control_input,
1391 if rendezvous[
'engine'] ==
'GLOO':
1397 gpu_direct=(rendezvous.get(
"transport",
None) ==
"ibverbs"),
1401 with core.DeviceScope(master_device_opt):
1402 model.ConstantFill(master_blob, reduced_blob, value=0.0)
1408 control_input=nccl_control_blob,
1410 nccl_control_blob = blobs_group[0]
1411 net.Copy(master_blob, reduced_blob)
1414 allreduce([reduced_blob])
1416 with core.DeviceScope(master_device_opt):
1417 net.Copy(reduced_blob, master_blob)
1420 _Broadcast(devices, model, net, blob_name)
1423 def _AllReduceBlobsSingleHost(blob_names, devices, model, net, use_nccl):
1424 """Performs NCCL AllReduce to distribute blobs to all the GPUs.""" 1426 if len(devices) == 1:
1431 master_device_opt = core.DeviceOption(model._device_type, devices[0])
1433 concatenated_idx = set()
1435 for blob_name
in blob_names:
1437 blobs_group = list(viewvalues(model._device_grouped_blobs[blob_name]))
1438 if len(blobs_group) == 1:
1441 assert len(blobs_group) == len(devices), \
1442 "Each GPU from {}, should have a copy of {}.".format(
1445 if _IsGPUBlob(model, blob_name):
1446 with core.DeviceScope(master_device_opt):
1447 if not isinstance(blobs_group[0], core.GradientSlice):
1449 devices, model, net, blob_name, use_nccl, last_out
1452 last_out = blobs_group[0]
1456 master_ns =
"{}_{}".format(model._device_prefix, devices[0])
1458 Skip if we have already copied concatenated indices 1459 to the indices of GradientSlice. This happens when two 1460 or more grad blobs are gathered with the same indices 1463 skip_idx_concat =
False 1464 for g
in blobs_group:
1465 if g.indices
in concatenated_idx:
1466 skip_idx_concat =
True 1468 if not skip_idx_concat:
1469 grad_idx_concat, _ = net.Concat(
1470 [g.indices
for g
in blobs_group],
1471 [
"{}/{}_index_concat".format(master_ns, blob_name),
1472 "{}/{}_index_splitinfo".format(master_ns, blob_name)],
1474 name=
"note:data_parallel_model")
1476 for gpu, g
in viewitems(model._device_grouped_blobs[blob_name]):
1477 device_opt = core.DeviceOption(model._device_type, gpu)
1478 with core.DeviceScope(device_opt):
1479 model.Copy(grad_idx_concat, g.indices)
1480 concatenated_idx.add(g.indices)
1482 grad_val_concat, _ = net.Concat(
1483 [g.values
for g
in blobs_group],
1484 [
"{}/{}_val_concat".format(master_ns, blob_name),
1485 "{}/{}_val_splitinfo".format(master_ns, blob_name)],
1486 axis=0, name=
"note:data_parallel_model")
1488 for gpu, g
in viewitems(model._device_grouped_blobs[blob_name]):
1489 device_opt = core.DeviceOption(model._device_type, gpu)
1490 with core.DeviceScope(device_opt):
1491 model.Copy(grad_val_concat, g.values)
1493 elif _IsIDEEPBlob(model, blob_name):
1494 assert not isinstance(blobs_group[0], core.GradientSlice), \
1495 "Synchronizing gradient slices not supported" 1496 with core.DeviceScope(core.DeviceOption(caffe2_pb2.IDEEP)):
1497 net.Sum(blobs_group, [blobs_group[0]])
1498 if not model._shared_model:
1499 _Broadcast(devices, model, net, blob_name)
1502 assert not isinstance(blobs_group[0], core.GradientSlice), \
1503 "Synchronizing gradient slices not supported" 1504 with core.DeviceScope(core.DeviceOption(caffe2_pb2.CPU)):
1506 net.Sum(blobs_group, [blobs_group[0]])
1507 if not model._shared_model:
1508 _Broadcast(devices, model, net, blob_name)
1511 def _BroadcastComputedParams(devices, model, rendezvous, use_nccl=False):
1512 if rendezvous
is None:
1513 _BroadcastComputedParamsSingleHost(devices, model, use_nccl)
1515 _BroadcastComputedParamsDistributed(devices, model, rendezvous, use_nccl)
1518 def _BroadcastComputedParamsDistributed(
1524 _BroadcastComputedParamsSingleHost(devices, model, use_nccl)
1525 log.warn(
"Distributed broadcast of computed params is not implemented yet")
1528 def _BroadcastComputedParamsSingleHost(devices, model, use_nccl=False):
1530 Average computed params over all devices 1532 if len(devices) == 1:
1535 for param_name
in model._computed_param_names:
1538 _Broadcast(devices, model, model.net, param_name, use_nccl)
1541 def _GetReverseOrderedGrads(model):
1543 Returns the gradients in reverse order (namespace stripped), 1544 for the optimal synchronization order. 1546 return list(reversed(model._grad_names))
1550 def stripBlobName(param):
1552 if isinstance(param, core.GradientSlice):
1553 return stripBlobName(param.indices) +
":" + stripBlobName(param.values)
1556 return name[name.index(scope._NAMESCOPE_SEPARATOR) + 1:]
1559 def _AnalyzeOperators(model):
1561 Look at all the operators and check that they do not cross device scopes 1563 for op
in model.Proto().op:
1564 if "NCCL" in op.type
or "Copy" in op.type
or "Concat" in op.type:
1566 if "Sum" == op.type
and op.name ==
"dpm":
1568 if "Allreduce" in op.type
and "GLOO" in op.engine:
1571 op_dev = op.device_option
1572 op_gpu = op_dev.device_id
1575 if not core.IsGPUDeviceType(op_dev.device_type):
1578 namescope =
"{}_{}/".format(model._device_prefix, op_gpu)
1579 for inp
in list(op.input) + list(op.output):
1580 if inp.startswith(
"{}_".format(model._device_prefix)
1581 )
and not inp.startswith(namescope):
1583 "Blob {} of op {}, should have namescope {}. Op: {}".format(
1586 "{}_{}/".format(model._device_prefix, op_gpu),
1592 def _InferBlobDevice(model):
1594 Assign blob to device option based on the operator outputing it 1600 device_option = op.device_option
1601 if op.type ==
"Iter":
1603 device_option = caffe2_pb2.DeviceOption()
1604 device_option.device_type = caffe2_pb2.CPU
1605 for b
in list(op.input) + list(op.output):
1606 if b
not in mapping:
1607 mapping[b] = device_option
1608 if op.type.startswith(
'RecurrentNetwork'):
1609 step_args = [a
for a
in op.arg
if a.name.endswith(
"step_net")]
1610 for step_arg
in step_args:
1612 map_ops(model.param_init_net.Proto())
1613 map_ops(model.net.Proto())
1614 model._blob_to_device = mapping
1616 def _IsIDEEPBlob(model, blob_name):
1617 if blob_name
in model._blob_to_device:
1618 return model._blob_to_device[blob_name].device_type == caffe2_pb2.IDEEP
1620 blob_name =
"{}_{}/{}".format(
1621 model._device_prefix, model._devices[0], blob_name
1623 if blob_name
not in model._blob_to_device:
1624 return model._device_type == caffe2_pb2.IDEEP
1625 return model._blob_to_device[blob_name].device_type == caffe2_pb2.IDEEP
1627 def _IsGPUBlob(model, blob_name):
1628 if blob_name
in model._blob_to_device:
1629 return core.IsGPUDeviceType(model._blob_to_device[blob_name].device_type)
1631 blob_name =
"{}_{}/{}".format(
1632 model._device_prefix, model._devices[0], blob_name
1634 if blob_name
not in model._blob_to_device:
1635 return core.IsGPUDeviceType(model._device_type)
1636 return core.IsGPUDeviceType(model._blob_to_device[blob_name].device_type)
1639 def _GroupByDevice(model, devices, params, non_data_params):
1641 Groups blobs by device, returning a map of [blobname] = {0: BlobRef, 1: ..}. 1642 Returns ordered dictionary, ensuring the original order. 1644 grouped = OrderedDict()
1646 params = params[len(non_data_params):]
1648 for _i, p
in enumerate(params):
1650 isinstance(p, core.GradientSlice), \
1651 "Param {} is not BlobReference or GradientSlice".format(p)
1653 name = stripBlobName(p)
1657 gpuid = int(p.GetNameScope().
split(
"_")[1].
split(
"/")[0])
1658 assert "{}_{}/".format(model._device_prefix, gpuid)
in p.GetNameScope(),\
1659 "Param {} expected to have namescope '{}_{}'".format(str(p), model._device_prefix, gpuid)
1661 gpuid = int(p.indices.GetNameScope().
split(
"_")[1].
split(
"/")[0])
1662 assert "{}_{}/".format(model._device_prefix, gpuid)
in p.indices.GetNameScope(),\
1663 "Indices {} expected to have namescope '{}_{}'".format(str(p), model._device_prefix, gpuid)
1664 assert "{}_{}/".format(model._device_prefix, gpuid)
in p.values.GetNameScope(),\
1665 "Values {} expected to have namescope '{}_{}'".format(str(p), model._device_prefix, gpuid)
1667 if name
not in grouped:
1669 grouped[name][gpuid] = p
1674 def _ValidateParams(params):
1675 set_params = set(params)
1676 if len(params) > len(set_params):
1679 for j, p
in enumerate(sp):
1680 if j > 0
and sp[j - 1] == p:
1683 assert len(params) == len(set_params), \
1684 "Duplicate entries in params: {}".format(dupes)
1687 def _ComputeBlobsToSync(model):
1689 We sync all blobs that are generated by param init net and 1690 are 'data parallel', i.e assigned to a device 1695 if model._shared_model:
1696 blobs_to_sync = [str(p)
for p
in model.GetComputedParams(
'')]
1697 sync_names = [stripBlobName(p)
for p
in blobs_to_sync]
1701 for op
in model.param_init_net.Proto().op:
1703 o
for o
in op.output
1704 if o.startswith(
"{}_".format(model._device_prefix))
1706 sync_names.update([stripBlobName(o)
for o
in dp_outputs])
1707 blobs_to_sync.extend(dp_outputs)
1710 diff = set(model._param_names) - sync_names
1711 assert diff == set(), \
1712 "Some params not instantiated in param init net: {}".format(diff)
1715 prefixlen = len(model._device_prefix) + 1
1717 def extract_sort_key(b):
1719 deviceid = int(b[prefixlen:b.index(scope._NAMESCOPE_SEPARATOR)])
1720 return (deviceid, b)
1722 blobs_to_sync = sorted(
1723 list(set(blobs_to_sync)),
1724 key=extract_sort_key)
1727 return (blobs_to_sync, sync_names)
1730 def _OptimizeGradientMemorySimple(model, losses_by_gpu, devices):
1731 log.warning(
"------- DEPRECATED API, please use " +
1732 "data_parallel_model.OptimizeGradientMemory() ----- ")
1733 for device
in devices:
1734 namescope =
"{}_{}/".format(model._device_prefix, device)
1735 model.net._net = memonger.share_grad_blobs(
1737 losses_by_gpu[device],
1738 set(viewvalues(model.param_to_grad)),
1740 share_activations=
False,
1744 def _AddDynamicMemoryOptimization(model, blobs_to_keep, devices):
1745 blobs_to_keep_all_devices = set()
1746 if blobs_to_keep
is not None:
1747 for device
in devices:
1748 for blob_name
in blobs_to_keep:
1749 blobs_to_keep_all_devices.add(
1750 "{}_{}/{}".format(model._device_prefix, device, blob_name)
1753 if model._rendezvous
is not None:
1757 blobs_to_keep_all_devices.update(
1758 [str(b)
for b
in viewvalues(model.param_to_grad)]
1761 model.net._net = memonger.release_blobs_when_used(
1763 blobs_to_keep_all_devices
1767 def OptimizeGradientMemory(model,
1770 recycle_activations):
1772 Optimize memory usage of the backward pass by recycling blobs for gradient 1773 inputs that have been 'used'. 1774 input_shapes: dict of blob name to shape for the inputs of the model. 1775 Pass empty dictionary if not known. 1776 excluded_blobs: list of blobs that cannot be recycled. These are blobs 1777 that you will access externally. 1778 recycle_activations: whether to also recycle forward pass activations 1780 if input_shapes
is not None:
1781 input_shapes_all_devices = {}
1782 for b, shp
in viewitems(input_shapes):
1783 for d
in model._devices:
1784 input_shapes_all_devices[
"{}_{}/{}".
1785 format(model._device_prefix, d, b)] = shp
1787 (shapes, types) = workspace.InferShapesAndTypes(
1788 [model.param_init_net, model.net],
1789 input_shapes_all_devices,
1794 for device
in model._devices:
1795 namescope =
"{}_{}/".format(model._device_prefix, device)
1796 excluded_blobs_by_device = set(namescope + b
for b
in excluded_blobs)
1797 model.net._net = memonger.share_grad_blobs(
1799 model._losses_by_gpu[device],
1800 set(viewvalues(model.param_to_grad)),
1802 dont_share_blobs=excluded_blobs_by_device,
1803 share_activations=recycle_activations,
1808 def _CreateOrCloneCommonWorld(
1815 if timeout_sec
is None:
1816 timeout_sec = _DEFAULT_TIMEOUT_SEC
1818 timeout_ms = timeout_sec * 1000
1824 for op
in net.Proto().op:
1825 if op.type !=
"CreateCommonWorld":
1831 if arg.name ==
'timeout_ms':
1832 op_timeout_ms = arg.i
1834 if op_timeout_ms != timeout_ms:
1839 existing = op.output[0]
1843 name =
"{}_op".format(common_world_blob)
1845 if existing
is not None:
1846 comm_world = net.CloneCommonWorld(
1850 engine=rendezvous[
'engine'],
1854 if 'transport' in rendezvous:
1855 kwargs[
'transport'] = rendezvous[
'transport']
1856 if 'interface' in rendezvous:
1857 kwargs[
'interface'] = rendezvous[
'interface']
1858 if 'mpi_rendezvous' in rendezvous:
1859 kwargs[
'mpi_rendezvous'] = rendezvous[
'mpi_rendezvous']
1860 comm_world = net.CreateCommonWorld(
1861 rendezvous[
'kv_handler']
or [],
1864 size=rendezvous[
'num_shards'],
1865 rank=rendezvous[
'shard_id'],
1866 engine=rendezvous[
'engine'],
1867 timeout_ms=timeout_ms,
1874 def _RunComparison(model, blob_name, device=None):
1876 device = model._blob_to_device[blob_name]
1877 with core.DeviceScope(device):
1878 rendezvous = model._rendezvous
1879 if rendezvous
is None or rendezvous[
'num_shards'] == 1:
1882 test_data_arr = np.zeros(rendezvous[
'num_shards']).astype(np.float32)
1883 test_data_arr[rendezvous[
'shard_id']] = 1
1884 workspace.FeedBlob(
"compare_arr", test_data_arr)
1886 comparison_net =
core.Net(
"allcompare_net")
1889 if 'mpi_rendezvous' in rendezvous:
1890 kwargs[
'mpi_rendezvous'] = rendezvous[
'mpi_rendezvous']
1891 comm_world = comparison_net.CreateCommonWorld(
1892 rendezvous[
'kv_handler']
or [],
1894 name=model.net.Proto().name +
".cw_master_select",
1895 size=rendezvous[
'num_shards'],
1896 rank=rendezvous[
'shard_id'],
1897 engine=rendezvous[
'engine'],
1901 blob_name_checksum = blob_name +
"_checksum" 1902 comparison_net.SumSqrElements(
1903 [blob_name], [blob_name_checksum], average=
False 1906 blob_name_gather = blob_name +
"_gather" 1908 inputs=[
"compare_arr", blob_name_checksum],
1909 outputs=blob_name_gather,
1913 comparison_net.Allreduce(
1914 inputs=[comm_world, blob_name_gather],
1915 outputs=[blob_name_gather],
1916 engine=rendezvous[
'engine'],
1919 workspace.RunNetOnce(comparison_net)
1920 gather_arr = workspace.FetchBlob(blob_name_gather)
1922 baseline = gather_arr[0]
1923 for i
in range(rendezvous[
'num_shards']):
1924 assert gather_arr[i] == baseline, \
1925 "allcompare failed on shard {}.".format(rendezvous[
'shard_id'])
1930 def _InterleaveOps(model):
1932 Data Parallel Model creates a net with ops in one device grouped together. 1933 This will interleave the ops so that each op for each device is next 1934 to each other in the net. Kind of like combining decks of cards. This 1935 ensures that progress is made along the critical path roughly concurrently 1936 for each device, which is important due to the extra intra-node 1937 synchronization required for multi-device batch normalization. 1939 orig_ops = list(model.net.Proto().op)
1940 num_devices = len(model._devices)
1941 num_ops_per_dev = len(orig_ops) // num_devices
1942 assert num_devices * num_ops_per_dev == len(orig_ops), \
1943 'Number of ops per device in original net is not uniform' 1945 ops = {d: []
for d
in range(num_devices)}
1947 ops[op.device_option.device_id].append(op)
1949 for j
in range(num_ops_per_dev):
1951 for d
in model._devices:
1954 new_ops.append(ops[d][j])
1956 assert ops[d][j].type == tp, \
1957 "Type mismatch {} / {}".format(tp, ops[d][j].type)
1959 del model.net.Proto().op[:]
1960 model.net.Proto().op.extend(new_ops)
1963 def _CPUInterDeviceBatchNormalization(model):
1964 orig_ops = list(model.net.Proto().op)
1966 num_devices = len(model._devices)
1970 spatial_bn_phase =
False 1974 input_blob_name =
None 1976 spatial_bn_gradient_phase =
False 1977 scale_grad_blobs = []
1978 bias_grad_blobs = []
1980 def _cpuReduce(param, input_blobs, destination_blobs):
1982 Reduce results from multiple cpus and distributes the results back 1983 to each device. This is done by copying values to cpu_0 and summing 1984 them. The cpu_0 result is then copied back to each of the devices. 1986 param: the name of the data (blobs) to reduce 1987 input_blobs: the list of blobs to reduce 1988 destination_blobs: list of blobs to copy the result to 1991 result_blob =
"cpu_0/" + param +
"_combined" 1992 added_ops.append(core.CreateOperator(
"Sum", input_blobs, result_blob))
1993 for blob
in destination_blobs:
1994 added_ops.append(core.CreateOperator(
"Copy", result_blob, blob))
1998 if op.type !=
'SpatialBN' and op.type !=
'SpatialBNGradient':
1999 if spatial_bn_phase:
2000 new_ops.extend(injected_ops)
2002 core.CreateOperator(
"Sum",
2004 input_blob_name +
"_sums_combined"))
2006 core.CreateOperator(
"Sum",
2008 input_blob_name +
"_sumsq_combined"))
2009 new_ops.extend(batch_norm_ops)
2014 spatial_bn_phase =
False 2015 input_blob_name =
None 2016 elif spatial_bn_gradient_phase:
2017 new_ops.extend(injected_ops)
2018 new_ops.extend(_cpuReduce(
2019 stripBlobName(scale_grad_blobs[0]),
2022 new_ops.extend(_cpuReduce(
2023 stripBlobName(bias_grad_blobs[0]),
2026 new_ops.extend(batch_norm_ops)
2029 scale_grad_blobs = []
2030 bias_grad_blobs = []
2031 spatial_bn_gradient_phase =
False 2033 elif op.type ==
'SpatialBN':
2034 spatial_bn_phase =
True 2035 if input_blob_name
is None:
2036 input_blob_name = op.input[0]
2038 injected_ops.append(
2039 core.CreateOperator(
2042 [name +
"_sums", name +
"_sumsq"]))
2043 sums_blobs.append(name +
"_sums")
2044 sumsq_blobs.append(name +
"_sumsq")
2045 op.input.append(input_blob_name +
"_sums_combined")
2046 op.input.append(input_blob_name +
"_sumsq_combined")
2047 op.arg.extend([utils.MakeArgument(
"num_batches", num_devices)])
2048 batch_norm_ops.append(op)
2049 elif op.type ==
'SpatialBNGradient':
2050 spatial_bn_gradient_phase =
True 2051 injected_ops.append(
2052 core.CreateOperator(
"ChannelBackpropStats",
2053 [op.input[0], op.input[3], op.input[4],
2055 [op.output[1], op.output[2]]))
2056 scale_grad_blobs.append(op.output[1])
2057 bias_grad_blobs.append(op.output[2])
2058 op.arg.extend([utils.MakeArgument(
"num_batches", num_devices)])
2059 op.input.extend([op.output[1], op.output[2]])
2060 batch_norm_ops.append(op)
2062 assert not spatial_bn_phase, \
2063 "Net modification for cpu inter-device batch normalization failed" 2064 del model.net.Proto().op[:]
2065 model.net.Proto().op.extend(new_ops)
2068 def _GPUInterDeviceBatchNormalization(model):
2069 orig_ops = list(model.net.Proto().op)
2071 num_devices = len(model._devices)
2075 spatial_bn_phase =
False 2079 input_blob_name =
None 2081 spatial_bn_gradient_phase =
False 2082 scale_grad_blobs = []
2083 bias_grad_blobs = []
2084 master_device =
"cpu_0" 2085 master_device_option = core.DeviceOption(caffe2_pb2.CPU)
2087 def _gpuReduce(param, num_devices, master_device, result_blobs=None):
2089 Reduces results from multiple gpus and distributes the results back 2090 to each device. This is done by copying values to the master device 2091 and summing them. The master device result is then copied back to 2092 each of the devices. 2094 param: the name of the data (blobs) to reduce 2095 num_devices: the number of devices 2096 master_device: the device to copy/compute values on 2097 result_blobs: optional list of result blobs to copy to 2101 destination_blobs = []
2102 if result_blobs
is None:
2104 "gpu_{}/{}_combined".format(i, param)
for i
in range(num_devices)
2106 for i
in range(num_devices):
2107 device_option = core.DeviceOption(model._device_type, i)
2108 source_blobs.append(
"gpu_{}/{}".format(i, param))
2109 destination_blobs.append(
2110 "{}/{}_gpu_{}_copy".format(master_device, param, i))
2112 core.CreateOperator(
2115 destination_blobs[i],
2116 device_option=device_option))
2118 core.CreateOperator(
2121 "{}/{}_combined".format(master_device, param),
2122 device_option=master_device_option))
2123 for i
in range(num_devices):
2124 device_option = core.DeviceOption(model._device_type, i)
2126 core.CreateOperator(
2128 "{}/{}_combined".format(master_device, param),
2130 device_option=device_option))
2134 if op.type !=
'SpatialBN' and op.type !=
'SpatialBNGradient':
2135 if spatial_bn_phase:
2136 new_ops.extend(injected_ops)
2137 new_ops.extend(_gpuReduce(
2138 stripBlobName(input_blob_name) +
"_sums",
2142 new_ops.extend(_gpuReduce(
2143 stripBlobName(input_blob_name) +
"_sumsq",
2147 new_ops.extend(batch_norm_ops)
2152 spatial_bn_phase =
False 2153 input_blob_name =
None 2154 elif spatial_bn_gradient_phase:
2155 new_ops.extend(injected_ops)
2156 new_ops.extend(_gpuReduce(
2157 stripBlobName(scale_grad_blobs[0]),
2162 new_ops.extend(_gpuReduce(
2163 stripBlobName(bias_grad_blobs[0]),
2168 new_ops.extend(batch_norm_ops)
2171 scale_grad_blobs = []
2172 bias_grad_blobs = []
2173 spatial_bn_gradient_phase =
False 2175 elif op.type ==
'SpatialBN':
2176 spatial_bn_phase =
True 2177 if input_blob_name
is None:
2178 input_blob_name = op.input[0]
2180 device_option = core.DeviceOption(
2182 op.device_option.device_id,
2184 injected_ops.append(
2185 core.CreateOperator(
2188 [name +
"_sums", name +
"_sumsq"],
2189 device_option=device_option))
2190 sums_blobs.append(name +
"_sums")
2191 sumsq_blobs.append(name +
"_sumsq")
2192 op.input.append(name +
"_sums_combined")
2193 op.input.append(name +
"_sumsq_combined")
2194 op.arg.extend([utils.MakeArgument(
"num_batches", num_devices)])
2195 batch_norm_ops.append(op)
2196 elif op.type ==
'SpatialBNGradient':
2197 spatial_bn_gradient_phase =
True 2198 device_option = core.DeviceOption(
2200 op.device_option.device_id,
2202 injected_ops.append(
2203 core.CreateOperator(
"ChannelBackpropStats",
2204 [op.input[0], op.input[3], op.input[4],
2206 [op.output[1], op.output[2]],
2207 device_option=device_option))
2208 scale_grad_blobs.append(op.output[1])
2209 bias_grad_blobs.append(op.output[2])
2210 op.arg.extend([utils.MakeArgument(
"num_batches", num_devices)])
2211 op.input.extend([op.output[1], op.output[2]])
2212 batch_norm_ops.append(op)
2214 assert not spatial_bn_phase, \
2215 "Net modification for gpu inter-device batch normalization failed" 2216 del model.net.Proto().op[:]
2217 model.net.Proto().op.extend(new_ops)
Module caffe2.python.layers.split.