Caffe2 - C++ API
A deep learning, cross platform ML framework
Module.cpp
1 #include <torch/csrc/python_headers.h>
2 
3 #include <memory>
4 #include <unordered_map>
5 #include <vector>
6 
7 #include <torch/csrc/utils/python_strings.h>
8 #include <torch/csrc/distributed/THDP.h>
9 #include <torch/csrc/PythonTypes.h>
10 #include <torch/csrc/autograd/python_variable.h>
11 
12 #ifdef USE_CUDA
13 #include <torch/csrc/cuda/Stream.h>
14 #endif
15 
16 
17 static std::unordered_map<std::string, THDChannelType> name2channel_type = {
18  {"mpi", THDChannelMPI},
19  {"tcp", THDChannelTCP},
20  {"gloo", THDChannelGloo},
21  {"nccl", THDChannelNccl},
22 };
23 
24 static std::unordered_map<PyObject*, THDReduceOp> obj2reduceop;
25 static std::unordered_map<PyObject*, THDGroup> obj2group;
26 
27 #ifdef USE_CUDA
28 extern THCState* state;
29 #endif
30 
31 PyObject* THDPModule_initProcessGroup(PyObject *_unused, PyObject *args)
32 {
33  HANDLE_TH_ERRORS
34  if (PyTuple_GET_SIZE(args) != 5 || !THPUtils_checkString(PyTuple_GET_ITEM(args, 0)) ||
35  !THPUtils_checkString(PyTuple_GET_ITEM(args, 1)) ||
36  !THPUtils_checkLong(PyTuple_GET_ITEM(args, 2)) ||
37  !THPUtils_checkString(PyTuple_GET_ITEM(args, 3)) ||
38  !THPUtils_checkLong(PyTuple_GET_ITEM(args, 4))) {
39  THPUtils_invalidArguments(args, nullptr, "init_process_group", 1, "(string backend, string init_method, int world_size, string group_name, int rank)");
40  return nullptr;
41  }
42 
43  std::string backend_name = THPUtils_unpackString(PyTuple_GET_ITEM(args, 0));
44  std::string init_method = THPUtils_unpackString(PyTuple_GET_ITEM(args, 1));
45  int world_size = THPUtils_unpackLong(PyTuple_GET_ITEM(args, 2));
46  std::string group_name = THPUtils_unpackString(PyTuple_GET_ITEM(args, 3));
47  int rank = THPUtils_unpackLong(PyTuple_GET_ITEM(args, 4));
48 
49  THDChannelType channel_type = name2channel_type.at(backend_name);
50  {
51  AutoNoGIL nogil;
52  THDProcessGroupInit(channel_type, init_method, world_size, group_name, rank);
53  }
54 #ifdef USE_CUDA
55  THDSetCudaStatePtr(&state);
56 #endif
57  Py_RETURN_NONE;
58  END_HANDLE_TH_ERRORS
59 }
60 
61 PyObject* THDPModule_destroyProcessGroup(PyObject *_unused) {
62  HANDLE_TH_ERRORS
63  {
64  AutoNoGIL nogil;
65  THDProcessGroupDestroy();
66  }
67  Py_RETURN_NONE;
68  END_HANDLE_TH_ERRORS
69 }
70 
71 #ifdef USE_CUDA
72 PyObject* THDPModule_registerStream(PyObject *_unused, PyObject *_stream)
73 {
74  HANDLE_TH_ERRORS
75  THPUtils_assert(THCPStream_Check(_stream), "_register_stream expects a "
76  "torch.cuda.Stream object");
77  THCPStream *stream = (THCPStream*)_stream;
78  THDRegisterCudaStream(stream->cuda_stream);
79  Py_RETURN_NONE;
80  END_HANDLE_TH_ERRORS
81 }
82 #endif
83 
84 PyObject* THDPModule_getRank(PyObject *_unused)
85 {
86  HANDLE_TH_ERRORS
87  return PyInt_FromLong(THDGetRank());
88  END_HANDLE_TH_ERRORS
89 }
90 
91 PyObject* THDPModule_getNumProcesses(PyObject *_unused)
92 {
93  HANDLE_TH_ERRORS
94  return PyInt_FromLong(THDGetNumProcesses());
95  END_HANDLE_TH_ERRORS
96 }
97 
98 #ifdef USE_CUDA
99 extern PyObject* THCPDoubleTensorClass;
100 extern PyObject* THCPFloatTensorClass;
101 extern PyObject* THCPHalfTensorClass;
102 extern PyObject* THCPLongTensorClass;
103 extern PyObject* THCPIntTensorClass;
104 extern PyObject* THCPShortTensorClass;
105 extern PyObject* THCPCharTensorClass;
106 extern PyObject* THCPByteTensorClass;
107 #endif
108 
109 THDTensorDescriptor THDPModule_makeDescriptor(PyObject *obj) {
110  auto var = (THPVariable*)obj;
111  return var->cdata.data();
112 }
113 
114 static THDRequest* _unpackRequest(PyObject *obj)
115 {
116  return static_cast<THDRequest*>(THPWrapper_get(obj));
117 }
118 
119 static THDReduceOp _getReduceOp(PyObject *obj)
120 {
121  auto it = obj2reduceop.find(obj);
122  if (it == obj2reduceop.end()) {
123  throw std::runtime_error("op should be a constant from "
124  "torch.distributed.deprecated.reduce_op");
125  }
126  return it->second;
127 }
128 
129 static THDGroup _getGroup(PyObject *obj)
130 {
131  auto it = obj2group.find(obj);
132  if (it == obj2group.end()) {
133  if (!THPUtils_checkLong(obj))
134  throw std::runtime_error("group should be an int or one of the values "
135  "from torch.distributed.deprecated.group");
136  return THPUtils_unpackLong(obj);
137  }
138  return it->second;
139 }
140 
141 PyObject* THDPModule_clearGroupCache(PyObject *_unused, PyObject *args) {
142  HANDLE_TH_ERRORS
143  if (PyTuple_GET_SIZE(args) != 1) {
144  THPUtils_invalidArguments(args, nullptr, "clear_group_cache", 1, "(group gr)");
145  return nullptr;
146  }
147 
148  THDGroup group = _getGroup(PyTuple_GET_ITEM(args, 0));
149 
150  {
151  AutoNoGIL nogil;
152  THDClearGroupCache(group);
153  }
154  Py_RETURN_NONE;
155  END_HANDLE_TH_ERRORS
156 }
157 
158 PyObject* THDPModule_isend(PyObject *_unused, PyObject *args)
159 {
160  HANDLE_TH_ERRORS
161  if (PyTuple_GET_SIZE(args) != 2 || !THPVariable_Check(PyTuple_GET_ITEM(args, 0)) ||
162  !THPUtils_checkLong(PyTuple_GET_ITEM(args, 1))) {
163  THPUtils_invalidArguments(args, nullptr, "isend", 1, "(tensor input, int dst_rank)");
164  return nullptr;
165  }
166 
167  auto desc = THDPModule_makeDescriptor(PyTuple_GET_ITEM(args, 0));
168  int dst_rank = THPUtils_unpackLong(PyTuple_GET_ITEM(args, 1));
169  THDRequest* req;
170  {
171  AutoNoGIL guard;
172  req = THDIsend(desc, dst_rank);
173  }
174  return THPWrapper_New(req, (void(*)(void*))THDRequest_free);
175  END_HANDLE_TH_ERRORS
176 }
177 
178 PyObject* THDPModule_irecv(PyObject *_unused, PyObject *args)
179 {
180  HANDLE_TH_ERRORS
181  if (PyTuple_GET_SIZE(args) != 2 || !THPVariable_Check(PyTuple_GET_ITEM(args, 0)) ||
182  !THPUtils_checkLong(PyTuple_GET_ITEM(args, 1))) {
183  THPUtils_invalidArguments(args, nullptr, "irecv", 1, "(tensor output, int src_rank)");
184  return nullptr;
185  }
186 
187  auto desc = THDPModule_makeDescriptor(PyTuple_GET_ITEM(args, 0));
188  int src_rank = THPUtils_unpackLong(PyTuple_GET_ITEM(args, 1));
189  THDRequest* req;
190  {
191  AutoNoGIL guard;
192  req = THDIrecv(desc, src_rank);
193  }
194  return THPWrapper_New(req, (void(*)(void*))THDRequest_free);
195  END_HANDLE_TH_ERRORS
196 }
197 
198 PyObject* THDPModule_send(PyObject *_unused, PyObject *args)
199 {
200  HANDLE_TH_ERRORS
201  if (PyTuple_GET_SIZE(args) != 2 || !THPVariable_Check(PyTuple_GET_ITEM(args, 0)) ||
202  !THPUtils_checkLong(PyTuple_GET_ITEM(args, 1))) {
203  THPUtils_invalidArguments(args, nullptr, "send", 1, "(tensor input, int dst_rank)");
204  return nullptr;
205  }
206 
207  auto desc = THDPModule_makeDescriptor(PyTuple_GET_ITEM(args, 0));
208  int dst_rank = THPUtils_unpackLong(PyTuple_GET_ITEM(args, 1));
209  {
210  AutoNoGIL guard;
211  THDSend(desc, dst_rank);
212  }
213  Py_RETURN_NONE;
214  END_HANDLE_TH_ERRORS
215 }
216 
217 PyObject* THDPModule_recvAnySource(PyObject *_unused, PyObject *_tensor)
218 {
219  HANDLE_TH_ERRORS
220  if (!THPVariable_Check(_tensor)) {
221  THPUtils_invalidArguments(_tensor, nullptr, "recv", 1, "(tensor output)");
222  return nullptr;
223  }
224 
225  auto desc = THDPModule_makeDescriptor(_tensor);
226  int sender;
227  {
228  AutoNoGIL guard;
229  sender = THDRecvAnySource(desc);
230  }
231  return PyInt_FromLong(sender);
232  END_HANDLE_TH_ERRORS
233 }
234 
235 PyObject* THDPModule_recv(PyObject *_unused, PyObject *args)
236 {
237  HANDLE_TH_ERRORS
238  if (PyTuple_GET_SIZE(args) != 2 || !THPVariable_Check(PyTuple_GET_ITEM(args, 0)) ||
239  !THPUtils_checkLong(PyTuple_GET_ITEM(args, 1))) {
240  THPUtils_invalidArguments(args, nullptr, "recv", 1, "(tensor output, int src_rank)");
241  return nullptr;
242  }
243 
244  auto desc = THDPModule_makeDescriptor(PyTuple_GET_ITEM(args, 0));
245  int src_rank = THPUtils_unpackLong(PyTuple_GET_ITEM(args, 1));
246  {
247  AutoNoGIL guard;
248  THDRecv(desc, src_rank);
249  }
250  // Return sender rank
251  Py_INCREF(PyTuple_GET_ITEM(args, 1));
252  return PyTuple_GET_ITEM(args, 1);
253  END_HANDLE_TH_ERRORS
254 }
255 
256 
257 PyObject* THDPModule_allReduceMultiGPU(PyObject *_unused, PyObject *args)
258 {
259  HANDLE_TH_ERRORS
260  std::vector<at::Tensor> descriptors;
261  size_t length;
262  THDGroup group;
263  THDReduceOp op;
264  THPObjectPtr sequence;
265 
266  if (PyTuple_GET_SIZE(args) != 3) {
267  goto invalid_arguments;
268  }
269 
270  if (!PySequence_Check(PyTuple_GET_ITEM(args, 0))) {
271  goto invalid_arguments;
272  }
273 
274  sequence = THPObjectPtr(PySequence_Fast(PyTuple_GET_ITEM(args, 0),
275  "expected a sequence"));
276  if (!sequence.get()) {
277  goto invalid_arguments;
278  }
279 
280  length = static_cast<size_t>(PySequence_Fast_GET_SIZE(sequence.get()));
281 
282  descriptors.reserve(length);
283 
284  for (size_t i = 0; i < length; ++i) {
285  if (!THPVariable_Check(PySequence_Fast_GET_ITEM(sequence.get(), i))) {
286  goto invalid_arguments;
287  }
288 
289  descriptors.push_back(
290  THDPModule_makeDescriptor(PySequence_Fast_GET_ITEM(sequence.get(), i))
291  );
292  }
293 
294  group = _getGroup(PyTuple_GET_ITEM(args, 2));
295  op = _getReduceOp(PyTuple_GET_ITEM(args, 1));
296 
297  {
298  AutoNoGIL guard;
299  THDAllReduceMultiGPU(descriptors.data(), length, op, group);
300  }
301  Py_RETURN_NONE;
302 
303 invalid_arguments:
304  THPUtils_invalidArguments(args, nullptr, "all_reduce_multigpu", 1,
305  "(list[tensor] in_out, reduce_op op, group gr)");
306  Py_RETURN_NONE;
307  END_HANDLE_TH_ERRORS
308 }
309 
310 
311 PyObject* THDPModule_reduceMultiGPU(PyObject *_unused, PyObject *args)
312 {
313  HANDLE_TH_ERRORS
314  THPObjectPtr sequence;
315  size_t length;
316  std::vector<at::Tensor> descriptors;
317  THDGroup group;
318  THDReduceOp op;
319  int dst_rank;
320 
321  if (PyTuple_GET_SIZE(args) != 4) {
322  goto invalid_arguments;
323  }
324 
325  if (!PySequence_Check(PyTuple_GET_ITEM(args, 0)) ||
326  !THPUtils_checkLong(PyTuple_GET_ITEM(args, 1))) {
327  goto invalid_arguments;
328  }
329 
330  sequence = THPObjectPtr(PySequence_Fast(PyTuple_GET_ITEM(args, 0),
331  "expected a sequence"));
332  if (!sequence.get()) {
333  goto invalid_arguments;
334  }
335 
336  length = static_cast<size_t>(PySequence_Fast_GET_SIZE(sequence.get()));
337 
338  descriptors.reserve(length);
339 
340  for (size_t i = 0; i < length; ++i) {
341  if (!THPVariable_Check(PySequence_Fast_GET_ITEM(sequence.get(), i))) {
342  goto invalid_arguments;
343  }
344 
345  descriptors.push_back(
346  THDPModule_makeDescriptor(PySequence_Fast_GET_ITEM(sequence.get(), i))
347  );
348  }
349 
350  group = _getGroup(PyTuple_GET_ITEM(args, 3));
351  op = _getReduceOp(PyTuple_GET_ITEM(args, 2));
352  dst_rank = THPUtils_unpackLong(PyTuple_GET_ITEM(args, 1));
353 
354  {
355  AutoNoGIL guard;
356  THDReduceMultiGPU(descriptors.data(), length, op, dst_rank, group);
357  }
358  Py_RETURN_NONE;
359 
360 invalid_arguments:
361  THPUtils_invalidArguments(args, nullptr, "reduce_multigpu", 1,
362  "(list[tensor] in_out, int dst_rank, "
363  "reduce_op op, group gr)");
364  Py_RETURN_NONE;
365  END_HANDLE_TH_ERRORS
366 }
367 
368 
369 PyObject* THDPModule_broadcastMultiGPU(PyObject *_unused, PyObject *args)
370 {
371  HANDLE_TH_ERRORS
372  THPObjectPtr sequence;
373  size_t length;
374  std::vector<at::Tensor> descriptors;
375  THDGroup group;
376  int src_rank;
377 
378  if (PyTuple_GET_SIZE(args) != 3) {
379  goto invalid_arguments;
380  }
381 
382  if (!PySequence_Check(PyTuple_GET_ITEM(args, 0)) ||
383  !THPUtils_checkLong(PyTuple_GET_ITEM(args, 1))) {
384  goto invalid_arguments;
385  }
386 
387  sequence = THPObjectPtr(PySequence_Fast(PyTuple_GET_ITEM(args, 0),
388  "expected a sequence"));
389  if (!sequence.get()) {
390  goto invalid_arguments;
391  }
392 
393  length = static_cast<size_t>(PySequence_Fast_GET_SIZE(sequence.get()));
394 
395  descriptors.reserve(length);
396 
397  for (size_t i = 0; i < length; ++i) {
398  if (!THPVariable_Check(PySequence_Fast_GET_ITEM(sequence.get(), i))) {
399  goto invalid_arguments;
400  }
401 
402  descriptors.push_back(
403  THDPModule_makeDescriptor(PySequence_Fast_GET_ITEM(sequence.get(), i))
404  );
405  }
406 
407  group = _getGroup(PyTuple_GET_ITEM(args, 2));
408  src_rank = THPUtils_unpackLong(PyTuple_GET_ITEM(args, 1));
409 
410  {
411  AutoNoGIL guard;
412  THDBroadcastMultiGPU(descriptors.data(), length, src_rank, group);
413  }
414  Py_RETURN_NONE;
415 
416 invalid_arguments:
417  THPUtils_invalidArguments(args, nullptr, "broadcast_multigpu", 1,
418  "(list[tensor] in_out, int src_rank, group gr)");
419  Py_RETURN_NONE;
420  END_HANDLE_TH_ERRORS
421 }
422 
423 
424 PyObject* THDPModule_allGatherMultiGPU(PyObject *_unused, PyObject *args)
425 {
426  HANDLE_TH_ERRORS
427  THPObjectPtr sequence_one;
428  THPObjectPtr sequence_two;
429 
430  size_t length_one;
431  size_t length_two;
432 
433  std::vector<at::Tensor> output_descriptors;
434  std::vector<at::Tensor> input_descriptors;
435 
436  THDGroup group;
437 
438  if (PyTuple_GET_SIZE(args) != 3) {
439  goto invalid_arguments;
440  }
441 
442  if (!PySequence_Check(PyTuple_GET_ITEM(args, 0)) ||
443  !PySequence_Check(PyTuple_GET_ITEM(args, 1))) {
444  goto invalid_arguments;
445  }
446 
447  sequence_one = THPObjectPtr(PySequence_Fast(PyTuple_GET_ITEM(args, 0),
448  "expected a sequence"));
449  sequence_two = THPObjectPtr(PySequence_Fast(PyTuple_GET_ITEM(args, 1),
450  "expected a sequence"));
451 
452  if (!sequence_one.get() || !sequence_two.get()) {
453  goto invalid_arguments;
454  }
455 
456  length_one = static_cast<size_t>(
457  PySequence_Fast_GET_SIZE(sequence_one.get()));
458 
459  length_two = static_cast<size_t>(
460  PySequence_Fast_GET_SIZE(sequence_two.get()));
461 
462  if (length_one != length_two) {
463  goto invalid_arguments;
464  }
465 
466  output_descriptors.reserve(length_one);
467  input_descriptors.reserve(length_two);
468 
469  // Get the input list
470  for (size_t i = 0; i < length_two; ++i) {
471  if (!THPVariable_Check(PySequence_Fast_GET_ITEM(sequence_two.get(), i)) ||
472  !THPVariable_Check(PySequence_Fast_GET_ITEM(sequence_one.get(), i))) {
473  goto invalid_arguments;
474  }
475 
476  input_descriptors.push_back(
477  THDPModule_makeDescriptor(PySequence_Fast_GET_ITEM(sequence_two.get(), i))
478  );
479 
480  output_descriptors.push_back(
481  THDPModule_makeDescriptor(PySequence_Fast_GET_ITEM(sequence_one.get(), i))
482  );
483  }
484 
485  group = _getGroup(PyTuple_GET_ITEM(args, 2));
486 
487  {
488  AutoNoGIL guard;
489  THDAllGatherMultiGPU(output_descriptors.data(),
490  length_one,
491  input_descriptors.data(),
492  length_two,
493  group);
494  }
495 
496  Py_RETURN_NONE;
497 
498 invalid_arguments:
499  THPUtils_invalidArguments(args, nullptr, "all_gather_multigpu", 1,
500  "(list[list[tensor]] output, list[tensor] input, group gr)");
501  Py_RETURN_NONE;
502  END_HANDLE_TH_ERRORS
503 }
504 
505 
506 PyObject* THDPModule_allReduce(PyObject *_unused, PyObject *args)
507 {
508  HANDLE_TH_ERRORS
509  if (PyTuple_GET_SIZE(args) != 3 || !THPVariable_Check(PyTuple_GET_ITEM(args, 0))) {
510  THPUtils_invalidArguments(args, nullptr, "all_reduce", 1, "(tensor in_out, reduce_op op, group gr)");
511  return nullptr;
512  }
513 
514  THDGroup group = _getGroup(PyTuple_GET_ITEM(args, 2));
515  THDReduceOp op = _getReduceOp(PyTuple_GET_ITEM(args, 1));
516  auto desc = THDPModule_makeDescriptor(PyTuple_GET_ITEM(args, 0));
517  {
518  AutoNoGIL guard;
519  THDAllReduce(desc, op, group);
520  }
521  Py_RETURN_NONE;
522  END_HANDLE_TH_ERRORS
523 }
524 
525 PyObject* THDPModule_reduce(PyObject *_unused, PyObject *args)
526 {
527  HANDLE_TH_ERRORS
528  if (PyTuple_GET_SIZE(args) != 4 || !THPVariable_Check(PyTuple_GET_ITEM(args, 0)) ||
529  !THPUtils_checkLong(PyTuple_GET_ITEM(args, 1))) {
530  THPUtils_invalidArguments(args, nullptr, "reduce", 1,
531  "(tensor reduced, int dst_rank, reduce_op op, group gr)");
532  return nullptr;
533  }
534 
535  THDGroup group = _getGroup(PyTuple_GET_ITEM(args, 3));
536  THDReduceOp op = _getReduceOp(PyTuple_GET_ITEM(args, 2));
537  auto desc = THDPModule_makeDescriptor(PyTuple_GET_ITEM(args, 0));
538  int dst_rank = THPUtils_unpackLong(PyTuple_GET_ITEM(args, 1));
539  {
540  AutoNoGIL guard;
541  THDReduce(desc, op, dst_rank, group);
542  }
543  Py_RETURN_NONE;
544  END_HANDLE_TH_ERRORS
545 }
546 
547 PyObject* THDPModule_broadcast(PyObject *_unused, PyObject *args)
548 {
549  HANDLE_TH_ERRORS
550  if (PyTuple_GET_SIZE(args) != 3 || !THPVariable_Check(PyTuple_GET_ITEM(args, 0)) ||
551  !THPUtils_checkLong(PyTuple_GET_ITEM(args, 1))) {
552  THPUtils_invalidArguments(args, nullptr, "broadcast", 1,
553  "(tensor src_dst, int src_rank, group gr)");
554  return nullptr;
555  }
556 
557  THDGroup group = _getGroup(PyTuple_GET_ITEM(args, 2));
558  auto desc = THDPModule_makeDescriptor(PyTuple_GET_ITEM(args, 0));
559  int src_rank = THPUtils_unpackLong(PyTuple_GET_ITEM(args, 1));
560  {
561  AutoNoGIL guard;
562  THDBroadcast(desc, src_rank, group);
563  }
564  Py_RETURN_NONE;
565  END_HANDLE_TH_ERRORS
566 }
567 
568 PyObject* THDPModule_allGather(PyObject *_unused, PyObject *args)
569 {
570  HANDLE_TH_ERRORS
571  THPObjectPtr sequence;
572  size_t length;
573  std::vector<at::Tensor> descriptors;
574  THDGroup group;
575  at::Tensor desc;
576 
577  if (PyTuple_GET_SIZE(args) != 3 ||
578  !PySequence_Check(PyTuple_GET_ITEM(args, 0)) ||
579  !THPVariable_Check(PyTuple_GET_ITEM(args, 1))) {
580 
581  goto invalid_arguments;
582  }
583 
584  sequence = THPObjectPtr(PySequence_Fast(PyTuple_GET_ITEM(args, 0),
585  "expected a sequence"));
586  if (!sequence.get()) {
587  goto invalid_arguments;
588  }
589 
590  length = static_cast<size_t>(PySequence_Fast_GET_SIZE(sequence.get()));
591 
592  descriptors.reserve(length);
593 
594  for (size_t i = 0; i < length; ++i) {
595  if (!THPVariable_Check(PySequence_Fast_GET_ITEM(sequence.get(), i)))
596  goto invalid_arguments;
597 
598  descriptors.push_back(
599  THDPModule_makeDescriptor(PySequence_Fast_GET_ITEM(sequence.get(), i))
600  );
601  }
602 
603  group = _getGroup(PyTuple_GET_ITEM(args, 2));
604  desc = THDPModule_makeDescriptor(PyTuple_GET_ITEM(args, 1));
605  {
606  AutoNoGIL guard;
607  THDAllGather(descriptors.data(), length, desc, group);
608  }
609  Py_RETURN_NONE;
610 
611 invalid_arguments:
612  THPUtils_invalidArguments(args, nullptr, "allGather", 1,
613  "(list[tensor] output, tensor input, group gr)");
614  Py_RETURN_NONE;
615  END_HANDLE_TH_ERRORS
616 }
617 
618 PyObject* THDPModule_gatherSend(PyObject *_unused, PyObject *args)
619 {
620  HANDLE_TH_ERRORS
621  if (PyTuple_GET_SIZE(args) != 3 || !THPVariable_Check(PyTuple_GET_ITEM(args, 0))) {
622  THPUtils_invalidArguments(args, nullptr, "gatherSend", 1,
623  "(tensor input, int dst_rank, group gr)");
624  return nullptr;
625  }
626 
627  THDGroup group = _getGroup(PyTuple_GET_ITEM(args, 2));
628  auto desc = THDPModule_makeDescriptor(PyTuple_GET_ITEM(args, 0));
629  int dst_rank = THPUtils_unpackLong(PyTuple_GET_ITEM(args, 1));
630  {
631  AutoNoGIL guard;
632  THDGatherSend(desc, dst_rank, group);
633  }
634  Py_RETURN_NONE;
635  END_HANDLE_TH_ERRORS
636 }
637 
638 PyObject* THDPModule_gatherRecv(PyObject *_unused, PyObject *args)
639 {
640  HANDLE_TH_ERRORS
641  THPObjectPtr sequence;
642  size_t length;
643  std::vector<at::Tensor> descriptors;
644  THDGroup group;
645  at::Tensor desc;
646 
647  if (PyTuple_GET_SIZE(args) != 3 ||
648  !PySequence_Check(PyTuple_GET_ITEM(args, 0)) ||
649  !THPVariable_Check(PyTuple_GET_ITEM(args, 1))) {
650  goto invalid_arguments;
651  }
652 
653  sequence = THPObjectPtr(PySequence_Fast(PyTuple_GET_ITEM(args, 0),
654  "expected a sequence"));
655  if (!sequence.get()) {
656  goto invalid_arguments;
657  }
658 
659  length = static_cast<size_t>(PySequence_Fast_GET_SIZE(sequence.get()));
660 
661  descriptors.reserve(length);
662 
663  for (size_t i = 0; i < length; ++i) {
664  if (!THPVariable_Check(PySequence_Fast_GET_ITEM(sequence.get(), i)))
665  goto invalid_arguments;
666 
667  descriptors.push_back(
668  THDPModule_makeDescriptor(PySequence_Fast_GET_ITEM(sequence.get(), i))
669  );
670  }
671 
672  desc = THDPModule_makeDescriptor(PyTuple_GET_ITEM(args, 1));
673  group = _getGroup(PyTuple_GET_ITEM(args, 2));
674  {
675  AutoNoGIL guard;
676  THDGatherRecv(descriptors.data(), length, desc, group);
677  }
678  Py_RETURN_NONE;
679 
680 invalid_arguments:
681  THPUtils_invalidArguments(args, nullptr, "gatherRecv", 1,
682  "(list[tensor] output, tensor input, group gr)");
683  return nullptr;
684  END_HANDLE_TH_ERRORS
685 }
686 
687 PyObject* THDPModule_scatterSend(PyObject *_unused, PyObject *args)
688 {
689  HANDLE_TH_ERRORS
690  THPObjectPtr sequence;
691  size_t length;
692  std::vector<at::Tensor> descriptors;
693  THDGroup group;
694  at::Tensor desc;
695 
696  if (PyTuple_GET_SIZE(args) != 3 ||
697  !PySequence_Check(PyTuple_GET_ITEM(args, 0)) ||
698  !THPVariable_Check(PyTuple_GET_ITEM(args, 1))) {
699  goto invalid_arguments;
700  }
701 
702  sequence = THPObjectPtr(PySequence_Fast(PyTuple_GET_ITEM(args, 0),
703  "expected a sequence"));
704  if (!sequence.get()) {
705  goto invalid_arguments;
706  }
707 
708  length = static_cast<size_t>(PySequence_Fast_GET_SIZE(sequence.get()));
709 
710  descriptors.reserve(length);
711 
712  for (size_t i = 0; i < length; ++i) {
713  if (!THPVariable_Check(PySequence_Fast_GET_ITEM(sequence.get(), i)))
714  goto invalid_arguments;
715 
716  descriptors.push_back(
717  THDPModule_makeDescriptor(PySequence_Fast_GET_ITEM(sequence.get(), i))
718  );
719  }
720 
721  desc = THDPModule_makeDescriptor(PyTuple_GET_ITEM(args, 1));
722  group = _getGroup(PyTuple_GET_ITEM(args, 2));
723  {
724  AutoNoGIL guard;
725  THDScatterSend(descriptors.data(), length, desc, group);
726  }
727  Py_RETURN_NONE;
728 
729 invalid_arguments:
730  THPUtils_invalidArguments(args, nullptr, "scatterSend", 1,
731  "(list[tensor] input, tensor output, group gr)");
732  return nullptr;
733  END_HANDLE_TH_ERRORS
734 }
735 
736 PyObject* THDPModule_scatterRecv(PyObject *_unused, PyObject *args)
737 {
738  HANDLE_TH_ERRORS
739  if (PyTuple_GET_SIZE(args) != 3 || !THPVariable_Check(PyTuple_GET_ITEM(args, 0)) ||
740  !THPUtils_checkLong(PyTuple_GET_ITEM(args, 1))) {
741  THPUtils_invalidArguments(args, nullptr, "scatterRecv", 1,
742  "(tensor output, int src_rank, group gr)");
743  return nullptr;
744  }
745 
746  THDGroup group = _getGroup(PyTuple_GET_ITEM(args, 2));
747  auto desc = THDPModule_makeDescriptor(PyTuple_GET_ITEM(args, 0));
748  int src_rank = THPUtils_unpackLong(PyTuple_GET_ITEM(args, 1));
749  {
750  AutoNoGIL guard;
751  THDScatterRecv(desc, src_rank, group);
752  }
753  Py_RETURN_NONE;
754  END_HANDLE_TH_ERRORS
755 }
756 
757 PyObject* THDPModule_barrier(PyObject *_unused, PyObject *_group)
758 {
759  HANDLE_TH_ERRORS
760  {
761  AutoNoGIL guard;
762  THDBarrier(_getGroup(_group));
763  }
764  Py_RETURN_NONE;
765  END_HANDLE_TH_ERRORS
766 }
767 
768 PyObject* THDPModule_newGroup(PyObject *_unused, PyObject *args)
769 {
770  HANDLE_TH_ERRORS
771  THPObjectPtr sequence;
772  size_t length;
773  std::vector<int> ranks;
774 
775  if (PyTuple_GET_SIZE(args) != 1 ||
776  !PySequence_Check(PyTuple_GET_ITEM(args, 0))) {
777  goto invalid_arguments;
778  }
779 
780  sequence = THPObjectPtr(PySequence_Fast(PyTuple_GET_ITEM(args, 0),
781  "expected a sequence"));
782  if (!sequence.get()) {
783  goto invalid_arguments;
784  }
785 
786  length = static_cast<size_t>(PySequence_Fast_GET_SIZE(sequence.get()));
787 
788  ranks.reserve(length);
789 
790  for (size_t i = 0; i < length; ++i) {
791  if (!THPUtils_checkLong(PySequence_Fast_GET_ITEM(sequence.get(), i)))
792  goto invalid_arguments;
793 
794  ranks.push_back(THPUtils_unpackLong(
795  PySequence_Fast_GET_ITEM(sequence.get(), i)));
796 
797  for (size_t j = 0; j < i; ++j)
798  THPUtils_assert(ranks[i] != ranks[j], "ranks should be unique");
799  }
800 
801  THDGroup group;
802  {
803  AutoNoGIL guard;
804  group = THDNewGroup(ranks.data(), length);
805  }
806  return PyInt_FromLong(group);
807 
808 invalid_arguments:
809  THPUtils_invalidArguments(args, nullptr, "newGroup", 1, "(list[int] ranks)");
810  return nullptr;
811  END_HANDLE_TH_ERRORS
812 }
813 
814 PyObject* THDPModule_requestIsCompleted(PyObject *_unused, PyObject *_req)
815 {
816  HANDLE_TH_ERRORS
817  if (!THPWrapper_check(_req)) {
818  THPUtils_invalidArguments(_req, nullptr, "requestIsCompleted", 1, "(request req)");
819  return nullptr;
820  }
821 
822  return PyBool_FromLong(THDRequest_isCompleted(_unpackRequest(_req)));
823  END_HANDLE_TH_ERRORS
824 }
825 
826 PyObject* THDPModule_requestWait(PyObject *_unused, PyObject *_req)
827 {
828  HANDLE_TH_ERRORS
829  if (!THPWrapper_check(_req)) {
830  THPUtils_invalidArguments(_req, nullptr, "requestWait", 1, "(request req)");
831  return nullptr;
832  }
833 
834  {
835  AutoNoGIL guard;
836  THDRequest_wait(_unpackRequest(_req));
837  }
838  Py_RETURN_NONE;
839  END_HANDLE_TH_ERRORS
840 }
841 
842 PyObject* THDPModule_initExtension(PyObject *_unused, PyObject *args) {
843  if (PyTuple_GET_SIZE(args) != 3) {
844  THPUtils_invalidArguments(args, nullptr, "initExtension", 1, "(bool is_master_worker, reduce_op obj, group obj)");
845  return nullptr;
846  }
847 
848  PyObject* is_master_worker_obj = PyTuple_GET_ITEM(args, 0);
849  PyObject* reduce_op_obj = PyTuple_GET_ITEM(args, 1);
850  PyObject* group_obj = PyTuple_GET_ITEM(args, 2);
851 
852  THPUtils_assert(PyBool_Check(is_master_worker_obj), "first argument should be a bool");
853  bool is_master_worker = is_master_worker_obj == Py_True;
854 
855  THPObjectPtr reduce_op;
856 #define REGISTER_REDUCE_OP(NAME) \
857  reduce_op = PyObject_GetAttrString(reduce_op_obj, #NAME); \
858  THPUtils_assert(reduce_op, "Missing object for reduce op " #NAME); \
859  obj2reduceop.emplace(reduce_op.get(), THDReduce##NAME);
860  REGISTER_REDUCE_OP(SUM);
861  REGISTER_REDUCE_OP(PRODUCT);
862  REGISTER_REDUCE_OP(MIN);
863  REGISTER_REDUCE_OP(MAX);
864 #undef REGISTER_REDUCE_OP
865 
866  THPObjectPtr group;
867 #define REGISTER_GROUP(NAME) \
868  group = PyObject_GetAttrString(group_obj, #NAME); \
869  THPUtils_assert(group, "Missing object for group " #NAME); \
870  obj2group.emplace(group.get(), THDGroup##NAME);
871  REGISTER_GROUP(WORLD);
872 #undef REGISTER_GROUP
873 
874  if (is_master_worker) {
875  throw std::runtime_error("THD master_worker no longer supported");
876  }
877  Py_RETURN_TRUE;
878 }
879 
880 static struct PyMethodDef _THDPModule_methods[] = {
881  {"_dist_init_extension", (PyCFunction)THDPModule_initExtension, METH_VARARGS, nullptr},
882  {"_dist_init_process_group", (PyCFunction)THDPModule_initProcessGroup, METH_VARARGS, nullptr},
883  {"_dist_destroy_process_group", (PyCFunction)THDPModule_destroyProcessGroup, METH_NOARGS, nullptr},
884  {"_dist_clear_group_cache", (PyCFunction)THDPModule_clearGroupCache, METH_VARARGS, nullptr},
885 #ifdef USE_CUDA
886  {"_dist_register_stream", (PyCFunction)THDPModule_registerStream, METH_O, nullptr},
887 #endif
888  {"_dist_get_rank", (PyCFunction)THDPModule_getRank, METH_NOARGS, nullptr},
889  {"_dist_get_num_processes", (PyCFunction)THDPModule_getNumProcesses, METH_NOARGS, nullptr},
890  {"_dist_isend", (PyCFunction)THDPModule_isend, METH_VARARGS, nullptr},
891  {"_dist_irecv", (PyCFunction)THDPModule_irecv, METH_VARARGS, nullptr},
892  {"_dist_send", (PyCFunction)THDPModule_send, METH_VARARGS, nullptr},
893  {"_dist_recv_any_source", (PyCFunction)THDPModule_recvAnySource, METH_O, nullptr},
894  {"_dist_recv", (PyCFunction)THDPModule_recv, METH_VARARGS, nullptr},
895  {"_dist_all_reduce", (PyCFunction)THDPModule_allReduce, METH_VARARGS, nullptr},
896  {"_dist_all_reduce_multigpu", (PyCFunction)THDPModule_allReduceMultiGPU, METH_VARARGS, nullptr},
897  {"_dist_reduce", (PyCFunction)THDPModule_reduce, METH_VARARGS, nullptr},
898  {"_dist_reduce_multigpu", (PyCFunction)THDPModule_reduceMultiGPU, METH_VARARGS, nullptr},
899  {"_dist_broadcast", (PyCFunction)THDPModule_broadcast, METH_VARARGS, nullptr},
900  {"_dist_broadcast_multigpu", (PyCFunction)THDPModule_broadcastMultiGPU, METH_VARARGS, nullptr},
901  {"_dist_all_gather", (PyCFunction)THDPModule_allGather, METH_VARARGS, nullptr},
902  {"_dist_all_gather_multigpu", (PyCFunction)THDPModule_allGatherMultiGPU, METH_VARARGS, nullptr},
903  {"_dist_gather_send", (PyCFunction)THDPModule_gatherSend, METH_VARARGS, nullptr},
904  {"_dist_gather_recv", (PyCFunction)THDPModule_gatherRecv, METH_VARARGS, nullptr},
905  {"_dist_scatter_send", (PyCFunction)THDPModule_scatterSend, METH_VARARGS, nullptr},
906  {"_dist_scatter_recv", (PyCFunction)THDPModule_scatterRecv, METH_VARARGS, nullptr},
907  {"_dist_barrier", (PyCFunction)THDPModule_barrier, METH_O, nullptr},
908  {"_dist_new_group", (PyCFunction)THDPModule_newGroup, METH_VARARGS, nullptr},
909  {"_dist_request_is_completed", (PyCFunction)THDPModule_requestIsCompleted, METH_O, nullptr},
910  {"_dist_request_wait", (PyCFunction)THDPModule_requestWait, METH_O, nullptr},
911  {nullptr}
912 };
913 
914 PyMethodDef* THDPModule_methods() {
915  return _THDPModule_methods;
916 }