Caffe2 - Python API
A deep learning, cross platform ML framework
data_parallel_model.py
1 # Copyright (c) 2016-present, Facebook, Inc.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 ##############################################################################
15 
16 ## @package data_parallel_model
17 # Module caffe2.python.data_parallel_model
18 from __future__ import absolute_import
19 from __future__ import division
20 from __future__ import print_function
21 
22 from collections import OrderedDict
23 from future.utils import viewitems, viewkeys, viewvalues
24 import logging
25 import copy
26 
27 from caffe2.python import \
28  model_helper, dyndep, scope, workspace, core, memonger, utils
29 from caffe2.proto import caffe2_pb2
30 
31 import numpy as np
32 
33 dyndep.InitOpsLibrary("@/caffe2/caffe2/contrib/nccl:nccl_ops")
34 dyndep.InitOpsLibrary("@/caffe2/caffe2/contrib/gloo:gloo_ops")
35 dyndep.InitOpsLibrary("@/caffe2/caffe2/contrib/gloo:gloo_ops_gpu")
36 
37 log = logging.getLogger("data_parallel_model")
38 log.setLevel(logging.INFO)
39 
40 _DEFAULT_TIMEOUT_SEC = 30
41 
42 
43 def Parallelize_GPU(*args, **kwargs):
44  kwargs['cpu_device'] = False
45  Parallelize(*args, **kwargs)
46 
47 
48 def Parallelize_CPU(*args, **kwargs):
49  kwargs['cpu_device'] = True
50  Parallelize(*args, **kwargs)
51 
52 
53 def Parallelize(
54  model_helper_obj,
55  input_builder_fun,
56  forward_pass_builder_fun,
57  param_update_builder_fun=None,
58  optimizer_builder_fun=None,
59  post_sync_builder_fun=None,
60  devices=None,
61  rendezvous=None,
62  net_type='dag',
63  broadcast_computed_params=True,
64  optimize_gradient_memory=False,
65  dynamic_memory_management=False,
66  blobs_to_keep=None,
67  use_nccl=False,
68  max_concurrent_distributed_ops=16,
69  cpu_device=False,
70  num_threads_per_device=4,
71  shared_model=False,
72  combine_spatial_bn=False,
73 ):
74  '''
75  Function to create a model that can run on many GPUs or CPUs.
76  model_helper_obj: an object of ModelHelper
77  input_builder_fun:
78  Function that adds the input operators
79  Note: Remember to instantiate reader outside of this
80  function so all devices share same reader object.
81  Signature: input_builder_fun(model)
82  forward_pass_builder_fun:
83  Function to add the operators to the model.
84  Must return list of loss-blob references that
85  are used to build the gradient. Loss scale parameter
86  is passed, as you should scale the loss of your model
87  by 1.0 / the total number of devices.
88  Signature: forward_pass_builder_fun(model, loss_scale)
89  param_update_builder_fun:
90  Function that adds operators that are run after
91  gradient update, such as updating the weights and
92  weight decaying. This is called for each GPU separately.
93  Signature: param_update_builder_fun(model)
94  optimizer_builder_fun:
95  Alternative to param_update_builder_fun, allows one
96  to add an optimizer for the whole model. Called only
97  once, without name or devicescope.
98  post_sync_builder_fun:
99  Function applied after initial parameter sync has been
100  completed, such as keeping multi-precision parameters
101  in sync.
102  Signature: post_sync_builder_fun(model)
103  devices: List of GPU ids, such as [0, 1, 2, 3],
104  rendezvous: used for rendezvous in distributed computation, if None
105  then only one node is used. To create rendezvous,
106  use <TBD>.
107  net_type: Network type
108  optimize_gradient_memory: whether to apply 'memonger' to share blobs
109  shared_model (only for CPU) use same parameters on each device
110  in gradient computation to reduce memory footprint.
111  dynamic_memory_management: Whether to apply dynamic memory optimization
112  by freeing unused blobs. The underlying (de)allocation
113  uses cached allocator. For GPU training PLEASE MAKE SURE
114  caffe2_cuda_memory_pool is set.
115  blobs_to_keep : A list of blob names to keep and don't free during
116  dynamic memory optimization (for example loss blob).
117  cpu_device Use CPU instead of GPU.
118  combine_spatial_bn:
119  When set to True, applies batch normalization across
120  all devices within the node. If False, batch
121  normalization will be done separately for each device.
122  This option is currently only supported on the CPU.
123  '''
124  assert scope.CurrentDeviceScope() is None \
125  or scope.CurrentDeviceScope().device_type == caffe2_pb2.CPU, \
126  "Parallelize must be called without device-scope, \
127  device scope was: {}".format(scope.CurrentDeviceScope())
128 
129  if devices is None:
130  devices = list(range(0, workspace.NumCudaDevices())),
131 
132  if not cpu_device:
133  for gpu in devices:
134  if gpu >= workspace.NumCudaDevices():
135  log.warning("** Only {} GPUs available, GPUs {} requested".format(
136  workspace.NumCudaDevices(), devices))
137  break
138  model_helper_obj._device_type = caffe2_pb2.CUDA
139  model_helper_obj._device_prefix = "gpu"
140  model_helper_obj._shared_model = False
141  device_name = "GPU"
142  assert shared_model is False, "Shared model only supported on CPU"
143  else:
144  model_helper_obj._device_type = caffe2_pb2.CPU
145  model_helper_obj._device_prefix = "cpu"
146  device_name = "CPU"
147  model_helper_obj._shared_model = shared_model
148  if shared_model and rendezvous is not None:
149  assert "Shared model only supported on single-node currently"
150 
151  log.info("Parallelizing model for devices: {}".format(devices))
152  extra_workers = 8 if rendezvous is not None else 0 # best-guess
153  num_workers = len(devices) * num_threads_per_device + extra_workers
154  max_concurrent_distributed_ops =\
155  min(max_concurrent_distributed_ops, num_workers - 1)
156  model_helper_obj.net.Proto().num_workers = num_workers
157  model_helper_obj.net.Proto().type = net_type
158 
159  # Store some information in the model -- a bit ugly
160  model_helper_obj._devices = devices
161  model_helper_obj._rendezvous = rendezvous
162  model_helper_obj._broadcast_context = None
163  model_helper_obj._grad_names = []
164 
165  assert isinstance(model_helper_obj, model_helper.ModelHelper)
166 
167  # Keep track of params that were in the model before: they are not
168  # data parallel, so we need to handle them separately
169  non_datapar_params = copy.copy(model_helper_obj.params)
170 
171  # Add input and model
172  log.info("Create input and model training operators")
173 
174  losses_by_gpu = {}
175  num_shards = 1 if rendezvous is None else rendezvous['num_shards']
176  loss_scale = 1.0 / (len(devices) * num_shards)
177 
178  has_parameter_updates = param_update_builder_fun is not None or \
179  optimizer_builder_fun is not None
180  assert not (
181  param_update_builder_fun is not None and
182  optimizer_builder_fun is not None
183  ), 'Can only specify one of param_update_builder_fun, optimizer_builder_fun'
184 
185  # Check that a model that is used for validation/testing has
186  # init_params False, otherwise running the param init net will overwrite
187  # synchronized values by the training net
188  if not has_parameter_updates and model_helper_obj.init_params:
189  log.warning('')
190  log.warning("############# WARNING #############")
191  log.warning("Model {}/{} is used for testing/validation but".format(
192  model_helper_obj.name, model_helper_obj))
193  log.warning("has init_params=True!")
194  log.warning("This can conflict with model training.")
195  log.warning("Please ensure model = ModelHelper(init_params=False)")
196  log.warning('####################################')
197  log.warning('')
198  # TODO: make into assert
199 
200  for device in devices:
201  device_opt = core.DeviceOption(model_helper_obj._device_type, device)
202  with core.DeviceScope(device_opt):
203  with core.NameScope("{}_{}".format(model_helper_obj._device_prefix,
204  device)):
205  log.info("Model for {} : {}".format(device_name, device))
206  input_builder_fun(model_helper_obj)
207  losses = forward_pass_builder_fun(model_helper_obj, loss_scale)
208  # Losses are not needed for test net
209  if has_parameter_updates:
210  assert isinstance(losses, list), \
211  'Model builder function must return list of loss blobs'
212  for loss in losses:
213  assert isinstance(loss, core.BlobReference), \
214  'Model builder func must return list of loss blobs'
215 
216  losses_by_gpu[device] = losses
217  _ValidateParams(model_helper_obj.params)
218 
219  # Create parameter map
220  model_helper_obj._device_grouped_blobs =\
221  _GroupByDevice(model_helper_obj, devices,
222  model_helper_obj.params, non_datapar_params)
223 
224  # computed params
225  computed_params_grouped =\
226  _GroupByDevice(model_helper_obj, devices,
227  model_helper_obj.GetComputedParams(''), [])
228  model_helper_obj._device_grouped_blobs.update(computed_params_grouped)
229 
230  model_helper_obj._param_names =\
231  list(viewkeys(model_helper_obj._device_grouped_blobs))
232  model_helper_obj._computed_param_names =\
233  list(viewkeys(computed_params_grouped))
234 
235  if not has_parameter_updates:
236  log.info("Parameter update function not defined --> only forward")
237  _InferBlobDevice(model_helper_obj)
238  return
239 
240  log.info("Adding gradient operators")
241  _AddGradientOperators(devices, model_helper_obj, losses_by_gpu)
242 
243  if combine_spatial_bn:
244  assert(cpu_device), \
245  'combine_spatial_bn is currently only supported on the CPU'
246  assert(has_parameter_updates), \
247  'combine_spatial_bn should only be used for train model'
248  _InterleaveOps(model_helper_obj)
249  _InterDeviceBatchNormalization(model_helper_obj)
250 
251  _ValidateParams(model_helper_obj.params)
252 
253  # Group gradients by device and register to blob lookup
254  param_to_grad = model_helper_obj.param_to_grad
255  grads_ordered = [param_to_grad[p] for p in
256  model_helper_obj.params if p in param_to_grad]
257  non_datapar_grads = [param_to_grad[p] for p in non_datapar_params]
258 
259  gradients_grouped = _GroupByDevice(
260  model_helper_obj,
261  devices,
262  grads_ordered,
263  non_datapar_grads
264  )
265  model_helper_obj._device_grouped_blobs.update(gradients_grouped)
266  model_helper_obj._grad_names = list(viewkeys(gradients_grouped))
267  model_helper_obj._losses_by_gpu = losses_by_gpu
268 
269  _InferBlobDevice(model_helper_obj)
270 
271  log.info("Add gradient all-reduces for SyncSGD")
272  if broadcast_computed_params:
273  _BroadcastComputedParams(devices, model_helper_obj, rendezvous, use_nccl)
274 
275  if len(model_helper_obj._grad_names) > 0:
276  # Gradients in reverse order
277  reverse_ordered_grads = _GetReverseOrderedGrads(model_helper_obj)
278  assert(len(reverse_ordered_grads) > 0)
279  _AllReduceBlobs(
280  reverse_ordered_grads,
281  devices,
282  model_helper_obj,
283  model_helper_obj.net,
284  rendezvous,
285  use_nccl,
286  max_concurrent_distributed_ops,
287  )
288  else:
289  log.info("NOTE: Param builder function did not create any parameters.")
290 
291  log.info("Post-iteration operators for updating params")
292  num_shards = 1 if rendezvous is None else rendezvous['num_shards']
293 
294  all_params = set(model_helper_obj.GetParams(''))
295  if shared_model:
296  _PruneParametersForSharing(model_helper_obj)
297 
298  if param_update_builder_fun is not None:
299  for device in devices:
300  device_opt = core.DeviceOption(model_helper_obj._device_type, device)
301  with core.DeviceScope(device_opt):
302  with core.NameScope(
303  "{}_{}".format(model_helper_obj._device_prefix, device)
304  ):
305  param_update_builder_fun(model_helper_obj)
306  else:
307  log.info("Calling optimizer builder function")
308  optimizer = optimizer_builder_fun(model_helper_obj)
309  model_helper_obj._optimizer = optimizer
310 
311  (sync_blobs, sync_names) = _ComputeBlobsToSync(model_helper_obj)
312  sync_blobs_grouped = _GroupByDevice(
313  model_helper_obj,
314  devices,
315  sync_blobs,
316  [],
317  )
318  model_helper_obj._device_grouped_blobs.update(sync_blobs_grouped)
319 
320  _InferBlobDevice(model_helper_obj)
321  _AnalyzeOperators(model_helper_obj)
322 
323  # Configure dagnet to run with only one worker on the first iteration,
324  # to prevent concurrency problems with allocs and nccl.
325  arg = model_helper_obj.Proto().arg.add()
326  arg.name = "first_iter_only_one_worker"
327  arg.i = 1
328 
329  # Add initial parameter syncs
330  log.info("Add initial parameter sync")
331  _SyncAllParams(
332  devices,
333  model_helper_obj,
334  model_helper_obj.param_init_net,
335  model_helper_obj.param_init_net,
336  rendezvous,
337  sync_names,
338  max_concurrent_distributed_ops=1
339  )
340 
341  # Handle any operations that need to be done after parameter sync
342  # i.e. making sure multi-precision copies of parameters are up-to-date
343  if post_sync_builder_fun is not None:
344  for device in devices:
345  device_opt = core.DeviceOption(model_helper_obj._device_type, device)
346  with core.DeviceScope(device_opt):
347  with core.NameScope(
348  "{}_{}".format(model_helper_obj._device_prefix, device)
349  ):
350  post_sync_builder_fun(model_helper_obj)
351 
352  assert not (optimize_gradient_memory and dynamic_memory_management), \
353  """It is not advised to use gradient optimization ('memonger')
354  with dynamic memory management."""
355 
356  if optimize_gradient_memory:
357  _OptimizeGradientMemorySimple(model_helper_obj, losses_by_gpu, devices)
358 
359  if dynamic_memory_management:
360  _AddDynamicMemoryOptimization(model_helper_obj, blobs_to_keep, devices)
361 
362  model_helper_obj._data_parallel_model_init_nets = [
363  model_helper_obj.param_init_net,
364  ]
365  model_helper_obj._data_parallel_model_nets = [model_helper_obj.net]
366 
367  if shared_model:
368  _RemapParameterBlobsForSharedModel(model_helper_obj, all_params)
369 
370 
371 def Parallelize_GPU_BMUF(*args, **kwargs):
372  kwargs['cpu_device'] = False
373  Parallelize_BMUF(*args, **kwargs)
374 
375 
376 def Parallelize_CPU_BMUF(*args, **kwargs):
377  kwargs['cpu_device'] = True
378  Parallelize_BMUF(*args, **kwargs)
379 
380 
381 def Parallelize_BMUF(
382  model_helper_obj,
383  input_builder_fun,
384  forward_pass_builder_fun,
385  param_update_builder_fun,
386  block_learning_rate=1.0,
387  block_momentum=None,
388  devices=None,
389  rendezvous=None,
390  net_type='dag',
391  master_device=None,
392  use_nccl=False,
393  nesterov=False,
394  optimize_gradient_memory=False,
395  reset_momentum_sgd=False,
396  warmup_iterations=None,
397  max_concurrent_distributed_ops=4,
398  add_blobs_to_sync=None,
399  num_threads_per_device=4,
400  cpu_device=False
401 ):
402  '''
403  Function to create model that run on many GPUs and creates a net for
404  parameter_updates that can be run independently for number of iterations
405  then followed by another net that runs once to compute the final parameter
406  updates according to block wise model update filtering rule described
407  in : Scalable Training of Deep Learning Machines by Incremental Block
408  Training with Intra-block Parallel Optimization and Blockwise Model-Update
409  Filtering (ICASSP 2016).
410  '''
411  assert scope.CurrentDeviceScope() is None \
412  or scope.CurrentDeviceScope().device_type == caffe2_pb2.CPU, \
413  "Parallelize must be called without device-scope, \
414  device scope was: {}".format(scope.CurrentDeviceScope())
415 
416  assert isinstance(model_helper_obj, model_helper.ModelHelper)
417 
418  if devices is None:
419  devices = list(range(0, workspace.NumCudaDevices()))
420  if master_device is None:
421  master_device = devices[0]
422 
423  if not cpu_device:
424  for gpu in devices:
425  if gpu >= workspace.NumCudaDevices():
426  log.warning("** Only {} GPUs available, GPUs {} requested".format(
427  workspace.NumCudaDevices(), devices))
428  break
429  model_helper_obj._device_type = caffe2_pb2.CUDA
430  model_helper_obj._device_prefix = "gpu"
431  else:
432  model_helper_obj._device_type = caffe2_pb2.CPU
433  model_helper_obj._device_prefix = "cpu"
434 
435  model_helper_obj._devices = devices
436  model_helper_obj._rendezvous = rendezvous
437  model_helper_obj._broadcast_context = None
438  model_helper_obj._shared_model = False
439  master_dev_opt = core.DeviceOption(model_helper_obj._device_type, master_device)
440 
441  # question: rendezvous structure
442  num_shards = rendezvous['num_shards'] if rendezvous else 1
443  # num_devices is #devices across all machines
444  num_devices = len(devices) * num_shards
445  # num_workers is #threads to execute the DAG per shard
446  num_workers = num_threads_per_device * len(devices)
447  if rendezvous:
448  num_workers += 8
449 
450  loss_scale = 1.0 / num_devices
451  if block_momentum is None:
452  block_momentum = 1.0 - 1.0 / num_devices
453 
454  max_concurrent_distributed_ops = min(
455  max_concurrent_distributed_ops,
456  num_workers - 1
457  )
458 
459  model_helper_obj.net.Proto().num_workers = num_workers
460  model_helper_obj.net.Proto().type = net_type
461 
462  # A net for initializing global model parameters. Its called once in the
463  # same step as net parameters initialization.
464  model_helper_obj._global_model_init_net = core.Net('global_model_init')
465  model_helper_obj._global_model_init_net.Proto().type = net_type
466  model_helper_obj._global_model_init_net.Proto().num_workers = \
467  num_workers
468 
469  # A net for computing final parameter updates. Its will run once after
470  # running net (local models updates) for `num_local_iterations` times.
471  model_helper_obj._global_model_param_updates_net = core.Net('global_model')
472  model_helper_obj._global_model_param_updates_net.Proto().type = net_type
473  model_helper_obj._global_model_param_updates_net.Proto().num_workers = \
474  num_workers
475 
476  def _v(param):
477  return "{}_v".format(param)
478 
479  def _g(param):
480  return "{}_g".format(param)
481 
482  def _v_prev(param):
483  return "{}_prev".format(param)
484 
485  # Keep track of params that were in the model before: they are not
486  # data parallel, so we need to handle them separately
487  non_datapar_params = copy.copy(model_helper_obj.params)
488  model_helper_obj._losses_by_gpu = {}
489 
490  def _InitializeModels(gpu_id):
491  input_builder_fun(model_helper_obj)
492  loss = forward_pass_builder_fun(model_helper_obj, loss_scale)
493  model_helper_obj._losses_by_gpu[gpu_id] = loss
494  _ForEachDevice(
495  devices,
496  _InitializeModels,
497  device_type=model_helper_obj._device_type,
498  device_prefix=model_helper_obj._device_prefix,
499  scoped=True
500  )
501  _ValidateParams(model_helper_obj.params)
502 
503  model_helper_obj._device_grouped_blobs =\
504  _GroupByDevice(model_helper_obj, devices,
505  model_helper_obj.params, non_datapar_params)
506 
507  model_helper_obj._param_names =\
508  list(viewkeys(model_helper_obj._device_grouped_blobs))
509 
510  _AddGradientOperators(
511  devices, model_helper_obj, model_helper_obj._losses_by_gpu
512  )
513  _ValidateParams(model_helper_obj.params)
514 
515  _InferBlobDevice(model_helper_obj)
516 
517  def _InitializeParamUpdate(gpu_id):
518  param_update_builder_fun(model_helper_obj)
519  _ForEachDevice(
520  devices,
521  _InitializeParamUpdate,
522  device_type=model_helper_obj._device_type,
523  device_prefix=model_helper_obj._device_prefix,
524  scoped=True
525  )
526 
527  model_parameter_names = list(
528  viewkeys(model_helper_obj._device_grouped_blobs)
529  )
530  if warmup_iterations is not None:
531  model_helper_obj._warmup_iterations = warmup_iterations
532  # A net for broadcasting gpu-0 (master shard) parameters after
533  # running net for `warmup_iterartions`.
534  model_helper_obj._warmup_broadcast = core.Net('warmup-broadcast')
535  model_helper_obj._warmup_broadcast.Proto().type = net_type
536  model_helper_obj._warmup_broadcast.Proto().num_workers = \
537  num_workers
538 
539  _SyncAllParams(
540  devices,
541  model_helper_obj,
542  model_helper_obj.param_init_net,
543  model_helper_obj._warmup_broadcast,
544  rendezvous,
545  model_parameter_names,
546  max_concurrent_distributed_ops
547  )
548  for param_name in viewkeys(model_helper_obj._device_grouped_blobs):
549  param = model_helper_obj._device_grouped_blobs[param_name][master_device]
550  with core.DeviceScope(master_dev_opt):
551  model_helper_obj._warmup_broadcast.Copy(param, _g(param))
552 
553  # (Step-0) Initialize momentum parameters on master device.
554  for param_name in viewkeys(model_helper_obj._device_grouped_blobs):
555  param = model_helper_obj._device_grouped_blobs[param_name][master_device]
556  with core.DeviceScope(master_dev_opt):
557  model_helper_obj._global_model_init_net.ConstantFill(
558  param, _v(param), value=0.0
559  )
560  model_helper_obj._global_model_init_net.Copy(param, _g(param))
561  if nesterov:
562  model_helper_obj._global_model_init_net.ConstantFill(
563  param, _v_prev(param), value=0.0
564  )
565 
566  # (Step-1) Update models for num_local_iterations.
567 
568  # (Step-2) Compute post-local-updates average of the params.
569  # Sum model params across GPUs and store resutls in param_avg blob.
570  _AllReduceBlobs(
571  model_parameter_names,
572  devices,
573  model_helper_obj,
574  model_helper_obj._global_model_param_updates_net,
575  rendezvous,
576  use_nccl,
577  max_concurrent_distributed_ops
578  )
579 
580  # (Step-3) Update momentum params :
581  # param_v = block_momentum * param_v
582  # + block_learning_Rate * (param_avg - param)
583  # if nesterov momentum:
584  # param = param + param_v
585  # - block_momentum * (param_v - param_v_prev)
586  # param_v_prev = param_v
587  # else:
588  # param = param + param_v
589  for param_name in model_parameter_names:
590  param = model_helper_obj._device_grouped_blobs[param_name][master_device]
591  with core.DeviceScope(master_dev_opt):
592  # TODO(ataei) : Stop building the graph here to get model average ?
593  model_helper_obj._global_model_param_updates_net.Scale(
594  param, param, scale=1.0 / num_devices
595  )
596  model_helper_obj._global_model_param_updates_net.Sub(
597  [param, _g(param)], param
598  )
599  model_helper_obj._global_model_param_updates_net.Scale(
600  param, param, scale=block_learning_rate
601  )
602  model_helper_obj._global_model_param_updates_net.Scale(
603  _v(param), _v(param), scale=block_momentum
604  )
605  model_helper_obj._global_model_param_updates_net.Add(
606  [_v(param), param], _v(param)
607  )
608  model_helper_obj._global_model_param_updates_net.Add(
609  [_g(param), _v(param)], _g(param)
610  )
611  if nesterov:
612  model_helper_obj._global_model_param_updates_net.Sub(
613  [_v(param), _v_prev(param)], _v_prev(param)
614  )
615  model_helper_obj._global_model_param_updates_net.Scale(
616  _v_prev(param), _v_prev(param), scale=block_momentum
617  )
618  model_helper_obj._global_model_param_updates_net.Sub(
619  [_g(param), _v_prev(param)], _g(param)
620  )
621  model_helper_obj._global_model_param_updates_net.Copy(
622  _v(param), _v_prev(param)
623  )
624  model_helper_obj._global_model_param_updates_net.Copy(
625  _g(param), param
626  )
627 
628 
629  _SyncAllParams(
630  devices,
631  model_helper_obj,
632  model_helper_obj.param_init_net,
633  model_helper_obj._global_model_param_updates_net,
634  rendezvous,
635  model_parameter_names,
636  max_concurrent_distributed_ops
637  )
638 
639  # Add additional syncs
640  if add_blobs_to_sync is not None:
641  AddBlobSync(
642  model_helper_obj,
643  add_blobs_to_sync,
644  net=model_helper_obj._global_model_param_updates_net)
645 
646  # Reset momentum-SGD parameters
647  if reset_momentum_sgd:
648  momentum_ops = [op for op in model_helper_obj.net.Proto().op
649  if op.type == 'MomentumSGDUpdate']
650  for op in momentum_ops:
651  momentum_blob = op.input[1]
652  with core.DeviceScope(op.device_option):
653  model_helper_obj._global_model_param_updates_net.ConstantFill(
654  [momentum_blob], momentum_blob, value=0.0
655  )
656 
657  if optimize_gradient_memory:
658  _OptimizeGradientMemorySimple(
659  model_helper_obj, model_helper_obj._losses_by_gpu, devices
660  )
661 
662  model_helper_obj._data_parallel_model_init_nets = [
663  model_helper_obj.param_init_net,
664  model_helper_obj._global_model_init_net
665  ]
666 
667  model_helper_obj._data_parallel_model_nets = [
668  model_helper_obj.net,
669  (model_helper_obj._global_model_param_updates_net, 1)
670  ]
671 
672 
673 def RunInitNet(model):
674  for init_net in model._data_parallel_model_init_nets:
675  workspace.RunNetOnce(init_net)
676  for net_iters in model._data_parallel_model_nets:
677  if isinstance(net_iters, tuple):
678  workspace.CreateNet(net_iters[0])
679  else:
680  workspace.CreateNet(net_iters)
681 
682 
683 def RunWarmup(model):
684  workspace.RunNet(model.net, model._warmup_iterations)
685  workspace.RunNetOnce(model._warmup_broadcast)
686 
687 
688 def RunNet(model, num_iterations):
689  for net_iter in model._data_parallel_model_nets:
690  if isinstance(net_iter, tuple):
691  workspace.RunNet(net_iter[0].Proto().name, net_iter[1])
692  else:
693  workspace.RunNet(net_iter, num_iterations)
694 
695 
696 barrier_instance = 0
697 
698 
699 def Synchronize(model, timeout_sec=_DEFAULT_TIMEOUT_SEC):
700  log.info("Creating synchronization barrier net")
701  assert model._rendezvous is not None, "Missing rendezvous"
702  assert model._rendezvous['engine'] == 'GLOO', "Engine does not support barrier"
703  assert model._rendezvous['num_shards'] > 1, \
704  "synchronization barrier requires multiple shards"
705  global barrier_instance
706  instance = barrier_instance
707  barrier_instance += 1
708  barrier_net = core.Net("sync_barrier_net_" + str(instance))
709  comm_world = _CreateOrCloneCommonWorld(
710  barrier_net,
711  "sync_barrier_cw_" + str(instance),
712  rendezvous=model._rendezvous,
713  status_blob="sync_barrier_cw_status_" + str(instance),
714  timeout_sec=timeout_sec,
715  )
716  barrier_net.Barrier(
717  inputs=[comm_world],
718  outputs=[],
719  engine=model._rendezvous['engine'],
720  status_blob="sync_barrier_status_" + str(instance),
721  )
722  workspace.RunNetOnce(barrier_net)
723 
724 
725 def ConvertNetForDevice(net, device=None):
726  '''
727  Converts all blobs in the net to have namescope gpu_X, and correct
728  device scope. You can use this to enable AppendNet with a
729  forward_pass_builder_fun:
730 
731  def builder_fun(model):
732  ...
733  model.net.AppendNet(
734  data_parallel_model.ConvertNetForDevice(othermodel.net))
735  model.param_init_net.AppendNet(
736  data_parallel_model.ConvertNetForDevice(othermodel.param_init_net))
737  '''
738  mnet = copy.deepcopy(net)
739 
740  if device is None:
741  device = scope.CurrentDeviceScope()
742 
743  device_prefix = "gpu" if device.device_type == caffe2_pb2.CUDA else "cpu"
744 
745  namescope = "{}_{}/".format(device_prefix, device.cuda_gpu_id)
746  for op in mnet.Proto().op:
747  if "RecurrentNetwork" in op.type:
748  raise("RecurrentNetwork conversion not yet supported")
749  for i, inputb in enumerate(op.input):
750  op.input[i] = namescope + inputb
751  for i, outputb in enumerate(op.output):
752  op.output[i] = namescope + outputb
753  for i, blob in enumerate(op.control_input):
754  op.control_input[i] = namescope + blob
755  op.device_option.CopyFrom(device)
756  for i, einp in enumerate(mnet.Proto().external_input):
757  mnet.Proto().external_input[i] = namescope + einp
758  for i, eoutp in enumerate(mnet.Proto().external_output):
759  mnet.Proto().external_output[i] = namescope + eoutp
760  return mnet
761 
762 
763 def _ForEachDevice(devices, f, device_type, device_prefix, scoped=False,
764  *args, **kwargs):
765  for device in devices:
766  device_opt = core.DeviceOption(device_type, device)
767  with core.DeviceScope(device_opt):
768  if scoped:
769  with core.NameScope("{}_{}".format(device_prefix, device)):
770  f(device, *args, **kwargs)
771  else:
772  f(device, *args, **kwargs)
773 
774 
775 def _AddGradientOperators(devices, model, losses_by_gpu):
776  def create_grad(lossp):
777  return model.ConstantFill(lossp, str(lossp) + "_grad", value=1.0)
778 
779  loss_grad = {}
780  # Explicitly need to create gradients on each GPU
781  for gpu_id in devices:
782  device = core.DeviceOption(model._device_type, gpu_id)
783  with core.DeviceScope(device):
784  for l in losses_by_gpu[gpu_id]:
785  lg = create_grad(l)
786  loss_grad[str(l)] = str(lg)
787 
788  model.AddGradientOperators(loss_grad)
789 
790 
791 def ExtractPredictorNet(model, inputs, outputs, device):
792  '''
793  Returns (net, params) that can be exported to be used as a prediction
794  net.
795  '''
796  master_device = model._devices[0]
797  prefix = "{}_{}/".format(model._device_prefix, master_device)
798  prefix_inputs = [prefix + str(b) for b in inputs]
799  prefix_outputs = [prefix + str(b) for b in outputs]
800  (predictor_net, export_blobs) = model_helper.ExtractPredictorNet(
801  net_proto=model.net.Proto(),
802  input_blobs=prefix_inputs,
803  output_blobs=prefix_outputs,
804  device=device,
805  renames={
806  a: b
807  for (a, b) in zip(prefix_inputs + prefix_outputs, inputs + outputs)
808  },
809  )
810 
811  return (predictor_net, export_blobs)
812 
813 
814 def GetCheckpointParams(model):
815  '''
816  Returns a set of blobs that are needed for a complete check point.
817  They are blobs for the first gpu and iteration blobs.
818  '''
819  (all_blobs, _) = _ComputeBlobsToSync(model)
820  first_gpu_blobs = {
821  b
822  for b in all_blobs
823  if str(b)
824  .startswith("{}_{}/".format(model._device_prefix, model._devices[0]))
825  }
826 
827  # Add iteration blobs that do not have namescope separately, since
828  # it is important to checkpoint iteration counter
829  iteration_blobs = set()
830  for op in model.net.Proto().op:
831  if op.type == 'Iter' or op.type == 'AtomicIter':
832  if not op.output[0].startswith("{}_".format(model._device_prefix)):
833  iteration_blobs.add(op.output[0])
834 
835  return first_gpu_blobs.union(iteration_blobs)
836 
837 
838 def FinalizeAfterCheckpoint(model, blobs=None):
839  '''
840  This function should be called after loading parameters from a
841  checkpoint / initial parameters file.
842  '''
843 
844  if not hasattr(model, "_checkpoint_net"):
845  if blobs is None:
846  (_, uniq_blob_names) = _ComputeBlobsToSync(model)
847  else:
848  uniq_blob_names = [stripBlobName(p) for p in blobs]
849 
850  # Synchronize to the blob lookup map, as the provided
851  # blobs might have non-parameters, such as momemtum blobs.
852  log.info("Creating checkpoint synchronization net")
853  devices = model.GetDevices()
854  for name in uniq_blob_names:
855  if name not in model._device_grouped_blobs:
856  grouped = {
857  d:
858  core.BlobReference("{}_{}{}{}".format(
859  model._device_prefix,
860  d,
861  scope._NAMESCOPE_SEPARATOR,
862  name)
863  ) for d in devices}
864  model._device_grouped_blobs[name] = grouped
865 
866  model._checkpoint_net = core.Net("checkpoint_sync_net")
867  model._checkpoint_net.RunAllOnGPU()
868 
869  checkpoint_init_net = None
870  if (model._rendezvous is not None and model._rendezvous['num_shards'] > 1):
871  checkpoint_init_net = core.Net("checkpoint_init_net")
872  checkpoint_init_net.RunAllOnGPU()
873 
874  _SyncAllParams(
875  devices,
876  model,
877  checkpoint_init_net,
878  model._checkpoint_net,
879  model._rendezvous,
880  uniq_blob_names,
881  max_concurrent_distributed_ops=1
882  )
883  if (checkpoint_init_net):
884  workspace.RunNetOnce(checkpoint_init_net)
885 
886  workspace.CreateNet(model._checkpoint_net)
887 
888  # Run the sync
889  log.info("Run checkpoint net")
890  workspace.RunNet(model._checkpoint_net.Proto().name)
891 
892 
893 def GetLearningRateBlobNames(model):
894  '''
895  Returns a list of learning rates blob names used in the optimizer.
896  '''
897  if model._optimizer is not None:
898  if model._device_type == caffe2_pb2.CPU:
899  return [model._optimizer.get_cpu_blob_name('lr')]
900  elif model._device_type == caffe2_pb2.CUDA:
901  return [model._optimizer.get_gpu_blob_name('lr', gpu, '')
902  for gpu in model._devices]
903  else:
904  raise Exception(
905  "Unsupported device type : {}".format(model._device_type)
906  )
907  else:
908  lr_blob_names = []
909  for op in model.net.Proto().op:
910  if op.type == "LearningRate":
911  lr_blob_names.append(op.output(0))
912  return lr_blob_names
913 
914 
915 def _Broadcast(devices, model, net, param, use_nccl=False):
916  # Copy params from gpu_0 to other
917  master_dev = devices[0]
918 
919  if use_nccl:
920  if _IsGPUBlob(model, param):
921  master_device_opt = core.DeviceOption(model._device_type, master_dev)
922  with core.DeviceScope(master_device_opt):
923  # Note that the root is the root _rank_ and not the root
924  # _device_. Thus we always use root=0, regardless of the
925  # devices used.
926  net.NCCLBroadcast(
927  list(viewvalues(model._device_grouped_blobs[param])),
928  list(viewvalues(model._device_grouped_blobs[param])),
929  root=0,
930  )
931  return
932 
933  for dev_idx in devices[1:]:
934  if _IsGPUBlob(model, param):
935  device_opt = core.DeviceOption(caffe2_pb2.CUDA, dev_idx)
936  else:
937  device_opt = core.DeviceOption(caffe2_pb2.CPU, 0)
938  with core.DeviceScope(device_opt):
939  net.Copy(
940  model._device_grouped_blobs[param][master_dev],
941  model._device_grouped_blobs[param][dev_idx]
942  )
943 
944 
945 def _AllReduce(devices, model, net, param, use_nccl=False, control_input=None):
946  blobs_group = list(viewvalues(model._device_grouped_blobs[param]))
947  if model._device_type == caffe2_pb2.CUDA and use_nccl:
948  # TODO: for _shared_model, do only NCCLReduce
949  model.NCCLAllreduce(
950  blobs_group, blobs_group, control_input=control_input
951  )
952  return
953 
954  if model._device_type == caffe2_pb2.CUDA:
955  p2p_access_pattern = workspace.GetCudaPeerAccessPattern()
956  else:
957  p2p_access_pattern = None
958 
959  def sumN(*dev_indices):
960  """Create a Sum op for 2 or more blobs on different devices.
961  Saves the result on the first device.
962 
963  Arguments:
964  dev_indices -- a list of device indices, which can be translated into
965  CUDA identifiers with model._devices
966  """
967  devices = [model._devices[idx] for idx in dev_indices]
968  blobs = [blobs_group[idx] for idx in dev_indices]
969  for i, peer in enumerate(devices):
970  if i == 0:
971  continue # Skip the first device
972  if p2p_access_pattern is not None and not p2p_access_pattern[
973  devices[0], peer
974  ]:
975  # Copy from peer to d0
976  blobs[i] = model.Copy(
977  blobs[i],
978  'gpu_{}/{}_gpu{}_copy'.format(devices[0], param, peer)
979  )
980  device_opt = core.DeviceOption(model._device_type, devices[0])
981  with core.DeviceScope(device_opt):
982  net.Sum(blobs, [blobs[0]], name='dpm')
983 
984  if len(devices) == 16:
985  # Special tree reduction for 16 gpus, TODO generalize like in muji.py
986  for j in range(8):
987  sumN(j * 2, j * 2 + 1)
988  for j in range(4):
989  sumN(j * 4, j * 4 + 2)
990  for j in range(2):
991  sumN(j * 8, j * 8 + 4)
992  sumN(0, 8)
993  elif len(devices) == 8:
994  for j in range(4):
995  sumN(j * 2, j * 2 + 1)
996  for j in range(2):
997  sumN(j * 4, j * 4 + 2)
998  sumN(0, 4)
999  elif len(devices) == 4:
1000  sumN(0, 1)
1001  sumN(2, 3)
1002  sumN(0, 2)
1003  else:
1004  sumN(*range(len(devices)))
1005  # TODO: for _shared_model, no need to broadcast
1006  _Broadcast(devices, model, net, param)
1007 
1008 
1009 def _SyncAllParams(
1010  devices,
1011  model,
1012  init_net,
1013  net,
1014  rendezvous,
1015  unique_param_names,
1016  max_concurrent_distributed_ops=4
1017 ):
1018  if rendezvous is None or rendezvous['num_shards'] <= 1:
1019  _SyncAllParamsSingleHost(devices, model, net, unique_param_names)
1020  else:
1021  _SyncAllParamsDistributed(
1022  devices,
1023  model,
1024  init_net,
1025  net,
1026  rendezvous,
1027  unique_param_names,
1028  max_concurrent_distributed_ops
1029  )
1030 
1031 
1032 def AddBlobSync(model, blobs, net=None):
1033  '''
1034  Sync a blob across devices and hosts
1035  '''
1036  if len(blobs) == 0:
1037  return
1038  net = model.net if net is None else net
1039  for b in blobs:
1040  assert not b.startswith(model._device_prefix), \
1041  "Provide unprefixed blob name: {}".format(b)
1042  model._device_grouped_blobs[b] = {
1043  d: core.BlobReference("{}_{}/{}".format(model._device_prefix, d, b))
1044  for d in model._devices
1045  }
1046 
1047  _SyncAllParams(
1048  model._devices,
1049  model,
1050  model.param_init_net,
1051  net,
1052  model._rendezvous,
1053  set(blobs))
1054 
1055 
1056 def AddDistributedBlobSync(model, blobs):
1057  '''
1058  Sync blobs across machines (but not across devices)
1059  '''
1060  if model._rendezvous is None:
1061  return
1062  synth_name = "_".join([str(b) for b in blobs])
1063  comm_world = _CreateOrCloneCommonWorld(
1064  model.param_init_net,
1065  "blob_sync_cw_" + synth_name,
1066  rendezvous=model._rendezvous,
1067  status_blob="create_blob_sync_cw_{}_cw_status".format(
1068  synth_name,
1069  ),
1070  )
1071 
1072  model.net.Allreduce(
1073  inputs=[comm_world] + blobs,
1074  outputs=blobs,
1075  engine=model._rendezvous['engine'],
1076  status_blob="blob_sync_allred_{}_status".format(synth_name),
1077  )
1078 
1079 
1080 def _SyncAllParamsDistributed(
1081  devices,
1082  model,
1083  init_net,
1084  net,
1085  rendezvous,
1086  unique_param_names,
1087  max_concurrent_distributed_ops
1088 ):
1089  assert rendezvous['num_shards'] > 1
1090 
1091  gpu_device_opt = core.DeviceOption(model._device_type, devices[0])
1092  cpu_device_opt = core.DeviceOption(caffe2_pb2.CPU)
1093 
1094  if model._broadcast_context is None:
1095  model._broadcast_context = CollectivesConcurrencyControl(
1096  "broadcast",
1097  max_concurrent_distributed_ops,
1098  init_net,
1099  rendezvous
1100  )
1101  context = model._broadcast_context
1102 
1103  for param_name in sorted(unique_param_names):
1104  master_param = model._device_grouped_blobs[param_name][devices[0]]
1105  params_group = list(viewvalues(model._device_grouped_blobs[param_name]))
1106 
1107  def broadcast(params):
1108  comm_world, control_input = context.get_control_and_context(params)
1109  net.Broadcast(
1110  inputs=[comm_world] + params,
1111  outputs=params,
1112  name=param_name,
1113  engine=rendezvous['engine'],
1114  status_blob="broadcast_{}_status".format(str(param_name)),
1115  control_input=control_input
1116  )
1117 
1118  device_opt = gpu_device_opt if _IsGPUBlob(
1119  model, param_name
1120  ) else cpu_device_opt
1121 
1122  if rendezvous['engine'] == 'GLOO':
1123  with core.DeviceScope(device_opt):
1124  broadcast(params_group)
1125  else:
1126  # Copy between GPU and CPU
1127  with core.DeviceScope(device_opt):
1128  param_cpu = net.CopyGPUToCPU(
1129  master_param,
1130  str(master_param) + "cpu"
1131  )
1132  with core.DeviceScope(cpu_device_opt):
1133  broadcast([param_cpu])
1134  with core.DeviceScope(device_opt):
1135  net.CopyCPUToGPU(param_cpu, master_param)
1136 
1137  # Broadcast locally
1138  _Broadcast(devices, model, net, param_name)
1139 
1140 
1141 def _SyncAllParamsSingleHost(devices, model, net, unique_param_names):
1142  for param in unique_param_names:
1143  _Broadcast(devices, model, net, param)
1144 
1145 
1146 def _AllReduceBlobs(blob_names, devices, model, net, rendezvous, use_nccl,
1147  max_concurrent_distributed_ops):
1148  if rendezvous is None or rendezvous['num_shards'] <= 1:
1149  _AllReduceBlobsSingleHost(
1150  blob_names,
1151  devices,
1152  model,
1153  net,
1154  use_nccl
1155  )
1156  else:
1157  _AllReduceBlobsDistributed(
1158  blob_names,
1159  devices,
1160  model,
1161  net,
1162  rendezvous,
1163  max_concurrent_distributed_ops,
1164  )
1165 
1166 
1167 def _PruneParametersForSharing(model):
1168  assert model._shared_model
1169  master_prefix = "{}_{}/".format(model._device_prefix, model._devices[0])
1170 
1171  # Remove non-master parameters so that they will not receive parameter
1172  # update operators.
1173  model.params = model.GetParams(master_prefix)
1174  paramset = set(model.params)
1175 
1176  model.param_to_grad = {
1177  p: model.param_to_grad[p]
1178  for p in model.param_to_grad if p in paramset
1179  }
1180  model.weights = [w for w in model.weights if w in paramset]
1181  model.biases = [w for w in model.biases if w in paramset]
1182 
1183 
1184 def _RemapParameterBlobsForSharedModel(model, all_params):
1185  assert model._shared_model
1186  master_prefix = "{}_{}/".format(
1187  model._device_prefix, model._devices[0])
1188  log.info("Remapping param blobs to master -> {}".format(master_prefix))
1189  master_params = set(model.GetParams())
1190 
1191  # Remove all but master params
1192  def modify_ops(net):
1193  ops = []
1194  for op in net.Proto().op:
1195  delete_op = False
1196  # Delete ops that output non-master version of parameter
1197  for outp in op.output:
1198  if outp in all_params and outp not in master_params:
1199  delete_op = True
1200  log.debug("Delete b/c {}: {}".format(outp, str(op)))
1201  break
1202  if delete_op:
1203  continue
1204  # Remap inputs to point to the master param
1205  for j, inp in enumerate(op.input):
1206  if inp in all_params and inp not in master_params:
1207  op.input[j] = master_prefix + stripBlobName(inp)
1208  ops.append(op)
1209  del net.Proto().op[:]
1210  net.Proto().op.extend(ops)
1211 
1212  modify_ops(model.param_init_net)
1213  modify_ops(model.net)
1214 
1215 
1217  """
1218  Creates common worlds (up to max_concurrent_context) and manage the
1219  sequential execution of collectives that shares the same context with
1220  cyclic control inputs.
1221  """
1222  def __init__(
1223  self,
1224  name,
1225  max_concurrent_context,
1226  param_init_net,
1227  rendezvous
1228  ):
1229  self.name = name
1230  self.param_init_net = param_init_net
1231  self.max_concurrent_context = max_concurrent_context
1232  self.counter = 0
1233  self.common_worlds = []
1234  self.control_inputs = []
1235  self.rendezvous = rendezvous
1236 
1237  def get_control_and_context(self, control_output_blob):
1238  common_world, control_input = [None, None]
1239  current_slot = self.counter % self.max_concurrent_context
1240  if len(self.common_worlds) < self.max_concurrent_context:
1241  common_world = _CreateOrCloneCommonWorld(
1242  self.param_init_net,
1243  "{}_{}_cw".format(self.name, current_slot),
1244  rendezvous=self.rendezvous,
1245  status_blob="create_{}_cw_{}_status".format(
1246  self.name,
1247  current_slot
1248  ),
1249  )
1250  self.common_worlds.append(common_world)
1251  self.control_inputs.append(control_output_blob)
1252  else:
1253  common_world = self.common_worlds[current_slot]
1254  control_input = self.control_inputs[current_slot]
1255  self.control_inputs[current_slot] = control_output_blob
1256  self.counter += 1
1257  return common_world, control_input
1258 
1259 
1260 def _AllReduceBlobsDistributed(
1261  blob_names,
1262  devices,
1263  model,
1264  net,
1265  rendezvous,
1266  max_concurrent_distributed_ops,
1267 ):
1268  num_workers = model.net.Proto().num_workers
1269  assert num_workers > 1, "Please specify more than 1 worker"
1270  all_reduce_engine = rendezvous['engine']
1271 
1272  master_device_opt = core.DeviceOption(model._device_type, devices[0])
1273 
1274  reducing_device_opt = master_device_opt
1275 
1277  "allreduce",
1278  max_concurrent_distributed_ops,
1279  model.param_init_net,
1280  rendezvous
1281  )
1282 
1283  nccl_control_blob = None
1284 
1285  for blob_name in blob_names:
1286  master_blob = model._device_grouped_blobs[blob_name][devices[0]]
1287  blobs_group = list(viewvalues(model._device_grouped_blobs[blob_name]))
1288 
1289  assert master_blob in blobs_group
1290 
1291  # Remark: NCCLReduce does not support in-place modifications
1292  # so we need a temporary blob
1293  reduced_blob = str(master_blob) + "_red"
1294 
1295  def allreduce(blobs, **kwargs):
1296  with core.DeviceScope(reducing_device_opt):
1297  comm_world, control_input = \
1298  context.get_control_and_context(blobs[0])
1299  net.Allreduce(
1300  inputs=[comm_world] + blobs,
1301  outputs=blobs,
1302  name=blob_name,
1303  engine=all_reduce_engine,
1304  control_input=control_input,
1305  status_blob="allreduce_{}_status".format(blob_name),
1306  **kwargs
1307  )
1308 
1309  if rendezvous['engine'] == 'GLOO':
1310  # With Gloo cross GPU and cross machine allreduce
1311  # can be executed in a single operation.
1312  # Try to use GPUDirect if transport == ibverbs.
1313  allreduce(
1314  blobs_group,
1315  gpu_direct=(rendezvous.get("transport", None) == "ibverbs"),
1316  )
1317  else:
1318  # Step 1: sum blobs from local GPUs to master GPU
1319  with core.DeviceScope(master_device_opt):
1320  model.ConstantFill(master_blob, reduced_blob, value=0.0)
1321 
1322  # Temp fix since NCCLReduce does not work
1323  net.NCCLAllreduce(
1324  blobs_group,
1325  blobs_group,
1326  control_input=nccl_control_blob,
1327  )
1328  nccl_control_blob = blobs_group[0]
1329  net.Copy(master_blob, reduced_blob)
1330 
1331  # Step 2: allreduce between all hosts, between master GPUs
1332  allreduce([reduced_blob])
1333 
1334  with core.DeviceScope(master_device_opt):
1335  net.Copy(reduced_blob, master_blob)
1336 
1337  # Step 3: broadcast locally
1338  _Broadcast(devices, model, net, blob_name)
1339 
1340 
1341 def _AllReduceBlobsSingleHost(blob_names, devices, model, net, use_nccl):
1342  """Performs NCCL AllReduce to distribute blobs to all the GPUs."""
1343 
1344  if len(devices) == 1:
1345  return
1346 
1347  # Now we need to Allreduce blobs on all the GPUs.
1348  # Pick GPU #0 as a master GPU.
1349  master_device_opt = core.DeviceOption(model._device_type, devices[0])
1350  last_out = None
1351  concatenated_idx = set()
1352 
1353  for blob_name in blob_names:
1354  # Group by blob_name for reduce.
1355  blobs_group = list(viewvalues(model._device_grouped_blobs[blob_name]))
1356  if len(blobs_group) == 1:
1357  # Non-reducible
1358  continue
1359  assert len(blobs_group) == len(devices), \
1360  "Each GPU from {}, should have a copy of {}.".format(
1361  devices, blob_name)
1362 
1363  if _IsGPUBlob(model, blob_name):
1364  with core.DeviceScope(master_device_opt):
1365  if not isinstance(blobs_group[0], core.GradientSlice):
1366  _AllReduce(
1367  devices, model, net, blob_name, use_nccl, last_out
1368  )
1369  # last_out is used to serialize the execution of nccls
1370  last_out = blobs_group[0]
1371 
1372  else:
1373  # Sparse gradients: all-gather for indices and values
1374  master_ns = "{}_{}".format(model._device_prefix, devices[0])
1375  '''
1376  Skip if we have already copied concatenated indices
1377  to the indices of GradientSlice. This happens when two
1378  or more grad blobs are gathered with the same indices
1379  blob
1380  '''
1381  skip_idx_concat = False
1382  for g in blobs_group:
1383  if g.indices in concatenated_idx:
1384  skip_idx_concat = True
1385 
1386  if not skip_idx_concat:
1387  grad_idx_concat, _ = net.Concat(
1388  [g.indices for g in blobs_group],
1389  ["{}/{}_index_concat".format(master_ns, blob_name),
1390  "{}/{}_index_splitinfo".format(master_ns, blob_name)],
1391  axis=0,
1392  name="note:data_parallel_model")
1393 
1394  for gpu, g in viewitems(model._device_grouped_blobs[blob_name]):
1395  device_opt = core.DeviceOption(model._device_type, gpu)
1396  with core.DeviceScope(device_opt):
1397  model.Copy(grad_idx_concat, g.indices)
1398  concatenated_idx.add(g.indices)
1399 
1400  grad_val_concat, _ = net.Concat(
1401  [g.values for g in blobs_group],
1402  ["{}/{}_val_concat".format(master_ns, blob_name),
1403  "{}/{}_val_splitinfo".format(master_ns, blob_name)],
1404  axis=0, name="note:data_parallel_model")
1405 
1406  for gpu, g in viewitems(model._device_grouped_blobs[blob_name]):
1407  device_opt = core.DeviceOption(model._device_type, gpu)
1408  with core.DeviceScope(device_opt):
1409  model.Copy(grad_val_concat, g.values)
1410 
1411  else:
1412  assert not isinstance(blobs_group[0], core.GradientSlice), \
1413  "Synchronizing gradient slices not supported"
1414  with core.DeviceScope(core.DeviceOption(caffe2_pb2.CPU)):
1415  # Poor man's allreduce
1416  net.Sum(blobs_group, [blobs_group[0]])
1417  if not model._shared_model:
1418  _Broadcast(devices, model, net, blob_name)
1419 
1420 
1421 def _BroadcastComputedParams(devices, model, rendezvous, use_nccl=False):
1422  if rendezvous is None:
1423  _BroadcastComputedParamsSingleHost(devices, model, use_nccl)
1424  else:
1425  _BroadcastComputedParamsDistributed(devices, model, rendezvous, use_nccl)
1426 
1427 
1428 def _BroadcastComputedParamsDistributed(
1429  devices,
1430  model,
1431  rendezvous,
1432  use_nccl=False
1433 ):
1434  _BroadcastComputedParamsSingleHost(devices, model, use_nccl)
1435  log.warn("Distributed broadcast of computed params is not implemented yet")
1436 
1437 
1438 def _BroadcastComputedParamsSingleHost(devices, model, use_nccl=False):
1439  '''
1440  Average computed params over all devices
1441  '''
1442  if len(devices) == 1:
1443  return
1444 
1445  for param_name in model._computed_param_names:
1446  # Copy from master to others -- averaging would be perhaps better,
1447  # but currently NCCLAllReduce is too prone to deadlock
1448  _Broadcast(devices, model, model.net, param_name, use_nccl)
1449 
1450 
1451 def _GetReverseOrderedGrads(model):
1452  '''
1453  Returns the gradients in reverse order (namespace stripped),
1454  for the optimal synchronization order.
1455  '''
1456  return list(reversed(model._grad_names))
1457 
1458 
1459 # A helper function to extract a parameter's name
1460 def stripBlobName(param):
1461  # Format is "a/b/c/d" -> "b/c/d"
1462  if isinstance(param, core.GradientSlice):
1463  return stripBlobName(param.indices) + ":" + stripBlobName(param.values)
1464  else:
1465  name = str(param)
1466  return name[name.index(scope._NAMESCOPE_SEPARATOR) + 1:]
1467 
1468 
1469 def _AnalyzeOperators(model):
1470  '''
1471  Look at all the operators and check that they do not cross device scopes
1472  '''
1473  for op in model.Proto().op:
1474  if "NCCL" in op.type or "Copy" in op.type or "Concat" in op.type:
1475  continue
1476  if "Sum" == op.type and op.name == "dpm":
1477  continue
1478  if "Allreduce" in op.type and "GLOO" in op.engine:
1479  continue
1480 
1481  op_dev = op.device_option
1482  op_gpu = op_dev.cuda_gpu_id
1483 
1484  # This avoids failing on operators that are only for CPU
1485  if op_dev.device_type != caffe2_pb2.CUDA:
1486  continue
1487 
1488  namescope = "{}_{}/".format(model._device_prefix, op_gpu)
1489  for inp in list(op.input) + list(op.output):
1490  if inp.startswith("{}_".format(model._device_prefix)
1491  ) and not inp.startswith(namescope):
1492  raise Exception(
1493  "Blob {} of op {}, should have namescope {}. Op: {}".format(
1494  inp,
1495  op.type,
1496  "{}_{}/".format(model._device_prefix, op_gpu),
1497  str(op),
1498  )
1499  )
1500 
1501 
1502 def _InferBlobDevice(model):
1503  '''
1504  Assign blob to device option based on the operator outputing it
1505  '''
1506  mapping = {}
1507 
1508  def map_ops(proto):
1509  for op in proto.op:
1510  device_option = op.device_option
1511  if op.type == "Iter":
1512  # Hack for Iters which have blob in CPU context
1513  device_option = caffe2_pb2.DeviceOption()
1514  device_option.device_type = caffe2_pb2.CPU
1515  for b in list(op.input) + list(op.output):
1516  if b not in mapping:
1517  mapping[b] = device_option
1518  if op.type.startswith('RecurrentNetwork'):
1519  step_args = [a for a in op.arg if a.name.endswith("step_net")]
1520  for step_arg in step_args:
1521  map_ops(step_arg.n)
1522  map_ops(model.param_init_net.Proto())
1523  map_ops(model.net.Proto())
1524  model._blob_to_device = mapping
1525 
1526 def _IsGPUBlob(model, blob_name):
1527  if blob_name in model._blob_to_device:
1528  return model._blob_to_device[blob_name].device_type == caffe2_pb2.CUDA
1529  else:
1530  blob_name = "{}_{}/{}".format(
1531  model._device_prefix, model._devices[0], blob_name
1532  )
1533  if blob_name not in model._blob_to_device:
1534  return model._device_type == caffe2_pb2.CUDA
1535  return model._blob_to_device[blob_name].device_type == caffe2_pb2.CUDA
1536 
1537 
1538 def _GroupByDevice(model, devices, params, non_data_params):
1539  '''
1540  Groups blobs by device, returning a map of [blobname] = {0: BlobRef, 1: ..}.
1541  Returns ordered dictionary, ensuring the original order.
1542  '''
1543  grouped = OrderedDict()
1544  # Only consider params that were created to be "data parallel"
1545  params = params[len(non_data_params):]
1546 
1547  for _i, p in enumerate(params):
1548  assert isinstance(p, core.BlobReference) or \
1549  isinstance(p, core.GradientSlice), \
1550  "Param {} is not BlobReference or GradientSlice".format(p)
1551 
1552  name = stripBlobName(p)
1553  gpuid = None
1554 
1555  if isinstance(p, core.BlobReference):
1556  gpuid = int(p.GetNameScope().split("_")[1].split("/")[0])
1557  assert "{}_{}/".format(model._device_prefix, gpuid) in p.GetNameScope(),\
1558  "Param {} expected to have namescope '{}_{}'".format(str(p), model._device_prefix, gpuid)
1559  else:
1560  gpuid = int(p.indices.GetNameScope().split("_")[1].split("/")[0])
1561  assert "{}_{}/".format(model._device_prefix, gpuid) in p.indices.GetNameScope(),\
1562  "Indices {} expected to have namescope '{}_{}'".format(str(p), model._device_prefix, gpuid)
1563  assert "{}_{}/".format(model._device_prefix, gpuid) in p.values.GetNameScope(),\
1564  "Values {} expected to have namescope '{}_{}'".format(str(p), model._device_prefix, gpuid)
1565 
1566  if name not in grouped:
1567  grouped[name] = {}
1568  grouped[name][gpuid] = p
1569 
1570  return grouped
1571 
1572 
1573 def _ValidateParams(params):
1574  set_params = set(params)
1575  if len(params) > len(set_params):
1576  dupes = []
1577  sp = sorted(params)
1578  for j, p in enumerate(sp):
1579  if j > 0 and sp[j - 1] == p:
1580  dupes.append(p)
1581 
1582  assert len(params) == len(set_params), \
1583  "Duplicate entries in params: {}".format(dupes)
1584 
1585 
1586 def _ComputeBlobsToSync(model):
1587  '''
1588  We sync all blobs that are generated by param init net and
1589  are 'data parallel', i.e assigned to a device
1590  '''
1591  sync_names = set()
1592 
1593  # We don't sync params if the model is shared
1594  if model._shared_model:
1595  blobs_to_sync = [str(p) for p in model.GetComputedParams('')]
1596  sync_names = [stripBlobName(p) for p in blobs_to_sync]
1597  else:
1598  blobs_to_sync = []
1599 
1600  for op in model.param_init_net.Proto().op:
1601  dp_outputs = [
1602  o for o in op.output
1603  if o.startswith("{}_".format(model._device_prefix))
1604  ]
1605  sync_names.update([stripBlobName(o) for o in dp_outputs])
1606  blobs_to_sync.extend(dp_outputs)
1607 
1608  # Sanity check
1609  diff = set(model._param_names) - sync_names
1610  assert diff == set(), \
1611  "Some params not instantiated in param init net: {}".format(diff)
1612 
1613  # Remove duplicates and sort
1614  prefixlen = len(model._device_prefix) + 1
1615 
1616  def extract_sort_key(b):
1617  # Sort first based on device id, and then by whole string
1618  deviceid = int(b[prefixlen:b.index(scope._NAMESCOPE_SEPARATOR)])
1619  return (deviceid, b)
1620 
1621  blobs_to_sync = sorted(
1622  list(set(blobs_to_sync)),
1623  key=extract_sort_key)
1624 
1625  blobs_to_sync = [core.BlobReference(b) for b in blobs_to_sync]
1626  return (blobs_to_sync, sync_names)
1627 
1628 
1629 def _OptimizeGradientMemorySimple(model, losses_by_gpu, devices):
1630  log.warning("------- DEPRECATED API, please use " +
1631  "data_parallel_model.OptimizeGradientMemory() ----- ")
1632  for device in devices:
1633  namescope = "{}_{}/".format(model._device_prefix, device)
1634  model.net._net = memonger.share_grad_blobs(
1635  model.net,
1636  losses_by_gpu[device],
1637  set(viewvalues(model.param_to_grad)),
1638  namescope,
1639  share_activations=False,
1640  )
1641 
1642 
1643 def _AddDynamicMemoryOptimization(model, blobs_to_keep, devices):
1644  blobs_to_keep_all_devices = set()
1645  if blobs_to_keep is not None:
1646  for device in devices:
1647  for blob_name in blobs_to_keep:
1648  blobs_to_keep_all_devices.add(
1649  "{}_{}/{}".format(model._device_prefix, device, blob_name)
1650  )
1651 
1652  if model._rendezvous is not None:
1653  # GLOO operators expect the tensor addresses to remain same over
1654  # iterations so we need to remove param grads from the dynamic memory
1655  # management.
1656  blobs_to_keep_all_devices.update(
1657  [str(b) for b in viewvalues(model.param_to_grad)]
1658  )
1659 
1660  model.net._net = memonger.release_blobs_when_used(
1661  model.net.Proto(),
1662  blobs_to_keep_all_devices
1663  )
1664 
1665 
1666 def OptimizeGradientMemory(model,
1667  input_shapes,
1668  excluded_blobs,
1669  recycle_activations):
1670  """
1671  Optimize memory usage of the backward pass by recycling blobs for gradient
1672  inputs that have been 'used'.
1673  input_shapes: dict of blob name to shape for the inputs of the model.
1674  Pass empty dictionary if not known.
1675  excluded_blobs: list of blobs that cannot be recycled. These are blobs
1676  that you will access externally.
1677  recycle_activations: whether to also recycle forward pass activations
1678  """
1679  if input_shapes is not None:
1680  input_shapes_all_devices = {}
1681  for b, shp in viewitems(input_shapes):
1682  for d in model._devices:
1683  input_shapes_all_devices["{}_{}/{}".
1684  format(model._device_prefix, d, b)] = shp
1685 
1686  (shapes, types) = workspace.InferShapesAndTypes(
1687  [model.param_init_net, model.net],
1688  input_shapes_all_devices,
1689  )
1690  else:
1691  shapes = None
1692 
1693  for device in model._devices:
1694  namescope = "{}_{}/".format(model._device_prefix, device)
1695  excluded_blobs_by_device = set(namescope + b for b in excluded_blobs)
1696  model.net._net = memonger.share_grad_blobs(
1697  model.net,
1698  model._losses_by_gpu[device],
1699  set(viewvalues(model.param_to_grad)),
1700  namescope,
1701  dont_share_blobs=excluded_blobs_by_device,
1702  share_activations=recycle_activations,
1703  blob_shapes=shapes,
1704  )
1705 
1706 
1707 def _CreateOrCloneCommonWorld(
1708  net,
1709  common_world_blob,
1710  rendezvous,
1711  name=None,
1712  status_blob=None,
1713  timeout_sec=None):
1714 
1715  if timeout_sec is None:
1716  timeout_sec = _DEFAULT_TIMEOUT_SEC
1717 
1718  timeout_ms = timeout_sec * 1000
1719 
1720  # Check if there is an existing CreateCommonWorld
1721  # with the same timeout we're looking for. If so,
1722  # we can clone it instead of creating a new one.
1723  existing = None
1724  for op in net.Proto().op:
1725  if op.type != "CreateCommonWorld":
1726  continue
1727 
1728  # Find common world timeout
1729  op_timeout_ms = -1
1730  for arg in op.arg:
1731  if arg.name == 'timeout_ms':
1732  op_timeout_ms = arg.i
1733  break
1734  if op_timeout_ms != timeout_ms:
1735  continue
1736 
1737  # This common world was created with the same timeout we're
1738  # looking for, so we can clone it
1739  existing = op.output[0]
1740  break
1741 
1742  if name is None:
1743  name = "{}_op".format(common_world_blob)
1744 
1745  if existing is not None:
1746  comm_world = net.CloneCommonWorld(
1747  [existing],
1748  common_world_blob,
1749  name=name,
1750  engine=rendezvous['engine'],
1751  status_blob=status_blob,
1752  )
1753  else:
1754  kwargs=dict()
1755  if 'transport' in rendezvous:
1756  kwargs['transport'] = rendezvous['transport']
1757  if 'interface' in rendezvous:
1758  kwargs['interface'] = rendezvous['interface']
1759  if 'mpi_rendezvous' in rendezvous:
1760  kwargs['mpi_rendezvous'] = rendezvous['mpi_rendezvous']
1761  comm_world = net.CreateCommonWorld(
1762  rendezvous['kv_handler'] or [],
1763  common_world_blob,
1764  name=name,
1765  size=rendezvous['num_shards'],
1766  rank=rendezvous['shard_id'],
1767  engine=rendezvous['engine'],
1768  status_blob=status_blob,
1769  timeout_ms=timeout_ms,
1770  **kwargs
1771  )
1772 
1773  return comm_world
1774 
1775 
1776 def _RunComparison(model, blob_name, device=None):
1777  if device is None:
1778  device = model._blob_to_device[blob_name]
1779  with core.DeviceScope(device):
1780  rendezvous = model._rendezvous
1781  if rendezvous is None or rendezvous['num_shards'] == 1:
1782  return True
1783 
1784  test_data_arr = np.zeros(rendezvous['num_shards']).astype(np.float32)
1785  test_data_arr[rendezvous['shard_id']] = 1
1786  workspace.FeedBlob("compare_arr", test_data_arr)
1787 
1788  comparison_net = core.Net("allcompare_net")
1789 
1790  kwargs=dict()
1791  if 'mpi_rendezvous' in rendezvous:
1792  kwargs['mpi_rendezvous'] = rendezvous['mpi_rendezvous']
1793  comm_world = comparison_net.CreateCommonWorld(
1794  rendezvous['kv_handler'] or [],
1795  "initial_sync",
1796  name=model.net.Proto().name + ".cw_master_select",
1797  size=rendezvous['num_shards'],
1798  rank=rendezvous['shard_id'],
1799  engine=rendezvous['engine'],
1800  status_blob="cw_master_select",
1801  **kwargs
1802  )
1803 
1804  blob_name_checksum = blob_name + "_checksum"
1805  comparison_net.SumSqrElements(
1806  [blob_name], [blob_name_checksum], average=False
1807  )
1808 
1809  blob_name_gather = blob_name + "_gather"
1810  comparison_net.Mul(
1811  inputs=["compare_arr", blob_name_checksum],
1812  outputs=blob_name_gather,
1813  broadcast=1
1814  )
1815 
1816  comparison_net.Allreduce(
1817  inputs=[comm_world, blob_name_gather],
1818  outputs=[blob_name_gather],
1819  engine=rendezvous['engine'],
1820  status_blob="all_reduce_master_select_status",
1821  )
1822 
1823  workspace.RunNetOnce(comparison_net)
1824  gather_arr = workspace.FetchBlob(blob_name_gather)
1825 
1826  baseline = gather_arr[0]
1827  for i in range(rendezvous['num_shards']):
1828  assert gather_arr[i] == baseline, \
1829  "allcompare failed on shard {}.".format(rendezvous['shard_id'])
1830 
1831  return True
1832 
1833 
1834 def _InterleaveOps(model):
1835  '''
1836  Data Parallel Model creates a net with ops in one device grouped together.
1837  This will interleave the ops so that each op for each device is next
1838  to each other in the net. Kind of like combining decks of cards. This
1839  ensures that progress is made along the critical path roughly concurrently
1840  for each device, which is important due to the extra intra-node
1841  synchronization required for multi-device batch normalization.
1842  '''
1843  orig_ops = list(model.net.Proto().op)
1844  num_devices = len(model._devices)
1845  num_ops_per_dev = len(orig_ops) // num_devices
1846  assert num_devices * num_ops_per_dev == len(orig_ops), \
1847  'Number of ops per device in original net is not uniform'
1848  new_ops = []
1849  ops = {d: [] for d in range(num_devices)}
1850  for op in orig_ops:
1851  ops[op.device_option.cuda_gpu_id].append(op)
1852 
1853  for j in range(num_ops_per_dev):
1854  tp = None
1855  for d in model._devices:
1856  if tp is None:
1857  tp = ops[d][j].type
1858  new_ops.append(ops[d][j])
1859  # Sanity
1860  assert ops[d][j].type == tp, \
1861  "Type mismatch {} / {}".format(tp, ops[d][j].type)
1862 
1863  del model.net.Proto().op[:]
1864  model.net.Proto().op.extend(new_ops)
1865 
1866 
1867 def _InterDeviceBatchNormalization(model):
1868  orig_ops = list(model.net.Proto().op)
1869  new_ops = []
1870  num_devices = len(model._devices)
1871  batch_norm_ops = []
1872  injected_ops = []
1873 
1874  spatial_bn_phase = False
1875  sums_blobs = []
1876  sumsq_blobs = []
1877  name = []
1878  input_blob_name = None
1879 
1880  spatial_bn_gradient_phase = False
1881  scale_grad_blobs = []
1882  bias_grad_blobs = []
1883 
1884  for op in orig_ops:
1885  if op.type != 'SpatialBN' and op.type != 'SpatialBNGradient':
1886  if spatial_bn_phase:
1887  new_ops.extend(injected_ops)
1888  new_ops.append(
1889  core.CreateOperator("Sum",
1890  sums_blobs,
1891  input_blob_name + "_sums_combined"))
1892  new_ops.append(
1893  core.CreateOperator("Sum",
1894  sumsq_blobs,
1895  input_blob_name + "_sumsq_combined"))
1896  new_ops.extend(batch_norm_ops)
1897  injected_ops = []
1898  batch_norm_ops = []
1899  sums_blobs = []
1900  sumsq_blobs = []
1901  spatial_bn_phase = False
1902  input_blob_name = None
1903  elif spatial_bn_gradient_phase:
1904  new_ops.extend(injected_ops)
1905  scale_blob = \
1906  "cpu_0/" + stripBlobName(scale_grad_blobs[0]) + "_combined"
1907  bias_blob = \
1908  "cpu_0/" + stripBlobName(bias_grad_blobs[0]) + "_combined"
1909  new_ops.append(
1910  core.CreateOperator("Sum", scale_grad_blobs, scale_blob))
1911  new_ops.append(
1912  core.CreateOperator("Sum", bias_grad_blobs, bias_blob))
1913  for blob in scale_grad_blobs:
1914  new_ops.append(
1915  core.CreateOperator("Copy", scale_blob, blob))
1916  for blob in bias_grad_blobs:
1917  new_ops.append(core.CreateOperator("Copy", bias_blob, blob))
1918  new_ops.extend(batch_norm_ops)
1919  injected_ops = []
1920  batch_norm_ops = []
1921  scale_grad_blobs = []
1922  bias_grad_blobs = []
1923  spatial_bn_gradient_phase = False
1924  new_ops.append(op)
1925  elif op.type == 'SpatialBN':
1926  spatial_bn_phase = True
1927  if input_blob_name is None:
1928  input_blob_name = op.input[0]
1929  name = op.input[0]
1930  injected_ops.append(
1931  core.CreateOperator(
1932  "ChannelStats",
1933  name,
1934  [name + "_sums", name + "_sumsq"]))
1935  sums_blobs.append(name + "_sums")
1936  sumsq_blobs.append(name + "_sumsq")
1937  op.input.append(input_blob_name + "_sums_combined")
1938  op.input.append(input_blob_name + "_sumsq_combined")
1939  op.arg.extend([utils.MakeArgument("num_batches", num_devices)])
1940  batch_norm_ops.append(op)
1941  elif op.type == 'SpatialBNGradient':
1942  spatial_bn_gradient_phase = True
1943  injected_ops.append(
1944  core.CreateOperator("ChannelBackpropStats",
1945  [op.input[0], op.input[3], op.input[4],
1946  op.input[2]],
1947  [op.output[1], op.output[2]]))
1948  scale_grad_blobs.append(op.output[1])
1949  bias_grad_blobs.append(op.output[2])
1950  op.arg.extend([utils.MakeArgument("num_batches", num_devices)])
1951  op.input.extend([op.output[1], op.output[2]])
1952  batch_norm_ops.append(op)
1953 
1954  assert not spatial_bn_phase, \
1955  "Net modification for inter-device batch normalization failed"
1956  del model.net.Proto().op[:]
1957  model.net.Proto().op.extend(new_ops)
Module caffe2.python.layers.split.