Caffe2 - Python API
A deep learning, cross platform ML framework
distributed.py
1 import copy
2 
3 import torch
4 
5 from torch.cuda.comm import broadcast_coalesced
6 from torch.cuda import nccl
7 import torch.distributed as dist
8 
9 if dist.is_available():
10  from torch.distributed.distributed_c10d import _get_default_group
11 
12 from ..modules import Module
13 from .replicate import replicate
14 from .scatter_gather import scatter_kwargs, gather
15 from .parallel_apply import parallel_apply
16 from torch.cuda._utils import _get_device_index
17 
18 
20  r"""Implements distributed data parallelism that is based on
21  ``torch.distributed`` package at the module level.
22 
23  This container parallelizes the application of the given module by
24  splitting the input across the specified devices by chunking in the batch
25  dimension. The module is replicated on each machine and each device, and
26  each such replica handles a portion of the input. During the backwards
27  pass, gradients from each node are averaged.
28 
29  The batch size should be larger than the number of GPUs used locally.
30 
31  See also: :ref:`distributed-basics` and :ref:`cuda-nn-dataparallel-instead`.
32  The same constraints on input as in :class:`torch.nn.DataParallel` apply.
33 
34  Creation of this class requires that ``torch.distributed`` to be already
35  initialized, by calling :func:`torch.distributed.init_process_group`.
36 
37  ``DistributedDataParallel`` can be used in the following two ways:
38 
39  (1) Single-Process Multi-GPU
40 
41  In this case, a single process will be
42  spawned on each host/node and each process will operate on all the GPUs
43  of the node where it's running. To use ``DistributedDataParallel`` in
44  this way, you can simply construct the model as the following:
45 
46  >>> torch.distributed.init_process_group(backend="nccl")
47  >>> model = DistributedDataParallel(model) # device_ids will include all GPU devices by default
48 
49  (2) Multi-Process Single-GPU
50 
51  This is the highly recommended way to use ``DistributedDataParallel``, with
52  multiple processes, each of which operates on a single GPU. This is
53  currently the fastest approach to do data parallel training using PyTorch
54  and applies to both single-node(multi-GPU) and multi-node data
55  parallel training. It is proven to be significantly faster than
56  :class:`torch.nn.DataParallel` for single-node multi-GPU data
57  parallel training.
58 
59  Here is how to use it: on each host with N GPUs, you should spawn up N
60  processes, while ensuring that each process individually works on a single GPU
61  from 0 to N-1. Therefore, it is your job to ensure that your training script
62  operates on a single given GPU by calling:
63 
64  >>> torch.cuda.set_device(i)
65 
66  where i is from 0 to N-1. In each process, you should refer the following
67  to construct this module:
68 
69  >>> torch.distributed.init_process_group(backend='nccl', world_size=4, init_method='...')
70  >>> model = DistributedDataParallel(model, device_ids=[i], output_device=i)
71 
72  In order to spawn up multiple processes per node, you can use either
73  ``torch.distributed.launch`` or ``torch.multiprocessing.spawn``
74 
75  .. note:: ``nccl`` backend is currently the fastest and
76  highly recommended backend to be used with Multi-Process Single-GPU
77  distributed training and this applies to both single-node and multi-node
78  distributed training
79 
80  .. note:: This module also supports mixed-precision distributed training.
81  This means that your model can have different types of parameters such
82  as mixed types of fp16 and fp32, the gradient reduction on these
83  mixed types of parameters will just work fine.
84  Also note that ``nccl`` backend is currently the fastest and highly
85  recommended backend for fp16/fp32 mixed-precision training.
86 
87  .. warning::
88  This module works only with the ``gloo`` and ``nccl`` backends.
89 
90  .. warning::
91  Constructor, forward method, and differentiation of the output (or a
92  function of the output of this module) is a distributed synchronization
93  point. Take that into account in case different processes might be
94  executing different code.
95 
96  .. warning::
97  This module assumes all parameters are registered in the model by the
98  time it is created. No parameters should be added nor removed later.
99  Same applies to buffers.
100 
101  .. warning::
102  This module assumes all parameters are registered in the model of each
103  distributed processes are in the same order. The module itself will
104  conduct gradient all-reduction following the reverse order of the
105  registered parameters of the model. In other words, it is users'
106  responsibility to ensure that each distributed process has the exact
107  same model and thus the exact same parameter registration order.
108 
109  .. warning::
110  This module assumes all buffers and gradients are dense.
111 
112  .. warning::
113  This module doesn't work with :func:`torch.autograd.grad` (i.e. it will
114  only work if gradients are to be accumulated in ``.grad`` attributes of
115  parameters).
116 
117  .. warning::
118 
119  If you plan on using this module with a ``nccl`` backend or a ``gloo``
120  backend (that uses Infiniband), together with a DataLoader that uses
121  multiple workers, please change the multiprocessing start method to
122  ``forkserver`` (Python 3 only) or ``spawn``. Unfortunately
123  Gloo (that uses Infiniband) and NCCL2 are not fork safe, and you will
124  likely experience deadlocks if you don't change this setting.
125 
126  .. warning::
127  Forward and backward hooks defined on :attr:`module` and its submodules
128  won't be invoked anymore, unless the hooks are initialized in the
129  :meth:`forward` method.
130 
131  .. warning::
132  You should never try to change your model's parameters after wrapping
133  up your model with DistributedDataParallel. In other words, when
134  wrapping up your model with DistributedDataParallel, the constructor of
135  DistributedDataParallel will register the additional gradient
136  reduction functions on all the parameters of the model itself at the
137  time of construction. If you change the model's parameters after
138  the DistributedDataParallel construction, this is not supported and
139  unexpected behaviors can happen, since some parameters' gradient
140  reduction functions might not get called.
141 
142  .. note::
143  Parameters are never broadcast between processes. The module performs
144  an all-reduce step on gradients and assumes that they will be modified
145  by the optimizer in all processes in the same way. Buffers
146  (e.g. BatchNorm stats) are broadcast from the module in process of rank
147  0, to all other replicas in the system in every iteration.
148 
149  Args:
150  module (Module): module to be parallelized
151  device_ids (list of int or torch.device): CUDA devices (default: all devices)
152  output_device (int or torch.device): device location of output (default: device_ids[0])
153  broadcast_buffers (bool): flag that enables syncing (broadcasting) buffers of
154  the module at beginning of the forward function.
155  (default: ``True``)
156  process_group: the process group to be used for distributed data
157  all-reduction. If ``None``, the default process group, which
158  is created by ```torch.distributed.init_process_group```,
159  will be used. (default: ``None``)
160  bucket_cap_mb: DistributedDataParallel will bucket parameters into
161  multiple buckets so that gradient reduction of each
162  bucket can potentially overlap with backward computation.
163  :attr:`bucket_cap_mb` controls the bucket size in MegaBytes (MB)
164  (default: 25)
165  check_reduction: when setting to ``True``, it enables DistributedDataParallel
166  to automatically check if the previous iteration's
167  backward reductions were successfully issued at the
168  beginning of every iteration's forward function.
169  You normally don't need this option enabled unless you
170  are observing weird behaviors such as different ranks
171  are getting different gradients, which should not
172  happen if DistributedDataParallel is correctly used.
173  (default: ``False``)
174 
175  Attributes:
176  module (Module): the module to be parallelized
177 
178  Example::
179 
180  >>> torch.distributed.init_process_group(backend='nccl', world_size=4, init_method='...')
181  >>> net = torch.nn.DistributedDataParallel(model, pg)
182  """
183  def __init__(self, module, device_ids=None,
184  output_device=None, dim=0, broadcast_buffers=True,
185  process_group=None, bucket_cap_mb=25,
186  check_reduction=False):
187 
188  super(DistributedDataParallel, self).__init__()
189 
190  # Use all devices by default
191  if device_ids is None:
192  device_ids = list(range(torch.cuda.device_count()))
193 
194  if output_device is None:
195  output_device = device_ids[0]
196 
197  if process_group is None:
198  self.process_group = _get_default_group()
199  else:
200  self.process_group = process_group
201 
202  self.dim = dim
203  self.module = module
204  self.device_ids = list(map(lambda x: _get_device_index(x, True), device_ids))
205  self.output_device = _get_device_index(output_device, True)
206  self.broadcast_buffers = broadcast_buffers
207  self.check_reduction = check_reduction
208 
209  MB = 1024 * 1024
210 
211  # used for intra-node param sync and inter-node sync as well
212  self.broadcast_bucket_size = 250 * MB
213 
214  # reduction bucket size
215  self.bucket_bytes_cap = bucket_cap_mb * MB
216 
217  # Sync params and buffers
218  module_states = list(self.module.state_dict().values())
219  if len(module_states) > 0:
220  self._dist_broadcast_coalesced(module_states,
222 
223  self._ddp_init_helper()
224 
225  def _ddp_init_helper(self):
226  """
227  Initialization helper function that does the following:
228 
229  (1) replicating the module from device[0] to the other devices
230  (2) bucketing the parameters for reductions
231  (3) resetting the bucketing states
232  (4) registering the grad hooks
233  (5) passing a handle of DDP to SyncBatchNorm Layer
234  """
235  if len(self.device_ids) > 1:
236  # TODO: we don't need to replicate params in here. they're always going to
237  # be broadcasted using larger blocks in broadcast_coalesced, so it might be
238  # better to not pollute the caches with these small blocks
239  self._module_copies = replicate(self.module, self.device_ids, detach=True)
240  self._module_copies[0] = self.module
241 
242  for module_copy in self._module_copies[1:]:
243  for param, copy_param in zip(self.module.parameters(), module_copy.parameters()):
244  copy_param.requires_grad = param.requires_grad
245 
246  else:
247  self._module_copies = [self.module]
248 
249  self.modules_params_data = [[] for _ in range(len(self.device_ids))]
250  self.modules_buffers_data = [[] for _ in range(len(self.device_ids))]
251 
252  for dev_idx, module in enumerate(self._module_copies):
253  self.modules_params_data[dev_idx] = [p.data for p in module.parameters()]
254  self.modules_buffers_data[dev_idx] = [b.data for b in module.buffers()]
255 
256  # This is a triply-nested list where the "dimensions" are: devices, buckets, bucket_elems
257  param_buckets = []
258 
259  # Split the parameters into buckets and by types as well
260  # We only need to bucket and reduce parameters that require grad and
261  # this is also true for backward since only the backward hooks for
262  # parameters that require grad will be registered with gradient
263  # reduction functions
264  params_to_bucket = [[] for _ in self._module_copies]
265  for dev_idx, m in enumerate(self._module_copies):
266  for p in m.parameters():
267  if p.requires_grad:
268  params_to_bucket[dev_idx].append(p)
269 
270  param_buckets = [dist._dist_bucket_tensors(dev_params_to_bucket,
271  int(self.bucket_bytes_cap),
272  fine_grained=False)
273  for dev_params_to_bucket in params_to_bucket]
274 
275  self.bucket_sizes = []
276  self.bucket_map = {}
277 
278  # We transpose param_buckets, so the loop is over buckets.
279  # param_buckets_tuple is a doubly-nested list with "dims": devices, bucket_elems
280  for bucket_idx, param_buckets_tuple in enumerate(zip(*param_buckets)):
281  self.bucket_sizes.append(0)
282  # Now, we transpose again, so we iterate over bucket_elems, but getting tuples
283  # of params from each device.
284  for param_tuple in zip(*param_buckets_tuple):
285  if not param_tuple[0].requires_grad:
286  continue
287  for p in param_tuple:
288  self.bucket_map[p] = (bucket_idx, self.bucket_sizes[bucket_idx])
289  self.bucket_sizes[bucket_idx] += 1
290 
291  self.buckets = [[[None for _ in range(self.bucket_sizes[i])]
292  for _ in range(len(self.device_ids))] for i in range(len(self.bucket_sizes))]
293  # The number of params ready in each bucket
294  self.buckets_ready_size = [[0 for _ in range(len(self.device_ids))] for i in range(len(self.bucket_sizes))]
295 
296  # coalesced bucket for only device 0
297  self.buckets_coalesced = [[] for _ in range(len(self.bucket_sizes))]
298  # We will always reduce the bucket following the reverse order
299  # that is, alway reduces following the order of: n - 1, n - 2, ..., 0
300  self.next_bucket = len(self.bucket_sizes) - 1
301  # When all buckets are reduced, this will be set to True. This flag is
302  # useful for sanity checks to ensure that each iteration's backward has
303  # always reduced all buckets
304  self.all_buckets_reduced = False
305  self.check_previous_reduction = False
306  self.ready_buckets_not_reduced = set()
307  self.reduction_works = [None for _ in range(len(self.bucket_sizes))]
308  self.devs_ready = [0 for _ in range(len(self.bucket_sizes))]
309  self._register_grad_hooks()
310 
311  # passing a handle to torch.nn.SyncBatchNorm layer
313 
314  def __getstate__(self):
315  self._check_default_group()
316  attrs = copy.copy(self.__dict__)
317  del attrs['process_group'], \
318  attrs['default_streams'], \
319  attrs['_grad_accs']
320  return attrs
321 
322  def __setstate__(self, state):
323  # If serializable, then the process group should be the default one
324  self.process_group = _get_default_group()
325  self.check_previous_reduction = False
326  super(DistributedDataParallel, self).__setstate__(state)
327  self._ddp_init_helper()
328 
329  def _check_default_group(self):
330  pickle_not_supported = False
331  try:
332  if self.process_group != _get_default_group():
333  pickle_not_supported = True
334  except RuntimeError:
335  pickle_not_supported = True
336 
337  if pickle_not_supported:
338  raise RuntimeError("DDP Pickling/Unpickling are only supported "
339  "when using DDP with the default process "
340  "group. That is, when you have called "
341  "init_process_group and have not passed "
342  "process_group argument to DDP constructor")
343 
344  def _check_previous_reduction(self):
345  if not self.training:
346  return
347  # self.check_previous_reduction will be False in the first iteration
348  # and is then toggled to True for all future iterations.
349  if self.check_previous_reduction is False:
350  self.check_previous_reduction = True
351  else:
352  if not self.all_buckets_reduced:
353  raise RuntimeError("Not all gradients have been reduced from "
354  "the backward of the previous iteration. "
355  "This is an unexpected and fatal error. "
356  "Please check and ensure that the model's "
357  "parameters are not changed after you wrap "
358  "up the model with DistributedDataParallel.")
359  self.all_buckets_reduced = False
360 
361  def forward(self, *inputs, **kwargs):
362  if self.check_reduction:
364  inputs, kwargs = self.scatter(inputs, kwargs, self.device_ids)
365  self._sync_params()
366  if len(self.device_ids) == 1:
367  return self.module(*inputs[0], **kwargs[0])
368  outputs = self.parallel_apply(self._module_copies[:len(inputs)], inputs, kwargs)
369  return self.gather(outputs, self.output_device)
370 
371  def scatter(self, inputs, kwargs, device_ids):
372  return scatter_kwargs(inputs, kwargs, device_ids, dim=self.dim)
373 
374  def parallel_apply(self, replicas, inputs, kwargs):
375  return parallel_apply(replicas, inputs, kwargs, self.device_ids[:len(replicas)])
376 
377  def gather(self, outputs, output_device):
378  return gather(outputs, output_device, dim=self.dim)
379 
380  def train(self, mode=True):
381  self.check_previous_reduction = False
382  super(DistributedDataParallel, self).train(mode)
383  for module in self._module_copies[1:]:
384  module.train(mode)
385 
386  def _dist_broadcast_coalesced(self, tensors, buffer_size):
387  dist._dist_broadcast_coalesced(self.process_group, tensors, buffer_size, False)
388 
389  def _sync_params(self):
390  if len(self.device_ids) > 1:
391  # intra-node parameter sync
392  result = broadcast_coalesced(self.modules_params_data[0],
393  self.device_ids,
395  for tensors, module_params_data in zip(result[1:], self.modules_params_data[1:]):
396  for tensor, param_data in zip(tensors, module_params_data):
397  param_data.set_(tensor)
398 
399  # module buffer sync
400  if self.broadcast_buffers:
401  if len(self.modules_buffers_data[0]) > 0:
402  # cross-node buffer sync
405  if len(self.device_ids) > 1:
406  # intra-node buffer sync
407  result = broadcast_coalesced(self.modules_buffers_data[0],
408  self.device_ids,
410  for tensors, module_buffers_data in zip(result[1:], self.modules_buffers_data[1:]):
411  for tensor, buffer_data in zip(tensors, module_buffers_data):
412  buffer_data.set_(tensor)
413 
414  def _passing_sync_batchnorm_handle(self, module_copies):
415  for dev_idx, module in enumerate(module_copies):
416  for layer in module.modules():
417  if isinstance(layer, torch.nn.modules.SyncBatchNorm):
418  layer._specify_ddp_gpu_num(len(self.device_ids))
419 
420  def _register_grad_hooks(self):
421  self._grad_accs = [] # need to keep them in scope
422 
423  # default stream tracking to launch nccl reduce kernels
424  self.default_streams = []
425  for dev_id in self.device_ids:
426  with torch.cuda.device(dev_id):
427  self.default_streams.append(torch.cuda.current_stream())
428 
429  for device_idx, module in enumerate(self._module_copies):
430  for p in module.parameters():
431  if p.requires_grad:
432  p_tmp = p.expand_as(p)
433  grad_acc = p_tmp.grad_fn.next_functions[0][0]
434  grad_acc.register_hook(self._make_param_hook(p, device_idx))
435  self._grad_accs.append(grad_acc)
436 
437  def _make_param_hook(self, param, device_idx):
438  bucket_idx, bucket_offset = self.bucket_map[param]
439 
440  def distributed_data_parallel_hook(*unused):
441  if param.grad.requires_grad:
442  raise RuntimeError("DistributedDataParallel only works "
443  "with gradients that don't require grad")
444  bucket = self.buckets[bucket_idx][device_idx]
445  bucket[bucket_offset] = param.grad.data
446  self.buckets_ready_size[bucket_idx][device_idx] += 1
447 
448  # We can flush these and save memory for replicas
449  if device_idx > 0:
450  param.grad = None
451  param.data.set_()
452 
453  # Current device's bucket is full
454  if self.buckets_ready_size[bucket_idx][device_idx] == self.bucket_sizes[bucket_idx]:
455  self.devs_ready[bucket_idx] += 1
456  if self.devs_ready[bucket_idx] < len(self.device_ids):
457  return
458 
459  # Now all devices's buckets with index: bucket_idx are ready
460  if bucket_idx == self.next_bucket:
461  self._queue_reduction(bucket_idx)
462  self.next_bucket -= 1
463  # Now reduce anything that is ready but not yet reduced
464  if len(self.ready_buckets_not_reduced) > 0:
465  sorted_todo = sorted(self.ready_buckets_not_reduced, reverse=True)
466  for i in sorted_todo:
467  # Nothing can be reduced now
468  if i < self.next_bucket:
469  break
470  self._queue_reduction(i)
471  self.ready_buckets_not_reduced.remove(i)
472  if i == self.next_bucket:
473  self.next_bucket -= 1
474  else:
475  self.ready_buckets_not_reduced.add(bucket_idx)
476 
477  # When all devices' buckets
478  if self.next_bucket == -1:
479  # A final sync for all the reduction works
480  self._sync_reduction_works()
481  self.all_buckets_reduced = True
482 
483  return distributed_data_parallel_hook
484 
485  def _queue_reduction(self, bucket_idx):
486  # _queue_reduction will use a seperate CUDA stream to coalesce
487  # the small tensors to achieve more parallelisms, before passing the
488  # coalesced tensor into the c10d CUDA stream for reduction
489  result = dist._queue_reduction(self.process_group,
490  self.buckets[bucket_idx],
491  self.device_ids)
492  self.reduction_works[bucket_idx] = result[0]
493  self.buckets_coalesced[bucket_idx] = result[1]
494 
495  def _sync_reduction_works(self):
496  # Now only work on the first GPU of self.device_ids
497  # _sync_reduction will use a seperate CUDA stream to uncoalesce
498  # the coalesced tensors to achieve more parallelisms
499  for bucket_idx, grads_batch in enumerate(self.buckets):
500  dist._sync_reduction(self.reduction_works[bucket_idx],
501  grads_batch[0],
502  self.buckets_coalesced[bucket_idx])
503 
504  # Reset the module states
505  self.next_bucket = len(self.bucket_sizes) - 1
506  self.ready_buckets_not_reduced = set()
507  self.reduction_works = [None for _ in range(len(self.bucket_sizes))]
508  self.devs_ready = [0 for _ in range(len(self.bucket_sizes))]
509 
510  self.buckets = [[[None for _ in range(self.bucket_sizes[i])]
511  for _ in range(len(self.device_ids))] for i in range(len(self.bucket_sizes))]
512  self.buckets_coalesced = [[] for _ in range(len(self.bucket_sizes))]
513  self.buckets_ready_size = [[0 for _ in range(len(self.device_ids))] for i in range(len(self.bucket_sizes))]
def _dist_broadcast_coalesced(self, tensors, buffer_size)
Definition: distributed.py:386
def scatter(self, inputs, kwargs, device_ids)
Definition: distributed.py:371
def device_count()
Definition: __init__.py:341
def gather(self, outputs, output_device)
Definition: distributed.py:377
Module caffe2.python.helpers.train.
def parallel_apply(self, replicas, inputs, kwargs)
Definition: distributed.py:374
def current_stream(device=None)
Definition: __init__.py:361