9 if dist.is_available():
12 from ..modules
import Module
13 from .replicate
import replicate
14 from .scatter_gather
import scatter_kwargs, gather
15 from .parallel_apply
import parallel_apply
20 r"""Implements distributed data parallelism that is based on 21 ``torch.distributed`` package at the module level. 23 This container parallelizes the application of the given module by 24 splitting the input across the specified devices by chunking in the batch 25 dimension. The module is replicated on each machine and each device, and 26 each such replica handles a portion of the input. During the backwards 27 pass, gradients from each node are averaged. 29 The batch size should be larger than the number of GPUs used locally. 31 See also: :ref:`distributed-basics` and :ref:`cuda-nn-dataparallel-instead`. 32 The same constraints on input as in :class:`torch.nn.DataParallel` apply. 34 Creation of this class requires that ``torch.distributed`` to be already 35 initialized, by calling :func:`torch.distributed.init_process_group`. 37 ``DistributedDataParallel`` can be used in the following two ways: 39 (1) Single-Process Multi-GPU 41 In this case, a single process will be 42 spawned on each host/node and each process will operate on all the GPUs 43 of the node where it's running. To use ``DistributedDataParallel`` in 44 this way, you can simply construct the model as the following: 46 >>> torch.distributed.init_process_group(backend="nccl") 47 >>> model = DistributedDataParallel(model) # device_ids will include all GPU devices by default 49 (2) Multi-Process Single-GPU 51 This is the highly recommended way to use ``DistributedDataParallel``, with 52 multiple processes, each of which operates on a single GPU. This is 53 currently the fastest approach to do data parallel training using PyTorch 54 and applies to both single-node(multi-GPU) and multi-node data 55 parallel training. It is proven to be significantly faster than 56 :class:`torch.nn.DataParallel` for single-node multi-GPU data 59 Here is how to use it: on each host with N GPUs, you should spawn up N 60 processes, while ensuring that each process individually works on a single GPU 61 from 0 to N-1. Therefore, it is your job to ensure that your training script 62 operates on a single given GPU by calling: 64 >>> torch.cuda.set_device(i) 66 where i is from 0 to N-1. In each process, you should refer the following 67 to construct this module: 69 >>> torch.distributed.init_process_group(backend='nccl', world_size=4, init_method='...') 70 >>> model = DistributedDataParallel(model, device_ids=[i], output_device=i) 72 In order to spawn up multiple processes per node, you can use either 73 ``torch.distributed.launch`` or ``torch.multiprocessing.spawn`` 75 .. note:: ``nccl`` backend is currently the fastest and 76 highly recommended backend to be used with Multi-Process Single-GPU 77 distributed training and this applies to both single-node and multi-node 80 .. note:: This module also supports mixed-precision distributed training. 81 This means that your model can have different types of parameters such 82 as mixed types of fp16 and fp32, the gradient reduction on these 83 mixed types of parameters will just work fine. 84 Also note that ``nccl`` backend is currently the fastest and highly 85 recommended backend for fp16/fp32 mixed-precision training. 88 This module works only with the ``gloo`` and ``nccl`` backends. 91 Constructor, forward method, and differentiation of the output (or a 92 function of the output of this module) is a distributed synchronization 93 point. Take that into account in case different processes might be 94 executing different code. 97 This module assumes all parameters are registered in the model by the 98 time it is created. No parameters should be added nor removed later. 99 Same applies to buffers. 102 This module assumes all parameters are registered in the model of each 103 distributed processes are in the same order. The module itself will 104 conduct gradient all-reduction following the reverse order of the 105 registered parameters of the model. In other words, it is users' 106 responsibility to ensure that each distributed process has the exact 107 same model and thus the exact same parameter registration order. 110 This module assumes all buffers and gradients are dense. 113 This module doesn't work with :func:`torch.autograd.grad` (i.e. it will 114 only work if gradients are to be accumulated in ``.grad`` attributes of 119 If you plan on using this module with a ``nccl`` backend or a ``gloo`` 120 backend (that uses Infiniband), together with a DataLoader that uses 121 multiple workers, please change the multiprocessing start method to 122 ``forkserver`` (Python 3 only) or ``spawn``. Unfortunately 123 Gloo (that uses Infiniband) and NCCL2 are not fork safe, and you will 124 likely experience deadlocks if you don't change this setting. 127 Forward and backward hooks defined on :attr:`module` and its submodules 128 won't be invoked anymore, unless the hooks are initialized in the 129 :meth:`forward` method. 132 You should never try to change your model's parameters after wrapping 133 up your model with DistributedDataParallel. In other words, when 134 wrapping up your model with DistributedDataParallel, the constructor of 135 DistributedDataParallel will register the additional gradient 136 reduction functions on all the parameters of the model itself at the 137 time of construction. If you change the model's parameters after 138 the DistributedDataParallel construction, this is not supported and 139 unexpected behaviors can happen, since some parameters' gradient 140 reduction functions might not get called. 143 Parameters are never broadcast between processes. The module performs 144 an all-reduce step on gradients and assumes that they will be modified 145 by the optimizer in all processes in the same way. Buffers 146 (e.g. BatchNorm stats) are broadcast from the module in process of rank 147 0, to all other replicas in the system in every iteration. 150 module (Module): module to be parallelized 151 device_ids (list of int or torch.device): CUDA devices (default: all devices) 152 output_device (int or torch.device): device location of output (default: device_ids[0]) 153 broadcast_buffers (bool): flag that enables syncing (broadcasting) buffers of 154 the module at beginning of the forward function. 156 process_group: the process group to be used for distributed data 157 all-reduction. If ``None``, the default process group, which 158 is created by ```torch.distributed.init_process_group```, 159 will be used. (default: ``None``) 160 bucket_cap_mb: DistributedDataParallel will bucket parameters into 161 multiple buckets so that gradient reduction of each 162 bucket can potentially overlap with backward computation. 163 :attr:`bucket_cap_mb` controls the bucket size in MegaBytes (MB) 165 check_reduction: when setting to ``True``, it enables DistributedDataParallel 166 to automatically check if the previous iteration's 167 backward reductions were successfully issued at the 168 beginning of every iteration's forward function. 169 You normally don't need this option enabled unless you 170 are observing weird behaviors such as different ranks 171 are getting different gradients, which should not 172 happen if DistributedDataParallel is correctly used. 176 module (Module): the module to be parallelized 180 >>> torch.distributed.init_process_group(backend='nccl', world_size=4, init_method='...') 181 >>> net = torch.nn.DistributedDataParallel(model, pg) 183 def __init__(self, module, device_ids=None,
184 output_device=
None, dim=0, broadcast_buffers=
True,
185 process_group=
None, bucket_cap_mb=25,
186 check_reduction=
False):
188 super(DistributedDataParallel, self).__init__()
191 if device_ids
is None:
194 if output_device
is None:
195 output_device = device_ids[0]
197 if process_group
is None:
204 self.
device_ids = list(map(
lambda x: _get_device_index(x,
True), device_ids))
218 module_states = list(self.module.state_dict().values())
219 if len(module_states) > 0:
225 def _ddp_init_helper(self):
227 Initialization helper function that does the following: 229 (1) replicating the module from device[0] to the other devices 230 (2) bucketing the parameters for reductions 231 (3) resetting the bucketing states 232 (4) registering the grad hooks 233 (5) passing a handle of DDP to SyncBatchNorm Layer 243 for param, copy_param
in zip(self.module.parameters(), module_copy.parameters()):
244 copy_param.requires_grad = param.requires_grad
266 for p
in m.parameters():
268 params_to_bucket[dev_idx].append(p)
270 param_buckets = [dist._dist_bucket_tensors(dev_params_to_bucket,
273 for dev_params_to_bucket
in params_to_bucket]
280 for bucket_idx, param_buckets_tuple
in enumerate(zip(*param_buckets)):
281 self.bucket_sizes.append(0)
284 for param_tuple
in zip(*param_buckets_tuple):
285 if not param_tuple[0].requires_grad:
287 for p
in param_tuple:
314 def __getstate__(self):
316 attrs = copy.copy(self.__dict__)
317 del attrs[
'process_group'], \
318 attrs[
'default_streams'], \
322 def __setstate__(self, state):
326 super(DistributedDataParallel, self).__setstate__(state)
329 def _check_default_group(self):
330 pickle_not_supported =
False 333 pickle_not_supported =
True 335 pickle_not_supported =
True 337 if pickle_not_supported:
338 raise RuntimeError(
"DDP Pickling/Unpickling are only supported " 339 "when using DDP with the default process " 340 "group. That is, when you have called " 341 "init_process_group and have not passed " 342 "process_group argument to DDP constructor")
344 def _check_previous_reduction(self):
345 if not self.training:
353 raise RuntimeError(
"Not all gradients have been reduced from " 354 "the backward of the previous iteration. " 355 "This is an unexpected and fatal error. " 356 "Please check and ensure that the model's " 357 "parameters are not changed after you wrap " 358 "up the model with DistributedDataParallel.")
361 def forward(self, *inputs, **kwargs):
367 return self.
module(*inputs[0], **kwargs[0])
371 def scatter(self, inputs, kwargs, device_ids):
372 return scatter_kwargs(inputs, kwargs, device_ids, dim=self.
dim)
374 def parallel_apply(self, replicas, inputs, kwargs):
375 return parallel_apply(replicas, inputs, kwargs, self.
device_ids[:len(replicas)])
377 def gather(self, outputs, output_device):
378 return gather(outputs, output_device, dim=self.
dim)
380 def train(self, mode=True):
382 super(DistributedDataParallel, self).
train(mode)
386 def _dist_broadcast_coalesced(self, tensors, buffer_size):
387 dist._dist_broadcast_coalesced(self.
process_group, tensors, buffer_size,
False)
389 def _sync_params(self):
396 for tensor, param_data
in zip(tensors, module_params_data):
397 param_data.set_(tensor)
411 for tensor, buffer_data
in zip(tensors, module_buffers_data):
412 buffer_data.set_(tensor)
414 def _passing_sync_batchnorm_handle(self, module_copies):
415 for dev_idx, module
in enumerate(module_copies):
416 for layer
in module.modules():
417 if isinstance(layer, torch.nn.modules.SyncBatchNorm):
418 layer._specify_ddp_gpu_num(len(self.
device_ids))
420 def _register_grad_hooks(self):
430 for p
in module.parameters():
432 p_tmp = p.expand_as(p)
433 grad_acc = p_tmp.grad_fn.next_functions[0][0]
435 self._grad_accs.append(grad_acc)
437 def _make_param_hook(self, param, device_idx):
438 bucket_idx, bucket_offset = self.
bucket_map[param]
440 def distributed_data_parallel_hook(*unused):
441 if param.grad.requires_grad:
442 raise RuntimeError(
"DistributedDataParallel only works " 443 "with gradients that don't require grad")
444 bucket = self.
buckets[bucket_idx][device_idx]
445 bucket[bucket_offset] = param.grad.data
466 for i
in sorted_todo:
471 self.ready_buckets_not_reduced.remove(i)
475 self.ready_buckets_not_reduced.add(bucket_idx)
483 return distributed_data_parallel_hook
485 def _queue_reduction(self, bucket_idx):
495 def _sync_reduction_works(self):
499 for bucket_idx, grads_batch
in enumerate(self.
buckets):
def _dist_broadcast_coalesced(self, tensors, buffer_size)
def scatter(self, inputs, kwargs, device_ids)
def _passing_sync_batchnorm_handle(self, module_copies)
def _check_previous_reduction(self)
def gather(self, outputs, output_device)
Module caffe2.python.helpers.train.
def _check_default_group(self)
def _queue_reduction(self, bucket_idx)
def _register_grad_hooks(self)
def parallel_apply(self, replicas, inputs, kwargs)
def _make_param_hook(self, param, device_idx)
def _sync_reduction_works(self)
ready_buckets_not_reduced
def _ddp_init_helper(self)
def current_stream(device=None)