Caffe2 - C++ API
A deep learning, cross platform ML framework
data_channel_collectives.cpp
1 #ifdef WITH_GLOO
2 #include <THD/base/data_channels/DataChannelGloo.hpp>
3 #endif // WITH_GLOO
4 #ifdef WITH_MPI
5 #include <THD/base/data_channels/DataChannelMPI.hpp>
6 #endif // WITH_MPI
7 #include <THD/base/data_channels/DataChannelTCP.hpp>
8 #include <THD/test/TestUtils.hpp>
9 
10 #include <THPP/tensors/THTensor.hpp>
11 
12 #include <unistd.h>
13 #include <array>
14 #include <cassert>
15 #include <chrono>
16 #include <cmath>
17 #include <iostream>
18 #include <memory>
19 #include <mutex>
20 #include <set>
21 #include <thread>
22 
23 constexpr std::array<int, 4> WORKERS_NUM = {2, 4, 7, 13};
24 constexpr int MASTER_PORT = 45678;
25 constexpr int BARRIER_WAIT_TIME = 200; // milliseconds
26 
27 std::vector<std::thread> g_all_workers;
28 std::mutex g_mutex;
29 std::string g_data_channel_type;
30 std::unique_ptr<Barrier> g_barrier;
31 
32 void test_send_recv_tensor(std::shared_ptr<thd::DataChannel> data_channel) {
33  if (g_data_channel_type == "gloo") {
34  return; // XXX: Gloo does not support send/recv
35  }
36 
37  if (data_channel->getRank() == 0) {
38  auto float_tensor = buildTensor<float>({1, 2, 3}, 4.2);
39  data_channel->send(*float_tensor, 1);
40  } else if (data_channel->getRank() == 1) {
41  auto float_tensor = buildTensor<float>({1, 2, 3}, -1.0);
42  data_channel->receive(*float_tensor, 0);
43  ASSERT_TENSOR_VALUE(float, *float_tensor, 4.2);
44  }
45 }
46 
47 void test_send_recv_tensor_any_source(
48  std::shared_ptr<thd::DataChannel> data_channel,
49  int workers) {
50  if (g_data_channel_type == "gloo") {
51  return; // XXX: Gloo does not support send/recv from any source
52  }
53 
54  if (data_channel->getRank() == 0) {
55  std::set<int> ranks;
56  for (int i = 0; i < workers; i++) {
57  auto int_tensor = buildTensor<int>({1, 2, 3}, -1);
58  data_channel->receive(*int_tensor);
59  ranks.insert(static_cast<int*>(int_tensor->data())[0]);
60  }
61 
62  assert(ranks.size() == workers);
63  } else {
64  auto int_tensor = buildTensor<int>({1, 2, 3}, data_channel->getRank());
65  data_channel->send(*int_tensor, 0);
66  }
67 }
68 
69 void test_send_recv_scalar(std::shared_ptr<thd::DataChannel> data_channel) {
70  if (g_data_channel_type == "gloo") {
71  return; // XXX: Gloo does not support send/recv
72  }
73 
74  if (data_channel->getRank() == 0) {
75  thd::ScalarWrapper<int> scalar((int)1232);
76  data_channel->send(scalar, 1);
77  } else if (data_channel->getRank() == 1) {
78  thd::ScalarWrapper<int> scalar((int)-1);
79  data_channel->receive(scalar, 0);
80  assert(scalar.value() == 1232);
81  }
82 }
83 
84 void test_broadcast(std::shared_ptr<thd::DataChannel> data_channel) {
85  for (size_t dest = 0; dest < data_channel->getNumProcesses(); ++dest) {
86  if (data_channel->getRank() == dest) {
87  auto float_tensor = buildTensor<float>({1, 2, 3, 4, 5}, 10.123);
88  data_channel->broadcast(*float_tensor, dest);
89  } else {
90  auto float_tensor = buildTensor<float>({1, 2, 3, 4, 5}, -1.0);
91  data_channel->broadcast(*float_tensor, dest);
92  ASSERT_TENSOR_VALUE(float, *float_tensor, 10.123)
93  }
94  }
95 }
96 
97 void _test_reduce_helper(
98  std::shared_ptr<thd::DataChannel> data_channel,
99  THDReduceOp op_type,
100  int64_t init_value,
101  int64_t expected_value) {
102  if (data_channel->getRank() == 0) {
103  auto int_tensor = buildTensor<int>({1, 2, 3, 4, 5}, init_value);
104  data_channel->reduce(*int_tensor, op_type, 0);
105  ASSERT_TENSOR_VALUE(int, *int_tensor, expected_value)
106  } else {
107  auto int_tensor =
108  buildTensor<int>({1, 2, 3, 4, 5}, data_channel->getRank());
109  data_channel->reduce(*int_tensor, op_type, 0);
110  }
111 }
112 
113 void test_reduce(std::shared_ptr<thd::DataChannel> data_channel, int workers) {
114  if (g_data_channel_type == "gloo") {
115  return; // XXX: Gloo does not support reduce
116  }
117 
118  _test_reduce_helper(
119  data_channel,
120  THDReduceOp::THDReduceSUM,
121  2,
122  2 + (workers * (workers + 1) / 2));
123  _test_reduce_helper(
124  data_channel, THDReduceOp::THDReducePRODUCT, 2, 2 * factorial(workers));
125  _test_reduce_helper(data_channel, THDReduceOp::THDReduceMIN, 10010, 1);
126  _test_reduce_helper(
127  data_channel,
128  THDReduceOp::THDReduceMAX,
129  -1,
130  data_channel->getNumProcesses() - 1);
131 }
132 
133 void _test_allReduce_helper(
134  std::shared_ptr<thd::DataChannel> data_channel,
135  THDReduceOp op_type,
136  int64_t init_value,
137  int64_t expected_value) {
138  if (data_channel->getRank() == 0) {
139  auto int_tensor = buildTensor<int>({1, 2, 3, 4, 5, 6, 7, 100}, init_value);
140  data_channel->allReduce(*int_tensor, op_type, 0);
141  ASSERT_TENSOR_VALUE(int, *int_tensor, expected_value)
142  } else {
143  auto int_tensor =
144  buildTensor<int>({1, 2, 3, 4, 5, 6, 7, 100}, data_channel->getRank());
145  data_channel->allReduce(*int_tensor, op_type, 0);
146  ASSERT_TENSOR_VALUE(int, *int_tensor, expected_value)
147  }
148 }
149 
150 void test_allReduce(
151  std::shared_ptr<thd::DataChannel> data_channel,
152  int workers) {
153  _test_allReduce_helper(
154  data_channel,
155  THDReduceOp::THDReduceSUM,
156  2,
157  2 + (workers * (workers + 1) / 2));
158  _test_allReduce_helper(
159  data_channel, THDReduceOp::THDReducePRODUCT, 2, 2 * factorial(workers));
160  _test_allReduce_helper(data_channel, THDReduceOp::THDReduceMIN, 10010, 1);
161  _test_allReduce_helper(
162  data_channel,
163  THDReduceOp::THDReduceMAX,
164  -1,
165  data_channel->getNumProcesses() - 1);
166 }
167 
168 void test_scatter(std::shared_ptr<thd::DataChannel> data_channel) {
169  if (g_data_channel_type == "gloo") {
170  return; // XXX: Gloo does not support scatter
171  }
172 
173  std::vector<std::shared_ptr<thpp::IntTensor>> tensors;
174  std::vector<thpp::Tensor*> raw_tensors;
175  if (data_channel->getRank() == 0) {
176  for (size_t i = 0; i < data_channel->getNumProcesses(); ++i) {
177  tensors.push_back(buildTensor<int>({1, 2, 3, 4, 5}, i));
178  raw_tensors.push_back(tensors.back().get());
179  }
180  }
181 
182  auto int_tensor = buildTensor<int>({1, 2, 3, 4, 5}, -1);
183  data_channel->scatter(raw_tensors, *int_tensor, 0);
184  ASSERT_TENSOR_VALUE(int, *int_tensor, data_channel->getRank())
185 }
186 
187 void test_gather(std::shared_ptr<thd::DataChannel> data_channel) {
188  if (g_data_channel_type == "gloo") {
189  return; // XXX: Gloo does not support gather
190  }
191 
192  std::vector<std::shared_ptr<thpp::IntTensor>> tensors;
193  std::vector<thpp::Tensor*> raw_tensors;
194  auto int_tensor = buildTensor<int>({1, 2, 3, 4, 5}, data_channel->getRank());
195  if (data_channel->getRank() == 0) {
196  for (size_t i = 0; i < data_channel->getNumProcesses(); ++i) {
197  tensors.push_back(buildTensor<int>({1, 2, 3, 4, 5}, -1));
198  raw_tensors.push_back(tensors.back().get());
199  }
200 
201  data_channel->gather(raw_tensors, *int_tensor, 0);
202  for (size_t i = 0; i < tensors.size(); ++i)
203  ASSERT_TENSOR_VALUE(int, *(tensors[i]), i)
204  } else {
205  data_channel->gather(raw_tensors, *int_tensor, 0);
206  }
207 }
208 
209 void test_allGather(std::shared_ptr<thd::DataChannel> data_channel) {
210  std::vector<std::shared_ptr<thpp::IntTensor>> tensors;
211  std::vector<thpp::Tensor*> raw_tensors;
212  auto int_tensor = buildTensor<int>({1, 2, 3, 4, 5}, data_channel->getRank());
213  for (size_t i = 0; i < data_channel->getNumProcesses(); ++i) {
214  tensors.push_back(buildTensor<int>({1, 2, 3, 4, 5}, -1));
215  raw_tensors.push_back(tensors.back().get());
216  }
217 
218  data_channel->allGather(raw_tensors, *int_tensor, 0);
219  for (size_t i = 0; i < tensors.size(); ++i)
220  ASSERT_TENSOR_VALUE(int, *(tensors[i]), i)
221 }
222 
223 void test_barrier(std::shared_ptr<thd::DataChannel> data_channel) {
224  for (int i = 0; i < data_channel->getNumProcesses(); ++i) {
225  if (data_channel->getRank() == i) {
226  int64_t time_after_barrier = nowInMilliseconds() + BARRIER_WAIT_TIME;
227  auto time_tensor = buildTensor<int64_t>({1}, time_after_barrier);
228  data_channel->broadcast(*time_tensor, i);
229  std::this_thread::sleep_for(
230  std::chrono::milliseconds(BARRIER_WAIT_TIME + 10));
231  data_channel->barrier();
232  } else {
233  auto time_tensor = buildTensor<int64_t>({1}, -1);
234  data_channel->broadcast(
235  *time_tensor, i); // get expected time after barrier
236  data_channel->barrier();
237  assert(
238  nowInMilliseconds() >=
239  reinterpret_cast<int64_t*>(time_tensor->data())[0]);
240  }
241  }
242 }
243 
244 void test_isend(std::shared_ptr<thd::DataChannel> data_channel) {
245  if (g_data_channel_type == "gloo") {
246  return; // XXX: Gloo does not support isend
247  }
248 
249  if (data_channel->getRank() == 0) {
250  std::vector<std::shared_ptr<thd::DataChannel::Request>> requests;
251  for (size_t i = 1; i < data_channel->getNumProcesses(); ++i) {
252  auto tensor = buildTensor<int>({1, 2, 3, 4, 5}, i);
253  requests.push_back(std::shared_ptr<thd::DataChannel::Request>(
254  data_channel->isend(*tensor, i)));
255  }
256 
257  for (auto request : requests) {
258  request->wait();
259  assert(request->isCompleted());
260  }
261  } else {
262  auto int_tensor = buildTensor<int>({1, 2, 3, 4, 5}, -1);
263  data_channel->receive(*int_tensor, 0);
264  ASSERT_TENSOR_VALUE(int, *int_tensor, data_channel->getRank())
265  }
266 }
267 
268 void test_irecv(std::shared_ptr<thd::DataChannel> data_channel) {
269  if (g_data_channel_type == "gloo") {
270  return; // XXX: Gloo does not support irecv
271  }
272 
273  if (data_channel->getRank() == 0) {
274  std::vector<std::shared_ptr<thd::DataChannel::Request>> requests;
275  std::vector<std::shared_ptr<thpp::IntTensor>> tensors;
276  for (size_t i = 1; i < data_channel->getNumProcesses(); ++i) {
277  tensors.push_back(buildTensor<int>({1, 2, 3, 4, 5}, -1));
278  requests.push_back(std::shared_ptr<thd::DataChannel::Request>(
279  data_channel->ireceive(*tensors.back(), i)));
280  }
281 
282  for (size_t i = 0; i < requests.size(); ++i) {
283  requests.at(i)->wait();
284  assert(requests.at(i)->isCompleted());
285  ASSERT_TENSOR_VALUE(int, *tensors.at(i), i + 1)
286  }
287  } else {
288  auto int_tensor =
289  buildTensor<int>({1, 2, 3, 4, 5}, data_channel->getRank());
290  data_channel->send(*int_tensor, 0);
291  }
292 }
293 
294 void test_interlaces(std::shared_ptr<thd::DataChannel> data_channel) {
295  if (g_data_channel_type == "gloo") {
296  return; // XXX: Gloo does not support isend, irecv, send, recv
297  }
298 
299  if (data_channel->getRank() == 0) {
300  std::vector<std::shared_ptr<thd::DataChannel::Request>> requests;
301  for (size_t i = 1; i < data_channel->getNumProcesses(); ++i) {
302  auto tensor = buildTensor<int>({1, 2, 3, 4, 5}, 10);
303  requests.push_back(std::shared_ptr<thd::DataChannel::Request>(
304  data_channel->isend(*tensor, i)));
305  }
306 
307  data_channel->barrier();
308 
309  for (size_t i = 1; i < data_channel->getNumProcesses(); ++i) {
310  auto int_tensor = buildTensor<int>({1, 2, 3, 4, 5}, 20);
311  data_channel->send(*int_tensor, i);
312  }
313  } else {
314  auto int_tensor1 = buildTensor<int>({1, 2, 3, 4, 5}, -1);
315  auto request = std::shared_ptr<thd::DataChannel::Request>(
316  data_channel->ireceive(*int_tensor1, 0));
317 
318  data_channel->barrier();
319 
320  auto int_tensor2 = buildTensor<int>({1, 2, 3, 4, 5}, -1);
321  data_channel->receive(*int_tensor2, 0);
322  request->wait();
323 
324  ASSERT_TENSOR_VALUE(int, *int_tensor1, 10)
325  ASSERT_TENSOR_VALUE(int, *int_tensor2, 20)
326  }
327 }
328 
329 /*
330  * In group tests we call same functions in processes which do not belong to
331  * those groups to check if it will not affect any computations.
332  *
333  * Processes which do not belong to group do not have to call those methods!
334  */
335 
337 // GROUPS //
339 
340 void test_broadcast_group(
341  std::shared_ptr<thd::DataChannel> data_channel,
342  THDGroup group,
343  std::vector<thd::rank_type> group_ranks) {
344  if (contains(group_ranks, data_channel->getRank())) {
345  auto int_tensor = buildTensor({1, 2, 3, 4, 5}, -1);
346  if (data_channel->getRank() == group_ranks[0])
347  int_tensor->fill(2000);
348 
349  data_channel->broadcast(*int_tensor, group_ranks[0], group);
350  ASSERT_TENSOR_VALUE(int, *int_tensor, 2000)
351  } else {
352  auto int_tensor = buildTensor({1, 2, 3, 4, 5}, 1000);
353  data_channel->broadcast(*int_tensor, group_ranks[0], group);
354  ASSERT_TENSOR_VALUE(int, *int_tensor, 1000)
355  }
356 }
357 
358 void test_reduce_group(
359  std::shared_ptr<thd::DataChannel> data_channel,
360  THDGroup group,
361  std::vector<thd::rank_type> group_ranks) {
362  if (g_data_channel_type == "gloo") {
363  return; // XXX: Gloo does not support reduce
364  }
365 
366  if (contains(group_ranks, data_channel->getRank())) {
367  auto int_tensor = buildTensor({1, 2, 3, 4, 5}, 10);
368  data_channel->reduce(
369  *int_tensor, THDReduceOp::THDReduceSUM, group_ranks[0], group);
370  if (data_channel->getRank() == group_ranks[0]) {
371  ASSERT_TENSOR_VALUE(int, *int_tensor, 10 * group_ranks.size())
372  } else {
373  ASSERT_TENSOR_VALUE(int, *int_tensor, 10)
374  }
375  } else {
376  auto int_tensor = buildTensor({1, 2, 3, 4, 5}, 1000);
377  data_channel->reduce(
378  *int_tensor, THDReduceOp::THDReduceSUM, group_ranks[0], group);
379  ASSERT_TENSOR_VALUE(int, *int_tensor, 1000)
380  }
381 }
382 
383 void test_allReduce_group(
384  std::shared_ptr<thd::DataChannel> data_channel,
385  THDGroup group,
386  std::vector<thd::rank_type> group_ranks) {
387  if (contains(group_ranks, data_channel->getRank())) {
388  auto int_tensor = buildTensor({1, 2, 3, 4, 5, 6, 7, 100}, 10);
389  data_channel->allReduce(*int_tensor, THDReduceOp::THDReduceSUM, group);
390  ASSERT_TENSOR_VALUE(int, *int_tensor, 10 * group_ranks.size())
391  } else {
392  auto int_tensor = buildTensor({1, 2, 3, 4, 5, 6, 7, 100}, 1000);
393  data_channel->allReduce(*int_tensor, THDReduceOp::THDReduceSUM, group);
394  ASSERT_TENSOR_VALUE(int, *int_tensor, 1000)
395  }
396 }
397 
398 void test_scatter_group(
399  std::shared_ptr<thd::DataChannel> data_channel,
400  THDGroup group,
401  std::vector<thd::rank_type> group_ranks) {
402  if (g_data_channel_type == "gloo") {
403  return; // XXX: Gloo does not support scatter
404  }
405 
406  std::vector<std::shared_ptr<thpp::IntTensor>> tensors;
407  std::vector<thpp::Tensor*> raw_tensors;
408  if (contains(group_ranks, data_channel->getRank())) {
409  if (data_channel->getRank() == group_ranks[0]) {
410  for (size_t i = 0; i < group_ranks.size(); ++i) {
411  tensors.push_back(buildTensor<int>({1, 2, 3, 4, 5}, group_ranks[i]));
412  raw_tensors.push_back(tensors.back().get());
413  }
414  }
415 
416  auto int_tensor = buildTensor<int>({1, 2, 3, 4, 5}, -1);
417  data_channel->scatter(raw_tensors, *int_tensor, group_ranks[0], group);
418  ASSERT_TENSOR_VALUE(int, *int_tensor, data_channel->getRank())
419  } else {
420  auto int_tensor = buildTensor({1, 2, 3, 4, 5}, 1000);
421  data_channel->scatter(raw_tensors, *int_tensor, group_ranks[0], group);
422  ASSERT_TENSOR_VALUE(int, *int_tensor, 1000)
423  }
424 }
425 
426 void test_gather_group(
427  std::shared_ptr<thd::DataChannel> data_channel,
428  THDGroup group,
429  std::vector<thd::rank_type> group_ranks) {
430  if (g_data_channel_type == "gloo") {
431  return; // XXX: Gloo does not support gather
432  }
433 
434  std::vector<std::shared_ptr<thpp::IntTensor>> tensors;
435  std::vector<thpp::Tensor*> raw_tensors;
436  if (contains(group_ranks, data_channel->getRank())) {
437  auto int_tensor =
438  buildTensor<int>({1, 2, 3, 4, 5}, data_channel->getRank());
439  if (data_channel->getRank() == group_ranks[0]) {
440  for (size_t i = 0; i < group_ranks.size(); ++i) {
441  tensors.push_back(buildTensor<int>({1, 2, 3, 4, 5}, -1));
442  raw_tensors.push_back(tensors.back().get());
443  }
444 
445  data_channel->gather(raw_tensors, *int_tensor, group_ranks[0], group);
446  for (size_t i = 0; i < tensors.size(); ++i)
447  ASSERT_TENSOR_VALUE(int, *(tensors[i]), group_ranks[i])
448  } else {
449  data_channel->gather(raw_tensors, *int_tensor, group_ranks[0], group);
450  }
451  } else {
452  auto int_tensor = buildTensor({1, 2, 3, 4, 5}, 1000);
453  data_channel->gather(raw_tensors, *int_tensor, group_ranks[0], group);
454  ASSERT_TENSOR_VALUE(int, *int_tensor, 1000)
455  }
456 }
457 
458 void test_allGather_group(
459  std::shared_ptr<thd::DataChannel> data_channel,
460  THDGroup group,
461  std::vector<thd::rank_type> group_ranks) {
462  std::vector<std::shared_ptr<thpp::IntTensor>> tensors;
463  std::vector<thpp::Tensor*> raw_tensors;
464  if (contains(group_ranks, data_channel->getRank())) {
465  auto int_tensor =
466  buildTensor<int>({1, 2, 3, 4, 5}, data_channel->getRank());
467  for (size_t i = 0; i < group_ranks.size(); ++i) {
468  tensors.push_back(buildTensor<int>({1, 2, 3, 4, 5}, -1));
469  raw_tensors.push_back(tensors.back().get());
470  }
471 
472  data_channel->allGather(raw_tensors, *int_tensor, group);
473  for (size_t i = 0; i < tensors.size(); ++i)
474  ASSERT_TENSOR_VALUE(int, *(tensors[i]), group_ranks[i])
475  } else {
476  auto int_tensor = buildTensor({1, 2, 3, 4, 5}, 1000);
477  data_channel->allGather(raw_tensors, *int_tensor, group);
478  ASSERT_TENSOR_VALUE(int, *int_tensor, 1000)
479  }
480 }
481 
482 void test_barrier_group(
483  std::shared_ptr<thd::DataChannel> data_channel,
484  THDGroup group,
485  std::vector<thd::rank_type> group_ranks) {
486  if (contains(group_ranks, data_channel->getRank())) {
487  for (int i = 0; i < group_ranks.size(); ++i) {
488  if (data_channel->getRank() == group_ranks[i]) {
489  int64_t time_after_barrier = nowInMilliseconds() + BARRIER_WAIT_TIME;
490  auto time_tensor = buildTensor<int64_t>({1}, time_after_barrier);
491  data_channel->broadcast(*time_tensor, group_ranks[i], group);
492  std::this_thread::sleep_for(
493  std::chrono::milliseconds(BARRIER_WAIT_TIME + 10));
494  data_channel->barrier(group);
495  } else {
496  auto time_tensor = buildTensor<int64_t>({1}, -1);
497  data_channel->broadcast(
498  *time_tensor,
499  group_ranks[i],
500  group); // get expected time after barrier
501  data_channel->barrier(group);
502  assert(
503  nowInMilliseconds() >=
504  reinterpret_cast<int64_t*>(time_tensor->data())[0]);
505  }
506  }
507  } else {
508  std::this_thread::sleep_for(
509  std::chrono::milliseconds(BARRIER_WAIT_TIME + 100));
510  data_channel->barrier(group);
511  }
512 }
513 
515 // EXCEPTIONS //
517 
518 void test_send_recv_invalid_rank(
519  std::shared_ptr<thd::DataChannel> data_channel) {
520  if (g_data_channel_type == "gloo") {
521  return; // XXX: Gloo does not support send/recv
522  }
523 
524  if (g_data_channel_type == "mpi") {
525  return; // XXX: MPI does not throw exceptions
526  }
527 
528  auto rank = data_channel->getRank();
529  auto int_tensor = buildTensor({1, 2, 3, 4, 5}, -1);
530 
531  {// cannot send or receive to self
532  ASSERT_THROWS(std::logic_error, data_channel->send(*int_tensor, rank))
533  ASSERT_THROWS(
534  std::logic_error, data_channel->receive(*int_tensor, rank))}
535 
536  { // cannot send or receive to/from process with rank -1
537  ASSERT_THROWS(std::out_of_range, data_channel->send(*int_tensor, -1))
538  ASSERT_THROWS(std::out_of_range, data_channel->receive(*int_tensor, -1))
539  }
540 }
541 
542 // Cannot create empty group or group will be null
543 void test_empty_group(std::shared_ptr<thd::DataChannel> data_channel) {
544  // in MPI there will be created NULL_COMM
545  if (g_data_channel_type == "tcp" || g_data_channel_type == "gloo") {
546  ASSERT_THROWS(std::logic_error, data_channel->newGroup({}))
547  }
548 }
549 
550 // Process with rank 0 is not part of group, we cannot perform operation to it
551 void test_process_not_in_group(std::shared_ptr<thd::DataChannel> data_channel) {
552  auto int_tensor = buildTensor({1, 2, 3, 4, 5}, -1);
553 
554  THDGroup group = data_channel->newGroup({1});
555  std::vector<std::shared_ptr<thpp::IntTensor>> tensors = {
556  buildTensor<int>({1, 2, 3, 4, 5}, -1)};
557  std::vector<thpp::Tensor*> raw_tensors = {tensors.back().get()};
558 
559  if (data_channel->getRank() == 1) {
560  ASSERT_THROWS(
561  std::logic_error, data_channel->broadcast(*int_tensor, 0, group))
562 
563  if (g_data_channel_type == "gloo") {
564  return; // XXX: Gloo does not support scatter/gather/reduce
565  }
566 
567  ASSERT_THROWS(
568  std::logic_error,
569  data_channel->reduce(*int_tensor, THDReduceOp::THDReduceSUM, 0, group))
570 
571  ASSERT_THROWS(
572  std::logic_error,
573  data_channel->scatter(raw_tensors, *int_tensor, 0, group))
574 
575  ASSERT_THROWS(
576  std::logic_error,
577  data_channel->gather(raw_tensors, *int_tensor, 0, group))
578  }
579 }
580 
581 // input_tensors does not match size of group
582 void test_tensors_do_not_match_group_size(
583  std::shared_ptr<thd::DataChannel> data_channel) {
584  auto int_tensor = buildTensor({1, 2, 3, 4, 5}, -1);
585  THDGroup group = data_channel->newGroup({1, 2});
586  std::vector<std::shared_ptr<thpp::IntTensor>> tensors = {
587  buildTensor<int>({1, 2, 3, 4, 5}, -1)};
588  std::vector<thpp::Tensor*> raw_tensors = {tensors.back().get()};
589 
590  if (data_channel->getRank() == 1 || data_channel->getRank() == 2) {
591  ASSERT_THROWS(
592  std::logic_error,
593  data_channel->allGather(raw_tensors, *int_tensor, group))
594 
595  if (g_data_channel_type == "gloo") {
596  return; // XXX: Gloo does not support scatter/gather
597  }
598 
599  if (data_channel->getRank() == 1) {
600  ASSERT_THROWS(
601  std::logic_error,
602  data_channel->scatter(raw_tensors, *int_tensor, 1, group))
603 
604  ASSERT_THROWS(
605  std::logic_error,
606  data_channel->gather(raw_tensors, *int_tensor, 1, group))
607  }
608  }
609 }
610 
611 // input_tensors are not the same
612 void test_tensors_are_not_the_same(
613  std::shared_ptr<thd::DataChannel> data_channel) {
614  auto int_tensor = buildTensor({1, 2, 3, 4, 5}, -1);
615  THDGroup group = data_channel->newGroup({1, 2});
616  std::vector<std::shared_ptr<thpp::IntTensor>> tensors = {
617  buildTensor<int>({1, 2, 3, 4, 5}, -1),
618  buildTensor<int>({1, 2, 3, 4}, -1)};
619  std::vector<thpp::Tensor*> raw_tensors = {tensors[0].get(), tensors[1].get()};
620 
621  if (data_channel->getRank() == 1 || data_channel->getRank() == 2) {
622  ASSERT_THROWS(
623  std::logic_error,
624  data_channel->allGather(raw_tensors, *int_tensor, group))
625 
626  if (g_data_channel_type == "gloo") {
627  return; // XXX: Gloo does not support scatter/gather
628  }
629 
630  if (data_channel->getRank() == 1) {
631  ASSERT_THROWS(
632  std::logic_error,
633  data_channel->scatter(raw_tensors, *int_tensor, 1, group))
634 
635  ASSERT_THROWS(
636  std::logic_error,
637  data_channel->gather(raw_tensors, *int_tensor, 1, group))
638  }
639  }
640 }
641 
642 void run_all_tests(
643  std::shared_ptr<thd::DataChannel> data_channel,
644  int workers) {
645  test_send_recv_tensor(data_channel);
646  test_send_recv_tensor_any_source(data_channel, workers);
647  test_send_recv_scalar(data_channel);
648  test_broadcast(data_channel);
649  test_reduce(data_channel, workers);
650  test_allReduce(data_channel, workers);
651  test_scatter(data_channel);
652  test_gather(data_channel);
653  test_allGather(data_channel);
654  test_barrier(data_channel);
655  test_isend(data_channel);
656  test_irecv(data_channel);
657  test_interlaces(data_channel);
658 
659  std::vector<thd::rank_type> group_ranks = {1, 2};
660  THDGroup group = data_channel->newGroup(group_ranks);
661  test_broadcast_group(data_channel, group, group_ranks);
662  test_reduce_group(data_channel, group, group_ranks);
663  test_allReduce_group(data_channel, group, group_ranks);
664  test_scatter_group(data_channel, group, group_ranks);
665  test_gather_group(data_channel, group, group_ranks);
666  test_allGather_group(data_channel, group, group_ranks);
667  test_barrier_group(data_channel, group, group_ranks);
668 
669  test_send_recv_invalid_rank(data_channel);
670  test_empty_group(data_channel);
671  test_process_not_in_group(data_channel);
672  test_tensors_do_not_match_group_size(data_channel);
673  test_tensors_are_not_the_same(data_channel);
674 }
675 
676 void init_tcp_master(int workers) {
677  g_mutex.lock();
678  setenv(WORLD_SIZE_ENV, std::to_string((workers + 1)).data(), 1);
679  setenv(RANK_ENV, "0", 1);
680  setenv(MASTER_PORT_ENV, std::to_string(MASTER_PORT).data(), 1);
681  auto masterChannel = std::make_shared<thd::DataChannelTCP>(
682  thd::getInitConfig("env://")); // reads all env variable
683  g_mutex.unlock();
684 
685  assert(masterChannel->init());
686  run_all_tests(masterChannel, workers);
687 
688  // wait for all workers to finish
689  for (auto& worker : g_all_workers) {
690  worker.join();
691  }
692 }
693 
694 void init_tcp_worker(unsigned int id, int workers) {
695  g_mutex.lock();
696  setenv(RANK_ENV, std::to_string(id).data(), 1);
697  setenv(
698  MASTER_ADDR_ENV,
699  std::string("127.0.0.1:" + std::to_string(MASTER_PORT)).data(),
700  1);
701  auto worker_channel = std::make_shared<thd::DataChannelTCP>(
702  thd::getInitConfig("env://")); // reads all env variable
703  g_mutex.unlock();
704 
705  assert(worker_channel->init());
706  run_all_tests(worker_channel, workers);
707 }
708 
709 #ifdef WITH_GLOO
710 void init_gloo_master(int workers) {
711  g_mutex.lock();
712  setenv(WORLD_SIZE_ENV, std::to_string((workers + 1)).data(), 1);
713  setenv(RANK_ENV, "0", 1);
714  setenv(MASTER_PORT_ENV, std::to_string(MASTER_PORT).data(), 1);
715  auto masterChannel = std::make_shared<thd::DataChannelGloo>(
716  thd::getInitConfig("env://")); // reads all env variable
717  g_mutex.unlock();
718 
719  assert(masterChannel->init());
720  run_all_tests(masterChannel, workers);
721 
722  g_barrier->wait();
723 }
724 
725 void init_gloo_worker(unsigned int id, int workers) {
726  g_mutex.lock();
727  setenv(RANK_ENV, std::to_string(id).data(), 1);
728  setenv(
729  MASTER_ADDR_ENV,
730  std::string("127.0.0.1:" + std::to_string(MASTER_PORT)).data(),
731  1);
732  auto worker_channel = std::make_shared<thd::DataChannelGloo>(
733  thd::getInitConfig("env://")); // reads all env variable
734  g_mutex.unlock();
735 
736  assert(worker_channel->init());
737  run_all_tests(worker_channel, workers);
738 
739  g_barrier->wait();
740 }
741 #endif // WITH_GLOO
742 
743 #ifdef WITH_MPI
744 void init_mpi_process() {
745  auto data_channel = std::make_shared<thd::DataChannelMPI>();
746  assert(data_channel->init());
747  run_all_tests(data_channel, WORKERS_NUM[0]);
748 
749  std::cout << "MPI OK (id: " << data_channel->getRank() << ")" << std::endl;
750 }
751 #endif // WITH_MPI
752 
753 int main(int argc, char const* argv[]) {
754 #ifdef WITH_MPI
755  if (argc == 1) {
756 #endif // WITH_MPI
757  g_data_channel_type = "tcp";
758  for (auto workers : WORKERS_NUM) {
759  std::cout << "TCP (workers: " << workers << "):" << std::endl;
760  // start tcp master
761  std::thread tcp_master_thread(init_tcp_master, workers);
762 
763  // start tcp worker
764  for (int id = 1; id <= workers; ++id) {
765  g_all_workers.push_back(std::thread(init_tcp_worker, id, workers));
766  }
767 
768  tcp_master_thread.join();
769  g_all_workers.clear();
770 
771  std::cout << "TCP - OK" << std::endl;
772  }
773 
774 #ifdef WITH_GLOO
775  g_data_channel_type = "gloo";
776  for (auto workers : WORKERS_NUM) {
777  g_barrier.reset(new Barrier(workers + 1));
778  std::cout << "Gloo (workers: " << workers << "):" << std::endl;
779  // start gloo master
780  std::thread gloo_master_thread(init_gloo_master, workers);
781 
782  // start gloo worker
783  for (int id = 1; id <= workers; ++id) {
784  g_all_workers.push_back(std::thread(init_gloo_worker, id, workers));
785  }
786 
787  // wait for all workers to finish
788  for (auto& worker : g_all_workers) {
789  worker.join();
790  }
791 
792  gloo_master_thread.join();
793  g_all_workers.clear();
794 
795  std::cout << "Gloo - OK" << std::endl;
796  }
797 #endif // WITH_GLOO
798 
799 #ifdef WITH_MPI
800  std::cout << "--------------------------" << std::endl;
801 
802  // start MPI processes
803  std::cout << "MPI:" << std::endl;
804  execlp(
805  "mpirun",
806  "mpirun",
807  "-n",
808  std::to_string(WORKERS_NUM[0] + 1).data(),
809  argv[0],
810  "1",
811  NULL);
812  } else {
813  g_data_channel_type = "mpi";
814  init_mpi_process();
815  }
816 #endif // WITH_MPI
817  return 0;
818 }