Caffe2 - C++ API
A deep learning, cross platform ML framework
comm.cpp
1 #include <torch/csrc/cuda/comm.h>
2 
3 #include <torch/csrc/cuda/device_set.h>
4 #include <torch/csrc/utils/tensor_flatten.h>
5 
6 #ifdef USE_NCCL
7 #include <torch/csrc/cuda/nccl.h>
8 #endif
9 
10 #include <ATen/ATen.h>
11 #include <ATen/cuda/CUDAContext.h>
12 #include <c10/cuda/CUDAGuard.h>
13 #include <c10/util/Optional.h>
14 #include <torch/csrc/autograd/variable.h>
15 
16 #include <cstddef>
17 #include <vector>
18 
19 
20 // The following code is used to ensure torch is linked against caffe2_gpu.
21 #ifdef _MSC_VER
22 namespace {
23 #pragma optimize("", off)
24  int warp_size() {
25  return at::cuda::warp_size();
26  }
27 #pragma optimize("", on)
28 }
29 #endif
30 
31 namespace torch { namespace cuda {
32 using namespace at;
33 using namespace torch::autograd;
34 
35 // Some operations can be performed more efficiently if we're handling tensors
36 // of a single type only. Adding this logic directly in the loop makes it a bit
37 // ugly, so here's a helper for it.
39  void show(const at::Type& t) {
40  if (!unique) return;
41  if (!type) type = &t;
42  unique = (type == &t);
43  }
44 
45  const at::Type *type = nullptr;
46  bool unique = true;
47 };
48 
49 std::vector<Tensor> broadcast(const Tensor& tensor, IntArrayRef devices) {
50  auto & type = tensor.type();
51  if (type.is_cuda() && tensor.get_device() != devices[0])
52  throw std::runtime_error("device of broadcasted tensor must appear as the "
53  "first on devices list");
54  std::vector<Tensor> tensors;
55  tensors.reserve(devices.size());
56  at::cuda::OptionalCUDAGuard _device_guard;
57 #ifdef USE_NCCL
58  if (nccl::is_available({tensor})) {
59  tensors.push_back(tensor);
60  for (auto device : devices.slice(1)) {
61  _device_guard.set_index(device);
62  tensors.push_back(at::empty(tensor.sizes(), type.options()));
63  }
64  nccl::broadcast(tensors);
65  } else {
66 #else
67  {
68 #endif
69  auto & gpu_type = type.toBackend(type.is_sparse() ? at::Backend::SparseCUDA : at::Backend::CUDA);
70  if (type.is_cuda()) {
71  tensors.push_back(tensor);
72  }
73  IntArrayRef loop_devices = type.is_cuda() ? devices.slice(1) : devices;
74  for (auto device : loop_devices) {
75  _device_guard.set_index(device);
76  tensors.push_back(gpu_type.copy(tensor, true));
77  }
78  }
79  return tensors;
80 }
81 
82 // NOTE [ Version Counter in comm.*_coalesced ]
83 //
84 // broadcast_coalesced
85 // ~~~~~~~~~~~~~~~~~~~
86 //
87 // In broadcast_coalesced, multiple variables may be coalesced into a single
88 // large one, broadcast to other devices, and the get split according to the
89 // original shapes.
90 //
91 // When splitting, the view operations will make all Variables broadcast
92 // together to share a single version counter, because they are all views of the
93 // large Variable. However, that large Variable is immediately discarded and all
94 // these Varaibles do not share storage at all.
95 //
96 // For example, when two buffers are broadcast together in `DataParallel` and
97 // one of them is modified in-place during `forward` but the other is needed in
98 // backward, autograd engine will complain.
99 //
100 // We thus re-wrap these Variables after broadcasting (i.e., effetively doing
101 // what is equivalent to .data in Python), and give them individual version
102 // counters.
103 //
104 // NB: For `device[0]` in broadcast_coalesced, the input Variables are always
105 // returned as-is, so **do not** re-wrap them.
106 //
107 // reduce_add_coalesced
108 // ~~~~~~~~~~~~~~~~~~~~
109 //
110 // Similarly for reduce_add_coalesced, when the output are newly created
111 // Variables.
112 tensor_list2d broadcast_coalesced(TensorList tensors, IntArrayRef devices, size_t buffer_size) {
113  if (!std::all_of(tensors.begin(), tensors.end(),
114  [&](const at::Tensor& t) { return t.get_device() == devices[0]; })) {
115  throw std::runtime_error("all tensors must be on devices[0]");
116  }
117 #ifdef USE_NCCL
118  buffer_size = std::min(torch::cuda::nccl::get_max_count(), buffer_size);
119 #endif
120 
121  tensor_list2d outputs(devices.size());
122  outputs[0] = tensors.vec();
123  for (auto & o : outputs)
124  o.reserve(tensors.size());
125 
126  unique_type_checker type_checker;
127  at::cuda::CUDAGuard device_guard(devices[0]);
128  for (auto & chunk : utils::take_tensors(tensors, buffer_size)) {
129  auto & type = chunk.type();
130  type_checker.show(type);
131  std::vector<at::Tensor> results;
132  if (chunk.type().is_sparse()) {
133  auto flat_tuple = utils::flatten_sparse_tensors(chunk.tensors);
134  std::vector<at::Tensor> broadcast_indices = broadcast(flat_tuple.first, devices);
135  std::vector<at::Tensor> broadcast_values = broadcast(flat_tuple.second, devices);
136  results.reserve(devices.size());
137  for (size_t i = 1, num_devices = devices.size(); i < num_devices; ++i) {
138  device_guard.set_index(devices[i]);
139  auto & device_outputs = outputs[i];
140  auto & inds = broadcast_indices[i];
141  auto & vals = broadcast_values[i];
142  for (auto & t : utils::unflatten_sparse_tensors(inds, vals, chunk.tensors)) {
143  // See NOTE [ Version Counter in comm.*_coalesced ]
144  AT_ASSERT(t.is_variable());
145  Variable var = t;
146  device_outputs.push_back(make_variable(var.data(), false));
147  }
148  }
149  } else {
150  std::vector<Tensor> results = broadcast(utils::flatten_dense_tensors(chunk.tensors),
151  devices);
152  for (size_t i = 1, num_devices = devices.size(); i < num_devices; ++i) {
153  device_guard.set_index(devices[i]);
154  auto & device_outputs = outputs[i];
155  for (auto & t : utils::unflatten_dense_tensors(results[i], chunk.tensors)) {
156  // See NOTE [ Version Counter in comm.*_coalesced ]
157  AT_ASSERT(t.is_variable());
158  Variable var = t;
159  device_outputs.push_back(make_variable(var.data(), false));
160  }
161  }
162  }
163  }
164 
165  // If we only saw a single tensor type, then we can skip expensive reordering
166  if (!type_checker.unique) {
167  for (auto & o : outputs)
168  utils::reorder_tensors_like(o, tensors);
169  }
170  return outputs;
171 }
172 
173 std::vector<at::Tensor> scatter(
174  const at::Tensor& tensor,
175  at::IntArrayRef devices,
176  const c10::optional<std::vector<int64_t>>& chunk_sizes,
177  int64_t dim,
178  const c10::optional<std::vector<c10::optional<at::cuda::CUDAStream>>>& streams) {
179  std::vector<at::Tensor> chunks;
180  if (chunk_sizes) {
181  const int64_t chunk_size_sum =
182  std::accumulate(chunk_sizes->begin(), chunk_sizes->end(), int64_t{0});
183  AT_CHECK(
184  chunk_size_sum == tensor.size(dim),
185  "given chunk sizes don't sum up to the tensor's size ",
186  "(sum(chunk_sizes) == ", chunk_size_sum,
187  ", but expected ", tensor.size(dim), ")");
188  chunks.reserve(chunk_sizes->size());
189  int64_t chunk_start = 0;
190  for (size_t chunk = 0; chunk < chunk_sizes->size(); ++chunk) {
191  const int64_t chunk_size = (*chunk_sizes)[chunk];
192  AT_CHECK(chunk_size > 0, "Chunk size must be positive");
193  chunks.push_back(tensor.narrow(dim, chunk_start, chunk_size));
194  chunk_start += chunk_size;
195  }
196  AT_ASSERT(chunks.size() == chunk_sizes->size());
197  } else {
198  chunks = tensor.chunk(/*chunks=*/devices.size(), /*dim=*/dim);
199  }
201  for (size_t chunk = 0; chunk < chunks.size(); ++chunk) {
202  const auto device_index = static_cast<int16_t>(devices[chunk]);
203  if (streams && (*streams)[chunk]) {
204  AT_CHECK(
205  (*streams)[chunk]->device_index() == device_index,
206  "Expected the device associated with the stream at index ",
207  chunk, " (was ", (*streams)[chunk]->device_index(), ") ",
208  "to match the device supplied at that index ",
209  "(expected ", device_index, ")");
210  cuda_guard.reset_stream(*(*streams)[chunk]);
211  }
212  chunks[chunk] = chunks[chunk].contiguous().to(
213  {at::DeviceType::CUDA, device_index}, /*non_blocking=*/true);
214  }
215  return chunks;
216 }
217 
218 at::Tensor gather(
219  at::TensorList tensors,
220  int64_t dim,
221  c10::optional<int32_t> destination_index) {
222  AT_CHECK(!tensors.empty(), "Expected at least one tensor to gather from");
223  at::Tensor result;
224  int64_t total_size = 0;
225  auto& first = tensors.front();
226  const auto first_size = first.sizes();
227  std::vector<int64_t> expected_size(first_size.begin(), first_size.end());
228  for (const auto& tensor : tensors) {
229  AT_CHECK(
230  tensor.is_cuda(), "Gather expects all inputs to have CUDA type");
231  AT_ASSERT(tensor.ndimension() == static_cast<int64_t>(expected_size.size()));
232  expected_size[dim] = tensor.size(dim);
233  for (size_t dimension = 0; dimension < expected_size.size(); ++dimension) {
234  AT_CHECK(
235  expected_size[dimension] == tensor.size(dimension),
236  "Gather got an input of invalid size: got ",
237  tensor.sizes(), ", but expected ", at::IntArrayRef(expected_size));
238  }
239  total_size += tensor.size(dim);
240  }
241  expected_size[dim] = total_size;
242  at::Device device(at::DeviceType::CPU);
243  if (!destination_index || *destination_index != -1) {
244  device = at::Device(at::DeviceType::CUDA, destination_index ? *destination_index : -1);
245  }
246  result = at::empty(expected_size, first.options().device(device));
247 
248  int64_t chunk_start = 0;
249  for (const auto& tensor : tensors) {
250  result.narrow(dim, chunk_start, tensor.size(dim))
251  .copy_(tensor, /*non_blocking=*/true);
252  chunk_start += tensor.size(dim);
253  }
254  return result;
255 }
256 }} // namespace torch::cuda
AT_CPP14_CONSTEXPR const T & front() const
front - Get the first element.
Definition: ArrayRef.h:143
A variant of OptionalDeviceGuard that is specialized for CUDA.
Definition: CUDAGuard.h:65
int64_t get_device() const
Returns a Tensor&#39;s device index.
AT_CPP14_CONSTEXPR ArrayRef< T > slice(size_t N, size_t M) const
slice(n, m) - Chop off the first N elements of the array, and keep M elements in the array...
Definition: ArrayRef.h:161
Represents a a compute device on which a tensor is located.
Definition: Device.h:30
constexpr size_t size() const
size - Get the array size.
Definition: ArrayRef.h:138
bool is_variable() const noexcept
Returns true if the Tensor is actually a torch::autograd::Variable.
bool is_cuda() const
Returns if a Tensor has CUDA backend.
A variant of OptionalStreamGuard that is specialized for CUDA.
Definition: CUDAGuard.h:174
constexpr bool empty() const
empty - Check if the array is empty.
Definition: ArrayRef.h:129
Variable A Variable augments a Tensor with the ability to interact in our autograd machinery...
Definition: variable.h:85
void set_index(DeviceIndex device_index)
Sets the CUDA device to the given device index.
Definition: CUDAGuard.h:49
Definition: jit_type.h:17
Flush-To-Zero and Denormals-Are-Zero mode.
A variant of DeviceGuard that is specialized for CUDA.
Definition: CUDAGuard.h:20
void set_index(DeviceIndex device_index)
Sets the CUDA device to the given device index, initializing the guard if it is not already initializ...
Definition: CUDAGuard.h:97
void reset_stream(Stream stream)
Resets the currently set CUDA stream to the original stream and the currently set device to the origi...
Definition: CUDAGuard.h:203