8 from torch._utils import _flatten_dense_tensors, _unflatten_dense_tensors, \
16 from ...modules
import Module
17 from ..replicate
import replicate
18 from ..scatter_gather
import scatter_kwargs, gather
19 from ..parallel_apply
import parallel_apply
21 if sys.version_info[0] == 3:
28 r"""Implements distributed data parallelism at the module level. 30 This container parallelizes the application of the given module by 31 splitting the input across the specified devices by chunking in the batch 32 dimension. The module is replicated on each machine and each device, and 33 each such replica handles a portion of the input. During the backwards 34 pass, gradients from each node are averaged. 36 The batch size should be larger than the number of GPUs used locally. It 37 should also be an integer multiple of the number of GPUs so that each chunk 38 is the same size (so that each GPU processes the same number of samples). 40 See also: :ref:`distributed-basics` and :ref:`cuda-nn-dataparallel-instead`. 41 The same constraints on input as in :class:`torch.nn.DataParallel` apply. 43 Creation of this class requires the distributed package to be already 44 initialized in the process group mode 45 (see :func:`torch.distributed.deprecated.init_process_group`). 48 This module works only with the ``nccl`` and ``gloo`` backends. 51 Constructor, forward method, and differentiation of the output (or a 52 function of the output of this module) is a distributed synchronization 53 point. Take that into account in case different processes might be 54 executing different code. 57 This module assumes all parameters are registered in the model by the 58 time it is created. No parameters should be added nor removed later. 59 Same applies to buffers. 62 This module assumes all buffers and gradients are dense. 65 This module doesn't work with :func:`torch.autograd.grad` (i.e. it will 66 only work if gradients are to be accumulated in ``.grad`` attributes of 70 If you plan on using this module with a ``nccl`` backend or a ``gloo`` 71 backend (that uses Infiniband), together with a DataLoader that uses 72 multiple workers, please change the multiprocessing start method to 73 ``forkserver`` (Python 3 only) or ``spawn``. Unfortunately 74 Gloo (that uses Infiniband) and NCCL2 are not fork safe, and you will 75 likely experience deadlocks if you don't change this setting. 78 Parameters are never broadcast between processes. The module performs 79 an all-reduce step on gradients and assumes that they will be modified 80 by the optimizer in all processes in the same way. Buffers 81 (e.g. BatchNorm stats) are broadcast from the module in process of rank 82 0, to all other replicas in the system in every iteration. 85 Forward and backward hooks defined on :attr:`module` and its submodules 86 won't be invoked anymore, unless the hooks are initialized in the 87 :meth:`forward` method. 90 module: module to be parallelized 91 device_ids: CUDA devices (default: all devices) 92 output_device: device location of output (default: device_ids[0]) 93 broadcast_buffers: flag that enables syncing (broadcasting) buffers of 94 the module at beginning of the forward function. 98 module (Module): the module to be parallelized 102 >>> torch.distributed.deprecated.init_process_group(world_size=4, init_method='...') 103 >>> net = torch.nn.DistributedDataParallel(model) 106 def __init__(self, module, device_ids=None, output_device=None, dim=0,
107 broadcast_buffers=
True):
108 super(DistributedDataParallel, self).__init__()
109 if dist._backend
not in (dist.dist_backend.NCCL, dist.dist_backend.GLOO):
110 raise ValueError(
'Invalid backend, only NCCL and GLOO backends are supported by DistributedDataParallel')
112 if device_ids
is None:
114 if output_device
is None:
115 output_device = device_ids[0]
132 module_states = list(self.module.state_dict().values())
133 if len(module_states) > 0:
137 if len(device_ids) > 1:
145 for param, copy_param
in zip(self.module.parameters(), module_copy.parameters()):
146 copy_param.requires_grad = param.requires_grad
155 if dist._backend == dist.dist_backend.NCCL:
159 bucket_bytes_cap = 1 * MB
165 param_buckets.append(list(_take_tensors(module.parameters(), bucket_bytes_cap)))
172 for bucket_idx, param_buckets_tuple
in enumerate(zip(*param_buckets)):
173 self.bucket_sizes.append(0)
176 for idx, param_tuple
in enumerate(zip(*param_buckets_tuple)):
179 bucket_param_type = param_tuple[0].type()
181 if bucket_param_type == torch.cuda.HalfTensor
and \
182 dist._backend != dist.dist_backend.GLOO:
183 raise RuntimeError(
"DistributedDataParallel currently only " 184 "supports half precision parameters " 185 "with Nccl and Gloo backend")
186 if not param_tuple[0].requires_grad:
188 for p
in param_tuple:
200 def __getstate__(self):
201 attrs = copy.copy(self.__dict__)
202 if dist._backend != dist.dist_backend.NCCL:
203 del attrs[
'_grad_accs'], attrs[
'_reduction_queues'], \
204 attrs[
'_reduction_streams'], attrs[
'_reduction_threads'], \
205 attrs[
'_nccl_streams'], attrs[
'_default_streams'], \
206 attrs[
'dispatch_lock']
209 def __setstate__(self, state):
210 super(DistributedDataParallel, self).__setstate__(state)
211 if dist._backend == dist.dist_backend.NCCL:
218 def forward(self, *inputs, **kwargs):
223 return self.
module(*inputs[0], **kwargs[0])
227 def scatter(self, inputs, kwargs, device_ids):
228 return scatter_kwargs(inputs, kwargs, device_ids, dim=self.
dim)
230 def parallel_apply(self, replicas, inputs, kwargs):
231 return parallel_apply(replicas, inputs, kwargs, self.
device_ids[:len(replicas)])
233 def gather(self, outputs, output_device):
234 return gather(outputs, output_device, dim=self.
dim)
236 def train(self, mode=True):
237 super(DistributedDataParallel, self).
train(mode)
241 def _dist_broadcast_coalesced(self, tensors, buffer_size):
243 Broadcast a sequence of tensors to the default group from rank 0. 244 Small tensors are first coalesced into a buffer to reduce the number of 247 tensors (sequence): tensors to broadcast. Each tensor needs to be on the 249 buffer_size (int): maximum size of the buffer for coalescing 251 for tensors
in _take_tensors(tensors, buffer_size):
252 flat_tensors = _flatten_dense_tensors(tensors)
253 dist.broadcast(flat_tensors, 0)
254 for tensor, synced
in zip(tensors,
255 _unflatten_dense_tensors(flat_tensors, tensors)):
258 def _sync_params(self):
261 params = [p.data
for p
in self.module.parameters()]
264 for tensor, param
in zip(tensors, module.parameters()):
265 param.data.set_(tensor)
269 buffers = [b.data
for b
in self.module.buffers()]
278 for tensor, buf
in zip(tensors, module.buffers()):
279 buf.data.set_(tensor)
281 def _register_grad_hooks(self):
284 for p
in module.parameters():
286 p_tmp = p.expand_as(p)
287 grad_acc = p_tmp.grad_fn.next_functions[0][0]
289 self._grad_accs.append(grad_acc)
291 def _register_nccl_grad_hook(self):
293 This function registers the callback all-reduction function for the 294 NCCL backend. All gradients will be all reduced in one single step. 295 The NCCL reduction will directly be enqueued into the 296 default CUDA stream. Therefore, no synchronization is needed. 301 def reduction_fn_nccl():
308 all_grads_buckets_iters = []
312 for param
in module.parameters():
313 if not param.requires_grad
or param.grad
is None:
315 if param.grad.requires_grad:
316 raise RuntimeError(
"DistributedDataParallel only works " 317 "with gradients that don't require " 320 all_grads[dev_idx].append(param.grad.data)
323 dev_grads_buckets = _take_tensors(all_grads[dev_idx],
326 all_grads_buckets_iters.append(dev_grads_buckets)
329 for grads_batch
in zip(*all_grads_buckets_iters):
330 grads_batch_coalesced = []
332 for dev_idx, dev_grads_batch
in enumerate(grads_batch):
335 dev_grads_batch_coalesced = _flatten_dense_tensors(dev_grads_batch)
336 grads_batch_coalesced.append(dev_grads_batch_coalesced)
342 dist.all_reduce_multigpu(grads_batch_coalesced,
347 grads_batch_coalesced[0] /= dist.get_world_size()
348 grads_batch_reduced = _unflatten_dense_tensors(grads_batch_coalesced[0], grads_batch[0])
349 for grad, reduced
in zip(grads_batch[0], grads_batch_reduced):
354 for param
in module.parameters():
355 if param.requires_grad:
360 for p
in self.module.parameters():
361 if not p.requires_grad:
364 @torch.utils.hooks.unserializable_hook
365 def allreduce_hook(*unused):
366 Variable._execution_engine.queue_callback(reduction_fn_nccl)
368 p.register_hook(allreduce_hook)
370 def _make_param_hook(self, param, device_idx):
374 def distributed_data_parallel_hook(*unused):
375 if param.grad.requires_grad:
376 raise RuntimeError(
"DistributedDataParallel only works with " 377 "gradients that don't require grad")
378 bucket = self.
buckets[bucket_idx][device_idx]
379 bucket.append(param.grad.data)
389 event = torch.cuda.Event()
395 return distributed_data_parallel_hook
397 def _queue_reduction(self, bucket_idx):
398 dev_buckets = self.
buckets[bucket_idx]
402 if any(evt
is None for evt
in dev_events):
406 event = threading.Event()
408 Variable._execution_engine.queue_callback(
lambda: event.wait())
413 self.
reduced[bucket_idx] =
True 417 def sync_reduction_streams():
423 for reduction_stream
in dev_r_streams:
424 default_stream.wait_stream(reduction_stream)
425 Variable._execution_engine.queue_callback(sync_reduction_streams)
427 def _start_reduction_threads(self):
438 self._nccl_streams.append(torch.cuda.Stream())
442 reduction_streams.append(torch.cuda.Stream())
444 dist._register_stream(reduction_streams[0])
446 group_id = dist.new_group()
448 self._reduction_threads.append(threading.Thread(
455 def _reduction_thread_fn(queue, group_id, device_ids, reduction_streams, nccl_streams):
457 def _process_batch():
458 dev_grad_batch, dev_events, job_event = queue.get()
461 for dev_id, grad_batch, event, stream
in zip(device_ids, dev_grad_batch, dev_events, reduction_streams):
463 stream.wait_event(event)
464 coalesced = _flatten_dense_tensors(grad_batch)
465 dev_coalesced.append(coalesced)
467 for stream
in reduction_streams:
469 nccl.reduce(dev_coalesced, root=0, streams=nccl_streams)
472 grad_batch = dev_grad_batch[0]
473 coalesced = dev_coalesced[0]
474 reduce_stream = reduction_streams[0]
476 reduce_stream.wait_stream(nccl_streams[0])
477 coalesced /= dist.get_world_size()
478 dist.all_reduce(coalesced, group=group_id)
479 for grad, reduced
in zip(grad_batch, _unflatten_dense_tensors(coalesced, grad_batch)):
def _start_reduction_threads(self)
def _queue_reduction(self, bucket_idx)
def _register_grad_hooks(self)
def _reduction_thread_fn(queue, group_id, device_ids, reduction_streams, nccl_streams)
def scatter(self, inputs, kwargs, device_ids)
Module caffe2.python.helpers.train.
def _dist_broadcast_coalesced(self, tensors, buffer_size)
def _make_param_hook(self, param, device_idx)
def parallel_apply(self, replicas, inputs, kwargs)
def gather(self, outputs, output_device)
def _register_nccl_grad_hook(self)
def current_stream(device=None)