Caffe2 - Python API
A deep learning, cross platform ML framework
data_parallel_model.py
1 ## @package data_parallel_model
2 # Module caffe2.python.data_parallel_model
3 from __future__ import absolute_import
4 from __future__ import division
5 from __future__ import print_function
6 
7 from collections import OrderedDict
8 from future.utils import viewitems, viewkeys, viewvalues
9 import logging
10 import copy
11 
12 from multiprocessing import cpu_count
13 
14 from caffe2.python import \
15  model_helper, dyndep, scope, workspace, core, memonger, utils
16 from caffe2.proto import caffe2_pb2
17 
18 import numpy as np
19 import warnings
20 
21 dyndep.InitOpsLibrary("@/caffe2/caffe2/contrib/nccl:nccl_ops")
22 dyndep.InitOpsLibrary("@/caffe2/caffe2/contrib/gloo:gloo_ops")
23 dyndep.InitOpsLibrary("@/caffe2/caffe2/contrib/gloo:gloo_ops_gpu")
24 
25 log = logging.getLogger("data_parallel_model")
26 log.setLevel(logging.INFO)
27 
28 _DEFAULT_TIMEOUT_SEC = 30
29 _DEFAULT_BARRIER_NET_TIMEOUT_SEC = 300
30 
31 
32 def Parallelize_GPU(*args, **kwargs):
33  kwargs['cpu_device'] = False
34  Parallelize(*args, **kwargs)
35 
36 
37 def Parallelize_CPU(*args, **kwargs):
38  kwargs['cpu_device'] = True
39  Parallelize(*args, **kwargs)
40 
41 def Parallelize_iDeep(*args, **kwargs):
42  kwargs['ideep'] = True
43  Parallelize(*args, **kwargs)
44 
45 def Parallelize(
46  model_helper_obj,
47  input_builder_fun,
48  forward_pass_builder_fun,
49  param_update_builder_fun=None,
50  optimizer_builder_fun=None,
51  post_sync_builder_fun=None,
52  pre_grad_net_transformer_fun=None,
53  net_transformer_fun=None,
54  devices=None,
55  rendezvous=None,
56  net_type='dag',
57  broadcast_computed_params=True,
58  optimize_gradient_memory=False,
59  dynamic_memory_management=False,
60  blobs_to_keep=None,
61  use_nccl=False,
62  max_concurrent_distributed_ops=16,
63  cpu_device=False,
64  ideep=False,
65  num_threads_per_device=4,
66  shared_model=False,
67  combine_spatial_bn=False,
68  barrier_net_timeout_sec=_DEFAULT_BARRIER_NET_TIMEOUT_SEC,
69 ):
70  '''
71  Function to create a model that can run on many GPUs or CPUs.
72  model_helper_obj: an object of ModelHelper
73  input_builder_fun:
74  Function that adds the input operators
75  Note: Remember to instantiate reader outside of this
76  function so all devices share same reader object.
77  Signature: input_builder_fun(model)
78  forward_pass_builder_fun:
79  Function to add the operators to the model.
80  Must return list of loss-blob references that
81  are used to build the gradient. Loss scale parameter
82  is passed, as you should scale the loss of your model
83  by 1.0 / the total number of devices.
84  Signature: forward_pass_builder_fun(model, loss_scale)
85  param_update_builder_fun:
86  Function that adds operators that are run after
87  gradient update, such as updating the weights and
88  weight decaying. This is called for each GPU separately.
89  Signature: param_update_builder_fun(model)
90  optimizer_builder_fun:
91  Alternative to param_update_builder_fun, allows one
92  to add an optimizer for the whole model. Called only
93  once, without name or devicescope.
94  net_transformer_fun:
95  Optional function to transform the network after the
96  network is built. It will be called once (NOT once per
97  GPU.)
98  Signature:
99  net_transformer_fun(
100  model, num_devices, device_prefix, device_type)
101  pre_grad_net_transformer_fun:
102  Optional function to transform the network similar to
103  net_transformer_fun, but happens before gradient ops
104  been add.
105  Signature: pre_grad_net_transformer_fun(model)
106  post_sync_builder_fun:
107  Function applied after initial parameter sync has been
108  completed, such as keeping multi-precision parameters
109  in sync.
110  Signature: post_sync_builder_fun(model)
111  devices: List of GPU ids, such as [0, 1, 2, 3],
112  rendezvous: used for rendezvous in distributed computation, if None
113  then only one node is used. To create rendezvous,
114  use <TBD>.
115  net_type: Network type
116  optimize_gradient_memory: whether to apply 'memonger' to share blobs
117  shared_model (only for CPU) use same parameters on each device
118  in gradient computation to reduce memory footprint.
119  dynamic_memory_management: Whether to apply dynamic memory optimization
120  by freeing unused blobs. The underlying (de)allocation
121  uses cached allocator. For GPU training PLEASE MAKE SURE
122  caffe2_cuda_memory_pool is set.
123  blobs_to_keep : A list of blob names to keep and don't free during
124  dynamic memory optimization (for example loss blob).
125  cpu_device Use CPU instead of GPU.
126  ideep Use ideep.
127  combine_spatial_bn:
128  When set to True, applies batch normalization across
129  all devices within the node. If False, batch
130  normalization will be done separately for each device.
131  This option is currently only supported on the CPU.
132  barrier_net_timeout_sec:
133  The timeout in seconds of the barrier net, which is run
134  to synchronize shards before a training epoch starts.
135  Defaults to 300 seconds.
136  '''
137  assert scope.CurrentDeviceScope() is None \
138  or scope.CurrentDeviceScope().device_type == caffe2_pb2.CPU, \
139  "Parallelize must be called without device-scope, \
140  device scope was: {}".format(scope.CurrentDeviceScope())
141 
142  if devices is None:
143  if not (cpu_device or ideep):
144  devices = list(range(0, workspace.NumCudaDevices()))
145  else:
146  devices = list(range(0, cpu_count()))
147 
148  if not (cpu_device or ideep):
149  for gpu in devices:
150  if gpu >= workspace.NumGpuDevices():
151  log.warning("** Only {} GPUs available, GPUs {} requested".format(
152  workspace.NumGpuDevices(), devices))
153  break
154  model_helper_obj._device_type = workspace.GpuDeviceType
155  model_helper_obj._device_prefix = "gpu"
156  model_helper_obj._shared_model = False
157  device_name = "GPU"
158  assert shared_model is False, "Shared model only supported on CPU"
159  elif ideep:
160  model_helper_obj._device_type = caffe2_pb2.IDEEP
161  model_helper_obj._device_prefix = "ideep"
162  device_name = "IDEEP"
163  model_helper_obj._shared_model = shared_model
164  if shared_model and rendezvous is not None:
165  assert "Shared model only supported on single-node currently"
166  else:
167  model_helper_obj._device_type = caffe2_pb2.CPU
168  model_helper_obj._device_prefix = "cpu"
169  device_name = "CPU"
170  model_helper_obj._shared_model = shared_model
171  if shared_model and rendezvous is not None:
172  assert "Shared model only supported on single-node currently"
173 
174  log.info("Parallelizing model for devices: {}".format(devices))
175  extra_workers = 8 if rendezvous is not None else 0 # best-guess
176  num_workers = len(devices) * num_threads_per_device + extra_workers
177  max_concurrent_distributed_ops =\
178  min(max_concurrent_distributed_ops, num_workers - 1)
179  model_helper_obj.net.Proto().num_workers = num_workers
180  model_helper_obj.net.Proto().type = net_type
181 
182  # Store some information in the model -- a bit ugly
183  model_helper_obj._devices = devices
184  model_helper_obj._rendezvous = rendezvous
185  model_helper_obj._sync_barrier_net = None
186 
187  model_helper_obj._broadcast_context = None
188  model_helper_obj._grad_names = []
189 
190  assert isinstance(model_helper_obj, model_helper.ModelHelper)
191 
192  # Keep track of params that were in the model before: they are not
193  # data parallel, so we need to handle them separately
194  non_datapar_params = copy.copy(model_helper_obj.params)
195 
196  # Add input and model
197  log.info("Create input and model training operators")
198 
199  losses_by_gpu = {}
200  num_shards = 1 if rendezvous is None else rendezvous['num_shards']
201  loss_scale = 1.0 / (len(devices) * num_shards)
202 
203  has_parameter_updates = param_update_builder_fun is not None or \
204  optimizer_builder_fun is not None
205  assert not (
206  param_update_builder_fun is not None and
207  optimizer_builder_fun is not None
208  ), 'Can only specify one of param_update_builder_fun, optimizer_builder_fun'
209 
210  # Check that a model that is used for validation/testing has
211  # init_params False, otherwise running the param init net will overwrite
212  # synchronized values by the training net
213  if not has_parameter_updates and model_helper_obj.init_params:
214  log.warning('')
215  log.warning("############# WARNING #############")
216  log.warning("Model {}/{} is used for testing/validation but".format(
217  model_helper_obj.name, model_helper_obj))
218  log.warning("has init_params=True!")
219  log.warning("This can conflict with model training.")
220  log.warning("Please ensure model = ModelHelper(init_params=False)")
221  log.warning('####################################')
222  log.warning('')
223  # TODO: make into assert
224 
225  for device in devices:
226  device_opt = core.DeviceOption(model_helper_obj._device_type, device)
227  with core.DeviceScope(device_opt):
228  with core.NameScope("{}_{}".format(model_helper_obj._device_prefix,
229  device)):
230  log.info("Model for {} : {}".format(device_name, device))
231  input_builder_fun(model_helper_obj)
232  losses = forward_pass_builder_fun(model_helper_obj, loss_scale)
233  # Losses are not needed for test net
234  if has_parameter_updates:
235  assert isinstance(losses, list), \
236  'Model builder function must return list of loss blobs'
237  for loss in losses:
238  assert isinstance(loss, core.BlobReference), \
239  'Model builder func must return list of loss blobs'
240 
241  losses_by_gpu[device] = losses
242  _ValidateParams(model_helper_obj.params)
243 
244  # Create parameter map
245  model_helper_obj._device_grouped_blobs =\
246  _GroupByDevice(model_helper_obj, devices,
247  model_helper_obj.params, non_datapar_params)
248 
249  # computed params
250  computed_params_grouped =\
251  _GroupByDevice(model_helper_obj, devices,
252  model_helper_obj.GetComputedParams(''), [])
253  model_helper_obj._device_grouped_blobs.update(computed_params_grouped)
254 
255  model_helper_obj._param_names =\
256  list(viewkeys(model_helper_obj._device_grouped_blobs))
257  model_helper_obj._computed_param_names =\
258  list(viewkeys(computed_params_grouped))
259 
260  if pre_grad_net_transformer_fun:
261  pre_grad_net_transformer_fun(model_helper_obj)
262 
263  if has_parameter_updates:
264  log.info("Adding gradient operators")
265  _AddGradientOperators(devices, model_helper_obj, losses_by_gpu)
266 
267  if net_transformer_fun:
268  net_transformer_fun(
269  model_helper_obj,
270  len(devices),
271  model_helper_obj._device_prefix,
272  model_helper_obj._device_type)
273 
274  if not has_parameter_updates:
275  log.info("Parameter update function not defined --> only forward")
276  _InferBlobDevice(model_helper_obj)
277  return
278 
279  if combine_spatial_bn:
280  assert(has_parameter_updates), \
281  'combine_spatial_bn should only be used for train model'
282  _InterleaveOps(model_helper_obj)
283  if cpu_device:
284  _CPUInterDeviceBatchNormalization(model_helper_obj)
285  else:
286  _GPUInterDeviceBatchNormalization(model_helper_obj)
287 
288  _ValidateParams(model_helper_obj.params)
289 
290  # Group gradients by device and register to blob lookup
291  param_to_grad = model_helper_obj.param_to_grad
292  grads_ordered = [param_to_grad[p] for p in
293  model_helper_obj.params if p in param_to_grad]
294  non_datapar_grads = [param_to_grad[p] for p in non_datapar_params]
295 
296  gradients_grouped = _GroupByDevice(
297  model_helper_obj,
298  devices,
299  grads_ordered,
300  non_datapar_grads
301  )
302  model_helper_obj._device_grouped_blobs.update(gradients_grouped)
303  model_helper_obj._grad_names = list(viewkeys(gradients_grouped))
304  model_helper_obj._losses_by_gpu = losses_by_gpu
305 
306  _InferBlobDevice(model_helper_obj)
307 
308  log.info("Add gradient all-reduces for SyncSGD")
309  if broadcast_computed_params:
310  _BroadcastComputedParams(devices, model_helper_obj, rendezvous, use_nccl)
311 
312  if len(model_helper_obj._grad_names) > 0:
313  # Gradients in reverse order
314  reverse_ordered_grads = _GetReverseOrderedGrads(model_helper_obj)
315  assert(len(reverse_ordered_grads) > 0)
316  _AllReduceBlobs(
317  reverse_ordered_grads,
318  devices,
319  model_helper_obj,
320  model_helper_obj.net,
321  rendezvous,
322  use_nccl,
323  max_concurrent_distributed_ops,
324  )
325  else:
326  log.info("NOTE: Param builder function did not create any parameters.")
327 
328  log.info("Post-iteration operators for updating params")
329  num_shards = 1 if rendezvous is None else rendezvous['num_shards']
330 
331  all_params = set(model_helper_obj.GetParams(''))
332  if shared_model:
333  _PruneParametersForSharing(model_helper_obj)
334 
335  if param_update_builder_fun is not None:
336  for device in devices:
337  device_opt = core.DeviceOption(model_helper_obj._device_type, device)
338  with core.DeviceScope(device_opt):
339  with core.NameScope(
340  "{}_{}".format(model_helper_obj._device_prefix, device)
341  ):
342  param_update_builder_fun(model_helper_obj)
343  else:
344  log.info("Calling optimizer builder function")
345  optimizer = optimizer_builder_fun(model_helper_obj)
346  model_helper_obj._optimizer = optimizer
347 
348  (sync_blobs, sync_names) = _ComputeBlobsToSync(model_helper_obj)
349  sync_blobs_grouped = _GroupByDevice(
350  model_helper_obj,
351  devices,
352  sync_blobs,
353  [],
354  )
355  model_helper_obj._device_grouped_blobs.update(sync_blobs_grouped)
356 
357  _InferBlobDevice(model_helper_obj)
358  _AnalyzeOperators(model_helper_obj)
359 
360  # Configure dagnet to run with only one worker on the first iteration,
361  # to prevent concurrency problems with allocs and nccl.
362  arg = model_helper_obj.Proto().arg.add()
363  arg.name = "first_iter_only_one_worker"
364  arg.i = 1
365 
366  # Add initial parameter syncs
367  log.info("Add initial parameter sync")
368  _SyncAllParams(
369  devices,
370  model_helper_obj,
371  model_helper_obj.param_init_net,
372  model_helper_obj.param_init_net,
373  rendezvous,
374  sync_names,
375  max_concurrent_distributed_ops=1
376  )
377 
378  # Handle any operations that need to be done after parameter sync
379  # i.e. making sure multi-precision copies of parameters are up-to-date
380  if post_sync_builder_fun is not None:
381  for device in devices:
382  device_opt = core.DeviceOption(model_helper_obj._device_type, device)
383  with core.DeviceScope(device_opt):
384  with core.NameScope(
385  "{}_{}".format(model_helper_obj._device_prefix, device)
386  ):
387  post_sync_builder_fun(model_helper_obj)
388 
389  assert not (optimize_gradient_memory and dynamic_memory_management), \
390  """It is not advised to use gradient optimization ('memonger')
391  with dynamic memory management."""
392 
393  if optimize_gradient_memory:
394  _OptimizeGradientMemorySimple(model_helper_obj, losses_by_gpu, devices)
395 
396  if dynamic_memory_management:
397  _AddDynamicMemoryOptimization(model_helper_obj, blobs_to_keep, devices)
398 
399 
400  model_helper_obj._data_parallel_model_init_nets = [
401  model_helper_obj.param_init_net,
402  ]
403 
404  model_helper_obj._data_parallel_model_nets = [
405  model_helper_obj.net
406  ]
407  _AddBarrierToModelNets(model_helper_obj, barrier_net_timeout_sec)
408 
409  if shared_model:
410  _RemapParameterBlobsForSharedModel(model_helper_obj, all_params)
411 
412 
413 def Parallelize_GPU_BMUF(*args, **kwargs):
414  kwargs['cpu_device'] = False
415  Parallelize_BMUF(*args, **kwargs)
416 
417 
418 def Parallelize_CPU_BMUF(*args, **kwargs):
419  kwargs['cpu_device'] = True
420  Parallelize_BMUF(*args, **kwargs)
421 
422 
423 def Parallelize_BMUF(
424  model_helper_obj,
425  input_builder_fun,
426  forward_pass_builder_fun,
427  param_update_builder_fun,
428  block_learning_rate=1.0,
429  block_momentum=None,
430  devices=None,
431  rendezvous=None,
432  net_type='dag',
433  master_device=None,
434  use_nccl=False,
435  nesterov=False,
436  optimize_gradient_memory=False,
437  reset_momentum_sgd=False,
438  warmup_iterations=None,
439  max_concurrent_distributed_ops=4,
440  add_blobs_to_sync=None,
441  num_threads_per_device=4,
442  cpu_device=False,
443  barrier_net_timeout_sec=_DEFAULT_BARRIER_NET_TIMEOUT_SEC,
444 ):
445  '''
446  Function to create model that run on many GPUs and creates a net for
447  parameter_updates that can be run independently for number of iterations
448  then followed by another net that runs once to compute the final parameter
449  updates according to block wise model update filtering rule described
450  in : Scalable Training of Deep Learning Machines by Incremental Block
451  Training with Intra-block Parallel Optimization and Blockwise Model-Update
452  Filtering (ICASSP 2016).
453  '''
454  assert scope.CurrentDeviceScope() is None \
455  or scope.CurrentDeviceScope().device_type == caffe2_pb2.CPU, \
456  "Parallelize must be called without device-scope, \
457  device scope was: {}".format(scope.CurrentDeviceScope())
458 
459  assert isinstance(model_helper_obj, model_helper.ModelHelper)
460 
461  if devices is None:
462  devices = list(range(0, workspace.NumGpuDevices()))
463  if master_device is None:
464  master_device = devices[0]
465 
466  if not cpu_device:
467  for gpu in devices:
468  if gpu >= workspace.NumGpuDevices():
469  log.warning("** Only {} GPUs available, GPUs {} requested".format(
470  workspace.NumGpuDevices(), devices))
471  break
472  model_helper_obj._device_type = workspace.GpuDeviceType
473  model_helper_obj._device_prefix = "gpu"
474  else:
475  model_helper_obj._device_type = caffe2_pb2.CPU
476  model_helper_obj._device_prefix = "cpu"
477 
478  model_helper_obj._devices = devices
479  model_helper_obj._rendezvous = rendezvous
480  model_helper_obj._sync_barrier_net = None
481  model_helper_obj._broadcast_context = None
482  model_helper_obj._shared_model = False
483  master_dev_opt = core.DeviceOption(model_helper_obj._device_type, master_device)
484 
485  # question: rendezvous structure
486  num_shards = rendezvous['num_shards'] if rendezvous else 1
487  # num_devices is #devices across all machines
488  num_devices = len(devices) * num_shards
489  # num_workers is #threads to execute the DAG per shard
490  num_workers = num_threads_per_device * len(devices)
491  if rendezvous:
492  num_workers += 8
493 
494  loss_scale = 1.0 / num_devices
495  if block_momentum is None:
496  block_momentum = 1.0 - 1.0 / num_devices
497 
498  max_concurrent_distributed_ops = min(
499  max_concurrent_distributed_ops,
500  num_workers - 1
501  )
502 
503  model_helper_obj.net.Proto().num_workers = num_workers
504  model_helper_obj.net.Proto().type = net_type
505 
506  # A net for initializing global model parameters. Its called once in the
507  # same step as net parameters initialization.
508  model_helper_obj._global_model_init_net = core.Net('global_model_init')
509  model_helper_obj._global_model_init_net.Proto().type = net_type
510  model_helper_obj._global_model_init_net.Proto().num_workers = \
511  num_workers
512 
513  # A net for computing final parameter updates. Its will run once after
514  # running net (local models updates) for `num_local_iterations` times.
515  model_helper_obj._global_model_param_updates_net = core.Net('global_model')
516  model_helper_obj._global_model_param_updates_net.Proto().type = net_type
517  model_helper_obj._global_model_param_updates_net.Proto().num_workers = \
518  num_workers
519 
520  def _v(param):
521  return "{}_v".format(param)
522 
523  def _g(param):
524  return "{}_g".format(param)
525 
526  def _v_prev(param):
527  return "{}_prev".format(param)
528 
529  # Keep track of params that were in the model before: they are not
530  # data parallel, so we need to handle them separately
531  non_datapar_params = copy.copy(model_helper_obj.params)
532  model_helper_obj._losses_by_gpu = {}
533 
534  def _InitializeModels(gpu_id):
535  input_builder_fun(model_helper_obj)
536  loss = forward_pass_builder_fun(model_helper_obj, loss_scale)
537  model_helper_obj._losses_by_gpu[gpu_id] = loss
538  _ForEachDevice(
539  devices,
540  _InitializeModels,
541  device_type=model_helper_obj._device_type,
542  device_prefix=model_helper_obj._device_prefix,
543  scoped=True
544  )
545  _ValidateParams(model_helper_obj.params)
546 
547  model_helper_obj._device_grouped_blobs =\
548  _GroupByDevice(model_helper_obj, devices,
549  model_helper_obj.params, non_datapar_params)
550 
551  model_helper_obj._param_names =\
552  list(viewkeys(model_helper_obj._device_grouped_blobs))
553 
554  _AddGradientOperators(
555  devices, model_helper_obj, model_helper_obj._losses_by_gpu
556  )
557  _ValidateParams(model_helper_obj.params)
558 
559  _InferBlobDevice(model_helper_obj)
560 
561  def _InitializeParamUpdate(gpu_id):
562  param_update_builder_fun(model_helper_obj)
563  _ForEachDevice(
564  devices,
565  _InitializeParamUpdate,
566  device_type=model_helper_obj._device_type,
567  device_prefix=model_helper_obj._device_prefix,
568  scoped=True
569  )
570 
571  model_parameter_names = list(
572  viewkeys(model_helper_obj._device_grouped_blobs)
573  )
574  if warmup_iterations is not None:
575  model_helper_obj._warmup_iterations = warmup_iterations
576  # A net for broadcasting gpu-0 (master shard) parameters after
577  # running net for `warmup_iterartions`.
578  model_helper_obj._warmup_broadcast = core.Net('warmup-broadcast')
579  model_helper_obj._warmup_broadcast.Proto().type = net_type
580  model_helper_obj._warmup_broadcast.Proto().num_workers = \
581  num_workers
582 
583  _SyncAllParams(
584  devices,
585  model_helper_obj,
586  model_helper_obj.param_init_net,
587  model_helper_obj._warmup_broadcast,
588  rendezvous,
589  model_parameter_names,
590  max_concurrent_distributed_ops
591  )
592  for param_name in viewkeys(model_helper_obj._device_grouped_blobs):
593  param = model_helper_obj._device_grouped_blobs[param_name][master_device]
594  with core.DeviceScope(master_dev_opt):
595  model_helper_obj._warmup_broadcast.Copy(param, _g(param))
596 
597  # (Step-0) Initialize momentum parameters on master device.
598  for param_name in viewkeys(model_helper_obj._device_grouped_blobs):
599  param = model_helper_obj._device_grouped_blobs[param_name][master_device]
600  with core.DeviceScope(master_dev_opt):
601  model_helper_obj._global_model_init_net.ConstantFill(
602  param, _v(param), value=0.0
603  )
604  model_helper_obj._global_model_init_net.Copy(param, _g(param))
605  if nesterov:
606  model_helper_obj._global_model_init_net.ConstantFill(
607  param, _v_prev(param), value=0.0
608  )
609 
610  # (Step-1) Update models for num_local_iterations.
611 
612  # (Step-2) Compute post-local-updates average of the params.
613  # Sum model params across GPUs and store resutls in param_avg blob.
614  _AllReduceBlobs(
615  model_parameter_names,
616  devices,
617  model_helper_obj,
618  model_helper_obj._global_model_param_updates_net,
619  rendezvous,
620  use_nccl,
621  max_concurrent_distributed_ops
622  )
623 
624  # (Step-3) Update momentum params :
625  # param_v = block_momentum * param_v
626  # + block_learning_Rate * (param_avg - param)
627  # if nesterov momentum:
628  # param = param + param_v
629  # - block_momentum * (param_v - param_v_prev)
630  # param_v_prev = param_v
631  # else:
632  # param = param + param_v
633  for param_name in model_parameter_names:
634  param = model_helper_obj._device_grouped_blobs[param_name][master_device]
635  with core.DeviceScope(master_dev_opt):
636  # TODO(ataei) : Stop building the graph here to get model average ?
637  model_helper_obj._global_model_param_updates_net.Scale(
638  param, param, scale=1.0 / num_devices
639  )
640  model_helper_obj._global_model_param_updates_net.Sub(
641  [param, _g(param)], param
642  )
643  model_helper_obj._global_model_param_updates_net.Scale(
644  param, param, scale=block_learning_rate
645  )
646  model_helper_obj._global_model_param_updates_net.Scale(
647  _v(param), _v(param), scale=block_momentum
648  )
649  model_helper_obj._global_model_param_updates_net.Add(
650  [_v(param), param], _v(param)
651  )
652  model_helper_obj._global_model_param_updates_net.Add(
653  [_g(param), _v(param)], _g(param)
654  )
655  if nesterov:
656  model_helper_obj._global_model_param_updates_net.Sub(
657  [_v(param), _v_prev(param)], _v_prev(param)
658  )
659  model_helper_obj._global_model_param_updates_net.Scale(
660  _v_prev(param), _v_prev(param), scale=block_momentum
661  )
662  model_helper_obj._global_model_param_updates_net.Sub(
663  [_g(param), _v_prev(param)], _g(param)
664  )
665  model_helper_obj._global_model_param_updates_net.Copy(
666  _v(param), _v_prev(param)
667  )
668  model_helper_obj._global_model_param_updates_net.Copy(
669  _g(param), param
670  )
671 
672 
673  _SyncAllParams(
674  devices,
675  model_helper_obj,
676  model_helper_obj.param_init_net,
677  model_helper_obj._global_model_param_updates_net,
678  rendezvous,
679  model_parameter_names,
680  max_concurrent_distributed_ops
681  )
682 
683  # Add additional syncs
684  if add_blobs_to_sync is not None:
685  AddBlobSync(
686  model_helper_obj,
687  add_blobs_to_sync,
688  net=model_helper_obj._global_model_param_updates_net)
689 
690  # Reset momentum-SGD parameters
691  if reset_momentum_sgd:
692  momentum_ops = [op for op in model_helper_obj.net.Proto().op
693  if op.type == 'MomentumSGDUpdate']
694  for op in momentum_ops:
695  momentum_blob = op.input[1]
696  with core.DeviceScope(op.device_option):
697  model_helper_obj._global_model_param_updates_net.ConstantFill(
698  [momentum_blob], momentum_blob, value=0.0
699  )
700 
701  if optimize_gradient_memory:
702  _OptimizeGradientMemorySimple(
703  model_helper_obj, model_helper_obj._losses_by_gpu, devices
704  )
705 
706  model_helper_obj._data_parallel_model_init_nets = [
707  model_helper_obj.param_init_net,
708  model_helper_obj._global_model_init_net
709  ]
710 
711  model_helper_obj._data_parallel_model_nets = [
712  model_helper_obj.net,
713  (model_helper_obj._global_model_param_updates_net, 1)
714  ]
715  _AddBarrierToModelNets(model_helper_obj, barrier_net_timeout_sec)
716 
717 def CreateNet(model, overwrite=False):
718  for net_iters in model._data_parallel_model_nets:
719  if isinstance(net_iters, tuple):
720  workspace.CreateNet(net_iters[0], overwrite=overwrite)
721  else:
722  workspace.CreateNet(net_iters, overwrite=overwrite)
723 
724 
725 def RunInitNet(model):
726  for init_net in model._data_parallel_model_init_nets:
727  workspace.RunNetOnce(init_net)
728  CreateNet(model)
729 
730 
731 def RunWarmup(model):
732  workspace.RunNet(model.net, model._warmup_iterations)
733  workspace.RunNetOnce(model._warmup_broadcast)
734 
735 
736 def RunNet(model, num_iterations):
737  for net_iter in model._data_parallel_model_nets:
738  if isinstance(net_iter, tuple):
739  workspace.RunNet(net_iter[0].Proto().name, net_iter[1])
740  else:
741  workspace.RunNet(net_iter, num_iterations)
742 
743 
744 def _AddBarrierToModelNets(model, barrier_net_timeout_sec):
745  if model._rendezvous is not None and model._rendezvous['engine'] == 'GLOO':
746  # Synchronize DPM at the start of each epoch. This allows shards that
747  # starts an epoch sooner to wait for slower shards. Without this,
748  # shards that are faster than others will begin training the next epoch
749  # while stragglers are blocked on IO, and may timeout after 30 seconds
750  # (_DEFAULT_TIMEOUT_SEC).
751  # We pass in model.param_init_net so that the barrier net can be run as
752  # part of the param_init_net.
753 
754  model._barrier_init_net = core.Net("barrier_init_net")
755 
756  model._barrier_net = _CreateBarrierNet(model, model._barrier_init_net,
757  "pre_training", barrier_net_timeout_sec)
758 
759  model._data_parallel_model_init_nets.insert(0, model._barrier_init_net)
760 
761  model._data_parallel_model_nets.insert(0, model._barrier_net)
762 
763 
764 def _CreateBarrierNet(model, init_net, name_prefix, timeout_sec):
765  log.info("Creating barrier net")
766  assert model._rendezvous['engine'] == 'GLOO', "Engine does not support barrier"
767  comm_world = _CreateOrCloneCommonWorld(
768  init_net,
769  name_prefix + "_barrier_cw",
770  rendezvous=model._rendezvous,
771  timeout_sec=timeout_sec,
772  )
773  barrier_net = core.Net(name_prefix + "_barrier_net")
774  barrier_net.Barrier(
775  inputs=[comm_world],
776  outputs=[],
777  engine=model._rendezvous['engine'],
778  )
779  return barrier_net
780 
781 
782 # DEPRECATED: See warnings below.
783 def Synchronize(model, timeout_sec=_DEFAULT_BARRIER_NET_TIMEOUT_SEC):
784  warnings.warn("The Synchronize API has been deprecated. We now have a "
785  "barrier net which runs before training to ensure all hosts wait "
786  "before training starts. The default timeout for the barrier is "
787  "300s and it can be overridden using the barrier_net_timeout_sec "
788  "parameter when calling Parallelize.",
789  category=DeprecationWarning, stacklevel=2)
790  if model._rendezvous is None or model._rendezvous['num_shards'] <= 1:
791  # Single host case
792  return
793 
794  if model._sync_barrier_net is None:
795  barrier_init_net = core.Net("sync_barrier_init_net")
796  model._sync_barrier_net = _CreateBarrierNet(
797  model, barrier_init_net, "sync", timeout_sec)
798  workspace.RunNetOnce(barrier_init_net)
799  workspace.CreateNet(model._sync_barrier_net)
800  model._sync_barrier_net_timeout = timeout_sec
801  assert model._sync_barrier_net_timeout == timeout_sec, \
802  "Must use fixed timeout, {} != {}".format(
803  model._sync_barrier_net_timeout, timeout_sec
804  )
805  log.info("Synchronize run barrier net.")
806  workspace.RunNet(model._sync_barrier_net)
807 
808 
809 def ConvertNetForDevice(net, device=None):
810  '''
811  Converts all blobs in the net to have namescope gpu_X, and correct
812  device scope. You can use this to enable AppendNet with a
813  forward_pass_builder_fun:
814 
815  def builder_fun(model):
816  ...
817  model.net.AppendNet(
818  data_parallel_model.ConvertNetForDevice(othermodel.net))
819  model.param_init_net.AppendNet(
820  data_parallel_model.ConvertNetForDevice(othermodel.param_init_net))
821  '''
822  mnet = copy.deepcopy(net)
823 
824  if device is None:
825  device = scope.CurrentDeviceScope()
826  if core.IsGPUDeviceType(device.device_type):
827  device_prefix = "gpu"
828  elif device.device_type == caffe2_pb2.IDEEP:
829  device_prefix = "ideep"
830  else:
831  device_prefix = "cpu"
832 
833  namescope = "{}_{}/".format(device_prefix, device.device_id)
834  for op in mnet.Proto().op:
835  if "RecurrentNetwork" in op.type:
836  raise("RecurrentNetwork conversion not yet supported")
837  for i, inputb in enumerate(op.input):
838  op.input[i] = namescope + inputb
839  for i, outputb in enumerate(op.output):
840  op.output[i] = namescope + outputb
841  for i, blob in enumerate(op.control_input):
842  op.control_input[i] = namescope + blob
843  op.device_option.CopyFrom(device)
844  for i, einp in enumerate(mnet.Proto().external_input):
845  mnet.Proto().external_input[i] = namescope + einp
846  for i, eoutp in enumerate(mnet.Proto().external_output):
847  mnet.Proto().external_output[i] = namescope + eoutp
848  return mnet
849 
850 
851 def _ForEachDevice(devices, f, device_type, device_prefix, scoped=False,
852  *args, **kwargs):
853  for device in devices:
854  device_opt = core.DeviceOption(device_type, device)
855  with core.DeviceScope(device_opt):
856  if scoped:
857  with core.NameScope("{}_{}".format(device_prefix, device)):
858  f(device, *args, **kwargs)
859  else:
860  f(device, *args, **kwargs)
861 
862 
863 def _AddGradientOperators(devices, model, losses_by_gpu):
864  def create_grad(lossp):
865  return model.ConstantFill(lossp, str(lossp) + "_grad", value=1.0)
866 
867  loss_grad = {}
868  # Explicitly need to create gradients on each GPU
869  for gpu_id in devices:
870  device = core.DeviceOption(model._device_type, gpu_id)
871  with core.DeviceScope(device):
872  for l in losses_by_gpu[gpu_id]:
873  lg = create_grad(l)
874  loss_grad[str(l)] = str(lg)
875 
876  model.AddGradientOperators(loss_grad)
877 
878 
879 def ExtractPredictorNet(model, inputs, outputs, device):
880  '''
881  Returns (net, params) that can be exported to be used as a prediction
882  net.
883  '''
884  master_device = model._devices[0]
885  prefix = "{}_{}/".format(model._device_prefix, master_device)
886  prefix_inputs = [prefix + str(b) for b in inputs]
887  prefix_outputs = [prefix + str(b) for b in outputs]
888  (predictor_net, export_blobs) = model_helper.ExtractPredictorNet(
889  net_proto=model.net.Proto(),
890  input_blobs=prefix_inputs,
891  output_blobs=prefix_outputs,
892  device=device,
893  renames={
894  a: b
895  for (a, b) in zip(prefix_inputs + prefix_outputs, inputs + outputs)
896  },
897  )
898 
899  return (predictor_net, export_blobs)
900 
901 
902 def GetCheckpointParams(model):
903  '''
904  Returns a set of blobs that are needed for a complete check point.
905  They are blobs for the first gpu and iteration blobs.
906  '''
907  (all_blobs, _) = _ComputeBlobsToSync(model)
908  first_gpu_blobs = {
909  b
910  for b in all_blobs
911  if str(b)
912  .startswith("{}_{}/".format(model._device_prefix, model._devices[0]))
913  }
914 
915  # Add iteration blobs that do not have namescope separately, since
916  # it is important to checkpoint iteration counter
917  iteration_blobs = set()
918  for op in model.net.Proto().op:
919  if op.type == 'Iter' or op.type == 'AtomicIter':
920  if not op.output[0].startswith("{}_".format(model._device_prefix)):
921  iteration_blobs.add(op.output[0])
922 
923  return first_gpu_blobs.union(iteration_blobs)
924 
925 
926 def FinalizeAfterCheckpoint(model, blobs=None, cpu_mode=False):
927  '''
928  This function should be called after loading parameters from a
929  checkpoint / initial parameters file.
930  '''
931 
932  if not hasattr(model, "_checkpoint_net"):
933  if blobs is None:
934  (_, uniq_blob_names) = _ComputeBlobsToSync(model)
935  else:
936  uniq_blob_names = [stripBlobName(p) for p in blobs]
937 
938  # Synchronize to the blob lookup map, as the provided
939  # blobs might have non-parameters, such as momemtum blobs.
940  log.info("Creating checkpoint synchronization net")
941  devices = model.GetDevices()
942  for name in uniq_blob_names:
943  if name not in model._device_grouped_blobs:
944  grouped = {
945  d:
946  core.BlobReference("{}_{}{}{}".format(
947  model._device_prefix,
948  d,
949  scope._NAMESCOPE_SEPARATOR,
950  name)
951  ) for d in devices}
952  model._device_grouped_blobs[name] = grouped
953 
954  model._checkpoint_net = core.Net("checkpoint_sync_net")
955  if not cpu_mode:
956  model._checkpoint_net.RunAllOnGPU()
957 
958  checkpoint_init_net = None
959  if (model._rendezvous is not None and model._rendezvous['num_shards'] > 1):
960  checkpoint_init_net = core.Net("checkpoint_init_net")
961  if not cpu_mode:
962  checkpoint_init_net.RunAllOnGPU()
963 
964  _SyncAllParams(
965  devices,
966  model,
967  checkpoint_init_net,
968  model._checkpoint_net,
969  model._rendezvous,
970  uniq_blob_names,
971  max_concurrent_distributed_ops=1
972  )
973  if (checkpoint_init_net):
974  workspace.RunNetOnce(checkpoint_init_net)
975 
976  workspace.CreateNet(model._checkpoint_net)
977 
978  # Run the sync
979  log.info("Run checkpoint net")
980  workspace.RunNet(model._checkpoint_net.Proto().name)
981 
982 
983 def GetLearningRateBlobNames(model):
984  '''
985  Returns a list of learning rates blob names used in the optimizer.
986  '''
987  if model._optimizer is not None:
988  if model._device_type == caffe2_pb2.CPU or model._device_type == caffe2_pb2.IDEEP:
989  return [model._optimizer.get_cpu_blob_name('lr')]
990  elif core.IsGPUDeviceType(model._device_type):
991  return [model._optimizer.get_gpu_blob_name('lr', gpu, '')
992  for gpu in model._devices]
993  else:
994  raise Exception(
995  "Unsupported device type : {}".format(model._device_type)
996  )
997  else:
998  lr_blob_names = []
999  for op in model.net.Proto().op:
1000  if op.type == "LearningRate":
1001  lr_blob_names.append(op.output(0))
1002  return lr_blob_names
1003 
1004 
1005 def _Broadcast(devices, model, net, param, use_nccl=False):
1006  # Copy params from gpu_0 to other
1007  master_dev = devices[0]
1008 
1009  if use_nccl:
1010  if _IsGPUBlob(model, param):
1011  master_device_opt = core.DeviceOption(model._device_type, master_dev)
1012  with core.DeviceScope(master_device_opt):
1013  # Note that the root is the root _rank_ and not the root
1014  # _device_. Thus we always use root=0, regardless of the
1015  # devices used.
1016  net.NCCLBroadcast(
1017  list(viewvalues(model._device_grouped_blobs[param])),
1018  list(viewvalues(model._device_grouped_blobs[param])),
1019  root=0,
1020  )
1021  return
1022 
1023  for dev_idx in devices[1:]:
1024  if _IsGPUBlob(model, param):
1025  device_opt = core.DeviceOption(workspace.GpuDeviceType, dev_idx)
1026  else:
1027  device_opt = core.DeviceOption(caffe2_pb2.IDEEP, 0) if _IsIDEEPBlob(model, param) else \
1028  core.DeviceOption(caffe2_pb2.CPU, 0)
1029  with core.DeviceScope(device_opt):
1030  net.Copy(
1031  model._device_grouped_blobs[param][master_dev],
1032  model._device_grouped_blobs[param][dev_idx]
1033  )
1034 
1035 
1036 def _AllReduce(devices, model, net, param, use_nccl=False, control_input=None):
1037  blobs_group = list(viewvalues(model._device_grouped_blobs[param]))
1038  if model._device_type == caffe2_pb2.CUDA and use_nccl:
1039  # TODO: for _shared_model, do only NCCLReduce
1040  model.NCCLAllreduce(
1041  blobs_group, blobs_group, control_input=control_input
1042  )
1043  return
1044 
1045  if model._device_type == workspace.GpuDeviceType:
1046  p2p_access_pattern = workspace.GetGpuPeerAccessPattern()
1047  else:
1048  p2p_access_pattern = None
1049 
1050  def sumN(*dev_indices):
1051  """Create a Sum op for 2 or more blobs on different devices.
1052  Saves the result on the first device.
1053 
1054  Arguments:
1055  dev_indices -- a list of device indices, which can be translated into
1056  CUDA identifiers with model._devices
1057  """
1058  devices = [model._devices[idx] for idx in dev_indices]
1059  blobs = [blobs_group[idx] for idx in dev_indices]
1060  device_opt = core.DeviceOption(model._device_type, devices[0])
1061  with core.DeviceScope(device_opt):
1062  for i, peer in enumerate(devices):
1063  if i == 0:
1064  continue # Skip the first device
1065  if p2p_access_pattern is not None and p2p_access_pattern.size and not p2p_access_pattern[
1066  devices[0], peer
1067  ]:
1068  # Copy from peer to d0
1069  blobs[i] = model.Copy(
1070  blobs[i],
1071  'gpu_{}/{}_gpu{}_copy'.format(devices[0], param, peer)
1072  )
1073  net.Sum(blobs, [blobs[0]], name='dpm')
1074 
1075  if len(devices) == 16:
1076  # Special tree reduction for 16 gpus, TODO generalize like in muji.py
1077  for j in range(8):
1078  sumN(j * 2, j * 2 + 1)
1079  for j in range(4):
1080  sumN(j * 4, j * 4 + 2)
1081  for j in range(2):
1082  sumN(j * 8, j * 8 + 4)
1083  sumN(0, 8)
1084  elif len(devices) == 8:
1085  for j in range(4):
1086  sumN(j * 2, j * 2 + 1)
1087  for j in range(2):
1088  sumN(j * 4, j * 4 + 2)
1089  sumN(0, 4)
1090  elif len(devices) == 4:
1091  sumN(0, 1)
1092  sumN(2, 3)
1093  sumN(0, 2)
1094  else:
1095  sumN(*range(len(devices)))
1096  # TODO: for _shared_model, no need to broadcast
1097  _Broadcast(devices, model, net, param)
1098 
1099 
1100 def _SyncAllParams(
1101  devices,
1102  model,
1103  init_net,
1104  net,
1105  rendezvous,
1106  unique_param_names,
1107  max_concurrent_distributed_ops=4
1108 ):
1109  if rendezvous is None or rendezvous['num_shards'] <= 1:
1110  _SyncAllParamsSingleHost(devices, model, net, unique_param_names)
1111  else:
1112  _SyncAllParamsDistributed(
1113  devices,
1114  model,
1115  init_net,
1116  net,
1117  rendezvous,
1118  unique_param_names,
1119  max_concurrent_distributed_ops
1120  )
1121 
1122 
1123 def AddBlobSync(model, blobs, net=None):
1124  '''
1125  Sync a blob across devices and hosts
1126  '''
1127  if len(blobs) == 0:
1128  return
1129  net = model.net if net is None else net
1130  for b in blobs:
1131  assert not b.startswith(model._device_prefix), \
1132  "Provide unprefixed blob name: {}".format(b)
1133  model._device_grouped_blobs[b] = {
1134  d: core.BlobReference("{}_{}/{}".format(model._device_prefix, d, b))
1135  for d in model._devices
1136  }
1137 
1138  _SyncAllParams(
1139  model._devices,
1140  model,
1141  model.param_init_net,
1142  net,
1143  model._rendezvous,
1144  set(blobs))
1145 
1146 
1147 def AddDistributedBlobSync(model, blobs):
1148  '''
1149  Sync blobs across machines (but not across devices)
1150  '''
1151  if model._rendezvous is None:
1152  return
1153  synth_name = "_".join([str(b) for b in blobs])
1154  comm_world = _CreateOrCloneCommonWorld(
1155  model.param_init_net,
1156  "blob_sync_cw_" + synth_name,
1157  rendezvous=model._rendezvous,
1158  )
1159 
1160  model.net.Allreduce(
1161  inputs=[comm_world] + blobs,
1162  outputs=blobs,
1163  engine=model._rendezvous['engine'],
1164  )
1165 
1166 
1167 def _SyncAllParamsDistributed(
1168  devices,
1169  model,
1170  init_net,
1171  net,
1172  rendezvous,
1173  unique_param_names,
1174  max_concurrent_distributed_ops
1175 ):
1176  assert rendezvous['num_shards'] > 1
1177 
1178  gpu_device_opt = core.DeviceOption(model._device_type, devices[0])
1179  cpu_device_opt = core.DeviceOption(caffe2_pb2.CPU)
1180  ideep_device_opt = core.DeviceOption(caffe2_pb2.IDEEP)
1181 
1182  if model._broadcast_context is None:
1183  model._broadcast_context = CollectivesConcurrencyControl(
1184  "broadcast",
1185  max_concurrent_distributed_ops,
1186  init_net,
1187  rendezvous
1188  )
1189  context = model._broadcast_context
1190 
1191  for param_name in sorted(unique_param_names):
1192  master_param = model._device_grouped_blobs[param_name][devices[0]]
1193  params_group = list(viewvalues(model._device_grouped_blobs[param_name]))
1194 
1195  def broadcast(params):
1196  comm_world, control_input = context.get_control_and_context(params)
1197  net.Broadcast(
1198  inputs=[comm_world] + params,
1199  outputs=params,
1200  name=param_name,
1201  engine=rendezvous['engine'],
1202  control_input=control_input
1203  )
1204 
1205  device_opt = gpu_device_opt if _IsGPUBlob(
1206  model, param_name
1207  ) else ideep_device_opt if _IsIDEEPBlob(model, param_name) else cpu_device_opt
1208 
1209  if rendezvous['engine'] == 'GLOO':
1210  with core.DeviceScope(device_opt):
1211  broadcast(params_group)
1212  else:
1213  # Copy between GPU and CPU
1214  with core.DeviceScope(device_opt):
1215  param_cpu = net.CopyGPUToCPU(
1216  master_param,
1217  str(master_param) + "cpu"
1218  )
1219  with core.DeviceScope(cpu_device_opt):
1220  broadcast([param_cpu])
1221  with core.DeviceScope(device_opt):
1222  net.CopyCPUToGPU(param_cpu, master_param)
1223 
1224  # Broadcast locally
1225  _Broadcast(devices, model, net, param_name)
1226 
1227 
1228 def _SyncAllParamsSingleHost(devices, model, net, unique_param_names):
1229  for param in unique_param_names:
1230  _Broadcast(devices, model, net, param)
1231 
1232 
1233 def _AllReduceBlobs(blob_names, devices, model, net, rendezvous, use_nccl,
1234  max_concurrent_distributed_ops):
1235  if rendezvous is None or rendezvous['num_shards'] <= 1:
1236  _AllReduceBlobsSingleHost(
1237  blob_names,
1238  devices,
1239  model,
1240  net,
1241  use_nccl
1242  )
1243  else:
1244  _AllReduceBlobsDistributed(
1245  blob_names,
1246  devices,
1247  model,
1248  net,
1249  rendezvous,
1250  max_concurrent_distributed_ops,
1251  )
1252 
1253 
1254 def _PruneParametersForSharing(model):
1255  assert model._shared_model
1256  master_prefix = "{}_{}/".format(model._device_prefix, model._devices[0])
1257 
1258  # Remove non-master parameters so that they will not receive parameter
1259  # update operators.
1260  model.params = model.GetParams(master_prefix)
1261  paramset = set(model.params)
1262 
1263  model.param_to_grad = {
1264  p: model.param_to_grad[p]
1265  for p in model.param_to_grad if p in paramset
1266  }
1267  model.weights = [w for w in model.weights if w in paramset]
1268  model.biases = [w for w in model.biases if w in paramset]
1269 
1270 
1271 def _RemapParameterBlobsForSharedModel(model, all_params):
1272  assert model._shared_model
1273  master_prefix = "{}_{}/".format(
1274  model._device_prefix, model._devices[0])
1275  log.info("Remapping param blobs to master -> {}".format(master_prefix))
1276  master_params = set(model.GetParams())
1277 
1278  # Remove all but master params
1279  def modify_ops(net):
1280  ops = []
1281  for op in net.Proto().op:
1282  delete_op = False
1283  # Delete ops that output non-master version of parameter
1284  for outp in op.output:
1285  if outp in all_params and outp not in master_params:
1286  delete_op = True
1287  log.debug("Delete b/c {}: {}".format(outp, str(op)))
1288  break
1289  if delete_op:
1290  continue
1291  # Remap inputs to point to the master param
1292  for j, inp in enumerate(op.input):
1293  if inp in all_params and inp not in master_params:
1294  op.input[j] = master_prefix + stripBlobName(inp)
1295  ops.append(op)
1296  del net.Proto().op[:]
1297  net.Proto().op.extend(ops)
1298 
1299  modify_ops(model.param_init_net)
1300  modify_ops(model.net)
1301 
1302 
1304  """
1305  Creates common worlds (up to max_concurrent_context) and manage the
1306  sequential execution of collectives that shares the same context with
1307  cyclic control inputs.
1308  """
1309  def __init__(
1310  self,
1311  name,
1312  max_concurrent_context,
1313  param_init_net,
1314  rendezvous
1315  ):
1316  self.name = name
1317  self.param_init_net = param_init_net
1318  self.max_concurrent_context = max_concurrent_context
1319  self.counter = 0
1320  self.common_worlds = []
1321  self.control_inputs = []
1322  self.rendezvous = rendezvous
1323 
1324  def get_control_and_context(self, control_output_blob):
1325  common_world, control_input = [None, None]
1326  current_slot = self.counter % self.max_concurrent_context
1327  if len(self.common_worlds) < self.max_concurrent_context:
1328  common_world = _CreateOrCloneCommonWorld(
1329  self.param_init_net,
1330  "{}_{}_cw".format(self.name, current_slot),
1331  rendezvous=self.rendezvous,
1332  )
1333  self.common_worlds.append(common_world)
1334  self.control_inputs.append(control_output_blob)
1335  else:
1336  common_world = self.common_worlds[current_slot]
1337  control_input = self.control_inputs[current_slot]
1338  self.control_inputs[current_slot] = control_output_blob
1339  self.counter += 1
1340  return common_world, control_input
1341 
1342 
1343 def _AllReduceBlobsDistributed(
1344  blob_names,
1345  devices,
1346  model,
1347  net,
1348  rendezvous,
1349  max_concurrent_distributed_ops,
1350 ):
1351  num_workers = model.net.Proto().num_workers
1352  assert num_workers > 1, "Please specify more than 1 worker"
1353  all_reduce_engine = rendezvous['engine']
1354 
1355  master_device_opt = core.DeviceOption(model._device_type, devices[0])
1356 
1357  reducing_device_opt = master_device_opt
1358 
1360  "allreduce",
1361  max_concurrent_distributed_ops,
1362  model.param_init_net,
1363  rendezvous
1364  )
1365 
1366  nccl_control_blob = None
1367 
1368  for blob_name in blob_names:
1369  master_blob = model._device_grouped_blobs[blob_name][devices[0]]
1370  blobs_group = list(viewvalues(model._device_grouped_blobs[blob_name]))
1371 
1372  assert master_blob in blobs_group
1373 
1374  # Remark: NCCLReduce does not support in-place modifications
1375  # so we need a temporary blob
1376  reduced_blob = str(master_blob) + "_red"
1377 
1378  def allreduce(blobs, **kwargs):
1379  with core.DeviceScope(reducing_device_opt):
1380  comm_world, control_input = \
1381  context.get_control_and_context(blobs[0])
1382  net.Allreduce(
1383  inputs=[comm_world] + blobs,
1384  outputs=blobs,
1385  name=blob_name,
1386  engine=all_reduce_engine,
1387  control_input=control_input,
1388  **kwargs
1389  )
1390 
1391  if rendezvous['engine'] == 'GLOO':
1392  # With Gloo cross GPU and cross machine allreduce
1393  # can be executed in a single operation.
1394  # Try to use GPUDirect if transport == ibverbs.
1395  allreduce(
1396  blobs_group,
1397  gpu_direct=(rendezvous.get("transport", None) == "ibverbs"),
1398  )
1399  else:
1400  # Step 1: sum blobs from local GPUs to master GPU
1401  with core.DeviceScope(master_device_opt):
1402  model.ConstantFill(master_blob, reduced_blob, value=0.0)
1403 
1404  # Temp fix since NCCLReduce does not work
1405  net.NCCLAllreduce(
1406  blobs_group,
1407  blobs_group,
1408  control_input=nccl_control_blob,
1409  )
1410  nccl_control_blob = blobs_group[0]
1411  net.Copy(master_blob, reduced_blob)
1412 
1413  # Step 2: allreduce between all hosts, between master GPUs
1414  allreduce([reduced_blob])
1415 
1416  with core.DeviceScope(master_device_opt):
1417  net.Copy(reduced_blob, master_blob)
1418 
1419  # Step 3: broadcast locally
1420  _Broadcast(devices, model, net, blob_name)
1421 
1422 
1423 def _AllReduceBlobsSingleHost(blob_names, devices, model, net, use_nccl):
1424  """Performs NCCL AllReduce to distribute blobs to all the GPUs."""
1425 
1426  if len(devices) == 1:
1427  return
1428 
1429  # Now we need to Allreduce blobs on all the GPUs.
1430  # Pick GPU #0 as a master GPU.
1431  master_device_opt = core.DeviceOption(model._device_type, devices[0])
1432  last_out = None
1433  concatenated_idx = set()
1434 
1435  for blob_name in blob_names:
1436  # Group by blob_name for reduce.
1437  blobs_group = list(viewvalues(model._device_grouped_blobs[blob_name]))
1438  if len(blobs_group) == 1:
1439  # Non-reducible
1440  continue
1441  assert len(blobs_group) == len(devices), \
1442  "Each GPU from {}, should have a copy of {}.".format(
1443  devices, blob_name)
1444 
1445  if _IsGPUBlob(model, blob_name):
1446  with core.DeviceScope(master_device_opt):
1447  if not isinstance(blobs_group[0], core.GradientSlice):
1448  _AllReduce(
1449  devices, model, net, blob_name, use_nccl, last_out
1450  )
1451  # last_out is used to serialize the execution of nccls
1452  last_out = blobs_group[0]
1453 
1454  else:
1455  # Sparse gradients: all-gather for indices and values
1456  master_ns = "{}_{}".format(model._device_prefix, devices[0])
1457  '''
1458  Skip if we have already copied concatenated indices
1459  to the indices of GradientSlice. This happens when two
1460  or more grad blobs are gathered with the same indices
1461  blob
1462  '''
1463  skip_idx_concat = False
1464  for g in blobs_group:
1465  if g.indices in concatenated_idx:
1466  skip_idx_concat = True
1467 
1468  if not skip_idx_concat:
1469  grad_idx_concat, _ = net.Concat(
1470  [g.indices for g in blobs_group],
1471  ["{}/{}_index_concat".format(master_ns, blob_name),
1472  "{}/{}_index_splitinfo".format(master_ns, blob_name)],
1473  axis=0,
1474  name="note:data_parallel_model")
1475 
1476  for gpu, g in viewitems(model._device_grouped_blobs[blob_name]):
1477  device_opt = core.DeviceOption(model._device_type, gpu)
1478  with core.DeviceScope(device_opt):
1479  model.Copy(grad_idx_concat, g.indices)
1480  concatenated_idx.add(g.indices)
1481 
1482  grad_val_concat, _ = net.Concat(
1483  [g.values for g in blobs_group],
1484  ["{}/{}_val_concat".format(master_ns, blob_name),
1485  "{}/{}_val_splitinfo".format(master_ns, blob_name)],
1486  axis=0, name="note:data_parallel_model")
1487 
1488  for gpu, g in viewitems(model._device_grouped_blobs[blob_name]):
1489  device_opt = core.DeviceOption(model._device_type, gpu)
1490  with core.DeviceScope(device_opt):
1491  model.Copy(grad_val_concat, g.values)
1492 
1493  elif _IsIDEEPBlob(model, blob_name):
1494  assert not isinstance(blobs_group[0], core.GradientSlice), \
1495  "Synchronizing gradient slices not supported"
1496  with core.DeviceScope(core.DeviceOption(caffe2_pb2.IDEEP)):
1497  net.Sum(blobs_group, [blobs_group[0]])
1498  if not model._shared_model:
1499  _Broadcast(devices, model, net, blob_name)
1500 
1501  else:
1502  assert not isinstance(blobs_group[0], core.GradientSlice), \
1503  "Synchronizing gradient slices not supported"
1504  with core.DeviceScope(core.DeviceOption(caffe2_pb2.CPU)):
1505  # Poor man's allreduce
1506  net.Sum(blobs_group, [blobs_group[0]])
1507  if not model._shared_model:
1508  _Broadcast(devices, model, net, blob_name)
1509 
1510 
1511 def _BroadcastComputedParams(devices, model, rendezvous, use_nccl=False):
1512  if rendezvous is None:
1513  _BroadcastComputedParamsSingleHost(devices, model, use_nccl)
1514  else:
1515  _BroadcastComputedParamsDistributed(devices, model, rendezvous, use_nccl)
1516 
1517 
1518 def _BroadcastComputedParamsDistributed(
1519  devices,
1520  model,
1521  rendezvous,
1522  use_nccl=False
1523 ):
1524  _BroadcastComputedParamsSingleHost(devices, model, use_nccl)
1525  log.warn("Distributed broadcast of computed params is not implemented yet")
1526 
1527 
1528 def _BroadcastComputedParamsSingleHost(devices, model, use_nccl=False):
1529  '''
1530  Average computed params over all devices
1531  '''
1532  if len(devices) == 1:
1533  return
1534 
1535  for param_name in model._computed_param_names:
1536  # Copy from master to others -- averaging would be perhaps better,
1537  # but currently NCCLAllReduce is too prone to deadlock
1538  _Broadcast(devices, model, model.net, param_name, use_nccl)
1539 
1540 
1541 def _GetReverseOrderedGrads(model):
1542  '''
1543  Returns the gradients in reverse order (namespace stripped),
1544  for the optimal synchronization order.
1545  '''
1546  return list(reversed(model._grad_names))
1547 
1548 
1549 # A helper function to extract a parameter's name
1550 def stripBlobName(param):
1551  # Format is "a/b/c/d" -> "b/c/d"
1552  if isinstance(param, core.GradientSlice):
1553  return stripBlobName(param.indices) + ":" + stripBlobName(param.values)
1554  else:
1555  name = str(param)
1556  return name[name.index(scope._NAMESCOPE_SEPARATOR) + 1:]
1557 
1558 
1559 def _AnalyzeOperators(model):
1560  '''
1561  Look at all the operators and check that they do not cross device scopes
1562  '''
1563  for op in model.Proto().op:
1564  if "NCCL" in op.type or "Copy" in op.type or "Concat" in op.type:
1565  continue
1566  if "Sum" == op.type and op.name == "dpm":
1567  continue
1568  if "Allreduce" in op.type and "GLOO" in op.engine:
1569  continue
1570 
1571  op_dev = op.device_option
1572  op_gpu = op_dev.device_id
1573 
1574  # This avoids failing on operators that are only for CPU
1575  if not core.IsGPUDeviceType(op_dev.device_type):
1576  continue
1577 
1578  namescope = "{}_{}/".format(model._device_prefix, op_gpu)
1579  for inp in list(op.input) + list(op.output):
1580  if inp.startswith("{}_".format(model._device_prefix)
1581  ) and not inp.startswith(namescope):
1582  raise Exception(
1583  "Blob {} of op {}, should have namescope {}. Op: {}".format(
1584  inp,
1585  op.type,
1586  "{}_{}/".format(model._device_prefix, op_gpu),
1587  str(op),
1588  )
1589  )
1590 
1591 
1592 def _InferBlobDevice(model):
1593  '''
1594  Assign blob to device option based on the operator outputing it
1595  '''
1596  mapping = {}
1597 
1598  def map_ops(proto):
1599  for op in proto.op:
1600  device_option = op.device_option
1601  if op.type == "Iter":
1602  # Hack for Iters which have blob in CPU context
1603  device_option = caffe2_pb2.DeviceOption()
1604  device_option.device_type = caffe2_pb2.CPU
1605  for b in list(op.input) + list(op.output):
1606  if b not in mapping:
1607  mapping[b] = device_option
1608  if op.type.startswith('RecurrentNetwork'):
1609  step_args = [a for a in op.arg if a.name.endswith("step_net")]
1610  for step_arg in step_args:
1611  map_ops(step_arg.n)
1612  map_ops(model.param_init_net.Proto())
1613  map_ops(model.net.Proto())
1614  model._blob_to_device = mapping
1615 
1616 def _IsIDEEPBlob(model, blob_name):
1617  if blob_name in model._blob_to_device:
1618  return model._blob_to_device[blob_name].device_type == caffe2_pb2.IDEEP
1619  else:
1620  blob_name = "{}_{}/{}".format(
1621  model._device_prefix, model._devices[0], blob_name
1622  )
1623  if blob_name not in model._blob_to_device:
1624  return model._device_type == caffe2_pb2.IDEEP
1625  return model._blob_to_device[blob_name].device_type == caffe2_pb2.IDEEP
1626 
1627 def _IsGPUBlob(model, blob_name):
1628  if blob_name in model._blob_to_device:
1629  return core.IsGPUDeviceType(model._blob_to_device[blob_name].device_type)
1630  else:
1631  blob_name = "{}_{}/{}".format(
1632  model._device_prefix, model._devices[0], blob_name
1633  )
1634  if blob_name not in model._blob_to_device:
1635  return core.IsGPUDeviceType(model._device_type)
1636  return core.IsGPUDeviceType(model._blob_to_device[blob_name].device_type)
1637 
1638 
1639 def _GroupByDevice(model, devices, params, non_data_params):
1640  '''
1641  Groups blobs by device, returning a map of [blobname] = {0: BlobRef, 1: ..}.
1642  Returns ordered dictionary, ensuring the original order.
1643  '''
1644  grouped = OrderedDict()
1645  # Only consider params that were created to be "data parallel"
1646  params = params[len(non_data_params):]
1647 
1648  for _i, p in enumerate(params):
1649  assert isinstance(p, core.BlobReference) or \
1650  isinstance(p, core.GradientSlice), \
1651  "Param {} is not BlobReference or GradientSlice".format(p)
1652 
1653  name = stripBlobName(p)
1654  gpuid = None
1655 
1656  if isinstance(p, core.BlobReference):
1657  gpuid = int(p.GetNameScope().split("_")[1].split("/")[0])
1658  assert "{}_{}/".format(model._device_prefix, gpuid) in p.GetNameScope(),\
1659  "Param {} expected to have namescope '{}_{}'".format(str(p), model._device_prefix, gpuid)
1660  else:
1661  gpuid = int(p.indices.GetNameScope().split("_")[1].split("/")[0])
1662  assert "{}_{}/".format(model._device_prefix, gpuid) in p.indices.GetNameScope(),\
1663  "Indices {} expected to have namescope '{}_{}'".format(str(p), model._device_prefix, gpuid)
1664  assert "{}_{}/".format(model._device_prefix, gpuid) in p.values.GetNameScope(),\
1665  "Values {} expected to have namescope '{}_{}'".format(str(p), model._device_prefix, gpuid)
1666 
1667  if name not in grouped:
1668  grouped[name] = {}
1669  grouped[name][gpuid] = p
1670 
1671  return grouped
1672 
1673 
1674 def _ValidateParams(params):
1675  set_params = set(params)
1676  if len(params) > len(set_params):
1677  dupes = []
1678  sp = sorted(params)
1679  for j, p in enumerate(sp):
1680  if j > 0 and sp[j - 1] == p:
1681  dupes.append(p)
1682 
1683  assert len(params) == len(set_params), \
1684  "Duplicate entries in params: {}".format(dupes)
1685 
1686 
1687 def _ComputeBlobsToSync(model):
1688  '''
1689  We sync all blobs that are generated by param init net and
1690  are 'data parallel', i.e assigned to a device
1691  '''
1692  sync_names = set()
1693 
1694  # We don't sync params if the model is shared
1695  if model._shared_model:
1696  blobs_to_sync = [str(p) for p in model.GetComputedParams('')]
1697  sync_names = [stripBlobName(p) for p in blobs_to_sync]
1698  else:
1699  blobs_to_sync = []
1700 
1701  for op in model.param_init_net.Proto().op:
1702  dp_outputs = [
1703  o for o in op.output
1704  if o.startswith("{}_".format(model._device_prefix))
1705  ]
1706  sync_names.update([stripBlobName(o) for o in dp_outputs])
1707  blobs_to_sync.extend(dp_outputs)
1708 
1709  # Sanity check
1710  diff = set(model._param_names) - sync_names
1711  assert diff == set(), \
1712  "Some params not instantiated in param init net: {}".format(diff)
1713 
1714  # Remove duplicates and sort
1715  prefixlen = len(model._device_prefix) + 1
1716 
1717  def extract_sort_key(b):
1718  # Sort first based on device id, and then by whole string
1719  deviceid = int(b[prefixlen:b.index(scope._NAMESCOPE_SEPARATOR)])
1720  return (deviceid, b)
1721 
1722  blobs_to_sync = sorted(
1723  list(set(blobs_to_sync)),
1724  key=extract_sort_key)
1725 
1726  blobs_to_sync = [core.BlobReference(b) for b in blobs_to_sync]
1727  return (blobs_to_sync, sync_names)
1728 
1729 
1730 def _OptimizeGradientMemorySimple(model, losses_by_gpu, devices):
1731  log.warning("------- DEPRECATED API, please use " +
1732  "data_parallel_model.OptimizeGradientMemory() ----- ")
1733  for device in devices:
1734  namescope = "{}_{}/".format(model._device_prefix, device)
1735  model.net._net = memonger.share_grad_blobs(
1736  model.net,
1737  losses_by_gpu[device],
1738  set(viewvalues(model.param_to_grad)),
1739  namescope,
1740  share_activations=False,
1741  )
1742 
1743 
1744 def _AddDynamicMemoryOptimization(model, blobs_to_keep, devices):
1745  blobs_to_keep_all_devices = set()
1746  if blobs_to_keep is not None:
1747  for device in devices:
1748  for blob_name in blobs_to_keep:
1749  blobs_to_keep_all_devices.add(
1750  "{}_{}/{}".format(model._device_prefix, device, blob_name)
1751  )
1752 
1753  if model._rendezvous is not None:
1754  # GLOO operators expect the tensor addresses to remain same over
1755  # iterations so we need to remove param grads from the dynamic memory
1756  # management.
1757  blobs_to_keep_all_devices.update(
1758  [str(b) for b in viewvalues(model.param_to_grad)]
1759  )
1760 
1761  model.net._net = memonger.release_blobs_when_used(
1762  model.net.Proto(),
1763  blobs_to_keep_all_devices
1764  )
1765 
1766 
1767 def OptimizeGradientMemory(model,
1768  input_shapes,
1769  excluded_blobs,
1770  recycle_activations):
1771  """
1772  Optimize memory usage of the backward pass by recycling blobs for gradient
1773  inputs that have been 'used'.
1774  input_shapes: dict of blob name to shape for the inputs of the model.
1775  Pass empty dictionary if not known.
1776  excluded_blobs: list of blobs that cannot be recycled. These are blobs
1777  that you will access externally.
1778  recycle_activations: whether to also recycle forward pass activations
1779  """
1780  if input_shapes is not None:
1781  input_shapes_all_devices = {}
1782  for b, shp in viewitems(input_shapes):
1783  for d in model._devices:
1784  input_shapes_all_devices["{}_{}/{}".
1785  format(model._device_prefix, d, b)] = shp
1786 
1787  (shapes, types) = workspace.InferShapesAndTypes(
1788  [model.param_init_net, model.net],
1789  input_shapes_all_devices,
1790  )
1791  else:
1792  shapes = None
1793 
1794  for device in model._devices:
1795  namescope = "{}_{}/".format(model._device_prefix, device)
1796  excluded_blobs_by_device = set(namescope + b for b in excluded_blobs)
1797  model.net._net = memonger.share_grad_blobs(
1798  model.net,
1799  model._losses_by_gpu[device],
1800  set(viewvalues(model.param_to_grad)),
1801  namescope,
1802  dont_share_blobs=excluded_blobs_by_device,
1803  share_activations=recycle_activations,
1804  blob_shapes=shapes,
1805  )
1806 
1807 
1808 def _CreateOrCloneCommonWorld(
1809  net,
1810  common_world_blob,
1811  rendezvous,
1812  name=None,
1813  timeout_sec=None):
1814 
1815  if timeout_sec is None:
1816  timeout_sec = _DEFAULT_TIMEOUT_SEC
1817 
1818  timeout_ms = timeout_sec * 1000
1819 
1820  # Check if there is an existing CreateCommonWorld
1821  # with the same timeout we're looking for. If so,
1822  # we can clone it instead of creating a new one.
1823  existing = None
1824  for op in net.Proto().op:
1825  if op.type != "CreateCommonWorld":
1826  continue
1827 
1828  # Find common world timeout
1829  op_timeout_ms = -1
1830  for arg in op.arg:
1831  if arg.name == 'timeout_ms':
1832  op_timeout_ms = arg.i
1833  break
1834  if op_timeout_ms != timeout_ms:
1835  continue
1836 
1837  # This common world was created with the same timeout we're
1838  # looking for, so we can clone it
1839  existing = op.output[0]
1840  break
1841 
1842  if name is None:
1843  name = "{}_op".format(common_world_blob)
1844 
1845  if existing is not None:
1846  comm_world = net.CloneCommonWorld(
1847  [existing],
1848  common_world_blob,
1849  name=name,
1850  engine=rendezvous['engine'],
1851  )
1852  else:
1853  kwargs=dict()
1854  if 'transport' in rendezvous:
1855  kwargs['transport'] = rendezvous['transport']
1856  if 'interface' in rendezvous:
1857  kwargs['interface'] = rendezvous['interface']
1858  if 'mpi_rendezvous' in rendezvous:
1859  kwargs['mpi_rendezvous'] = rendezvous['mpi_rendezvous']
1860  comm_world = net.CreateCommonWorld(
1861  rendezvous['kv_handler'] or [],
1862  common_world_blob,
1863  name=name,
1864  size=rendezvous['num_shards'],
1865  rank=rendezvous['shard_id'],
1866  engine=rendezvous['engine'],
1867  timeout_ms=timeout_ms,
1868  **kwargs
1869  )
1870 
1871  return comm_world
1872 
1873 
1874 def _RunComparison(model, blob_name, device=None):
1875  if device is None:
1876  device = model._blob_to_device[blob_name]
1877  with core.DeviceScope(device):
1878  rendezvous = model._rendezvous
1879  if rendezvous is None or rendezvous['num_shards'] == 1:
1880  return True
1881 
1882  test_data_arr = np.zeros(rendezvous['num_shards']).astype(np.float32)
1883  test_data_arr[rendezvous['shard_id']] = 1
1884  workspace.FeedBlob("compare_arr", test_data_arr)
1885 
1886  comparison_net = core.Net("allcompare_net")
1887 
1888  kwargs=dict()
1889  if 'mpi_rendezvous' in rendezvous:
1890  kwargs['mpi_rendezvous'] = rendezvous['mpi_rendezvous']
1891  comm_world = comparison_net.CreateCommonWorld(
1892  rendezvous['kv_handler'] or [],
1893  "initial_sync",
1894  name=model.net.Proto().name + ".cw_master_select",
1895  size=rendezvous['num_shards'],
1896  rank=rendezvous['shard_id'],
1897  engine=rendezvous['engine'],
1898  **kwargs
1899  )
1900 
1901  blob_name_checksum = blob_name + "_checksum"
1902  comparison_net.SumSqrElements(
1903  [blob_name], [blob_name_checksum], average=False
1904  )
1905 
1906  blob_name_gather = blob_name + "_gather"
1907  comparison_net.Mul(
1908  inputs=["compare_arr", blob_name_checksum],
1909  outputs=blob_name_gather,
1910  broadcast=1
1911  )
1912 
1913  comparison_net.Allreduce(
1914  inputs=[comm_world, blob_name_gather],
1915  outputs=[blob_name_gather],
1916  engine=rendezvous['engine'],
1917  )
1918 
1919  workspace.RunNetOnce(comparison_net)
1920  gather_arr = workspace.FetchBlob(blob_name_gather)
1921 
1922  baseline = gather_arr[0]
1923  for i in range(rendezvous['num_shards']):
1924  assert gather_arr[i] == baseline, \
1925  "allcompare failed on shard {}.".format(rendezvous['shard_id'])
1926 
1927  return True
1928 
1929 
1930 def _InterleaveOps(model):
1931  '''
1932  Data Parallel Model creates a net with ops in one device grouped together.
1933  This will interleave the ops so that each op for each device is next
1934  to each other in the net. Kind of like combining decks of cards. This
1935  ensures that progress is made along the critical path roughly concurrently
1936  for each device, which is important due to the extra intra-node
1937  synchronization required for multi-device batch normalization.
1938  '''
1939  orig_ops = list(model.net.Proto().op)
1940  num_devices = len(model._devices)
1941  num_ops_per_dev = len(orig_ops) // num_devices
1942  assert num_devices * num_ops_per_dev == len(orig_ops), \
1943  'Number of ops per device in original net is not uniform'
1944  new_ops = []
1945  ops = {d: [] for d in range(num_devices)}
1946  for op in orig_ops:
1947  ops[op.device_option.device_id].append(op)
1948 
1949  for j in range(num_ops_per_dev):
1950  tp = None
1951  for d in model._devices:
1952  if tp is None:
1953  tp = ops[d][j].type
1954  new_ops.append(ops[d][j])
1955  # Sanity
1956  assert ops[d][j].type == tp, \
1957  "Type mismatch {} / {}".format(tp, ops[d][j].type)
1958 
1959  del model.net.Proto().op[:]
1960  model.net.Proto().op.extend(new_ops)
1961 
1962 
1963 def _CPUInterDeviceBatchNormalization(model):
1964  orig_ops = list(model.net.Proto().op)
1965  new_ops = []
1966  num_devices = len(model._devices)
1967  batch_norm_ops = []
1968  injected_ops = []
1969 
1970  spatial_bn_phase = False
1971  sums_blobs = []
1972  sumsq_blobs = []
1973  name = []
1974  input_blob_name = None
1975 
1976  spatial_bn_gradient_phase = False
1977  scale_grad_blobs = []
1978  bias_grad_blobs = []
1979 
1980  def _cpuReduce(param, input_blobs, destination_blobs):
1981  """
1982  Reduce results from multiple cpus and distributes the results back
1983  to each device. This is done by copying values to cpu_0 and summing
1984  them. The cpu_0 result is then copied back to each of the devices.
1985 
1986  param: the name of the data (blobs) to reduce
1987  input_blobs: the list of blobs to reduce
1988  destination_blobs: list of blobs to copy the result to
1989  """
1990  added_ops = []
1991  result_blob = "cpu_0/" + param + "_combined"
1992  added_ops.append(core.CreateOperator("Sum", input_blobs, result_blob))
1993  for blob in destination_blobs:
1994  added_ops.append(core.CreateOperator("Copy", result_blob, blob))
1995  return added_ops
1996 
1997  for op in orig_ops:
1998  if op.type != 'SpatialBN' and op.type != 'SpatialBNGradient':
1999  if spatial_bn_phase:
2000  new_ops.extend(injected_ops)
2001  new_ops.append(
2002  core.CreateOperator("Sum",
2003  sums_blobs,
2004  input_blob_name + "_sums_combined"))
2005  new_ops.append(
2006  core.CreateOperator("Sum",
2007  sumsq_blobs,
2008  input_blob_name + "_sumsq_combined"))
2009  new_ops.extend(batch_norm_ops)
2010  injected_ops = []
2011  batch_norm_ops = []
2012  sums_blobs = []
2013  sumsq_blobs = []
2014  spatial_bn_phase = False
2015  input_blob_name = None
2016  elif spatial_bn_gradient_phase:
2017  new_ops.extend(injected_ops)
2018  new_ops.extend(_cpuReduce(
2019  stripBlobName(scale_grad_blobs[0]),
2020  scale_grad_blobs,
2021  scale_grad_blobs))
2022  new_ops.extend(_cpuReduce(
2023  stripBlobName(bias_grad_blobs[0]),
2024  bias_grad_blobs,
2025  bias_grad_blobs))
2026  new_ops.extend(batch_norm_ops)
2027  injected_ops = []
2028  batch_norm_ops = []
2029  scale_grad_blobs = []
2030  bias_grad_blobs = []
2031  spatial_bn_gradient_phase = False
2032  new_ops.append(op)
2033  elif op.type == 'SpatialBN':
2034  spatial_bn_phase = True
2035  if input_blob_name is None:
2036  input_blob_name = op.input[0]
2037  name = op.input[0]
2038  injected_ops.append(
2039  core.CreateOperator(
2040  "ChannelStats",
2041  name,
2042  [name + "_sums", name + "_sumsq"]))
2043  sums_blobs.append(name + "_sums")
2044  sumsq_blobs.append(name + "_sumsq")
2045  op.input.append(input_blob_name + "_sums_combined")
2046  op.input.append(input_blob_name + "_sumsq_combined")
2047  op.arg.extend([utils.MakeArgument("num_batches", num_devices)])
2048  batch_norm_ops.append(op)
2049  elif op.type == 'SpatialBNGradient':
2050  spatial_bn_gradient_phase = True
2051  injected_ops.append(
2052  core.CreateOperator("ChannelBackpropStats",
2053  [op.input[0], op.input[3], op.input[4],
2054  op.input[2]],
2055  [op.output[1], op.output[2]]))
2056  scale_grad_blobs.append(op.output[1])
2057  bias_grad_blobs.append(op.output[2])
2058  op.arg.extend([utils.MakeArgument("num_batches", num_devices)])
2059  op.input.extend([op.output[1], op.output[2]])
2060  batch_norm_ops.append(op)
2061 
2062  assert not spatial_bn_phase, \
2063  "Net modification for cpu inter-device batch normalization failed"
2064  del model.net.Proto().op[:]
2065  model.net.Proto().op.extend(new_ops)
2066 
2067 
2068 def _GPUInterDeviceBatchNormalization(model):
2069  orig_ops = list(model.net.Proto().op)
2070  new_ops = []
2071  num_devices = len(model._devices)
2072  batch_norm_ops = []
2073  injected_ops = []
2074 
2075  spatial_bn_phase = False
2076  sums_blobs = []
2077  sumsq_blobs = []
2078  name = []
2079  input_blob_name = None
2080 
2081  spatial_bn_gradient_phase = False
2082  scale_grad_blobs = []
2083  bias_grad_blobs = []
2084  master_device = "cpu_0"
2085  master_device_option = core.DeviceOption(caffe2_pb2.CPU)
2086 
2087  def _gpuReduce(param, num_devices, master_device, result_blobs=None):
2088  """
2089  Reduces results from multiple gpus and distributes the results back
2090  to each device. This is done by copying values to the master device
2091  and summing them. The master device result is then copied back to
2092  each of the devices.
2093 
2094  param: the name of the data (blobs) to reduce
2095  num_devices: the number of devices
2096  master_device: the device to copy/compute values on
2097  result_blobs: optional list of result blobs to copy to
2098  """
2099  added_ops = []
2100  source_blobs = []
2101  destination_blobs = []
2102  if result_blobs is None:
2103  result_blobs = [
2104  "gpu_{}/{}_combined".format(i, param) for i in range(num_devices)
2105  ]
2106  for i in range(num_devices):
2107  device_option = core.DeviceOption(model._device_type, i)
2108  source_blobs.append("gpu_{}/{}".format(i, param))
2109  destination_blobs.append(
2110  "{}/{}_gpu_{}_copy".format(master_device, param, i))
2111  added_ops.append(
2112  core.CreateOperator(
2113  "CopyGPUToCPU",
2114  source_blobs[i],
2115  destination_blobs[i],
2116  device_option=device_option))
2117  added_ops.append(
2118  core.CreateOperator(
2119  "Sum",
2120  destination_blobs,
2121  "{}/{}_combined".format(master_device, param),
2122  device_option=master_device_option))
2123  for i in range(num_devices):
2124  device_option = core.DeviceOption(model._device_type, i)
2125  added_ops.append(
2126  core.CreateOperator(
2127  "CopyCPUToGPU",
2128  "{}/{}_combined".format(master_device, param),
2129  result_blobs[i],
2130  device_option=device_option))
2131  return added_ops
2132 
2133  for op in orig_ops:
2134  if op.type != 'SpatialBN' and op.type != 'SpatialBNGradient':
2135  if spatial_bn_phase:
2136  new_ops.extend(injected_ops)
2137  new_ops.extend(_gpuReduce(
2138  stripBlobName(input_blob_name) + "_sums",
2139  num_devices,
2140  master_device,
2141  ))
2142  new_ops.extend(_gpuReduce(
2143  stripBlobName(input_blob_name) + "_sumsq",
2144  num_devices,
2145  master_device,
2146  ))
2147  new_ops.extend(batch_norm_ops)
2148  injected_ops = []
2149  batch_norm_ops = []
2150  sums_blobs = []
2151  sumsq_blobs = []
2152  spatial_bn_phase = False
2153  input_blob_name = None
2154  elif spatial_bn_gradient_phase:
2155  new_ops.extend(injected_ops)
2156  new_ops.extend(_gpuReduce(
2157  stripBlobName(scale_grad_blobs[0]),
2158  num_devices,
2159  master_device,
2160  scale_grad_blobs,
2161  ))
2162  new_ops.extend(_gpuReduce(
2163  stripBlobName(bias_grad_blobs[0]),
2164  num_devices,
2165  master_device,
2166  bias_grad_blobs,
2167  ))
2168  new_ops.extend(batch_norm_ops)
2169  injected_ops = []
2170  batch_norm_ops = []
2171  scale_grad_blobs = []
2172  bias_grad_blobs = []
2173  spatial_bn_gradient_phase = False
2174  new_ops.append(op)
2175  elif op.type == 'SpatialBN':
2176  spatial_bn_phase = True
2177  if input_blob_name is None:
2178  input_blob_name = op.input[0]
2179  name = op.input[0]
2180  device_option = core.DeviceOption(
2181  model._device_type,
2182  op.device_option.device_id,
2183  )
2184  injected_ops.append(
2185  core.CreateOperator(
2186  "ChannelStats",
2187  name,
2188  [name + "_sums", name + "_sumsq"],
2189  device_option=device_option))
2190  sums_blobs.append(name + "_sums")
2191  sumsq_blobs.append(name + "_sumsq")
2192  op.input.append(name + "_sums_combined")
2193  op.input.append(name + "_sumsq_combined")
2194  op.arg.extend([utils.MakeArgument("num_batches", num_devices)])
2195  batch_norm_ops.append(op)
2196  elif op.type == 'SpatialBNGradient':
2197  spatial_bn_gradient_phase = True
2198  device_option = core.DeviceOption(
2199  model._device_type,
2200  op.device_option.device_id,
2201  )
2202  injected_ops.append(
2203  core.CreateOperator("ChannelBackpropStats",
2204  [op.input[0], op.input[3], op.input[4],
2205  op.input[2]],
2206  [op.output[1], op.output[2]],
2207  device_option=device_option))
2208  scale_grad_blobs.append(op.output[1])
2209  bias_grad_blobs.append(op.output[2])
2210  op.arg.extend([utils.MakeArgument("num_batches", num_devices)])
2211  op.input.extend([op.output[1], op.output[2]])
2212  batch_norm_ops.append(op)
2213 
2214  assert not spatial_bn_phase, \
2215  "Net modification for gpu inter-device batch normalization failed"
2216  del model.net.Proto().op[:]
2217  model.net.Proto().op.extend(new_ops)
Module caffe2.python.layers.split.