2 torch.distributed.deprecated provides an MPI-like interface for exchanging tensor 3 data across multi-machine networks. It supports a few different backends 4 and initialization methods. 9 from torch._utils import _flatten_dense_tensors, _unflatten_dense_tensors
23 _backend = dist_backend.UNDEFINED
27 def _extend_scope(module):
28 _scope.update({k: getattr(module, k)
for k
in dir(module)
if not k.startswith(
'_')})
32 return torch._C._has_distributed()
35 def destroy_process_group():
36 r"""Destroy the initialized distributed package 40 torch._C._dist_destroy_process_group()
41 _backend = dist_backend.UNDEFINED
46 r"""Checking if the process group has been initialized 48 return _initialized == _INITIALIZED_PG
51 def init_process_group(backend, init_method='env://
', **kwargs): 52 r"""Initializes the distributed package. 55 backend (str): Name of the backend to use. Depending on build-time configuration 56 valid values include: ``tcp``, ``mpi``, ``gloo`` and ``nccl``. 57 init_method (str, optional): URL specifying how to initialize the package. 58 world_size (int, optional): Number of processes participating in the job. 59 rank (int, optional): Rank of the current process. 60 group_name (str, optional): Group name. See description of init methods. 62 To enable ``backend == mpi``, PyTorch needs to built from source on a system that 63 supports MPI. If you want to use Open MPI with CUDA-aware support, please use 64 Open MPI major version 2 and above. 67 This method initializes CUDA context. Therefore, if multiple processes 68 run on a single machine but use different GPUs, make sure to use 69 :func:`torch.cuda.set_device` before this method to avoid unnecessarily 70 creating context on the first visible device. 73 world_size = kwargs.pop(
'world_size', -1)
74 group_name = kwargs.pop(
'group_name',
'')
75 rank = kwargs.pop(
'rank', -1)
76 assert len(kwargs) == 0,
"got unexpected keyword arguments: %s" %
",".join(kwargs.keys())
78 if not is_available():
79 raise RuntimeError(
"PyTorch built without distributed support")
83 raise RuntimeError(
"trying to initialize torch.distributed.deprecated twice!")
88 backend = backend.lower()
90 _backend = dist_backend.TCP
91 elif backend ==
"mpi":
92 _backend = dist_backend.MPI
93 elif backend ==
"gloo":
94 _backend = dist_backend.GLOO
95 elif backend ==
"nccl":
96 _backend = dist_backend.NCCL
98 raise RuntimeError(
"Invalid distributed backend name: " + backend)
100 torch._C._dist_init_process_group(backend, init_method, world_size,
102 _initialized = _INITIALIZED_PG
104 if _backend == dist_backend.NCCL:
105 atexit.register(destroy_process_group)
107 if not torch._C._dist_init_extension(
False, reduce_op, group):
108 raise RuntimeError(
"distributed module initialization failed")
111 def init_master_worker(backend, init_method='env://
', **kwargs): 113 ================================================================================ 115 ================================================================================ 116 Master-worker mode is still experimental. The API will change without 117 notice and we do not guarantee full correctness and expected performance yet. 118 We'll announce it once it's ready. 120 world_size = kwargs.pop(
'world_size', -1)
121 group_name = kwargs.pop(
'group_name',
'')
122 rank = kwargs.pop(
'rank', -1)
123 assert len(kwargs) == 0,
"got unexpected keyword arguments: %s" %
",".join(kwargs.keys())
125 if not is_available():
126 raise RuntimeError(
"PyTorch built without distributed support")
130 raise RuntimeError(
"trying to initialize torch.distributed.deprecated twice!")
131 torch._C._dist_init_master_worker(backend, init_method, world_size,
133 _initialized = _INITIALIZED_MW
134 import torch.distributed.deprecated.collectives
as collectives
136 _extend_scope(collectives)
137 _extend_scope(remote_types)
138 if not torch._C._dist_init_extension(
True, reduce_op, group):
139 raise RuntimeError(
"distributed module initialization failed")
154 def __init__(self, request):
157 def is_completed(self):
158 return torch._C._dist_request_is_completed(self.
request)
161 torch._C._dist_request_wait(self.
request)
165 r"""Returns the rank of current process. 167 Rank is a unique identifier assigned to each process within a distributed 168 group. They are always consecutive integers ranging from ``0`` to 169 ``world_size - 1`` (inclusive). 171 assert torch.distributed.deprecated._initialized
172 return torch._C._dist_get_rank()
175 def get_world_size():
176 r"""Returns the number of processes in the distributed group.""" 177 assert torch.distributed.deprecated._initialized
178 return torch._C._dist_get_num_processes()
181 def isend(tensor, dst):
182 r"""Sends a tensor asynchronously. 185 tensor (Tensor): Tensor to send. 186 dst (int): Destination rank. 189 A distributed request object. 191 assert torch.distributed.deprecated._initialized == _INITIALIZED_PG, \
192 "collective only supported in process-group mode" 196 def irecv(tensor, src):
197 r"""Receives a tensor asynchronously. 200 tensor (Tensor): Tensor to fill with received data. 201 src (int): Source rank. 204 A distributed request object. 206 assert torch.distributed.deprecated._initialized == _INITIALIZED_PG, \
207 "collective only supported in process-group mode" 211 def send(tensor, dst):
212 r"""Sends a tensor synchronously. 215 tensor (Tensor): Tensor to send. 216 dst (int): Destination rank. 218 assert torch.distributed.deprecated._initialized == _INITIALIZED_PG, \
219 "collective only supported in process-group mode" 220 return torch._C._dist_send(tensor, dst)
223 def recv(tensor, src=None):
224 r"""Receives a tensor synchronously. 227 tensor (Tensor): Tensor to fill with received data. 228 src (int, optional): Source rank. Will receive from any 229 process if unspecified. 234 assert torch.distributed.deprecated._initialized == _INITIALIZED_PG, \
235 "collective only supported in process-group mode" 237 return torch._C._dist_recv_any_source(tensor)
238 return torch._C._dist_recv(tensor, src)
241 def broadcast_multigpu(tensor_list, src, group=group.WORLD):
242 r"""Broadcasts the tensor to the whole group with multiple GPU tensors 245 :attr:`tensor` must have the same number of elements in all the GPUs from 246 all processes participating in the collective. each tensor in the list must 247 be on a different GPU. 250 Only NCCL backend is currently supported. :attr:`tensor_list` should only 254 tensor_list (List[Tensor]): Tensors that participate in the collective 255 operation. if ``src`` is the rank, then the first element of 256 ``tensor_list`` (``tensor_list[0]``) will be broadcasted to all 257 other tensors (on different GPUs) in the src process and all tensors 258 in ``tensor_list`` of other non-src processes. You also need to make 259 sure that ``len(tensor_list)`` is the same for all the distributed 260 processes calling this function. 262 src (int): Source rank. 263 group (optional): Group of the collective. 265 assert torch.distributed.deprecated._initialized == _INITIALIZED_PG, \
266 "collective only supported in process-group mode" 268 return torch._C._dist_broadcast_multigpu(tensor_list, src, group)
271 def broadcast(tensor, src, group=group.WORLD):
272 r"""Broadcasts the tensor to the whole group. 274 :attr:`tensor` must have the same number of elements in all processes 275 participating in the collective. 278 tensor (Tensor): Data to be sent if :attr:`src` is the rank of 279 current process, and tensor to be used to save received data 281 src (int): Source rank. 282 group (optional): Group of the collective. 284 assert torch.distributed.deprecated._initialized == _INITIALIZED_PG, \
285 "collective only supported in process-group mode" 286 return torch._C._dist_broadcast(tensor, src, group)
289 def all_reduce_multigpu(tensor_list, op=reduce_op.SUM, group=group.WORLD):
290 r"""Reduces the tensor data across all machines in such a way that all get 291 the final result. This function reduces a number of tensors on every node, 292 while each tensor resides on a different GPU. 293 Therefore, the input tensor in the tensor list needs to be GPU tensors. 294 Also, each tensor in the tensor list needs to reside on a different GPU. 296 After the call, all tensors in :attr:`tensor_list` will be bitwise identical 300 Only NCCL backend is currently supported. :attr:`tensor_list` should only 304 tensor_list (List[Tensor]): List of input and output tensors of 305 the collective. The function operates in-place and requires that 306 each tensor to be a GPU tensor on different GPUs. 307 You also need to make sure that ``len(tensor_list)`` is the same for 308 all the distributed processes calling this function. 310 op (optional): One of the values from ``torch.distributed.deprecated.reduce_op`` 311 enum. Specifies an operation used for element-wise reductions. 312 group (optional): Group of the collective. 314 assert torch.distributed.deprecated._initialized == _INITIALIZED_PG, \
315 "collective only supported in process-group mode" 317 return torch._C._dist_all_reduce_multigpu(tensor_list, op, group)
320 def all_reduce(tensor, op=reduce_op.SUM, group=group.WORLD):
321 r"""Reduces the tensor data across all machines in such a way that all get 324 After the call :attr:`tensor` will be bitwise identical in all processes. 327 tensor (Tensor): Input and output of the collective. The function 329 op (optional): One of the values from ``torch.distributed.deprecated.reduce_op`` 330 enum. Specifies an operation used for element-wise reductions. 331 group (optional): Group of the collective. 333 assert torch.distributed.deprecated._initialized == _INITIALIZED_PG, \
334 "collective only supported in process-group mode" 335 return torch._C._dist_all_reduce(tensor, op, group)
338 def reduce_multigpu(tensor_list, dst, op=reduce_op.SUM, group=group.WORLD):
339 r"""Reduces the tensor data on multiple GPUs across all machines. Each tensor 340 in :attr:`tensor_list` should reside on a separate GPU. 342 Only the GPU of ``tensor_list[0]`` on the process with rank :attr:`dst` is 343 going to receive the final result. 346 Only NCCL backend is currently supported. :attr:`tensor_list` should only 350 tensor_list (List[Tensor]): Input and output GPU tensors of the 351 collective. The function operates in-place. 352 You also need to make sure that ``len(tensor_list)`` is the same for 353 all the distributed processes calling this function. 355 dst (int): Destination rank 356 op (optional): One of the values from ``torch.distributed.deprecated.reduce_op`` 357 enum. Specifies an operation used for element-wise reductions. 358 group (optional): Group of the collective. 360 assert torch.distributed.deprecated._initialized == _INITIALIZED_PG, \
361 "collective only supported in process-group mode" 363 return torch._C._dist_reduce_multigpu(tensor_list, dst, op, group)
366 def reduce(tensor, dst, op=reduce_op.SUM, group=group.WORLD):
367 r"""Reduces the tensor data across all machines. 369 Only the process with rank :attr:`dst` is going to receive the final result. 372 tensor (Tensor): Input and output of the collective. The function 374 dst (int): Destination rank 375 op (optional): One of the values from ``torch.distributed.deprecated.reduce_op`` 376 enum. Specifies an operation used for element-wise reductions. 377 group (optional): Group of the collective. 379 assert torch.distributed.deprecated._initialized == _INITIALIZED_PG, \
380 "collective only supported in process-group mode" 381 return torch._C._dist_reduce(tensor, dst, op, group)
384 def all_gather_multigpu(output_tensor_lists,
387 r"""Gathers tensors from the whole group in a list. 388 Each tensor in :attr:`input_tensor_list` should reside on a separate GPU. 391 Only NCCL backend is currently supported. :attr:`output_tensor_lists` and 392 :attr:`input_tensor_list` should only contain GPU tensors. 395 output_tensor_lists (List[List[Tensor]]): Output lists. It should 396 contain correctly-sized tensors on each GPU to be used for output of 398 e.g. ``output_tensor_lists[i]`` contains the all_gather 399 result that resides on the GPU of ``input_tensor_list[i]``. 400 Note that each element of ``output_tensor_lists[i]`` has the size of 401 ``world_size * len(input_tensor_list)``, since the function all 402 gathers the result from every single GPU in the group. To interpret 403 each element of ``output_tensor_list[i]``, note that 404 ``input_tensor_list[j]`` of rank k will be appear in 405 ``output_tensor_list[i][rank * world_size + j]`` 406 Also note that ``len(output_tensor_lists)``, and the size of each 407 element in ``output_tensor_lists`` (each element is a list, 408 therefore ``len(output_tensor_lists[i])``) need to be the same 409 for all the distributed processes calling this function. 411 input_tensor_list (List[Tensor]): List of tensors (on different GPUs) to 412 be broadcast from current process. 413 Note that ``len(input_tensor_list)`` needs to be the same for 414 all the distributed processes calling this function. 415 group (optional): Group of the collective. 417 assert torch.distributed.deprecated._initialized == _INITIALIZED_PG, \
418 "collective only supported in process-group mode" 420 flatten_tensor_list = []
421 for output_tensor_list
in output_tensor_lists:
422 flatten_tensor_list.append(_flatten_dense_tensors(output_tensor_list))
424 ret = torch._C._dist_all_gather_multigpu(flatten_tensor_list,
428 for output_tensor_list, flatten_tensor
in zip(output_tensor_lists,
429 flatten_tensor_list):
430 for tensor, value
in zip(output_tensor_list,
431 _unflatten_dense_tensors(flatten_tensor,
432 output_tensor_list)):
438 def all_gather(tensor_list, tensor, group=group.WORLD):
439 r"""Gathers tensors from the whole group in a list. 442 tensor_list (list[Tensor]): Output list. It should contain 443 correctly-sized tensors to be used for output of the collective. 444 tensor (Tensor): Tensor to be broadcast from current process. 445 group (optional): Group of the collective. 447 assert torch.distributed.deprecated._initialized == _INITIALIZED_PG, \
448 "collective only supported in process-group mode" 449 if _backend != dist_backend.NCCL:
450 return torch._C._dist_all_gather(tensor_list, tensor, group)
452 return all_gather_multigpu([tensor_list], [tensor], group)
455 def gather(tensor, **kwargs):
456 r"""Gathers a list of tensors in a single process. 459 tensor (Tensor): Input tensor. 460 dst (int): Destination rank. Required in all processes except the one that 461 is receiveing the data. 462 gather_list (list[Tensor]): List of appropriately-sized tensors to 463 use for received data. Required only in the receiving process. 464 group (optional): Group of the collective. 466 assert torch.distributed.deprecated._initialized == _INITIALIZED_PG, \
467 "collective only supported in process-group mode" 469 dst = kwargs.pop(
'dst', my_rank)
470 gather_list = kwargs.pop(
'gather_list',
None)
471 _group = kwargs.pop(
'group', group.WORLD)
473 raise RuntimeError(
"got unexpected kwargs")
475 if gather_list
is None:
476 raise RuntimeError(
"gather_list is a required argument in gather destination")
477 return torch._C._dist_gather_recv(gather_list, tensor, _group)
480 raise RuntimeError(
"non-empty gather_list can be given only to gather destination")
481 return torch._C._dist_gather_send(tensor, dst, _group)
484 def scatter(tensor, **kwargs):
485 r"""Scatters a list of tensors to all processes in a group. 487 Each process will receive exactly one tensor and store its data in the 488 :attr:`tensor` argument. 491 tensor (Tensor): Output tensor. 492 src (int): Source rank. Required in all processes except the one that 494 scatter_list (list[Tensor]): List of tensors to scatter. Required only 495 in the process that is sending the data. 496 group (optional): Group of the collective. 498 assert torch.distributed.deprecated._initialized == _INITIALIZED_PG, \
499 "collective only supported in process-group mode" 501 src = kwargs.pop(
'src', my_rank)
502 scatter_list = kwargs.pop(
'scatter_list',
None)
503 _group = kwargs.pop(
'group', group.WORLD)
505 raise RuntimeError(
"got unexpected kwargs: {}".format(
", ".join(kwargs.keys())))
507 if scatter_list
is None:
508 raise RuntimeError(
"scatter_list is a required argument in scatter source")
509 return torch._C._dist_scatter_send(scatter_list, tensor, _group)
512 raise RuntimeError(
"non-empty can be given only to scatter source")
513 return torch._C._dist_scatter_recv(tensor, src, _group)
516 def barrier(group=group.WORLD):
517 r"""Synchronizes all processes. 519 This collective blocks processes until the whole group enters this function. 522 group (optional): Group of the collective. 524 assert torch.distributed.deprecated._initialized == _INITIALIZED_PG, \
525 "collective only supported in process-group mode" 526 return torch._C._dist_barrier(group)
529 def new_group(ranks=None):
530 r"""Creates a new distributed group. 532 This function requires that all processes in the main group (i.e., all 533 processes that are part of the distributed job) enter this function, even 534 if they are not going to be members of the group. Additionally, groups 535 should be created in the same order in all processes. 538 ranks (list[int]): List of ranks of group members. 541 A handle of distributed group that can be given to collective calls. 543 assert torch.distributed.deprecated._initialized == _INITIALIZED_PG, \
544 "collective only supported in process-group mode" 546 ranks = list(range(get_world_size()))
547 return torch._C._dist_new_group(ranks)
550 def _clear_group_cache(group=group.WORLD):
551 r"""Clear the created distributed group's cached resource. 553 Only NCCL backend is currently supported. 555 Cached resource includes NCCL communicators and CUDA events. 558 group (optional): Group of the collective. 560 return torch._C._dist_clear_group_cache(group)
563 def _register_stream(stream):
565 raise RuntimeError(
"torch.distributed.deprecated needs to be initialized first")
566 return torch._C._dist_register_stream(stream)