3 from torch._utils import _accumulate, _take_tensors, _flatten_dense_tensors, \
4 _flatten_sparse_tensors, _unflatten_dense_tensors, \
5 _unflatten_sparse_tensors, _reorder_tensors_as
8 def broadcast(tensor, devices):
9 """Broadcasts a tensor to a number of GPUs. 12 tensor (Tensor): tensor to broadcast. 13 devices (Iterable): an iterable of devices among which to broadcast. 14 Note that it should be like (src, dst1, dst2, ...), the first element 15 of which is the source device to broadcast from. 18 A tuple containing copies of the ``tensor``, placed on devices 19 corresponding to indices from ``devices``. 21 return torch._C._broadcast(tensor, devices)
24 def broadcast_coalesced(tensors, devices, buffer_size=10485760):
25 """Broadcasts a sequence tensors to the specified GPUs. 26 Small tensors are first coalesced into a buffer to reduce the number 30 tensors (sequence): tensors to broadcast. 31 devices (Iterable): an iterable of devices among which to broadcast. 32 Note that it should be like (src, dst1, dst2, ...), the first element 33 of which is the source device to broadcast from. 34 buffer_size (int): maximum size of the buffer used for coalescing 37 A tuple containing copies of the ``tensor``, placed on devices 38 corresponding to indices from ``devices``. 40 return torch._C._broadcast_coalesced(tensors, devices, buffer_size)
43 def reduce_add(inputs, destination=None):
44 """Sums tensors from multiple GPUs. 46 All inputs should have matching shapes. 49 inputs (Iterable[Tensor]): an iterable of tensors to add. 50 destination (int, optional): a device on which the output will be 51 placed (default: current device). 54 A tensor containing an elementwise sum of all inputs, placed on the 55 ``destination`` device. 59 if destination
is None:
61 input_size = inputs[0].size()
63 for i, inp
in enumerate(inputs):
64 assert inp.is_cuda,
"reduce_add expects all inputs to be on GPUs" 65 if inp.get_device() == destination:
67 if inp.size() != input_size:
68 got =
'x'.join(str(x)
for x
in inp.size())
69 expected =
'x'.join(str(x)
for x
in input_size)
70 raise ValueError(
"input {} has invalid size: got {}, but expected " 71 "{}".format(i, got, expected))
73 raise RuntimeError(
"reduce_add expects destination to be on the same GPU with one of the tensors")
74 result = inp.new(device=destination).resize_as_(inp).zero_()
76 if nccl.is_available(inputs)
and inputs[0].get_device() == destination:
77 outputs = [result] + [t.new(t.size())
for t
in inputs[1:]]
78 nccl.reduce(inputs, outputs, root=nccl_root)
81 input_correct_gpu = inp.cuda(result.get_device())
82 result.add_(input_correct_gpu)
86 def reduce_add_coalesced(inputs, destination=None, buffer_size=10485760):
87 """Sums tensors from multiple GPUs. 89 Small tensors are first coalesced into a buffer to reduce the number 93 inputs (Iterable[Iterable[Tensor]]): iterable of iterables that 94 contain tensors from a single device. 95 destination (int, optional): a device on which the output will be 96 placed (default: current device). 97 buffer_size (int): maximum size of the buffer used for coalescing 100 A tuple of tensors containing an elementwise sum of each group of 101 inputs, placed on the ``destination`` device. 105 dense_tensors = [[]
for _
in inputs]
109 for tensor_at_gpus
in zip(*inputs):
110 if all(t.is_sparse
for t
in tensor_at_gpus):
111 result = reduce_add(tensor_at_gpus, destination)
112 output.append(result)
113 ref_order.append(tensor_at_gpus[0])
115 for coll, t
in zip(dense_tensors, tensor_at_gpus):
116 coll.append(t.to_dense()
if t.is_sparse
else t)
117 ref_order.append(dense_tensors[0][-1])
118 itrs = [_take_tensors(tensors, buffer_size)
for tensors
in dense_tensors]
120 for chunks
in zip(*itrs):
121 flat_tensors = [_flatten_dense_tensors(chunk)
for chunk
in chunks]
122 flat_result = reduce_add(flat_tensors, destination)
123 for t
in _unflatten_dense_tensors(flat_result, chunks[0]):
127 output.append(t.data)
128 return tuple(_reorder_tensors_as(output, ref_order))
131 def scatter(tensor, devices, chunk_sizes=None, dim=0, streams=None):
132 """Scatters tensor across multiple GPUs. 135 tensor (Tensor): tensor to scatter. 136 devices (Iterable[int]): iterable of ints, specifying among which 137 devices the tensor should be scattered. 138 chunk_sizes (Iterable[int], optional): sizes of chunks to be placed on 139 each device. It should match ``devices`` in length and sum to 140 ``tensor.size(dim)``. If not specified, the tensor will be divided 142 dim (int, optional): A dimension along which to chunk the tensor. 145 A tuple containing chunks of the ``tensor``, spread across given 148 return tuple(torch._C._scatter(tensor, devices, chunk_sizes, dim, streams))
151 def gather(tensors, dim=0, destination=None):
152 """Gathers tensors from multiple GPUs. 154 Tensor sizes in all dimension different than ``dim`` have to match. 157 tensors (Iterable[Tensor]): iterable of tensors to gather. 158 dim (int): a dimension along which the tensors will be concatenated. 159 destination (int, optional): output device (-1 means CPU, default: 163 A tensor located on ``destination`` device, that is a result of 164 concatenating ``tensors`` along ``dim``. 166 return torch._C._gather(tensors, dim, destination)