Caffe2 - Python API
A deep learning, cross platform ML framework
data_parallel_model.py
1 ## @package data_parallel_model
2 # Module caffe2.python.data_parallel_model
3 from __future__ import absolute_import
4 from __future__ import division
5 from __future__ import print_function
6 
7 from collections import OrderedDict
8 from future.utils import viewitems, viewkeys, viewvalues
9 import logging
10 import copy
11 
12 from caffe2.python import \
13  model_helper, dyndep, scope, workspace, core, memonger, utils
14 from caffe2.proto import caffe2_pb2
15 
16 import numpy as np
17 
18 dyndep.InitOpsLibrary("@/pytorch/pytorch/contrib/nccl:nccl_ops")
19 dyndep.InitOpsLibrary("@/pytorch/pytorch/contrib/gloo:gloo_ops")
20 dyndep.InitOpsLibrary("@/pytorch/pytorch/contrib/gloo:gloo_ops_gpu")
21 
22 log = logging.getLogger("data_parallel_model")
23 log.setLevel(logging.INFO)
24 
25 _DEFAULT_TIMEOUT_SEC = 30
26 
27 
28 def Parallelize_GPU(*args, **kwargs):
29  kwargs['cpu_device'] = False
30  Parallelize(*args, **kwargs)
31 
32 
33 def Parallelize_CPU(*args, **kwargs):
34  kwargs['cpu_device'] = True
35  Parallelize(*args, **kwargs)
36 
37 
38 def Parallelize(
39  model_helper_obj,
40  input_builder_fun,
41  forward_pass_builder_fun,
42  param_update_builder_fun=None,
43  optimizer_builder_fun=None,
44  post_sync_builder_fun=None,
45  devices=None,
46  rendezvous=None,
47  net_type='dag',
48  broadcast_computed_params=True,
49  optimize_gradient_memory=False,
50  dynamic_memory_management=False,
51  blobs_to_keep=None,
52  use_nccl=False,
53  max_concurrent_distributed_ops=16,
54  cpu_device=False,
55  num_threads_per_device=4,
56  shared_model=False,
57  combine_spatial_bn=False,
58 ):
59  '''
60  Function to create a model that can run on many GPUs or CPUs.
61  model_helper_obj: an object of ModelHelper
62  input_builder_fun:
63  Function that adds the input operators
64  Note: Remember to instantiate reader outside of this
65  function so all devices share same reader object.
66  Signature: input_builder_fun(model)
67  forward_pass_builder_fun:
68  Function to add the operators to the model.
69  Must return list of loss-blob references that
70  are used to build the gradient. Loss scale parameter
71  is passed, as you should scale the loss of your model
72  by 1.0 / the total number of devices.
73  Signature: forward_pass_builder_fun(model, loss_scale)
74  param_update_builder_fun:
75  Function that adds operators that are run after
76  gradient update, such as updating the weights and
77  weight decaying. This is called for each GPU separately.
78  Signature: param_update_builder_fun(model)
79  optimizer_builder_fun:
80  Alternative to param_update_builder_fun, allows one
81  to add an optimizer for the whole model. Called only
82  once, without name or devicescope.
83  post_sync_builder_fun:
84  Function applied after initial parameter sync has been
85  completed, such as keeping multi-precision parameters
86  in sync.
87  Signature: post_sync_builder_fun(model)
88  devices: List of GPU ids, such as [0, 1, 2, 3],
89  rendezvous: used for rendezvous in distributed computation, if None
90  then only one node is used. To create rendezvous,
91  use <TBD>.
92  net_type: Network type
93  optimize_gradient_memory: whether to apply 'memonger' to share blobs
94  shared_model (only for CPU) use same parameters on each device
95  in gradient computation to reduce memory footprint.
96  dynamic_memory_management: Whether to apply dynamic memory optimization
97  by freeing unused blobs. The underlying (de)allocation
98  uses cached allocator. For GPU training PLEASE MAKE SURE
99  caffe2_cuda_memory_pool is set.
100  blobs_to_keep : A list of blob names to keep and don't free during
101  dynamic memory optimization (for example loss blob).
102  cpu_device Use CPU instead of GPU.
103  combine_spatial_bn:
104  When set to True, applies batch normalization across
105  all devices within the node. If False, batch
106  normalization will be done separately for each device.
107  This option is currently only supported on the CPU.
108  '''
109  assert scope.CurrentDeviceScope() is None \
110  or scope.CurrentDeviceScope().device_type == caffe2_pb2.CPU, \
111  "Parallelize must be called without device-scope, \
112  device scope was: {}".format(scope.CurrentDeviceScope())
113 
114  if devices is None:
115  devices = list(range(0, workspace.NumCudaDevices())),
116 
117  if not cpu_device:
118  for gpu in devices:
119  if gpu >= workspace.NumCudaDevices():
120  log.warning("** Only {} GPUs available, GPUs {} requested".format(
121  workspace.NumCudaDevices(), devices))
122  break
123  model_helper_obj._device_type = caffe2_pb2.CUDA
124  model_helper_obj._device_prefix = "gpu"
125  model_helper_obj._shared_model = False
126  device_name = "GPU"
127  assert shared_model is False, "Shared model only supported on CPU"
128  else:
129  model_helper_obj._device_type = caffe2_pb2.CPU
130  model_helper_obj._device_prefix = "cpu"
131  device_name = "CPU"
132  model_helper_obj._shared_model = shared_model
133  if shared_model and rendezvous is not None:
134  assert "Shared model only supported on single-node currently"
135 
136  log.info("Parallelizing model for devices: {}".format(devices))
137  extra_workers = 8 if rendezvous is not None else 0 # best-guess
138  num_workers = len(devices) * num_threads_per_device + extra_workers
139  max_concurrent_distributed_ops =\
140  min(max_concurrent_distributed_ops, num_workers - 1)
141  model_helper_obj.net.Proto().num_workers = num_workers
142  model_helper_obj.net.Proto().type = net_type
143 
144  # Store some information in the model -- a bit ugly
145  model_helper_obj._devices = devices
146  model_helper_obj._rendezvous = rendezvous
147  model_helper_obj._broadcast_context = None
148  model_helper_obj._grad_names = []
149 
150  assert isinstance(model_helper_obj, model_helper.ModelHelper)
151 
152  # Keep track of params that were in the model before: they are not
153  # data parallel, so we need to handle them separately
154  non_datapar_params = copy.copy(model_helper_obj.params)
155 
156  # Add input and model
157  log.info("Create input and model training operators")
158 
159  losses_by_gpu = {}
160  num_shards = 1 if rendezvous is None else rendezvous['num_shards']
161  loss_scale = 1.0 / (len(devices) * num_shards)
162 
163  has_parameter_updates = param_update_builder_fun is not None or \
164  optimizer_builder_fun is not None
165  assert not (
166  param_update_builder_fun is not None and
167  optimizer_builder_fun is not None
168  ), 'Can only specify one of param_update_builder_fun, optimizer_builder_fun'
169 
170  # Check that a model that is used for validation/testing has
171  # init_params False, otherwise running the param init net will overwrite
172  # synchronized values by the training net
173  if not has_parameter_updates and model_helper_obj.init_params:
174  log.warning('')
175  log.warning("############# WARNING #############")
176  log.warning("Model {}/{} is used for testing/validation but".format(
177  model_helper_obj.name, model_helper_obj))
178  log.warning("has init_params=True!")
179  log.warning("This can conflict with model training.")
180  log.warning("Please ensure model = ModelHelper(init_params=False)")
181  log.warning('####################################')
182  log.warning('')
183  # TODO: make into assert
184 
185  for device in devices:
186  device_opt = core.DeviceOption(model_helper_obj._device_type, device)
187  with core.DeviceScope(device_opt):
188  with core.NameScope("{}_{}".format(model_helper_obj._device_prefix,
189  device)):
190  log.info("Model for {} : {}".format(device_name, device))
191  input_builder_fun(model_helper_obj)
192  losses = forward_pass_builder_fun(model_helper_obj, loss_scale)
193  # Losses are not needed for test net
194  if has_parameter_updates:
195  assert isinstance(losses, list), \
196  'Model builder function must return list of loss blobs'
197  for loss in losses:
198  assert isinstance(loss, core.BlobReference), \
199  'Model builder func must return list of loss blobs'
200 
201  losses_by_gpu[device] = losses
202  _ValidateParams(model_helper_obj.params)
203 
204  # Create parameter map
205  model_helper_obj._device_grouped_blobs =\
206  _GroupByDevice(model_helper_obj, devices,
207  model_helper_obj.params, non_datapar_params)
208 
209  # computed params
210  computed_params_grouped =\
211  _GroupByDevice(model_helper_obj, devices,
212  model_helper_obj.GetComputedParams(''), [])
213  model_helper_obj._device_grouped_blobs.update(computed_params_grouped)
214 
215  model_helper_obj._param_names =\
216  list(viewkeys(model_helper_obj._device_grouped_blobs))
217  model_helper_obj._computed_param_names =\
218  list(viewkeys(computed_params_grouped))
219 
220  if not has_parameter_updates:
221  log.info("Parameter update function not defined --> only forward")
222  _InferBlobDevice(model_helper_obj)
223  return
224 
225  log.info("Adding gradient operators")
226  _AddGradientOperators(devices, model_helper_obj, losses_by_gpu)
227 
228  if combine_spatial_bn:
229  assert(cpu_device), \
230  'combine_spatial_bn is currently only supported on the CPU'
231  assert(has_parameter_updates), \
232  'combine_spatial_bn should only be used for train model'
233  _InterleaveOps(model_helper_obj)
234  _InterDeviceBatchNormalization(model_helper_obj)
235 
236  _ValidateParams(model_helper_obj.params)
237 
238  # Group gradients by device and register to blob lookup
239  param_to_grad = model_helper_obj.param_to_grad
240  grads_ordered = [param_to_grad[p] for p in
241  model_helper_obj.params if p in param_to_grad]
242  non_datapar_grads = [param_to_grad[p] for p in non_datapar_params]
243 
244  gradients_grouped = _GroupByDevice(
245  model_helper_obj,
246  devices,
247  grads_ordered,
248  non_datapar_grads
249  )
250  model_helper_obj._device_grouped_blobs.update(gradients_grouped)
251  model_helper_obj._grad_names = list(viewkeys(gradients_grouped))
252  model_helper_obj._losses_by_gpu = losses_by_gpu
253 
254  _InferBlobDevice(model_helper_obj)
255 
256  log.info("Add gradient all-reduces for SyncSGD")
257  if broadcast_computed_params:
258  _BroadcastComputedParams(devices, model_helper_obj, rendezvous, use_nccl)
259 
260  if len(model_helper_obj._grad_names) > 0:
261  # Gradients in reverse order
262  reverse_ordered_grads = _GetReverseOrderedGrads(model_helper_obj)
263  assert(len(reverse_ordered_grads) > 0)
264  _AllReduceBlobs(
265  reverse_ordered_grads,
266  devices,
267  model_helper_obj,
268  model_helper_obj.net,
269  rendezvous,
270  use_nccl,
271  max_concurrent_distributed_ops,
272  )
273  else:
274  log.info("NOTE: Param builder function did not create any parameters.")
275 
276  log.info("Post-iteration operators for updating params")
277  num_shards = 1 if rendezvous is None else rendezvous['num_shards']
278 
279  all_params = set(model_helper_obj.GetParams(''))
280  if shared_model:
281  _PruneParametersForSharing(model_helper_obj)
282 
283  if param_update_builder_fun is not None:
284  for device in devices:
285  device_opt = core.DeviceOption(model_helper_obj._device_type, device)
286  with core.DeviceScope(device_opt):
287  with core.NameScope(
288  "{}_{}".format(model_helper_obj._device_prefix, device)
289  ):
290  param_update_builder_fun(model_helper_obj)
291  else:
292  log.info("Calling optimizer builder function")
293  optimizer = optimizer_builder_fun(model_helper_obj)
294  model_helper_obj._optimizer = optimizer
295 
296  (sync_blobs, sync_names) = _ComputeBlobsToSync(model_helper_obj)
297  sync_blobs_grouped = _GroupByDevice(
298  model_helper_obj,
299  devices,
300  sync_blobs,
301  [],
302  )
303  model_helper_obj._device_grouped_blobs.update(sync_blobs_grouped)
304 
305  _InferBlobDevice(model_helper_obj)
306  _AnalyzeOperators(model_helper_obj)
307 
308  # Configure dagnet to run with only one worker on the first iteration,
309  # to prevent concurrency problems with allocs and nccl.
310  arg = model_helper_obj.Proto().arg.add()
311  arg.name = "first_iter_only_one_worker"
312  arg.i = 1
313 
314  # Add initial parameter syncs
315  log.info("Add initial parameter sync")
316  _SyncAllParams(
317  devices,
318  model_helper_obj,
319  model_helper_obj.param_init_net,
320  model_helper_obj.param_init_net,
321  rendezvous,
322  sync_names,
323  max_concurrent_distributed_ops=1
324  )
325 
326  # Handle any operations that need to be done after parameter sync
327  # i.e. making sure multi-precision copies of parameters are up-to-date
328  if post_sync_builder_fun is not None:
329  for device in devices:
330  device_opt = core.DeviceOption(model_helper_obj._device_type, device)
331  with core.DeviceScope(device_opt):
332  with core.NameScope(
333  "{}_{}".format(model_helper_obj._device_prefix, device)
334  ):
335  post_sync_builder_fun(model_helper_obj)
336 
337  assert not (optimize_gradient_memory and dynamic_memory_management), \
338  """It is not advised to use gradient optimization ('memonger')
339  with dynamic memory management."""
340 
341  if optimize_gradient_memory:
342  _OptimizeGradientMemorySimple(model_helper_obj, losses_by_gpu, devices)
343 
344  if dynamic_memory_management:
345  _AddDynamicMemoryOptimization(model_helper_obj, blobs_to_keep, devices)
346 
347  model_helper_obj._data_parallel_model_init_nets = [
348  model_helper_obj.param_init_net,
349  ]
350  model_helper_obj._data_parallel_model_nets = [model_helper_obj.net]
351 
352  if shared_model:
353  _RemapParameterBlobsForSharedModel(model_helper_obj, all_params)
354 
355 
356 def Parallelize_GPU_BMUF(*args, **kwargs):
357  kwargs['cpu_device'] = False
358  Parallelize_BMUF(*args, **kwargs)
359 
360 
361 def Parallelize_CPU_BMUF(*args, **kwargs):
362  kwargs['cpu_device'] = True
363  Parallelize_BMUF(*args, **kwargs)
364 
365 
366 def Parallelize_BMUF(
367  model_helper_obj,
368  input_builder_fun,
369  forward_pass_builder_fun,
370  param_update_builder_fun,
371  block_learning_rate=1.0,
372  block_momentum=None,
373  devices=None,
374  rendezvous=None,
375  net_type='dag',
376  master_device=None,
377  use_nccl=False,
378  nesterov=False,
379  optimize_gradient_memory=False,
380  reset_momentum_sgd=False,
381  warmup_iterations=None,
382  max_concurrent_distributed_ops=4,
383  add_blobs_to_sync=None,
384  num_threads_per_device=4,
385  cpu_device=False
386 ):
387  '''
388  Function to create model that run on many GPUs and creates a net for
389  parameter_updates that can be run independently for number of iterations
390  then followed by another net that runs once to compute the final parameter
391  updates according to block wise model update filtering rule described
392  in : Scalable Training of Deep Learning Machines by Incremental Block
393  Training with Intra-block Parallel Optimization and Blockwise Model-Update
394  Filtering (ICASSP 2016).
395  '''
396  assert scope.CurrentDeviceScope() is None \
397  or scope.CurrentDeviceScope().device_type == caffe2_pb2.CPU, \
398  "Parallelize must be called without device-scope, \
399  device scope was: {}".format(scope.CurrentDeviceScope())
400 
401  assert isinstance(model_helper_obj, model_helper.ModelHelper)
402 
403  if devices is None:
404  devices = list(range(0, workspace.NumCudaDevices()))
405  if master_device is None:
406  master_device = devices[0]
407 
408  if not cpu_device:
409  for gpu in devices:
410  if gpu >= workspace.NumCudaDevices():
411  log.warning("** Only {} GPUs available, GPUs {} requested".format(
412  workspace.NumCudaDevices(), devices))
413  break
414  model_helper_obj._device_type = caffe2_pb2.CUDA
415  model_helper_obj._device_prefix = "gpu"
416  else:
417  model_helper_obj._device_type = caffe2_pb2.CPU
418  model_helper_obj._device_prefix = "cpu"
419 
420  model_helper_obj._devices = devices
421  model_helper_obj._rendezvous = rendezvous
422  model_helper_obj._broadcast_context = None
423  model_helper_obj._shared_model = False
424  master_dev_opt = core.DeviceOption(model_helper_obj._device_type, master_device)
425 
426  # question: rendezvous structure
427  num_shards = rendezvous['num_shards'] if rendezvous else 1
428  # num_devices is #devices across all machines
429  num_devices = len(devices) * num_shards
430  # num_workers is #threads to execute the DAG per shard
431  num_workers = num_threads_per_device * len(devices)
432  if rendezvous:
433  num_workers += 8
434 
435  loss_scale = 1.0 / num_devices
436  if block_momentum is None:
437  block_momentum = 1.0 - 1.0 / num_devices
438 
439  max_concurrent_distributed_ops = min(
440  max_concurrent_distributed_ops,
441  num_workers - 1
442  )
443 
444  model_helper_obj.net.Proto().num_workers = num_workers
445  model_helper_obj.net.Proto().type = net_type
446 
447  # A net for initializing global model parameters. Its called once in the
448  # same step as net parameters initialization.
449  model_helper_obj._global_model_init_net = core.Net('global_model_init')
450  model_helper_obj._global_model_init_net.Proto().type = net_type
451  model_helper_obj._global_model_init_net.Proto().num_workers = \
452  num_workers
453 
454  # A net for computing final parameter updates. Its will run once after
455  # running net (local models updates) for `num_local_iterations` times.
456  model_helper_obj._global_model_param_updates_net = core.Net('global_model')
457  model_helper_obj._global_model_param_updates_net.Proto().type = net_type
458  model_helper_obj._global_model_param_updates_net.Proto().num_workers = \
459  num_workers
460 
461  def _v(param):
462  return "{}_v".format(param)
463 
464  def _g(param):
465  return "{}_g".format(param)
466 
467  def _v_prev(param):
468  return "{}_prev".format(param)
469 
470  # Keep track of params that were in the model before: they are not
471  # data parallel, so we need to handle them separately
472  non_datapar_params = copy.copy(model_helper_obj.params)
473  model_helper_obj._losses_by_gpu = {}
474 
475  def _InitializeModels(gpu_id):
476  input_builder_fun(model_helper_obj)
477  loss = forward_pass_builder_fun(model_helper_obj, loss_scale)
478  model_helper_obj._losses_by_gpu[gpu_id] = loss
479  _ForEachDevice(
480  devices,
481  _InitializeModels,
482  device_type=model_helper_obj._device_type,
483  device_prefix=model_helper_obj._device_prefix,
484  scoped=True
485  )
486  _ValidateParams(model_helper_obj.params)
487 
488  model_helper_obj._device_grouped_blobs =\
489  _GroupByDevice(model_helper_obj, devices,
490  model_helper_obj.params, non_datapar_params)
491 
492  model_helper_obj._param_names =\
493  list(viewkeys(model_helper_obj._device_grouped_blobs))
494 
495  _AddGradientOperators(
496  devices, model_helper_obj, model_helper_obj._losses_by_gpu
497  )
498  _ValidateParams(model_helper_obj.params)
499 
500  _InferBlobDevice(model_helper_obj)
501 
502  def _InitializeParamUpdate(gpu_id):
503  param_update_builder_fun(model_helper_obj)
504  _ForEachDevice(
505  devices,
506  _InitializeParamUpdate,
507  device_type=model_helper_obj._device_type,
508  device_prefix=model_helper_obj._device_prefix,
509  scoped=True
510  )
511 
512  model_parameter_names = list(
513  viewkeys(model_helper_obj._device_grouped_blobs)
514  )
515  if warmup_iterations is not None:
516  model_helper_obj._warmup_iterations = warmup_iterations
517  # A net for broadcasting gpu-0 (master shard) parameters after
518  # running net for `warmup_iterartions`.
519  model_helper_obj._warmup_broadcast = core.Net('warmup-broadcast')
520  model_helper_obj._warmup_broadcast.Proto().type = net_type
521  model_helper_obj._warmup_broadcast.Proto().num_workers = \
522  num_workers
523 
524  _SyncAllParams(
525  devices,
526  model_helper_obj,
527  model_helper_obj.param_init_net,
528  model_helper_obj._warmup_broadcast,
529  rendezvous,
530  model_parameter_names,
531  max_concurrent_distributed_ops
532  )
533  for param_name in viewkeys(model_helper_obj._device_grouped_blobs):
534  param = model_helper_obj._device_grouped_blobs[param_name][master_device]
535  with core.DeviceScope(master_dev_opt):
536  model_helper_obj._warmup_broadcast.Copy(param, _g(param))
537 
538  # (Step-0) Initialize momentum parameters on master device.
539  for param_name in viewkeys(model_helper_obj._device_grouped_blobs):
540  param = model_helper_obj._device_grouped_blobs[param_name][master_device]
541  with core.DeviceScope(master_dev_opt):
542  model_helper_obj._global_model_init_net.ConstantFill(
543  param, _v(param), value=0.0
544  )
545  model_helper_obj._global_model_init_net.Copy(param, _g(param))
546  if nesterov:
547  model_helper_obj._global_model_init_net.ConstantFill(
548  param, _v_prev(param), value=0.0
549  )
550 
551  # (Step-1) Update models for num_local_iterations.
552 
553  # (Step-2) Compute post-local-updates average of the params.
554  # Sum model params across GPUs and store resutls in param_avg blob.
555  _AllReduceBlobs(
556  model_parameter_names,
557  devices,
558  model_helper_obj,
559  model_helper_obj._global_model_param_updates_net,
560  rendezvous,
561  use_nccl,
562  max_concurrent_distributed_ops
563  )
564 
565  # (Step-3) Update momentum params :
566  # param_v = block_momentum * param_v
567  # + block_learning_Rate * (param_avg - param)
568  # if nesterov momentum:
569  # param = param + param_v
570  # - block_momentum * (param_v - param_v_prev)
571  # param_v_prev = param_v
572  # else:
573  # param = param + param_v
574  for param_name in model_parameter_names:
575  param = model_helper_obj._device_grouped_blobs[param_name][master_device]
576  with core.DeviceScope(master_dev_opt):
577  # TODO(ataei) : Stop building the graph here to get model average ?
578  model_helper_obj._global_model_param_updates_net.Scale(
579  param, param, scale=1.0 / num_devices
580  )
581  model_helper_obj._global_model_param_updates_net.Sub(
582  [param, _g(param)], param
583  )
584  model_helper_obj._global_model_param_updates_net.Scale(
585  param, param, scale=block_learning_rate
586  )
587  model_helper_obj._global_model_param_updates_net.Scale(
588  _v(param), _v(param), scale=block_momentum
589  )
590  model_helper_obj._global_model_param_updates_net.Add(
591  [_v(param), param], _v(param)
592  )
593  model_helper_obj._global_model_param_updates_net.Add(
594  [_g(param), _v(param)], _g(param)
595  )
596  if nesterov:
597  model_helper_obj._global_model_param_updates_net.Sub(
598  [_v(param), _v_prev(param)], _v_prev(param)
599  )
600  model_helper_obj._global_model_param_updates_net.Scale(
601  _v_prev(param), _v_prev(param), scale=block_momentum
602  )
603  model_helper_obj._global_model_param_updates_net.Sub(
604  [_g(param), _v_prev(param)], _g(param)
605  )
606  model_helper_obj._global_model_param_updates_net.Copy(
607  _v(param), _v_prev(param)
608  )
609  model_helper_obj._global_model_param_updates_net.Copy(
610  _g(param), param
611  )
612 
613 
614  _SyncAllParams(
615  devices,
616  model_helper_obj,
617  model_helper_obj.param_init_net,
618  model_helper_obj._global_model_param_updates_net,
619  rendezvous,
620  model_parameter_names,
621  max_concurrent_distributed_ops
622  )
623 
624  # Add additional syncs
625  if add_blobs_to_sync is not None:
626  AddBlobSync(
627  model_helper_obj,
628  add_blobs_to_sync,
629  net=model_helper_obj._global_model_param_updates_net)
630 
631  # Reset momentum-SGD parameters
632  if reset_momentum_sgd:
633  momentum_ops = [op for op in model_helper_obj.net.Proto().op
634  if op.type == 'MomentumSGDUpdate']
635  for op in momentum_ops:
636  momentum_blob = op.input[1]
637  with core.DeviceScope(op.device_option):
638  model_helper_obj._global_model_param_updates_net.ConstantFill(
639  [momentum_blob], momentum_blob, value=0.0
640  )
641 
642  if optimize_gradient_memory:
643  _OptimizeGradientMemorySimple(
644  model_helper_obj, model_helper_obj._losses_by_gpu, devices
645  )
646 
647  model_helper_obj._data_parallel_model_init_nets = [
648  model_helper_obj.param_init_net,
649  model_helper_obj._global_model_init_net
650  ]
651 
652  model_helper_obj._data_parallel_model_nets = [
653  model_helper_obj.net,
654  (model_helper_obj._global_model_param_updates_net, 1)
655  ]
656 
657 
658 def RunInitNet(model):
659  for init_net in model._data_parallel_model_init_nets:
660  workspace.RunNetOnce(init_net)
661  for net_iters in model._data_parallel_model_nets:
662  if isinstance(net_iters, tuple):
663  workspace.CreateNet(net_iters[0])
664  else:
665  workspace.CreateNet(net_iters)
666 
667 
668 def RunWarmup(model):
669  workspace.RunNet(model.net, model._warmup_iterations)
670  workspace.RunNetOnce(model._warmup_broadcast)
671 
672 
673 def RunNet(model, num_iterations):
674  for net_iter in model._data_parallel_model_nets:
675  if isinstance(net_iter, tuple):
676  workspace.RunNet(net_iter[0].Proto().name, net_iter[1])
677  else:
678  workspace.RunNet(net_iter, num_iterations)
679 
680 
681 barrier_instance = 0
682 
683 
684 def Synchronize(model, timeout_sec=_DEFAULT_TIMEOUT_SEC):
685  if model._rendezvous is None or model._rendezvous['num_shards'] <= 1:
686  # Single host case
687  return
688 
689  log.info("Creating synchronization barrier net")
690  assert model._rendezvous['engine'] == 'GLOO', "Engine does not support barrier"
691  global barrier_instance
692  instance = barrier_instance
693  barrier_instance += 1
694  barrier_net = core.Net("sync_barrier_net_" + str(instance))
695  comm_world = _CreateOrCloneCommonWorld(
696  barrier_net,
697  "sync_barrier_cw_" + str(instance),
698  rendezvous=model._rendezvous,
699  status_blob="sync_barrier_cw_status_" + str(instance),
700  timeout_sec=timeout_sec,
701  )
702  barrier_net.Barrier(
703  inputs=[comm_world],
704  outputs=[],
705  engine=model._rendezvous['engine'],
706  status_blob="sync_barrier_status_" + str(instance),
707  )
708  workspace.RunNetOnce(barrier_net)
709 
710 
711 def ConvertNetForDevice(net, device=None):
712  '''
713  Converts all blobs in the net to have namescope gpu_X, and correct
714  device scope. You can use this to enable AppendNet with a
715  forward_pass_builder_fun:
716 
717  def builder_fun(model):
718  ...
719  model.net.AppendNet(
720  data_parallel_model.ConvertNetForDevice(othermodel.net))
721  model.param_init_net.AppendNet(
722  data_parallel_model.ConvertNetForDevice(othermodel.param_init_net))
723  '''
724  mnet = copy.deepcopy(net)
725 
726  if device is None:
727  device = scope.CurrentDeviceScope()
728 
729  device_prefix = "gpu" if device.device_type == caffe2_pb2.CUDA else "cpu"
730 
731  namescope = "{}_{}/".format(device_prefix, device.cuda_gpu_id)
732  for op in mnet.Proto().op:
733  if "RecurrentNetwork" in op.type:
734  raise("RecurrentNetwork conversion not yet supported")
735  for i, inputb in enumerate(op.input):
736  op.input[i] = namescope + inputb
737  for i, outputb in enumerate(op.output):
738  op.output[i] = namescope + outputb
739  for i, blob in enumerate(op.control_input):
740  op.control_input[i] = namescope + blob
741  op.device_option.CopyFrom(device)
742  for i, einp in enumerate(mnet.Proto().external_input):
743  mnet.Proto().external_input[i] = namescope + einp
744  for i, eoutp in enumerate(mnet.Proto().external_output):
745  mnet.Proto().external_output[i] = namescope + eoutp
746  return mnet
747 
748 
749 def _ForEachDevice(devices, f, device_type, device_prefix, scoped=False,
750  *args, **kwargs):
751  for device in devices:
752  device_opt = core.DeviceOption(device_type, device)
753  with core.DeviceScope(device_opt):
754  if scoped:
755  with core.NameScope("{}_{}".format(device_prefix, device)):
756  f(device, *args, **kwargs)
757  else:
758  f(device, *args, **kwargs)
759 
760 
761 def _AddGradientOperators(devices, model, losses_by_gpu):
762  def create_grad(lossp):
763  return model.ConstantFill(lossp, str(lossp) + "_grad", value=1.0)
764 
765  loss_grad = {}
766  # Explicitly need to create gradients on each GPU
767  for gpu_id in devices:
768  device = core.DeviceOption(model._device_type, gpu_id)
769  with core.DeviceScope(device):
770  for l in losses_by_gpu[gpu_id]:
771  lg = create_grad(l)
772  loss_grad[str(l)] = str(lg)
773 
774  model.AddGradientOperators(loss_grad)
775 
776 
777 def ExtractPredictorNet(model, inputs, outputs, device):
778  '''
779  Returns (net, params) that can be exported to be used as a prediction
780  net.
781  '''
782  master_device = model._devices[0]
783  prefix = "{}_{}/".format(model._device_prefix, master_device)
784  prefix_inputs = [prefix + str(b) for b in inputs]
785  prefix_outputs = [prefix + str(b) for b in outputs]
786  (predictor_net, export_blobs) = model_helper.ExtractPredictorNet(
787  net_proto=model.net.Proto(),
788  input_blobs=prefix_inputs,
789  output_blobs=prefix_outputs,
790  device=device,
791  renames={
792  a: b
793  for (a, b) in zip(prefix_inputs + prefix_outputs, inputs + outputs)
794  },
795  )
796 
797  return (predictor_net, export_blobs)
798 
799 
800 def GetCheckpointParams(model):
801  '''
802  Returns a set of blobs that are needed for a complete check point.
803  They are blobs for the first gpu and iteration blobs.
804  '''
805  (all_blobs, _) = _ComputeBlobsToSync(model)
806  first_gpu_blobs = {
807  b
808  for b in all_blobs
809  if str(b)
810  .startswith("{}_{}/".format(model._device_prefix, model._devices[0]))
811  }
812 
813  # Add iteration blobs that do not have namescope separately, since
814  # it is important to checkpoint iteration counter
815  iteration_blobs = set()
816  for op in model.net.Proto().op:
817  if op.type == 'Iter' or op.type == 'AtomicIter':
818  if not op.output[0].startswith("{}_".format(model._device_prefix)):
819  iteration_blobs.add(op.output[0])
820 
821  return first_gpu_blobs.union(iteration_blobs)
822 
823 
824 def FinalizeAfterCheckpoint(model, blobs=None):
825  '''
826  This function should be called after loading parameters from a
827  checkpoint / initial parameters file.
828  '''
829 
830  if not hasattr(model, "_checkpoint_net"):
831  if blobs is None:
832  (_, uniq_blob_names) = _ComputeBlobsToSync(model)
833  else:
834  uniq_blob_names = [stripBlobName(p) for p in blobs]
835 
836  # Synchronize to the blob lookup map, as the provided
837  # blobs might have non-parameters, such as momemtum blobs.
838  log.info("Creating checkpoint synchronization net")
839  devices = model.GetDevices()
840  for name in uniq_blob_names:
841  if name not in model._device_grouped_blobs:
842  grouped = {
843  d:
844  core.BlobReference("{}_{}{}{}".format(
845  model._device_prefix,
846  d,
847  scope._NAMESCOPE_SEPARATOR,
848  name)
849  ) for d in devices}
850  model._device_grouped_blobs[name] = grouped
851 
852  model._checkpoint_net = core.Net("checkpoint_sync_net")
853  model._checkpoint_net.RunAllOnGPU()
854 
855  checkpoint_init_net = None
856  if (model._rendezvous is not None and model._rendezvous['num_shards'] > 1):
857  checkpoint_init_net = core.Net("checkpoint_init_net")
858  checkpoint_init_net.RunAllOnGPU()
859 
860  _SyncAllParams(
861  devices,
862  model,
863  checkpoint_init_net,
864  model._checkpoint_net,
865  model._rendezvous,
866  uniq_blob_names,
867  max_concurrent_distributed_ops=1
868  )
869  if (checkpoint_init_net):
870  workspace.RunNetOnce(checkpoint_init_net)
871 
872  workspace.CreateNet(model._checkpoint_net)
873 
874  # Run the sync
875  log.info("Run checkpoint net")
876  workspace.RunNet(model._checkpoint_net.Proto().name)
877 
878 
879 def GetLearningRateBlobNames(model):
880  '''
881  Returns a list of learning rates blob names used in the optimizer.
882  '''
883  if model._optimizer is not None:
884  if model._device_type == caffe2_pb2.CPU:
885  return [model._optimizer.get_cpu_blob_name('lr')]
886  elif model._device_type == caffe2_pb2.CUDA:
887  return [model._optimizer.get_gpu_blob_name('lr', gpu, '')
888  for gpu in model._devices]
889  else:
890  raise Exception(
891  "Unsupported device type : {}".format(model._device_type)
892  )
893  else:
894  lr_blob_names = []
895  for op in model.net.Proto().op:
896  if op.type == "LearningRate":
897  lr_blob_names.append(op.output(0))
898  return lr_blob_names
899 
900 
901 def _Broadcast(devices, model, net, param, use_nccl=False):
902  # Copy params from gpu_0 to other
903  master_dev = devices[0]
904 
905  if use_nccl:
906  if _IsGPUBlob(model, param):
907  master_device_opt = core.DeviceOption(model._device_type, master_dev)
908  with core.DeviceScope(master_device_opt):
909  # Note that the root is the root _rank_ and not the root
910  # _device_. Thus we always use root=0, regardless of the
911  # devices used.
912  net.NCCLBroadcast(
913  list(viewvalues(model._device_grouped_blobs[param])),
914  list(viewvalues(model._device_grouped_blobs[param])),
915  root=0,
916  )
917  return
918 
919  for dev_idx in devices[1:]:
920  if _IsGPUBlob(model, param):
921  device_opt = core.DeviceOption(caffe2_pb2.CUDA, dev_idx)
922  else:
923  device_opt = core.DeviceOption(caffe2_pb2.CPU, 0)
924  with core.DeviceScope(device_opt):
925  net.Copy(
926  model._device_grouped_blobs[param][master_dev],
927  model._device_grouped_blobs[param][dev_idx]
928  )
929 
930 
931 def _AllReduce(devices, model, net, param, use_nccl=False, control_input=None):
932  blobs_group = list(viewvalues(model._device_grouped_blobs[param]))
933  if model._device_type == caffe2_pb2.CUDA and use_nccl:
934  # TODO: for _shared_model, do only NCCLReduce
935  model.NCCLAllreduce(
936  blobs_group, blobs_group, control_input=control_input
937  )
938  return
939 
940  if model._device_type == caffe2_pb2.CUDA:
941  p2p_access_pattern = workspace.GetCudaPeerAccessPattern()
942  else:
943  p2p_access_pattern = None
944 
945  def sumN(*dev_indices):
946  """Create a Sum op for 2 or more blobs on different devices.
947  Saves the result on the first device.
948 
949  Arguments:
950  dev_indices -- a list of device indices, which can be translated into
951  CUDA identifiers with model._devices
952  """
953  devices = [model._devices[idx] for idx in dev_indices]
954  blobs = [blobs_group[idx] for idx in dev_indices]
955  for i, peer in enumerate(devices):
956  if i == 0:
957  continue # Skip the first device
958  if p2p_access_pattern is not None and not p2p_access_pattern[
959  devices[0], peer
960  ]:
961  # Copy from peer to d0
962  blobs[i] = model.Copy(
963  blobs[i],
964  'gpu_{}/{}_gpu{}_copy'.format(devices[0], param, peer)
965  )
966  device_opt = core.DeviceOption(model._device_type, devices[0])
967  with core.DeviceScope(device_opt):
968  net.Sum(blobs, [blobs[0]], name='dpm')
969 
970  if len(devices) == 16:
971  # Special tree reduction for 16 gpus, TODO generalize like in muji.py
972  for j in range(8):
973  sumN(j * 2, j * 2 + 1)
974  for j in range(4):
975  sumN(j * 4, j * 4 + 2)
976  for j in range(2):
977  sumN(j * 8, j * 8 + 4)
978  sumN(0, 8)
979  elif len(devices) == 8:
980  for j in range(4):
981  sumN(j * 2, j * 2 + 1)
982  for j in range(2):
983  sumN(j * 4, j * 4 + 2)
984  sumN(0, 4)
985  elif len(devices) == 4:
986  sumN(0, 1)
987  sumN(2, 3)
988  sumN(0, 2)
989  else:
990  sumN(*range(len(devices)))
991  # TODO: for _shared_model, no need to broadcast
992  _Broadcast(devices, model, net, param)
993 
994 
995 def _SyncAllParams(
996  devices,
997  model,
998  init_net,
999  net,
1000  rendezvous,
1001  unique_param_names,
1002  max_concurrent_distributed_ops=4
1003 ):
1004  if rendezvous is None or rendezvous['num_shards'] <= 1:
1005  _SyncAllParamsSingleHost(devices, model, net, unique_param_names)
1006  else:
1007  _SyncAllParamsDistributed(
1008  devices,
1009  model,
1010  init_net,
1011  net,
1012  rendezvous,
1013  unique_param_names,
1014  max_concurrent_distributed_ops
1015  )
1016 
1017 
1018 def AddBlobSync(model, blobs, net=None):
1019  '''
1020  Sync a blob across devices and hosts
1021  '''
1022  if len(blobs) == 0:
1023  return
1024  net = model.net if net is None else net
1025  for b in blobs:
1026  assert not b.startswith(model._device_prefix), \
1027  "Provide unprefixed blob name: {}".format(b)
1028  model._device_grouped_blobs[b] = {
1029  d: core.BlobReference("{}_{}/{}".format(model._device_prefix, d, b))
1030  for d in model._devices
1031  }
1032 
1033  _SyncAllParams(
1034  model._devices,
1035  model,
1036  model.param_init_net,
1037  net,
1038  model._rendezvous,
1039  set(blobs))
1040 
1041 
1042 def AddDistributedBlobSync(model, blobs):
1043  '''
1044  Sync blobs across machines (but not across devices)
1045  '''
1046  if model._rendezvous is None:
1047  return
1048  synth_name = "_".join([str(b) for b in blobs])
1049  comm_world = _CreateOrCloneCommonWorld(
1050  model.param_init_net,
1051  "blob_sync_cw_" + synth_name,
1052  rendezvous=model._rendezvous,
1053  status_blob="create_blob_sync_cw_{}_cw_status".format(
1054  synth_name,
1055  ),
1056  )
1057 
1058  model.net.Allreduce(
1059  inputs=[comm_world] + blobs,
1060  outputs=blobs,
1061  engine=model._rendezvous['engine'],
1062  status_blob="blob_sync_allred_{}_status".format(synth_name),
1063  )
1064 
1065 
1066 def _SyncAllParamsDistributed(
1067  devices,
1068  model,
1069  init_net,
1070  net,
1071  rendezvous,
1072  unique_param_names,
1073  max_concurrent_distributed_ops
1074 ):
1075  assert rendezvous['num_shards'] > 1
1076 
1077  gpu_device_opt = core.DeviceOption(model._device_type, devices[0])
1078  cpu_device_opt = core.DeviceOption(caffe2_pb2.CPU)
1079 
1080  if model._broadcast_context is None:
1081  model._broadcast_context = CollectivesConcurrencyControl(
1082  "broadcast",
1083  max_concurrent_distributed_ops,
1084  init_net,
1085  rendezvous
1086  )
1087  context = model._broadcast_context
1088 
1089  for param_name in sorted(unique_param_names):
1090  master_param = model._device_grouped_blobs[param_name][devices[0]]
1091  params_group = list(viewvalues(model._device_grouped_blobs[param_name]))
1092 
1093  def broadcast(params):
1094  comm_world, control_input = context.get_control_and_context(params)
1095  net.Broadcast(
1096  inputs=[comm_world] + params,
1097  outputs=params,
1098  name=param_name,
1099  engine=rendezvous['engine'],
1100  status_blob="broadcast_{}_status".format(str(param_name)),
1101  control_input=control_input
1102  )
1103 
1104  device_opt = gpu_device_opt if _IsGPUBlob(
1105  model, param_name
1106  ) else cpu_device_opt
1107 
1108  if rendezvous['engine'] == 'GLOO':
1109  with core.DeviceScope(device_opt):
1110  broadcast(params_group)
1111  else:
1112  # Copy between GPU and CPU
1113  with core.DeviceScope(device_opt):
1114  param_cpu = net.CopyGPUToCPU(
1115  master_param,
1116  str(master_param) + "cpu"
1117  )
1118  with core.DeviceScope(cpu_device_opt):
1119  broadcast([param_cpu])
1120  with core.DeviceScope(device_opt):
1121  net.CopyCPUToGPU(param_cpu, master_param)
1122 
1123  # Broadcast locally
1124  _Broadcast(devices, model, net, param_name)
1125 
1126 
1127 def _SyncAllParamsSingleHost(devices, model, net, unique_param_names):
1128  for param in unique_param_names:
1129  _Broadcast(devices, model, net, param)
1130 
1131 
1132 def _AllReduceBlobs(blob_names, devices, model, net, rendezvous, use_nccl,
1133  max_concurrent_distributed_ops):
1134  if rendezvous is None or rendezvous['num_shards'] <= 1:
1135  _AllReduceBlobsSingleHost(
1136  blob_names,
1137  devices,
1138  model,
1139  net,
1140  use_nccl
1141  )
1142  else:
1143  _AllReduceBlobsDistributed(
1144  blob_names,
1145  devices,
1146  model,
1147  net,
1148  rendezvous,
1149  max_concurrent_distributed_ops,
1150  )
1151 
1152 
1153 def _PruneParametersForSharing(model):
1154  assert model._shared_model
1155  master_prefix = "{}_{}/".format(model._device_prefix, model._devices[0])
1156 
1157  # Remove non-master parameters so that they will not receive parameter
1158  # update operators.
1159  model.params = model.GetParams(master_prefix)
1160  paramset = set(model.params)
1161 
1162  model.param_to_grad = {
1163  p: model.param_to_grad[p]
1164  for p in model.param_to_grad if p in paramset
1165  }
1166  model.weights = [w for w in model.weights if w in paramset]
1167  model.biases = [w for w in model.biases if w in paramset]
1168 
1169 
1170 def _RemapParameterBlobsForSharedModel(model, all_params):
1171  assert model._shared_model
1172  master_prefix = "{}_{}/".format(
1173  model._device_prefix, model._devices[0])
1174  log.info("Remapping param blobs to master -> {}".format(master_prefix))
1175  master_params = set(model.GetParams())
1176 
1177  # Remove all but master params
1178  def modify_ops(net):
1179  ops = []
1180  for op in net.Proto().op:
1181  delete_op = False
1182  # Delete ops that output non-master version of parameter
1183  for outp in op.output:
1184  if outp in all_params and outp not in master_params:
1185  delete_op = True
1186  log.debug("Delete b/c {}: {}".format(outp, str(op)))
1187  break
1188  if delete_op:
1189  continue
1190  # Remap inputs to point to the master param
1191  for j, inp in enumerate(op.input):
1192  if inp in all_params and inp not in master_params:
1193  op.input[j] = master_prefix + stripBlobName(inp)
1194  ops.append(op)
1195  del net.Proto().op[:]
1196  net.Proto().op.extend(ops)
1197 
1198  modify_ops(model.param_init_net)
1199  modify_ops(model.net)
1200 
1201 
1203  """
1204  Creates common worlds (up to max_concurrent_context) and manage the
1205  sequential execution of collectives that shares the same context with
1206  cyclic control inputs.
1207  """
1208  def __init__(
1209  self,
1210  name,
1211  max_concurrent_context,
1212  param_init_net,
1213  rendezvous
1214  ):
1215  self.name = name
1216  self.param_init_net = param_init_net
1217  self.max_concurrent_context = max_concurrent_context
1218  self.counter = 0
1219  self.common_worlds = []
1220  self.control_inputs = []
1221  self.rendezvous = rendezvous
1222 
1223  def get_control_and_context(self, control_output_blob):
1224  common_world, control_input = [None, None]
1225  current_slot = self.counter % self.max_concurrent_context
1226  if len(self.common_worlds) < self.max_concurrent_context:
1227  common_world = _CreateOrCloneCommonWorld(
1228  self.param_init_net,
1229  "{}_{}_cw".format(self.name, current_slot),
1230  rendezvous=self.rendezvous,
1231  status_blob="create_{}_cw_{}_status".format(
1232  self.name,
1233  current_slot
1234  ),
1235  )
1236  self.common_worlds.append(common_world)
1237  self.control_inputs.append(control_output_blob)
1238  else:
1239  common_world = self.common_worlds[current_slot]
1240  control_input = self.control_inputs[current_slot]
1241  self.control_inputs[current_slot] = control_output_blob
1242  self.counter += 1
1243  return common_world, control_input
1244 
1245 
1246 def _AllReduceBlobsDistributed(
1247  blob_names,
1248  devices,
1249  model,
1250  net,
1251  rendezvous,
1252  max_concurrent_distributed_ops,
1253 ):
1254  num_workers = model.net.Proto().num_workers
1255  assert num_workers > 1, "Please specify more than 1 worker"
1256  all_reduce_engine = rendezvous['engine']
1257 
1258  master_device_opt = core.DeviceOption(model._device_type, devices[0])
1259 
1260  reducing_device_opt = master_device_opt
1261 
1263  "allreduce",
1264  max_concurrent_distributed_ops,
1265  model.param_init_net,
1266  rendezvous
1267  )
1268 
1269  nccl_control_blob = None
1270 
1271  for blob_name in blob_names:
1272  master_blob = model._device_grouped_blobs[blob_name][devices[0]]
1273  blobs_group = list(viewvalues(model._device_grouped_blobs[blob_name]))
1274 
1275  assert master_blob in blobs_group
1276 
1277  # Remark: NCCLReduce does not support in-place modifications
1278  # so we need a temporary blob
1279  reduced_blob = str(master_blob) + "_red"
1280 
1281  def allreduce(blobs, **kwargs):
1282  with core.DeviceScope(reducing_device_opt):
1283  comm_world, control_input = \
1284  context.get_control_and_context(blobs[0])
1285  net.Allreduce(
1286  inputs=[comm_world] + blobs,
1287  outputs=blobs,
1288  name=blob_name,
1289  engine=all_reduce_engine,
1290  control_input=control_input,
1291  status_blob="allreduce_{}_status".format(blob_name),
1292  **kwargs
1293  )
1294 
1295  if rendezvous['engine'] == 'GLOO':
1296  # With Gloo cross GPU and cross machine allreduce
1297  # can be executed in a single operation.
1298  # Try to use GPUDirect if transport == ibverbs.
1299  allreduce(
1300  blobs_group,
1301  gpu_direct=(rendezvous.get("transport", None) == "ibverbs"),
1302  )
1303  else:
1304  # Step 1: sum blobs from local GPUs to master GPU
1305  with core.DeviceScope(master_device_opt):
1306  model.ConstantFill(master_blob, reduced_blob, value=0.0)
1307 
1308  # Temp fix since NCCLReduce does not work
1309  net.NCCLAllreduce(
1310  blobs_group,
1311  blobs_group,
1312  control_input=nccl_control_blob,
1313  )
1314  nccl_control_blob = blobs_group[0]
1315  net.Copy(master_blob, reduced_blob)
1316 
1317  # Step 2: allreduce between all hosts, between master GPUs
1318  allreduce([reduced_blob])
1319 
1320  with core.DeviceScope(master_device_opt):
1321  net.Copy(reduced_blob, master_blob)
1322 
1323  # Step 3: broadcast locally
1324  _Broadcast(devices, model, net, blob_name)
1325 
1326 
1327 def _AllReduceBlobsSingleHost(blob_names, devices, model, net, use_nccl):
1328  """Performs NCCL AllReduce to distribute blobs to all the GPUs."""
1329 
1330  if len(devices) == 1:
1331  return
1332 
1333  # Now we need to Allreduce blobs on all the GPUs.
1334  # Pick GPU #0 as a master GPU.
1335  master_device_opt = core.DeviceOption(model._device_type, devices[0])
1336  last_out = None
1337  concatenated_idx = set()
1338 
1339  for blob_name in blob_names:
1340  # Group by blob_name for reduce.
1341  blobs_group = list(viewvalues(model._device_grouped_blobs[blob_name]))
1342  if len(blobs_group) == 1:
1343  # Non-reducible
1344  continue
1345  assert len(blobs_group) == len(devices), \
1346  "Each GPU from {}, should have a copy of {}.".format(
1347  devices, blob_name)
1348 
1349  if _IsGPUBlob(model, blob_name):
1350  with core.DeviceScope(master_device_opt):
1351  if not isinstance(blobs_group[0], core.GradientSlice):
1352  _AllReduce(
1353  devices, model, net, blob_name, use_nccl, last_out
1354  )
1355  # last_out is used to serialize the execution of nccls
1356  last_out = blobs_group[0]
1357 
1358  else:
1359  # Sparse gradients: all-gather for indices and values
1360  master_ns = "{}_{}".format(model._device_prefix, devices[0])
1361  '''
1362  Skip if we have already copied concatenated indices
1363  to the indices of GradientSlice. This happens when two
1364  or more grad blobs are gathered with the same indices
1365  blob
1366  '''
1367  skip_idx_concat = False
1368  for g in blobs_group:
1369  if g.indices in concatenated_idx:
1370  skip_idx_concat = True
1371 
1372  if not skip_idx_concat:
1373  grad_idx_concat, _ = net.Concat(
1374  [g.indices for g in blobs_group],
1375  ["{}/{}_index_concat".format(master_ns, blob_name),
1376  "{}/{}_index_splitinfo".format(master_ns, blob_name)],
1377  axis=0,
1378  name="note:data_parallel_model")
1379 
1380  for gpu, g in viewitems(model._device_grouped_blobs[blob_name]):
1381  device_opt = core.DeviceOption(model._device_type, gpu)
1382  with core.DeviceScope(device_opt):
1383  model.Copy(grad_idx_concat, g.indices)
1384  concatenated_idx.add(g.indices)
1385 
1386  grad_val_concat, _ = net.Concat(
1387  [g.values for g in blobs_group],
1388  ["{}/{}_val_concat".format(master_ns, blob_name),
1389  "{}/{}_val_splitinfo".format(master_ns, blob_name)],
1390  axis=0, name="note:data_parallel_model")
1391 
1392  for gpu, g in viewitems(model._device_grouped_blobs[blob_name]):
1393  device_opt = core.DeviceOption(model._device_type, gpu)
1394  with core.DeviceScope(device_opt):
1395  model.Copy(grad_val_concat, g.values)
1396 
1397  else:
1398  assert not isinstance(blobs_group[0], core.GradientSlice), \
1399  "Synchronizing gradient slices not supported"
1400  with core.DeviceScope(core.DeviceOption(caffe2_pb2.CPU)):
1401  # Poor man's allreduce
1402  net.Sum(blobs_group, [blobs_group[0]])
1403  if not model._shared_model:
1404  _Broadcast(devices, model, net, blob_name)
1405 
1406 
1407 def _BroadcastComputedParams(devices, model, rendezvous, use_nccl=False):
1408  if rendezvous is None:
1409  _BroadcastComputedParamsSingleHost(devices, model, use_nccl)
1410  else:
1411  _BroadcastComputedParamsDistributed(devices, model, rendezvous, use_nccl)
1412 
1413 
1414 def _BroadcastComputedParamsDistributed(
1415  devices,
1416  model,
1417  rendezvous,
1418  use_nccl=False
1419 ):
1420  _BroadcastComputedParamsSingleHost(devices, model, use_nccl)
1421  log.warn("Distributed broadcast of computed params is not implemented yet")
1422 
1423 
1424 def _BroadcastComputedParamsSingleHost(devices, model, use_nccl=False):
1425  '''
1426  Average computed params over all devices
1427  '''
1428  if len(devices) == 1:
1429  return
1430 
1431  for param_name in model._computed_param_names:
1432  # Copy from master to others -- averaging would be perhaps better,
1433  # but currently NCCLAllReduce is too prone to deadlock
1434  _Broadcast(devices, model, model.net, param_name, use_nccl)
1435 
1436 
1437 def _GetReverseOrderedGrads(model):
1438  '''
1439  Returns the gradients in reverse order (namespace stripped),
1440  for the optimal synchronization order.
1441  '''
1442  return list(reversed(model._grad_names))
1443 
1444 
1445 # A helper function to extract a parameter's name
1446 def stripBlobName(param):
1447  # Format is "a/b/c/d" -> "b/c/d"
1448  if isinstance(param, core.GradientSlice):
1449  return stripBlobName(param.indices) + ":" + stripBlobName(param.values)
1450  else:
1451  name = str(param)
1452  return name[name.index(scope._NAMESCOPE_SEPARATOR) + 1:]
1453 
1454 
1455 def _AnalyzeOperators(model):
1456  '''
1457  Look at all the operators and check that they do not cross device scopes
1458  '''
1459  for op in model.Proto().op:
1460  if "NCCL" in op.type or "Copy" in op.type or "Concat" in op.type:
1461  continue
1462  if "Sum" == op.type and op.name == "dpm":
1463  continue
1464  if "Allreduce" in op.type and "GLOO" in op.engine:
1465  continue
1466 
1467  op_dev = op.device_option
1468  op_gpu = op_dev.cuda_gpu_id
1469 
1470  # This avoids failing on operators that are only for CPU
1471  if op_dev.device_type != caffe2_pb2.CUDA:
1472  continue
1473 
1474  namescope = "{}_{}/".format(model._device_prefix, op_gpu)
1475  for inp in list(op.input) + list(op.output):
1476  if inp.startswith("{}_".format(model._device_prefix)
1477  ) and not inp.startswith(namescope):
1478  raise Exception(
1479  "Blob {} of op {}, should have namescope {}. Op: {}".format(
1480  inp,
1481  op.type,
1482  "{}_{}/".format(model._device_prefix, op_gpu),
1483  str(op),
1484  )
1485  )
1486 
1487 
1488 def _InferBlobDevice(model):
1489  '''
1490  Assign blob to device option based on the operator outputing it
1491  '''
1492  mapping = {}
1493 
1494  def map_ops(proto):
1495  for op in proto.op:
1496  device_option = op.device_option
1497  if op.type == "Iter":
1498  # Hack for Iters which have blob in CPU context
1499  device_option = caffe2_pb2.DeviceOption()
1500  device_option.device_type = caffe2_pb2.CPU
1501  for b in list(op.input) + list(op.output):
1502  if b not in mapping:
1503  mapping[b] = device_option
1504  if op.type.startswith('RecurrentNetwork'):
1505  step_args = [a for a in op.arg if a.name.endswith("step_net")]
1506  for step_arg in step_args:
1507  map_ops(step_arg.n)
1508  map_ops(model.param_init_net.Proto())
1509  map_ops(model.net.Proto())
1510  model._blob_to_device = mapping
1511 
1512 def _IsGPUBlob(model, blob_name):
1513  if blob_name in model._blob_to_device:
1514  return model._blob_to_device[blob_name].device_type == caffe2_pb2.CUDA
1515  else:
1516  blob_name = "{}_{}/{}".format(
1517  model._device_prefix, model._devices[0], blob_name
1518  )
1519  if blob_name not in model._blob_to_device:
1520  return model._device_type == caffe2_pb2.CUDA
1521  return model._blob_to_device[blob_name].device_type == caffe2_pb2.CUDA
1522 
1523 
1524 def _GroupByDevice(model, devices, params, non_data_params):
1525  '''
1526  Groups blobs by device, returning a map of [blobname] = {0: BlobRef, 1: ..}.
1527  Returns ordered dictionary, ensuring the original order.
1528  '''
1529  grouped = OrderedDict()
1530  # Only consider params that were created to be "data parallel"
1531  params = params[len(non_data_params):]
1532 
1533  for _i, p in enumerate(params):
1534  assert isinstance(p, core.BlobReference) or \
1535  isinstance(p, core.GradientSlice), \
1536  "Param {} is not BlobReference or GradientSlice".format(p)
1537 
1538  name = stripBlobName(p)
1539  gpuid = None
1540 
1541  if isinstance(p, core.BlobReference):
1542  gpuid = int(p.GetNameScope().split("_")[1].split("/")[0])
1543  assert "{}_{}/".format(model._device_prefix, gpuid) in p.GetNameScope(),\
1544  "Param {} expected to have namescope '{}_{}'".format(str(p), model._device_prefix, gpuid)
1545  else:
1546  gpuid = int(p.indices.GetNameScope().split("_")[1].split("/")[0])
1547  assert "{}_{}/".format(model._device_prefix, gpuid) in p.indices.GetNameScope(),\
1548  "Indices {} expected to have namescope '{}_{}'".format(str(p), model._device_prefix, gpuid)
1549  assert "{}_{}/".format(model._device_prefix, gpuid) in p.values.GetNameScope(),\
1550  "Values {} expected to have namescope '{}_{}'".format(str(p), model._device_prefix, gpuid)
1551 
1552  if name not in grouped:
1553  grouped[name] = {}
1554  grouped[name][gpuid] = p
1555 
1556  return grouped
1557 
1558 
1559 def _ValidateParams(params):
1560  set_params = set(params)
1561  if len(params) > len(set_params):
1562  dupes = []
1563  sp = sorted(params)
1564  for j, p in enumerate(sp):
1565  if j > 0 and sp[j - 1] == p:
1566  dupes.append(p)
1567 
1568  assert len(params) == len(set_params), \
1569  "Duplicate entries in params: {}".format(dupes)
1570 
1571 
1572 def _ComputeBlobsToSync(model):
1573  '''
1574  We sync all blobs that are generated by param init net and
1575  are 'data parallel', i.e assigned to a device
1576  '''
1577  sync_names = set()
1578 
1579  # We don't sync params if the model is shared
1580  if model._shared_model:
1581  blobs_to_sync = [str(p) for p in model.GetComputedParams('')]
1582  sync_names = [stripBlobName(p) for p in blobs_to_sync]
1583  else:
1584  blobs_to_sync = []
1585 
1586  for op in model.param_init_net.Proto().op:
1587  dp_outputs = [
1588  o for o in op.output
1589  if o.startswith("{}_".format(model._device_prefix))
1590  ]
1591  sync_names.update([stripBlobName(o) for o in dp_outputs])
1592  blobs_to_sync.extend(dp_outputs)
1593 
1594  # Sanity check
1595  diff = set(model._param_names) - sync_names
1596  assert diff == set(), \
1597  "Some params not instantiated in param init net: {}".format(diff)
1598 
1599  # Remove duplicates and sort
1600  prefixlen = len(model._device_prefix) + 1
1601 
1602  def extract_sort_key(b):
1603  # Sort first based on device id, and then by whole string
1604  deviceid = int(b[prefixlen:b.index(scope._NAMESCOPE_SEPARATOR)])
1605  return (deviceid, b)
1606 
1607  blobs_to_sync = sorted(
1608  list(set(blobs_to_sync)),
1609  key=extract_sort_key)
1610 
1611  blobs_to_sync = [core.BlobReference(b) for b in blobs_to_sync]
1612  return (blobs_to_sync, sync_names)
1613 
1614 
1615 def _OptimizeGradientMemorySimple(model, losses_by_gpu, devices):
1616  log.warning("------- DEPRECATED API, please use " +
1617  "data_parallel_model.OptimizeGradientMemory() ----- ")
1618  for device in devices:
1619  namescope = "{}_{}/".format(model._device_prefix, device)
1620  model.net._net = memonger.share_grad_blobs(
1621  model.net,
1622  losses_by_gpu[device],
1623  set(viewvalues(model.param_to_grad)),
1624  namescope,
1625  share_activations=False,
1626  )
1627 
1628 
1629 def _AddDynamicMemoryOptimization(model, blobs_to_keep, devices):
1630  blobs_to_keep_all_devices = set()
1631  if blobs_to_keep is not None:
1632  for device in devices:
1633  for blob_name in blobs_to_keep:
1634  blobs_to_keep_all_devices.add(
1635  "{}_{}/{}".format(model._device_prefix, device, blob_name)
1636  )
1637 
1638  if model._rendezvous is not None:
1639  # GLOO operators expect the tensor addresses to remain same over
1640  # iterations so we need to remove param grads from the dynamic memory
1641  # management.
1642  blobs_to_keep_all_devices.update(
1643  [str(b) for b in viewvalues(model.param_to_grad)]
1644  )
1645 
1646  model.net._net = memonger.release_blobs_when_used(
1647  model.net.Proto(),
1648  blobs_to_keep_all_devices
1649  )
1650 
1651 
1652 def OptimizeGradientMemory(model,
1653  input_shapes,
1654  excluded_blobs,
1655  recycle_activations):
1656  """
1657  Optimize memory usage of the backward pass by recycling blobs for gradient
1658  inputs that have been 'used'.
1659  input_shapes: dict of blob name to shape for the inputs of the model.
1660  Pass empty dictionary if not known.
1661  excluded_blobs: list of blobs that cannot be recycled. These are blobs
1662  that you will access externally.
1663  recycle_activations: whether to also recycle forward pass activations
1664  """
1665  if input_shapes is not None:
1666  input_shapes_all_devices = {}
1667  for b, shp in viewitems(input_shapes):
1668  for d in model._devices:
1669  input_shapes_all_devices["{}_{}/{}".
1670  format(model._device_prefix, d, b)] = shp
1671 
1672  (shapes, types) = workspace.InferShapesAndTypes(
1673  [model.param_init_net, model.net],
1674  input_shapes_all_devices,
1675  )
1676  else:
1677  shapes = None
1678 
1679  for device in model._devices:
1680  namescope = "{}_{}/".format(model._device_prefix, device)
1681  excluded_blobs_by_device = set(namescope + b for b in excluded_blobs)
1682  model.net._net = memonger.share_grad_blobs(
1683  model.net,
1684  model._losses_by_gpu[device],
1685  set(viewvalues(model.param_to_grad)),
1686  namescope,
1687  dont_share_blobs=excluded_blobs_by_device,
1688  share_activations=recycle_activations,
1689  blob_shapes=shapes,
1690  )
1691 
1692 
1693 def _CreateOrCloneCommonWorld(
1694  net,
1695  common_world_blob,
1696  rendezvous,
1697  name=None,
1698  status_blob=None,
1699  timeout_sec=None):
1700 
1701  if timeout_sec is None:
1702  timeout_sec = _DEFAULT_TIMEOUT_SEC
1703 
1704  timeout_ms = timeout_sec * 1000
1705 
1706  # Check if there is an existing CreateCommonWorld
1707  # with the same timeout we're looking for. If so,
1708  # we can clone it instead of creating a new one.
1709  existing = None
1710  for op in net.Proto().op:
1711  if op.type != "CreateCommonWorld":
1712  continue
1713 
1714  # Find common world timeout
1715  op_timeout_ms = -1
1716  for arg in op.arg:
1717  if arg.name == 'timeout_ms':
1718  op_timeout_ms = arg.i
1719  break
1720  if op_timeout_ms != timeout_ms:
1721  continue
1722 
1723  # This common world was created with the same timeout we're
1724  # looking for, so we can clone it
1725  existing = op.output[0]
1726  break
1727 
1728  if name is None:
1729  name = "{}_op".format(common_world_blob)
1730 
1731  if existing is not None:
1732  comm_world = net.CloneCommonWorld(
1733  [existing],
1734  common_world_blob,
1735  name=name,
1736  engine=rendezvous['engine'],
1737  status_blob=status_blob,
1738  )
1739  else:
1740  kwargs=dict()
1741  if 'transport' in rendezvous:
1742  kwargs['transport'] = rendezvous['transport']
1743  if 'interface' in rendezvous:
1744  kwargs['interface'] = rendezvous['interface']
1745  if 'mpi_rendezvous' in rendezvous:
1746  kwargs['mpi_rendezvous'] = rendezvous['mpi_rendezvous']
1747  comm_world = net.CreateCommonWorld(
1748  rendezvous['kv_handler'] or [],
1749  common_world_blob,
1750  name=name,
1751  size=rendezvous['num_shards'],
1752  rank=rendezvous['shard_id'],
1753  engine=rendezvous['engine'],
1754  status_blob=status_blob,
1755  timeout_ms=timeout_ms,
1756  **kwargs
1757  )
1758 
1759  return comm_world
1760 
1761 
1762 def _RunComparison(model, blob_name, device=None):
1763  if device is None:
1764  device = model._blob_to_device[blob_name]
1765  with core.DeviceScope(device):
1766  rendezvous = model._rendezvous
1767  if rendezvous is None or rendezvous['num_shards'] == 1:
1768  return True
1769 
1770  test_data_arr = np.zeros(rendezvous['num_shards']).astype(np.float32)
1771  test_data_arr[rendezvous['shard_id']] = 1
1772  workspace.FeedBlob("compare_arr", test_data_arr)
1773 
1774  comparison_net = core.Net("allcompare_net")
1775 
1776  kwargs=dict()
1777  if 'mpi_rendezvous' in rendezvous:
1778  kwargs['mpi_rendezvous'] = rendezvous['mpi_rendezvous']
1779  comm_world = comparison_net.CreateCommonWorld(
1780  rendezvous['kv_handler'] or [],
1781  "initial_sync",
1782  name=model.net.Proto().name + ".cw_master_select",
1783  size=rendezvous['num_shards'],
1784  rank=rendezvous['shard_id'],
1785  engine=rendezvous['engine'],
1786  status_blob="cw_master_select",
1787  **kwargs
1788  )
1789 
1790  blob_name_checksum = blob_name + "_checksum"
1791  comparison_net.SumSqrElements(
1792  [blob_name], [blob_name_checksum], average=False
1793  )
1794 
1795  blob_name_gather = blob_name + "_gather"
1796  comparison_net.Mul(
1797  inputs=["compare_arr", blob_name_checksum],
1798  outputs=blob_name_gather,
1799  broadcast=1
1800  )
1801 
1802  comparison_net.Allreduce(
1803  inputs=[comm_world, blob_name_gather],
1804  outputs=[blob_name_gather],
1805  engine=rendezvous['engine'],
1806  status_blob="all_reduce_master_select_status",
1807  )
1808 
1809  workspace.RunNetOnce(comparison_net)
1810  gather_arr = workspace.FetchBlob(blob_name_gather)
1811 
1812  baseline = gather_arr[0]
1813  for i in range(rendezvous['num_shards']):
1814  assert gather_arr[i] == baseline, \
1815  "allcompare failed on shard {}.".format(rendezvous['shard_id'])
1816 
1817  return True
1818 
1819 
1820 def _InterleaveOps(model):
1821  '''
1822  Data Parallel Model creates a net with ops in one device grouped together.
1823  This will interleave the ops so that each op for each device is next
1824  to each other in the net. Kind of like combining decks of cards. This
1825  ensures that progress is made along the critical path roughly concurrently
1826  for each device, which is important due to the extra intra-node
1827  synchronization required for multi-device batch normalization.
1828  '''
1829  orig_ops = list(model.net.Proto().op)
1830  num_devices = len(model._devices)
1831  num_ops_per_dev = len(orig_ops) // num_devices
1832  assert num_devices * num_ops_per_dev == len(orig_ops), \
1833  'Number of ops per device in original net is not uniform'
1834  new_ops = []
1835  ops = {d: [] for d in range(num_devices)}
1836  for op in orig_ops:
1837  ops[op.device_option.cuda_gpu_id].append(op)
1838 
1839  for j in range(num_ops_per_dev):
1840  tp = None
1841  for d in model._devices:
1842  if tp is None:
1843  tp = ops[d][j].type
1844  new_ops.append(ops[d][j])
1845  # Sanity
1846  assert ops[d][j].type == tp, \
1847  "Type mismatch {} / {}".format(tp, ops[d][j].type)
1848 
1849  del model.net.Proto().op[:]
1850  model.net.Proto().op.extend(new_ops)
1851 
1852 
1853 def _InterDeviceBatchNormalization(model):
1854  orig_ops = list(model.net.Proto().op)
1855  new_ops = []
1856  num_devices = len(model._devices)
1857  batch_norm_ops = []
1858  injected_ops = []
1859 
1860  spatial_bn_phase = False
1861  sums_blobs = []
1862  sumsq_blobs = []
1863  name = []
1864  input_blob_name = None
1865 
1866  spatial_bn_gradient_phase = False
1867  scale_grad_blobs = []
1868  bias_grad_blobs = []
1869 
1870  for op in orig_ops:
1871  if op.type != 'SpatialBN' and op.type != 'SpatialBNGradient':
1872  if spatial_bn_phase:
1873  new_ops.extend(injected_ops)
1874  new_ops.append(
1875  core.CreateOperator("Sum",
1876  sums_blobs,
1877  input_blob_name + "_sums_combined"))
1878  new_ops.append(
1879  core.CreateOperator("Sum",
1880  sumsq_blobs,
1881  input_blob_name + "_sumsq_combined"))
1882  new_ops.extend(batch_norm_ops)
1883  injected_ops = []
1884  batch_norm_ops = []
1885  sums_blobs = []
1886  sumsq_blobs = []
1887  spatial_bn_phase = False
1888  input_blob_name = None
1889  elif spatial_bn_gradient_phase:
1890  new_ops.extend(injected_ops)
1891  scale_blob = \
1892  "cpu_0/" + stripBlobName(scale_grad_blobs[0]) + "_combined"
1893  bias_blob = \
1894  "cpu_0/" + stripBlobName(bias_grad_blobs[0]) + "_combined"
1895  new_ops.append(
1896  core.CreateOperator("Sum", scale_grad_blobs, scale_blob))
1897  new_ops.append(
1898  core.CreateOperator("Sum", bias_grad_blobs, bias_blob))
1899  for blob in scale_grad_blobs:
1900  new_ops.append(
1901  core.CreateOperator("Copy", scale_blob, blob))
1902  for blob in bias_grad_blobs:
1903  new_ops.append(core.CreateOperator("Copy", bias_blob, blob))
1904  new_ops.extend(batch_norm_ops)
1905  injected_ops = []
1906  batch_norm_ops = []
1907  scale_grad_blobs = []
1908  bias_grad_blobs = []
1909  spatial_bn_gradient_phase = False
1910  new_ops.append(op)
1911  elif op.type == 'SpatialBN':
1912  spatial_bn_phase = True
1913  if input_blob_name is None:
1914  input_blob_name = op.input[0]
1915  name = op.input[0]
1916  injected_ops.append(
1917  core.CreateOperator(
1918  "ChannelStats",
1919  name,
1920  [name + "_sums", name + "_sumsq"]))
1921  sums_blobs.append(name + "_sums")
1922  sumsq_blobs.append(name + "_sumsq")
1923  op.input.append(input_blob_name + "_sums_combined")
1924  op.input.append(input_blob_name + "_sumsq_combined")
1925  op.arg.extend([utils.MakeArgument("num_batches", num_devices)])
1926  batch_norm_ops.append(op)
1927  elif op.type == 'SpatialBNGradient':
1928  spatial_bn_gradient_phase = True
1929  injected_ops.append(
1930  core.CreateOperator("ChannelBackpropStats",
1931  [op.input[0], op.input[3], op.input[4],
1932  op.input[2]],
1933  [op.output[1], op.output[2]]))
1934  scale_grad_blobs.append(op.output[1])
1935  bias_grad_blobs.append(op.output[2])
1936  op.arg.extend([utils.MakeArgument("num_batches", num_devices)])
1937  op.input.extend([op.output[1], op.output[2]])
1938  batch_norm_ops.append(op)
1939 
1940  assert not spatial_bn_phase, \
1941  "Net modification for inter-device batch normalization failed"
1942  del model.net.Proto().op[:]
1943  model.net.Proto().op.extend(new_ops)
Module caffe2.python.layers.split.