Caffe2 - Python API
A deep learning, cross platform ML framework
__init__.py
1 """
2 torch.distributed.deprecated provides an MPI-like interface for exchanging tensor
3 data across multi-machine networks. It supports a few different backends
4 and initialization methods.
5 """
6 import torch
7 import atexit
8 import warnings
9 from torch._utils import _flatten_dense_tensors, _unflatten_dense_tensors
10 
11 
13  UNDEFINED = -1
14  TCP = 0
15  MPI = 1
16  GLOO = 2
17  NCCL = 3
18 
19 
20 _INITIALIZED_PG = 1
21 _INITIALIZED_MW = 2
22 _initialized = 0
23 _backend = dist_backend.UNDEFINED
24 _scope = locals()
25 
26 
27 def _extend_scope(module):
28  _scope.update({k: getattr(module, k) for k in dir(module) if not k.startswith('_')})
29 
30 
31 def is_available():
32  return torch._C._has_distributed()
33 
34 
35 def destroy_process_group():
36  r"""Destroy the initialized distributed package
37  """
38  global _backend
39  global _initialized
40  torch._C._dist_destroy_process_group()
41  _backend = dist_backend.UNDEFINED
42  _initialized = 0
43 
44 
45 def is_initialized():
46  r"""Checking if the process group has been initialized
47  """
48  return _initialized == _INITIALIZED_PG
49 
50 
51 def init_process_group(backend, init_method='env://', **kwargs):
52  r"""Initializes the distributed package.
53 
54  Arguments:
55  backend (str): Name of the backend to use. Depending on build-time configuration
56  valid values include: ``tcp``, ``mpi``, ``gloo`` and ``nccl``.
57  init_method (str, optional): URL specifying how to initialize the package.
58  world_size (int, optional): Number of processes participating in the job.
59  rank (int, optional): Rank of the current process.
60  group_name (str, optional): Group name. See description of init methods.
61 
62  To enable ``backend == mpi``, PyTorch needs to built from source on a system that
63  supports MPI. If you want to use Open MPI with CUDA-aware support, please use
64  Open MPI major version 2 and above.
65 
66  .. note::
67  This method initializes CUDA context. Therefore, if multiple processes
68  run on a single machine but use different GPUs, make sure to use
69  :func:`torch.cuda.set_device` before this method to avoid unnecessarily
70  creating context on the first visible device.
71 
72  """
73  world_size = kwargs.pop('world_size', -1)
74  group_name = kwargs.pop('group_name', '')
75  rank = kwargs.pop('rank', -1)
76  assert len(kwargs) == 0, "got unexpected keyword arguments: %s" % ",".join(kwargs.keys())
77 
78  if not is_available():
79  raise RuntimeError("PyTorch built without distributed support")
80 
81  global _initialized
82  if _initialized:
83  raise RuntimeError("trying to initialize torch.distributed.deprecated twice!")
84 
85  # Checking and assigning the distributed backend
86  global _backend
87 
88  backend = backend.lower()
89  if backend == "tcp":
90  _backend = dist_backend.TCP
91  elif backend == "mpi":
92  _backend = dist_backend.MPI
93  elif backend == "gloo":
94  _backend = dist_backend.GLOO
95  elif backend == "nccl":
96  _backend = dist_backend.NCCL
97  else:
98  raise RuntimeError("Invalid distributed backend name: " + backend)
99 
100  torch._C._dist_init_process_group(backend, init_method, world_size,
101  group_name, rank)
102  _initialized = _INITIALIZED_PG
103 
104  if _backend == dist_backend.NCCL:
105  atexit.register(destroy_process_group)
106 
107  if not torch._C._dist_init_extension(False, reduce_op, group):
108  raise RuntimeError("distributed module initialization failed")
109 
110 
111 def init_master_worker(backend, init_method='env://', **kwargs):
112  warnings.warn("""
113  ================================================================================
114  WARNING
115  ================================================================================
116  Master-worker mode is still experimental. The API will change without
117  notice and we do not guarantee full correctness and expected performance yet.
118  We'll announce it once it's ready.
119  """)
120  world_size = kwargs.pop('world_size', -1)
121  group_name = kwargs.pop('group_name', '')
122  rank = kwargs.pop('rank', -1)
123  assert len(kwargs) == 0, "got unexpected keyword arguments: %s" % ",".join(kwargs.keys())
124 
125  if not is_available():
126  raise RuntimeError("PyTorch built without distributed support")
127 
128  global _initialized
129  if _initialized:
130  raise RuntimeError("trying to initialize torch.distributed.deprecated twice!")
131  torch._C._dist_init_master_worker(backend, init_method, world_size,
132  group_name, rank)
133  _initialized = _INITIALIZED_MW
134  import torch.distributed.deprecated.collectives as collectives
135  import torch.distributed.deprecated.remote_types as remote_types
136  _extend_scope(collectives)
137  _extend_scope(remote_types)
138  if not torch._C._dist_init_extension(True, reduce_op, group):
139  raise RuntimeError("distributed module initialization failed")
140 
141 
142 class reduce_op(object):
143  SUM = object()
144  PRODUCT = object()
145  MAX = object()
146  MIN = object()
147 
148 
149 class group(object):
150  WORLD = object()
151 
152 
153 class _DistributedRequest(object):
154  def __init__(self, request):
155  self.request = request
156 
157  def is_completed(self):
158  return torch._C._dist_request_is_completed(self.request)
159 
160  def wait(self):
161  torch._C._dist_request_wait(self.request)
162 
163 
164 def get_rank():
165  r"""Returns the rank of current process.
166 
167  Rank is a unique identifier assigned to each process within a distributed
168  group. They are always consecutive integers ranging from ``0`` to
169  ``world_size - 1`` (inclusive).
170  """
171  assert torch.distributed.deprecated._initialized
172  return torch._C._dist_get_rank()
173 
174 
175 def get_world_size():
176  r"""Returns the number of processes in the distributed group."""
177  assert torch.distributed.deprecated._initialized
178  return torch._C._dist_get_num_processes()
179 
180 
181 def isend(tensor, dst):
182  r"""Sends a tensor asynchronously.
183 
184  Arguments:
185  tensor (Tensor): Tensor to send.
186  dst (int): Destination rank.
187 
188  Returns:
189  A distributed request object.
190  """
191  assert torch.distributed.deprecated._initialized == _INITIALIZED_PG, \
192  "collective only supported in process-group mode"
193  return _DistributedRequest(torch._C._dist_isend(tensor, dst))
194 
195 
196 def irecv(tensor, src):
197  r"""Receives a tensor asynchronously.
198 
199  Arguments:
200  tensor (Tensor): Tensor to fill with received data.
201  src (int): Source rank.
202 
203  Returns:
204  A distributed request object.
205  """
206  assert torch.distributed.deprecated._initialized == _INITIALIZED_PG, \
207  "collective only supported in process-group mode"
208  return _DistributedRequest(torch._C._dist_irecv(tensor, src))
209 
210 
211 def send(tensor, dst):
212  r"""Sends a tensor synchronously.
213 
214  Arguments:
215  tensor (Tensor): Tensor to send.
216  dst (int): Destination rank.
217  """
218  assert torch.distributed.deprecated._initialized == _INITIALIZED_PG, \
219  "collective only supported in process-group mode"
220  return torch._C._dist_send(tensor, dst)
221 
222 
223 def recv(tensor, src=None):
224  r"""Receives a tensor synchronously.
225 
226  Arguments:
227  tensor (Tensor): Tensor to fill with received data.
228  src (int, optional): Source rank. Will receive from any
229  process if unspecified.
230 
231  Returns:
232  Sender rank.
233  """
234  assert torch.distributed.deprecated._initialized == _INITIALIZED_PG, \
235  "collective only supported in process-group mode"
236  if src is None:
237  return torch._C._dist_recv_any_source(tensor)
238  return torch._C._dist_recv(tensor, src)
239 
240 
241 def broadcast_multigpu(tensor_list, src, group=group.WORLD):
242  r"""Broadcasts the tensor to the whole group with multiple GPU tensors
243  per node.
244 
245  :attr:`tensor` must have the same number of elements in all the GPUs from
246  all processes participating in the collective. each tensor in the list must
247  be on a different GPU.
248 
249  .. note::
250  Only NCCL backend is currently supported. :attr:`tensor_list` should only
251  contain GPU tensors.
252 
253  Arguments:
254  tensor_list (List[Tensor]): Tensors that participate in the collective
255  operation. if ``src`` is the rank, then the first element of
256  ``tensor_list`` (``tensor_list[0]``) will be broadcasted to all
257  other tensors (on different GPUs) in the src process and all tensors
258  in ``tensor_list`` of other non-src processes. You also need to make
259  sure that ``len(tensor_list)`` is the same for all the distributed
260  processes calling this function.
261 
262  src (int): Source rank.
263  group (optional): Group of the collective.
264  """
265  assert torch.distributed.deprecated._initialized == _INITIALIZED_PG, \
266  "collective only supported in process-group mode"
267 
268  return torch._C._dist_broadcast_multigpu(tensor_list, src, group)
269 
270 
271 def broadcast(tensor, src, group=group.WORLD):
272  r"""Broadcasts the tensor to the whole group.
273 
274  :attr:`tensor` must have the same number of elements in all processes
275  participating in the collective.
276 
277  Arguments:
278  tensor (Tensor): Data to be sent if :attr:`src` is the rank of
279  current process, and tensor to be used to save received data
280  otherwise.
281  src (int): Source rank.
282  group (optional): Group of the collective.
283  """
284  assert torch.distributed.deprecated._initialized == _INITIALIZED_PG, \
285  "collective only supported in process-group mode"
286  return torch._C._dist_broadcast(tensor, src, group)
287 
288 
289 def all_reduce_multigpu(tensor_list, op=reduce_op.SUM, group=group.WORLD):
290  r"""Reduces the tensor data across all machines in such a way that all get
291  the final result. This function reduces a number of tensors on every node,
292  while each tensor resides on a different GPU.
293  Therefore, the input tensor in the tensor list needs to be GPU tensors.
294  Also, each tensor in the tensor list needs to reside on a different GPU.
295 
296  After the call, all tensors in :attr:`tensor_list` will be bitwise identical
297  in all processes.
298 
299  .. note::
300  Only NCCL backend is currently supported. :attr:`tensor_list` should only
301  contain GPU tensors.
302 
303  Arguments:
304  tensor_list (List[Tensor]): List of input and output tensors of
305  the collective. The function operates in-place and requires that
306  each tensor to be a GPU tensor on different GPUs.
307  You also need to make sure that ``len(tensor_list)`` is the same for
308  all the distributed processes calling this function.
309 
310  op (optional): One of the values from ``torch.distributed.deprecated.reduce_op``
311  enum. Specifies an operation used for element-wise reductions.
312  group (optional): Group of the collective.
313  """
314  assert torch.distributed.deprecated._initialized == _INITIALIZED_PG, \
315  "collective only supported in process-group mode"
316 
317  return torch._C._dist_all_reduce_multigpu(tensor_list, op, group)
318 
319 
320 def all_reduce(tensor, op=reduce_op.SUM, group=group.WORLD):
321  r"""Reduces the tensor data across all machines in such a way that all get
322  the final result.
323 
324  After the call :attr:`tensor` will be bitwise identical in all processes.
325 
326  Arguments:
327  tensor (Tensor): Input and output of the collective. The function
328  operates in-place.
329  op (optional): One of the values from ``torch.distributed.deprecated.reduce_op``
330  enum. Specifies an operation used for element-wise reductions.
331  group (optional): Group of the collective.
332  """
333  assert torch.distributed.deprecated._initialized == _INITIALIZED_PG, \
334  "collective only supported in process-group mode"
335  return torch._C._dist_all_reduce(tensor, op, group)
336 
337 
338 def reduce_multigpu(tensor_list, dst, op=reduce_op.SUM, group=group.WORLD):
339  r"""Reduces the tensor data on multiple GPUs across all machines. Each tensor
340  in :attr:`tensor_list` should reside on a separate GPU.
341 
342  Only the GPU of ``tensor_list[0]`` on the process with rank :attr:`dst` is
343  going to receive the final result.
344 
345  .. note::
346  Only NCCL backend is currently supported. :attr:`tensor_list` should only
347  contain GPU tensors.
348 
349  Arguments:
350  tensor_list (List[Tensor]): Input and output GPU tensors of the
351  collective. The function operates in-place.
352  You also need to make sure that ``len(tensor_list)`` is the same for
353  all the distributed processes calling this function.
354 
355  dst (int): Destination rank
356  op (optional): One of the values from ``torch.distributed.deprecated.reduce_op``
357  enum. Specifies an operation used for element-wise reductions.
358  group (optional): Group of the collective.
359  """
360  assert torch.distributed.deprecated._initialized == _INITIALIZED_PG, \
361  "collective only supported in process-group mode"
362 
363  return torch._C._dist_reduce_multigpu(tensor_list, dst, op, group)
364 
365 
366 def reduce(tensor, dst, op=reduce_op.SUM, group=group.WORLD):
367  r"""Reduces the tensor data across all machines.
368 
369  Only the process with rank :attr:`dst` is going to receive the final result.
370 
371  Arguments:
372  tensor (Tensor): Input and output of the collective. The function
373  operates in-place.
374  dst (int): Destination rank
375  op (optional): One of the values from ``torch.distributed.deprecated.reduce_op``
376  enum. Specifies an operation used for element-wise reductions.
377  group (optional): Group of the collective.
378  """
379  assert torch.distributed.deprecated._initialized == _INITIALIZED_PG, \
380  "collective only supported in process-group mode"
381  return torch._C._dist_reduce(tensor, dst, op, group)
382 
383 
384 def all_gather_multigpu(output_tensor_lists,
385  input_tensor_list,
386  group=group.WORLD):
387  r"""Gathers tensors from the whole group in a list.
388  Each tensor in :attr:`input_tensor_list` should reside on a separate GPU.
389 
390  .. note::
391  Only NCCL backend is currently supported. :attr:`output_tensor_lists` and
392  :attr:`input_tensor_list` should only contain GPU tensors.
393 
394  Arguments:
395  output_tensor_lists (List[List[Tensor]]): Output lists. It should
396  contain correctly-sized tensors on each GPU to be used for output of
397  the collective.
398  e.g. ``output_tensor_lists[i]`` contains the all_gather
399  result that resides on the GPU of ``input_tensor_list[i]``.
400  Note that each element of ``output_tensor_lists[i]`` has the size of
401  ``world_size * len(input_tensor_list)``, since the function all
402  gathers the result from every single GPU in the group. To interpret
403  each element of ``output_tensor_list[i]``, note that
404  ``input_tensor_list[j]`` of rank k will be appear in
405  ``output_tensor_list[i][rank * world_size + j]``
406  Also note that ``len(output_tensor_lists)``, and the size of each
407  element in ``output_tensor_lists`` (each element is a list,
408  therefore ``len(output_tensor_lists[i])``) need to be the same
409  for all the distributed processes calling this function.
410 
411  input_tensor_list (List[Tensor]): List of tensors (on different GPUs) to
412  be broadcast from current process.
413  Note that ``len(input_tensor_list)`` needs to be the same for
414  all the distributed processes calling this function.
415  group (optional): Group of the collective.
416  """
417  assert torch.distributed.deprecated._initialized == _INITIALIZED_PG, \
418  "collective only supported in process-group mode"
419 
420  flatten_tensor_list = []
421  for output_tensor_list in output_tensor_lists:
422  flatten_tensor_list.append(_flatten_dense_tensors(output_tensor_list))
423 
424  ret = torch._C._dist_all_gather_multigpu(flatten_tensor_list,
425  input_tensor_list,
426  group)
427 
428  for output_tensor_list, flatten_tensor in zip(output_tensor_lists,
429  flatten_tensor_list):
430  for tensor, value in zip(output_tensor_list,
431  _unflatten_dense_tensors(flatten_tensor,
432  output_tensor_list)):
433  tensor.copy_(value)
434 
435  return ret
436 
437 
438 def all_gather(tensor_list, tensor, group=group.WORLD):
439  r"""Gathers tensors from the whole group in a list.
440 
441  Arguments:
442  tensor_list (list[Tensor]): Output list. It should contain
443  correctly-sized tensors to be used for output of the collective.
444  tensor (Tensor): Tensor to be broadcast from current process.
445  group (optional): Group of the collective.
446  """
447  assert torch.distributed.deprecated._initialized == _INITIALIZED_PG, \
448  "collective only supported in process-group mode"
449  if _backend != dist_backend.NCCL:
450  return torch._C._dist_all_gather(tensor_list, tensor, group)
451  else:
452  return all_gather_multigpu([tensor_list], [tensor], group)
453 
454 
455 def gather(tensor, **kwargs):
456  r"""Gathers a list of tensors in a single process.
457 
458  Arguments:
459  tensor (Tensor): Input tensor.
460  dst (int): Destination rank. Required in all processes except the one that
461  is receiveing the data.
462  gather_list (list[Tensor]): List of appropriately-sized tensors to
463  use for received data. Required only in the receiving process.
464  group (optional): Group of the collective.
465  """
466  assert torch.distributed.deprecated._initialized == _INITIALIZED_PG, \
467  "collective only supported in process-group mode"
468  my_rank = get_rank()
469  dst = kwargs.pop('dst', my_rank)
470  gather_list = kwargs.pop('gather_list', None)
471  _group = kwargs.pop('group', group.WORLD)
472  if kwargs:
473  raise RuntimeError("got unexpected kwargs")
474  if dst == my_rank:
475  if gather_list is None:
476  raise RuntimeError("gather_list is a required argument in gather destination")
477  return torch._C._dist_gather_recv(gather_list, tensor, _group)
478  else:
479  if gather_list:
480  raise RuntimeError("non-empty gather_list can be given only to gather destination")
481  return torch._C._dist_gather_send(tensor, dst, _group)
482 
483 
484 def scatter(tensor, **kwargs):
485  r"""Scatters a list of tensors to all processes in a group.
486 
487  Each process will receive exactly one tensor and store its data in the
488  :attr:`tensor` argument.
489 
490  Arguments:
491  tensor (Tensor): Output tensor.
492  src (int): Source rank. Required in all processes except the one that
493  is sending the data.
494  scatter_list (list[Tensor]): List of tensors to scatter. Required only
495  in the process that is sending the data.
496  group (optional): Group of the collective.
497  """
498  assert torch.distributed.deprecated._initialized == _INITIALIZED_PG, \
499  "collective only supported in process-group mode"
500  my_rank = get_rank()
501  src = kwargs.pop('src', my_rank)
502  scatter_list = kwargs.pop('scatter_list', None)
503  _group = kwargs.pop('group', group.WORLD)
504  if kwargs:
505  raise RuntimeError("got unexpected kwargs: {}".format(", ".join(kwargs.keys())))
506  if src == my_rank:
507  if scatter_list is None:
508  raise RuntimeError("scatter_list is a required argument in scatter source")
509  return torch._C._dist_scatter_send(scatter_list, tensor, _group)
510  else:
511  if scatter_list:
512  raise RuntimeError("non-empty can be given only to scatter source")
513  return torch._C._dist_scatter_recv(tensor, src, _group)
514 
515 
516 def barrier(group=group.WORLD):
517  r"""Synchronizes all processes.
518 
519  This collective blocks processes until the whole group enters this function.
520 
521  Arguments:
522  group (optional): Group of the collective.
523  """
524  assert torch.distributed.deprecated._initialized == _INITIALIZED_PG, \
525  "collective only supported in process-group mode"
526  return torch._C._dist_barrier(group)
527 
528 
529 def new_group(ranks=None):
530  r"""Creates a new distributed group.
531 
532  This function requires that all processes in the main group (i.e., all
533  processes that are part of the distributed job) enter this function, even
534  if they are not going to be members of the group. Additionally, groups
535  should be created in the same order in all processes.
536 
537  Arguments:
538  ranks (list[int]): List of ranks of group members.
539 
540  Returns:
541  A handle of distributed group that can be given to collective calls.
542  """
543  assert torch.distributed.deprecated._initialized == _INITIALIZED_PG, \
544  "collective only supported in process-group mode"
545  if ranks is None:
546  ranks = list(range(get_world_size()))
547  return torch._C._dist_new_group(ranks)
548 
549 
550 def _clear_group_cache(group=group.WORLD):
551  r"""Clear the created distributed group's cached resource.
552 
553  Only NCCL backend is currently supported.
554 
555  Cached resource includes NCCL communicators and CUDA events.
556 
557  Arguments:
558  group (optional): Group of the collective.
559  """
560  return torch._C._dist_clear_group_cache(group)
561 
562 
563 def _register_stream(stream):
564  if not _initialized:
565  raise RuntimeError("torch.distributed.deprecated needs to be initialized first")
566  return torch._C._dist_register_stream(stream)