Caffe2 - Python API
A deep learning, cross platform ML framework
distributed.py
1 import sys
2 import math
3 import threading
4 import copy
5 
6 import torch
7 from torch.autograd import Variable
8 from torch._utils import _flatten_dense_tensors, _unflatten_dense_tensors, \
9  _take_tensors
10 import torch.utils.hooks
11 
12 from torch.cuda.comm import broadcast_coalesced
13 from torch.cuda import nccl
14 import torch.distributed.deprecated as dist
15 
16 from ...modules import Module
17 from ..replicate import replicate
18 from ..scatter_gather import scatter_kwargs, gather
19 from ..parallel_apply import parallel_apply
20 
21 if sys.version_info[0] == 3:
22  import queue
23 else:
24  import Queue as queue
25 
26 
28  r"""Implements distributed data parallelism at the module level.
29 
30  This container parallelizes the application of the given module by
31  splitting the input across the specified devices by chunking in the batch
32  dimension. The module is replicated on each machine and each device, and
33  each such replica handles a portion of the input. During the backwards
34  pass, gradients from each node are averaged.
35 
36  The batch size should be larger than the number of GPUs used locally. It
37  should also be an integer multiple of the number of GPUs so that each chunk
38  is the same size (so that each GPU processes the same number of samples).
39 
40  See also: :ref:`distributed-basics` and :ref:`cuda-nn-dataparallel-instead`.
41  The same constraints on input as in :class:`torch.nn.DataParallel` apply.
42 
43  Creation of this class requires the distributed package to be already
44  initialized in the process group mode
45  (see :func:`torch.distributed.deprecated.init_process_group`).
46 
47  .. warning::
48  This module works only with the ``nccl`` and ``gloo`` backends.
49 
50  .. warning::
51  Constructor, forward method, and differentiation of the output (or a
52  function of the output of this module) is a distributed synchronization
53  point. Take that into account in case different processes might be
54  executing different code.
55 
56  .. warning::
57  This module assumes all parameters are registered in the model by the
58  time it is created. No parameters should be added nor removed later.
59  Same applies to buffers.
60 
61  .. warning::
62  This module assumes all buffers and gradients are dense.
63 
64  .. warning::
65  This module doesn't work with :func:`torch.autograd.grad` (i.e. it will
66  only work if gradients are to be accumulated in ``.grad`` attributes of
67  parameters).
68 
69  .. warning::
70  If you plan on using this module with a ``nccl`` backend or a ``gloo``
71  backend (that uses Infiniband), together with a DataLoader that uses
72  multiple workers, please change the multiprocessing start method to
73  ``forkserver`` (Python 3 only) or ``spawn``. Unfortunately
74  Gloo (that uses Infiniband) and NCCL2 are not fork safe, and you will
75  likely experience deadlocks if you don't change this setting.
76 
77  .. note::
78  Parameters are never broadcast between processes. The module performs
79  an all-reduce step on gradients and assumes that they will be modified
80  by the optimizer in all processes in the same way. Buffers
81  (e.g. BatchNorm stats) are broadcast from the module in process of rank
82  0, to all other replicas in the system in every iteration.
83 
84  .. warning::
85  Forward and backward hooks defined on :attr:`module` and its submodules
86  won't be invoked anymore, unless the hooks are initialized in the
87  :meth:`forward` method.
88 
89  Args:
90  module: module to be parallelized
91  device_ids: CUDA devices (default: all devices)
92  output_device: device location of output (default: device_ids[0])
93  broadcast_buffers: flag that enables syncing (broadcasting) buffers of
94  the module at beginning of the forward function.
95  (default: True)
96 
97  Attributes:
98  module (Module): the module to be parallelized
99 
100  Example::
101 
102  >>> torch.distributed.deprecated.init_process_group(world_size=4, init_method='...')
103  >>> net = torch.nn.DistributedDataParallel(model)
104  """
105 
106  def __init__(self, module, device_ids=None, output_device=None, dim=0,
107  broadcast_buffers=True):
108  super(DistributedDataParallel, self).__init__()
109  if dist._backend not in (dist.dist_backend.NCCL, dist.dist_backend.GLOO):
110  raise ValueError('Invalid backend, only NCCL and GLOO backends are supported by DistributedDataParallel')
111 
112  if device_ids is None:
113  device_ids = list(range(torch.cuda.device_count()))
114  if output_device is None:
115  output_device = device_ids[0]
116  self.dim = dim
117  self.module = module
118  self.device_ids = device_ids
119  self.output_device = output_device
120  self.broadcast_buffers = broadcast_buffers
121 
122  # Flag used by the NCCL backend to make sure we only reduce gradients
123  # one time in the execution engine
124  self.need_reduction = False
125 
126  MB = 1024 * 1024
127  # used for intra-node param sync and inter-node sync as well
128  self.broadcast_bucket_size = 10 * MB
129  self.nccl_reduce_bucket_size = 256 * MB
130 
131  # Sync params and buffers
132  module_states = list(self.module.state_dict().values())
133  if len(module_states) > 0:
134  self._dist_broadcast_coalesced(module_states,
136 
137  if len(device_ids) > 1:
138  # TODO: we don't need to replicate params in here. they're always going to
139  # be broadcasted using larger blocks in broadcast_coalesced, so it might be
140  # better to not pollute the caches with these small blocks
141  self._module_copies = replicate(self.module, self.device_ids, detach=True)
142  self._module_copies[0] = self.module
143 
144  for module_copy in self._module_copies[1:]:
145  for param, copy_param in zip(self.module.parameters(), module_copy.parameters()):
146  copy_param.requires_grad = param.requires_grad
147 
148  else:
149  self._module_copies = [self.module]
150 
151  # For NCCL backend, since every single NCCL call is asynchoronous, we
152  # therefore directly enqueue all the NCCL reduction calls to the
153  # default CUDA stream without spawning up other reduction threads.
154  # This achieves the best performance.
155  if dist._backend == dist.dist_backend.NCCL:
157  return
158 
159  bucket_bytes_cap = 1 * MB
160 
161  # This is a triply-nested list where the "dimensions" are: devices, buckets, bucket_elems
162  param_buckets = []
163  # Split the parameters into buckets and by types as well
164  for dev_idx, module in enumerate(self._module_copies):
165  param_buckets.append(list(_take_tensors(module.parameters(), bucket_bytes_cap)))
166 
167  self.bucket_sizes = []
168  self.bucket_map = {}
169 
170  # We transpose param_buckets, so the loop is over buckets.
171  # param_buckets_tuple is a doubly-nested list with "dims": devices, bucket_elems
172  for bucket_idx, param_buckets_tuple in enumerate(zip(*param_buckets)):
173  self.bucket_sizes.append(0)
174  # Now, we transpose again, so we iterate over bucket_elems, but getting tuples
175  # of params from each device.
176  for idx, param_tuple in enumerate(zip(*param_buckets_tuple)):
177  if idx == 0:
178  # Bucket parameter type tracking
179  bucket_param_type = param_tuple[0].type()
180  # Only gloo and nccl support half-precision
181  if bucket_param_type == torch.cuda.HalfTensor and \
182  dist._backend != dist.dist_backend.GLOO:
183  raise RuntimeError("DistributedDataParallel currently only "
184  "supports half precision parameters "
185  "with Nccl and Gloo backend")
186  if not param_tuple[0].requires_grad:
187  continue
188  for p in param_tuple:
189  self.bucket_map[p] = bucket_idx
190  self.bucket_sizes[bucket_idx] += 1
191 
192  self.buckets = [[[] for _ in range(len(self.device_ids))] for _ in range(len(self.bucket_sizes))]
193  self.bucket_events = [[None] * len(self.device_ids) for _ in range(len(self.bucket_sizes))]
194  self.reduced = [False] * len(self.bucket_sizes)
195 
196  self._register_grad_hooks()
197  self.dispatch_lock = threading.Lock()
199 
200  def __getstate__(self):
201  attrs = copy.copy(self.__dict__)
202  if dist._backend != dist.dist_backend.NCCL:
203  del attrs['_grad_accs'], attrs['_reduction_queues'], \
204  attrs['_reduction_streams'], attrs['_reduction_threads'], \
205  attrs['_nccl_streams'], attrs['_default_streams'], \
206  attrs['dispatch_lock']
207  return attrs
208 
209  def __setstate__(self, state):
210  super(DistributedDataParallel, self).__setstate__(state)
211  if dist._backend == dist.dist_backend.NCCL:
213  else:
214  self._register_grad_hooks()
215  self.dispatch_lock = threading.Lock()
217 
218  def forward(self, *inputs, **kwargs):
219  self.need_reduction = True
220  inputs, kwargs = self.scatter(inputs, kwargs, self.device_ids)
221  self._sync_params()
222  if len(self.device_ids) == 1:
223  return self.module(*inputs[0], **kwargs[0])
224  outputs = self.parallel_apply(self._module_copies[:len(inputs)], inputs, kwargs)
225  return self.gather(outputs, self.output_device)
226 
227  def scatter(self, inputs, kwargs, device_ids):
228  return scatter_kwargs(inputs, kwargs, device_ids, dim=self.dim)
229 
230  def parallel_apply(self, replicas, inputs, kwargs):
231  return parallel_apply(replicas, inputs, kwargs, self.device_ids[:len(replicas)])
232 
233  def gather(self, outputs, output_device):
234  return gather(outputs, output_device, dim=self.dim)
235 
236  def train(self, mode=True):
237  super(DistributedDataParallel, self).train(mode)
238  for module in self._module_copies[1:]:
239  module.train(mode)
240 
241  def _dist_broadcast_coalesced(self, tensors, buffer_size):
242  """
243  Broadcast a sequence of tensors to the default group from rank 0.
244  Small tensors are first coalesced into a buffer to reduce the number of
245  broadcasts.
246 
247  tensors (sequence): tensors to broadcast. Each tensor needs to be on the
248  same GPU.
249  buffer_size (int): maximum size of the buffer for coalescing
250  """
251  for tensors in _take_tensors(tensors, buffer_size):
252  flat_tensors = _flatten_dense_tensors(tensors)
253  dist.broadcast(flat_tensors, 0)
254  for tensor, synced in zip(tensors,
255  _unflatten_dense_tensors(flat_tensors, tensors)):
256  tensor.copy_(synced)
257 
258  def _sync_params(self):
259  if len(self.device_ids) > 1:
260  # intra-node parameter sync
261  params = [p.data for p in self.module.parameters()]
262  result = broadcast_coalesced(params, self.device_ids, self.broadcast_bucket_size)
263  for tensors, module in zip(result[1:], self._module_copies[1:]):
264  for tensor, param in zip(tensors, module.parameters()):
265  param.data.set_(tensor)
266 
267  # module buffer sync
268  if self.broadcast_buffers:
269  buffers = [b.data for b in self.module.buffers()]
270  if len(buffers) > 0:
271  # cross-node buffer sync
273 
274  if len(self.device_ids) > 1:
275  # intra-node buffer sync
276  result = broadcast_coalesced(buffers, self.device_ids, self.broadcast_bucket_size)
277  for tensors, module in zip(result[1:], self._module_copies[1:]):
278  for tensor, buf in zip(tensors, module.buffers()):
279  buf.data.set_(tensor)
280 
281  def _register_grad_hooks(self):
282  self._grad_accs = [] # need to keep them in scope
283  for device_idx, module in enumerate(self._module_copies):
284  for p in module.parameters():
285  if p.requires_grad:
286  p_tmp = p.expand_as(p)
287  grad_acc = p_tmp.grad_fn.next_functions[0][0]
288  grad_acc.register_hook(self._make_param_hook(p, device_idx))
289  self._grad_accs.append(grad_acc)
290 
291  def _register_nccl_grad_hook(self):
292  """
293  This function registers the callback all-reduction function for the
294  NCCL backend. All gradients will be all reduced in one single step.
295  The NCCL reduction will directly be enqueued into the
296  default CUDA stream. Therefore, no synchronization is needed.
297  """
298  # Creating a new group
299  self.nccl_reduction_group_id = dist.new_group()
300 
301  def reduction_fn_nccl():
302  # This function only needs to be called once
303  if not self.need_reduction:
304  return
305 
306  self.need_reduction = False
307  all_grads = [[] for _ in range(len(self._module_copies))]
308  all_grads_buckets_iters = []
309 
310  # Bucketing all the gradients
311  for dev_idx, module in enumerate(self._module_copies):
312  for param in module.parameters():
313  if not param.requires_grad or param.grad is None:
314  continue
315  if param.grad.requires_grad:
316  raise RuntimeError("DistributedDataParallel only works "
317  "with gradients that don't require "
318  "grad")
319  # Adding the gradients for reduction
320  all_grads[dev_idx].append(param.grad.data)
321 
322  # Now bucketing the parameters
323  dev_grads_buckets = _take_tensors(all_grads[dev_idx],
325 
326  all_grads_buckets_iters.append(dev_grads_buckets)
327 
328  # Now reduce each bucket one after another
329  for grads_batch in zip(*all_grads_buckets_iters):
330  grads_batch_coalesced = []
331  # Coalesce each bucket
332  for dev_idx, dev_grads_batch in enumerate(grads_batch):
333  dev_id = self.device_ids[dev_idx]
334  with torch.cuda.device(dev_id):
335  dev_grads_batch_coalesced = _flatten_dense_tensors(dev_grads_batch)
336  grads_batch_coalesced.append(dev_grads_batch_coalesced)
337 
338  # We will only use device 0's results, but this single op should be
339  # faster than doing the following two operation sequentially:
340  # (1) intra-node reduce to lead GPU, followed by
341  # (2) inter-node allreduce for all the first lead GPUs in all nodes
342  dist.all_reduce_multigpu(grads_batch_coalesced,
343  group=self.nccl_reduction_group_id)
344 
345  # Now only work on the first device of self.device_ids, uncoalesce
346  # the gradients for each bucket
347  grads_batch_coalesced[0] /= dist.get_world_size()
348  grads_batch_reduced = _unflatten_dense_tensors(grads_batch_coalesced[0], grads_batch[0])
349  for grad, reduced in zip(grads_batch[0], grads_batch_reduced):
350  grad.copy_(reduced)
351 
352  # clear the gradients and save memory for replicas
353  for module in self._module_copies[1:]:
354  for param in module.parameters():
355  if param.requires_grad:
356  param.grad = None
357  param.data.set_()
358 
359  # Now register the reduction hook on the parameters
360  for p in self.module.parameters():
361  if not p.requires_grad:
362  continue
363 
364  @torch.utils.hooks.unserializable_hook
365  def allreduce_hook(*unused):
366  Variable._execution_engine.queue_callback(reduction_fn_nccl)
367 
368  p.register_hook(allreduce_hook)
369 
370  def _make_param_hook(self, param, device_idx):
371 
372  bucket_idx = self.bucket_map[param]
373 
374  def distributed_data_parallel_hook(*unused):
375  if param.grad.requires_grad:
376  raise RuntimeError("DistributedDataParallel only works with "
377  "gradients that don't require grad")
378  bucket = self.buckets[bucket_idx][device_idx]
379  bucket.append(param.grad.data)
380 
381  # We can flush these and save memory for replicas
382  if device_idx > 0:
383  param.grad = None
384  param.data.set_()
385 
386  # Current device's bucket is full
387  if len(bucket) == self.bucket_sizes[bucket_idx]:
388  with torch.cuda.device(self.device_ids[device_idx]):
389  event = torch.cuda.Event()
390  event.record()
391  with self.dispatch_lock:
392  self.bucket_events[bucket_idx][device_idx] = event
393  self._queue_reduction(bucket_idx)
394 
395  return distributed_data_parallel_hook
396 
397  def _queue_reduction(self, bucket_idx):
398  dev_buckets = self.buckets[bucket_idx]
399  dev_events = self.bucket_events[bucket_idx]
400 
401  # Check if it's ready
402  if any(evt is None for evt in dev_events):
403  return
404 
405  # Queue the reduction and make sure backward waits for it
406  event = threading.Event()
407  self._reduction_queues[bucket_idx].put((dev_buckets, dev_events, event))
408  Variable._execution_engine.queue_callback(lambda: event.wait())
409 
410  # Reset bucket state
411  self.buckets[bucket_idx] = [[] for _ in range(len(self.device_ids))]
412  self.bucket_events[bucket_idx] = [None] * len(self.device_ids)
413  self.reduced[bucket_idx] = True
414  if all(self.reduced):
415  self.reduced = [False] * len(self.bucket_sizes)
416 
417  def sync_reduction_streams():
418  # We only have to sync with the first one, but it's safer to do it this way
419  # in case we change the way in which we paralellize work
420  r_streams = zip(*self._reduction_streams)
421  for dev_id, default_stream, dev_r_streams in zip(self.device_ids, self._default_streams, r_streams):
422  with torch.cuda.device(dev_id):
423  for reduction_stream in dev_r_streams:
424  default_stream.wait_stream(reduction_stream)
425  Variable._execution_engine.queue_callback(sync_reduction_streams)
426 
427  def _start_reduction_threads(self):
428  num_buckets = len(self.bucket_sizes)
429  self._reduction_queues = [queue.Queue() for _ in range(num_buckets)]
430  self._reduction_threads = []
431  self._reduction_streams = [[] for _ in range(num_buckets)]
432  self._nccl_streams = []
433  self._default_streams = []
434  for dev_id in self.device_ids:
435  with torch.cuda.device(dev_id):
436  # TODO: don't assume we're on a default stream
437  self._default_streams.append(torch.cuda.current_stream())
438  self._nccl_streams.append(torch.cuda.Stream())
439  for reduction_queue, reduction_streams in zip(self._reduction_queues, self._reduction_streams):
440  for dev_id in self.device_ids:
441  with torch.cuda.device(dev_id):
442  reduction_streams.append(torch.cuda.Stream())
443  # We only use the first device for distributed reductions
444  dist._register_stream(reduction_streams[0])
445 
446  group_id = dist.new_group()
447 
448  self._reduction_threads.append(threading.Thread(
449  target=self._reduction_thread_fn,
450  args=(reduction_queue, group_id, self.device_ids, reduction_streams, self._nccl_streams)))
451  self._reduction_threads[-1].daemon = True
452  self._reduction_threads[-1].start()
453 
454  @staticmethod
455  def _reduction_thread_fn(queue, group_id, device_ids, reduction_streams, nccl_streams):
456 
457  def _process_batch():
458  dev_grad_batch, dev_events, job_event = queue.get()
459  dev_coalesced = []
460  # Coalesce the tensors on all devices and start a local reduction
461  for dev_id, grad_batch, event, stream in zip(device_ids, dev_grad_batch, dev_events, reduction_streams):
462  with torch.cuda.device(dev_id), torch.cuda.stream(stream):
463  stream.wait_event(event)
464  coalesced = _flatten_dense_tensors(grad_batch)
465  dev_coalesced.append(coalesced)
466  # Wait for all copies to complete before starting the NCCL kernel
467  for stream in reduction_streams:
468  stream.synchronize()
469  nccl.reduce(dev_coalesced, root=0, streams=nccl_streams)
470 
471  # From now on we're only going to work on the first device (from device_ids)
472  grad_batch = dev_grad_batch[0]
473  coalesced = dev_coalesced[0]
474  reduce_stream = reduction_streams[0]
475  with torch.cuda.stream(reduce_stream):
476  reduce_stream.wait_stream(nccl_streams[0])
477  coalesced /= dist.get_world_size()
478  dist.all_reduce(coalesced, group=group_id)
479  for grad, reduced in zip(grad_batch, _unflatten_dense_tensors(coalesced, grad_batch)):
480  grad.copy_(reduced)
481  job_event.set()
482 
483  with torch.cuda.device(device_ids[0]):
484  while True:
485  _process_batch() # just to have a clear scope
def _reduction_thread_fn(queue, group_id, device_ids, reduction_streams, nccl_streams)
Definition: distributed.py:455
def device_count()
Definition: __init__.py:341
Module caffe2.python.helpers.train.
def stream(stream)
Definition: __init__.py:307
def current_stream(device=None)
Definition: __init__.py:361