Caffe2 - Python API
A deep learning, cross platform ML framework
comm.py
1 import torch
2 from . import nccl
3 from torch._utils import _accumulate, _take_tensors, _flatten_dense_tensors, \
4  _flatten_sparse_tensors, _unflatten_dense_tensors, \
5  _unflatten_sparse_tensors, _reorder_tensors_as
6 
7 
8 def broadcast(tensor, devices):
9  """Broadcasts a tensor to a number of GPUs.
10 
11  Arguments:
12  tensor (Tensor): tensor to broadcast.
13  devices (Iterable): an iterable of devices among which to broadcast.
14  Note that it should be like (src, dst1, dst2, ...), the first element
15  of which is the source device to broadcast from.
16 
17  Returns:
18  A tuple containing copies of the ``tensor``, placed on devices
19  corresponding to indices from ``devices``.
20  """
21  return torch._C._broadcast(tensor, devices)
22 
23 
24 def broadcast_coalesced(tensors, devices, buffer_size=10485760):
25  """Broadcasts a sequence tensors to the specified GPUs.
26  Small tensors are first coalesced into a buffer to reduce the number
27  of synchronizations.
28 
29  Arguments:
30  tensors (sequence): tensors to broadcast.
31  devices (Iterable): an iterable of devices among which to broadcast.
32  Note that it should be like (src, dst1, dst2, ...), the first element
33  of which is the source device to broadcast from.
34  buffer_size (int): maximum size of the buffer used for coalescing
35 
36  Returns:
37  A tuple containing copies of the ``tensor``, placed on devices
38  corresponding to indices from ``devices``.
39  """
40  return torch._C._broadcast_coalesced(tensors, devices, buffer_size)
41 
42 
43 def reduce_add(inputs, destination=None):
44  """Sums tensors from multiple GPUs.
45 
46  All inputs should have matching shapes.
47 
48  Arguments:
49  inputs (Iterable[Tensor]): an iterable of tensors to add.
50  destination (int, optional): a device on which the output will be
51  placed (default: current device).
52 
53  Returns:
54  A tensor containing an elementwise sum of all inputs, placed on the
55  ``destination`` device.
56  """
57  # TODO: try to find an input on another gpu, copy it,
58  # and accumulate into the copy
59  if destination is None:
60  destination = torch.cuda.current_device()
61  input_size = inputs[0].size()
62  nccl_root = None
63  for i, inp in enumerate(inputs):
64  assert inp.is_cuda, "reduce_add expects all inputs to be on GPUs"
65  if inp.get_device() == destination:
66  nccl_root = i
67  if inp.size() != input_size:
68  got = 'x'.join(str(x) for x in inp.size())
69  expected = 'x'.join(str(x) for x in input_size)
70  raise ValueError("input {} has invalid size: got {}, but expected "
71  "{}".format(i, got, expected))
72  if nccl_root is None:
73  raise RuntimeError("reduce_add expects destination to be on the same GPU with one of the tensors")
74  result = inp.new(device=destination).resize_as_(inp).zero_()
75 
76  if nccl.is_available(inputs) and inputs[0].get_device() == destination:
77  outputs = [result] + [t.new(t.size()) for t in inputs[1:]]
78  nccl.reduce(inputs, outputs, root=nccl_root)
79  return result
80  for inp in inputs:
81  input_correct_gpu = inp.cuda(result.get_device())
82  result.add_(input_correct_gpu)
83  return result
84 
85 
86 def reduce_add_coalesced(inputs, destination=None, buffer_size=10485760):
87  """Sums tensors from multiple GPUs.
88 
89  Small tensors are first coalesced into a buffer to reduce the number
90  of synchronizations.
91 
92  Arguments:
93  inputs (Iterable[Iterable[Tensor]]): iterable of iterables that
94  contain tensors from a single device.
95  destination (int, optional): a device on which the output will be
96  placed (default: current device).
97  buffer_size (int): maximum size of the buffer used for coalescing
98 
99  Returns:
100  A tuple of tensors containing an elementwise sum of each group of
101  inputs, placed on the ``destination`` device.
102  """
103  # TODO: When `len(inputs) == 1` and all inputs are on `destination`, just
104  # return `inputs`.
105  dense_tensors = [[] for _ in inputs] # shape (num_gpus, num_tensors)
106  output = []
107  ref_order = []
108  # process sparse ones first since they may have different sizes on different gpus
109  for tensor_at_gpus in zip(*inputs):
110  if all(t.is_sparse for t in tensor_at_gpus):
111  result = reduce_add(tensor_at_gpus, destination)
112  output.append(result)
113  ref_order.append(tensor_at_gpus[0])
114  else:
115  for coll, t in zip(dense_tensors, tensor_at_gpus):
116  coll.append(t.to_dense() if t.is_sparse else t)
117  ref_order.append(dense_tensors[0][-1])
118  itrs = [_take_tensors(tensors, buffer_size) for tensors in dense_tensors]
119  # now the dense ones, which have consistent sizes
120  for chunks in zip(*itrs):
121  flat_tensors = [_flatten_dense_tensors(chunk) for chunk in chunks]
122  flat_result = reduce_add(flat_tensors, destination)
123  for t in _unflatten_dense_tensors(flat_result, chunks[0]):
124  # The unflattened tensors do not share storage, and we don't expose
125  # base flat tensor anyways, so give them different version counters.
126  # See NOTE [ Version Counter in comm.*_coalesced ]
127  output.append(t.data)
128  return tuple(_reorder_tensors_as(output, ref_order))
129 
130 
131 def scatter(tensor, devices, chunk_sizes=None, dim=0, streams=None):
132  """Scatters tensor across multiple GPUs.
133 
134  Arguments:
135  tensor (Tensor): tensor to scatter.
136  devices (Iterable[int]): iterable of ints, specifying among which
137  devices the tensor should be scattered.
138  chunk_sizes (Iterable[int], optional): sizes of chunks to be placed on
139  each device. It should match ``devices`` in length and sum to
140  ``tensor.size(dim)``. If not specified, the tensor will be divided
141  into equal chunks.
142  dim (int, optional): A dimension along which to chunk the tensor.
143 
144  Returns:
145  A tuple containing chunks of the ``tensor``, spread across given
146  ``devices``.
147  """
148  return tuple(torch._C._scatter(tensor, devices, chunk_sizes, dim, streams))
149 
150 
151 def gather(tensors, dim=0, destination=None):
152  """Gathers tensors from multiple GPUs.
153 
154  Tensor sizes in all dimension different than ``dim`` have to match.
155 
156  Arguments:
157  tensors (Iterable[Tensor]): iterable of tensors to gather.
158  dim (int): a dimension along which the tensors will be concatenated.
159  destination (int, optional): output device (-1 means CPU, default:
160  current device)
161 
162  Returns:
163  A tensor located on ``destination`` device, that is a result of
164  concatenating ``tensors`` along ``dim``.
165  """
166  return torch._C._gather(tensors, dim, destination)
def current_device()
Definition: __init__.py:349