Caffe2 - C++ API
A deep learning, cross platform ML framework
nccl.cpp
1 #include <torch/csrc/cuda/nccl.h>
2 #include <torch/csrc/cuda/device_set.h>
3 #include <ATen/core/functional.h>
4 #include <torch/csrc/utils/hash.h>
5 
6 #include <ATen/ATen.h>
7 #include <c10/cuda/CUDAGuard.h>
8 #include <c10/util/Exception.h>
9 
10 #include <THC/THC.h>
11 
12 #include <limits>
13 #include <sstream>
14 #include <type_traits>
15 #include <unordered_map>
16 
17 namespace torch {
18 namespace cuda {
19 namespace nccl {
20 
21 using namespace at;
22 
23 namespace detail {
24 
25 void throw_nccl_error(ncclResult_t status) {
26  std::ostringstream err;
27  err << "NCCL Error " << status << ": " << ncclGetErrorString(status);
28  throw std::runtime_error(err.str());
29 }
30 
31 struct NcclCommList {
32  std::unique_ptr<ncclComm_t[]> comms;
33  int ndevices;
34  NcclCommList(const std::vector<int>& devices)
35  : comms(new ncclComm_t[devices.size()]), ndevices(devices.size()) {
36  NCCL_CHECK(ncclCommInitAll(comms.get(), devices.size(), devices.data()));
37  }
38  NcclCommList(NcclCommList&& foo) = default;
39  ~NcclCommList() {
40  /*
41  * TODO(T30279827) Temporarily disable calling ncclCommDestroy
42  * Calling ncclCommDestroy while program exiting is undefined
43  * according to Nvidia, and lead to segfault in NCCL 2
44  * (whether it is called before or after the CUDA runtime destructor).
45  * Temporarily disable it in destructor to avoid segfault.
46  * Following up with Nvidia for long term solution.
47  */
48  return;
49 
50  if (comms) {
51  for (int i = 0; i < ndevices; i++) {
52  int dummy_var;
53  if (cudaGetDevice(&dummy_var) != cudaSuccess) {
54  /* there are cases when this destructor is called after the
55  CUDA driver is already unloaded from the process.
56  In these cases, skip ncclCommDestroy */
57  return;
58  }
59  ncclCommDestroy(comms[i]);
60  }
61  }
62  }
63  ArrayRef<ncclComm_t> ref() const {
64  return ArrayRef<ncclComm_t>(comms.get(), ndevices);
65  }
66 };
67 
68 using device_list = std::vector<int>;
69 // accesses to this object have to be guarded by THC's CudaFreeMutex
70 static std::unordered_map<device_list, NcclCommList, torch::hash<device_list>>
71  _communicators;
72 
73 ArrayRef<ncclComm_t> _get_communicators(TensorList inputs) {
74  static auto get_device = [](const at::Tensor& t) -> int {
75  return t.get_device();
76  };
77  device_list devices = fmap(inputs, get_device);
78  auto it = _communicators.find(devices);
79  if (it == _communicators.end())
80  std::tie(it, std::ignore) = _communicators.emplace(devices, devices);
81  return it->second.ref();
82 }
83 
84 ncclDataType_t _get_data_type(const Tensor& t) {
85  if (t.type().backend() != Backend::CUDA) {
86  throw std::runtime_error("Unconvertible NCCL type");
87  }
88  switch (t.scalar_type()) {
89  case at::kFloat:
90  return ncclFloat;
91  case at::kHalf:
92  return ncclHalf;
93  case at::kDouble:
94  return ncclDouble;
95  case at::kLong:
96  return ncclInt64;
97  case at::kInt:
98  return ncclInt;
99  case at::kChar:
100  return ncclChar;
101  case at::kByte:
102  return ncclChar;
103  default:
104  throw std::runtime_error("Unconvertible NCCL type");
105  }
106 }
107 
108 void _check_inputs(
109  TensorList inputs,
110  TensorList outputs,
111  int input_multiplier,
112  int output_multiplier) {
113  // len(inputs) == len(outputs)
114  size_t len = inputs.size();
115 
116  if (len <= 0) {
117  throw std::runtime_error("input sequence can't be empty");
118  }
119 
120  if (len != outputs.size()) {
121  std::stringstream err;
122  err << "inputs and outputs sequences have to be of the same length, but got input of length "
123  << len << " and output of length " << outputs.size();
124  throw std::runtime_error(err.str());
125  }
126 
127  device_set devices;
128  int64_t numel = inputs[0].numel();
129  auto& type = inputs[0].type();
130 
131  for (size_t i = 0; i < len; i++) {
132  auto input = inputs[i];
133  auto output = outputs[i];
134 
135  if (!(input.is_cuda() && !input.is_sparse() &&
136  output.is_cuda() && !output.is_sparse())) {
137  throw std::runtime_error(
138  "input and output elements have to be cuda dense Tensors");
139  }
140 
141  if (!(type == input.type() && type == output.type())) {
142  throw std::runtime_error(
143  "all inputs and outputs must be of the same Tensor type");
144  }
145 
146  if (!input.is_contiguous() || !output.is_contiguous()) {
147  throw std::runtime_error("all inputs and outputs have to be contiguous");
148  }
149 
150  auto input_device = input.get_device();
151  // inputs must be on unique devices
152  if (devices.test(input_device)) {
153  throw std::runtime_error("inputs must be on unique devices");
154  }
155  devices.set(input_device);
156 
157  // inputs and outputs must be on same device respectively
158  if (input_device != output.get_device()) {
159  throw std::runtime_error("input and output must be on the same device");
160  }
161 
162  // all inputs must be same size
163  if (input.numel() != numel) {
164  throw std::runtime_error(
165  "all inputs must have the same number of elements");
166  }
167 
168  if (output.numel() * output_multiplier != numel * input_multiplier) {
169  throw std::runtime_error(
170  "output must be of size input_size * size_multiplier");
171  }
172  }
173 }
174 
175 } // namespace detail
176 
177 bool is_available(TensorList tensors) {
178 #ifdef USE_NCCL
179  device_set devices;
180  for (auto& tensor : tensors) {
181  auto& type = tensor.type();
182  if (!type.is_cuda() || type.is_sparse())
183  return false;
184  if (!tensor.is_contiguous())
185  return false;
186  auto device = tensor.get_device();
187  if (devices[device])
188  return false;
189  devices[device] = true;
190  }
191  return true;
192 #else
193  return false;
194 #endif
195 }
196 
197 std::uint64_t version() {
198 #if defined(NCCL_MAJOR)
199  return NCCL_MAJOR * 1000 + NCCL_MINOR * 100 + NCCL_PATCH;
200 #elif defined(USE_NCCL)
201  return 1000;
202 #else
203  return 0;
204 #endif
205 }
206 
207 namespace {
208 // NCCL changed the numerical type used for count between NCCL1 and NCCL2.
209 // So we use the following struct, which gets the type of the second argument
210 // of T, if T is a function type, with ncclBcast, to get that type statically
211 // and programmatically.
212 
213 template <typename T>
214 struct GetSecondArgType;
215 
216 template <typename R, typename Arg0, typename Arg1, typename... Args>
217 struct GetSecondArgType<R(Arg0, Arg1, Args...)> {
218  typedef typename std::decay<Arg1>::type type;
219 };
220 
221 constexpr auto count_max =
222  std::numeric_limits<GetSecondArgType<decltype(ncclBcast)>::type>::max();
223 } // namespace
224 
225 size_t get_max_count() {
226  return count_max;
227 }
228 
229 void broadcast(
230  TensorList tensors,
231  const stream_list& streams,
232  const comm_list& user_comms) {
233 #ifdef USE_NCCL
234  using namespace torch::cuda::nccl::detail;
235  _check_inputs(tensors, tensors, 1, 1);
236  ncclDataType_t data_type = _get_data_type(tensors[0]);
237  int64_t numel = tensors[0].numel();
238 
239  std::lock_guard<std::mutex> free_mutex(
240  *(c10::cuda::CUDACachingAllocator::getFreeMutex()));
241  const auto comms = user_comms.empty() ? _get_communicators(tensors)
242  : ArrayRef<ncclComm_t>(user_comms);
243 
244  at::cuda::OptionalCUDAGuard device_guard;
245  AutoNcclGroup nccl_group_guard;
246  for (size_t i = 0, num_tensors = tensors.size(); i < num_tensors; i++) {
247  int device = tensors[i].get_device();
248  device_guard.set_index(device);
249  // Default to the current stream
250  const auto stream = (streams.empty() || !streams[i])
251  ? at::cuda::getCurrentCUDAStream(device).stream()
252  : streams[i]->stream();
253  AT_CHECK(
254  static_cast<uint64_t>(numel) <= static_cast<uint64_t>(count_max),
255  "Broadcast tensor has ",
256  numel,
257  " elements, which exceeds the "
258  "maximum NCCL supports (",
259  count_max,
260  ")");
261  NCCL_CHECK(ncclBcast(
262  tensors[i].data_ptr(), numel, data_type, 0, comms[i], stream));
263  }
264 #else
265  AT_ERROR("PyTorch built without NCCL support");
266 #endif
267 }
268 
269 void reduce(
270  const std::vector<at::Tensor>& inputs,
271  std::vector<at::Tensor>& outputs,
272  int32_t root,
273  int32_t op,
274  const stream_list& streams,
275  const comm_list& user_comms) {
276 #ifdef USE_NCCL
277  using namespace torch::cuda::nccl::detail;
278  AT_CHECK(
279  root >= 0 && static_cast<size_t>(root) < inputs.size(), "invalid root");
280 
281  _check_inputs(inputs, outputs, 1, 1);
282  const auto len = inputs.size();
283 
284  ncclDataType_t data_type = _get_data_type(inputs[0]);
285 
286  const auto count = inputs[0].numel();
287  std::lock_guard<std::mutex> lock(*(c10::cuda::CUDACachingAllocator::getFreeMutex()));
288  auto comms_ref = user_comms.empty() ? _get_communicators(inputs)
289  : ArrayRef<ncclComm_t>(user_comms);
290 
291  at::cuda::OptionalCUDAGuard device_guard;
292  AutoNcclGroup nccl_group_guard;
293  for (size_t i = 0; i < len; i++) {
294  int device = inputs[i].device().index();
295  device_guard.set_index(device);
296  // Default to the current stream
297  const auto stream = (streams.empty() || !streams[i])
298  ? at::cuda::getCurrentCUDAStream(device).stream()
299  : streams[i]->stream();
300 
301  NCCL_CHECK(ncclReduce(
302  inputs[i].data_ptr(),
303  outputs[i].data_ptr(),
304  count,
305  data_type,
306  (ncclRedOp_t)op,
307  root,
308  comms_ref[i],
309  stream));
310  }
311 #else
312  AT_ERROR("PyTorch built without NCCL support");
313 #endif
314 }
315 
316 void reduce(
317  std::vector<at::Tensor>& inputs,
318  int32_t root,
319  int32_t op,
320  const stream_list& streams,
321  const comm_list& user_comms) {
322  reduce(inputs, /*outputs=*/inputs, root, op, streams, user_comms);
323 }
324 } // namespace nccl
325 } // namespace cuda
326 } // namespace torch
A variant of OptionalDeviceGuard that is specialized for CUDA.
Definition: CUDAGuard.h:65
int64_t get_device() const
Returns a Tensor&#39;s device index.
constexpr size_t size() const
size - Get the array size.
Definition: ArrayRef.h:138
Definition: jit_type.h:17
ArrayRef - Represent a constant reference to an array (0 or more elements consecutively in memory)...
Definition: ArrayRef.h:41
Flush-To-Zero and Denormals-Are-Zero mode.