2 #include <THD/base/data_channels/DataChannelGloo.hpp> 5 #include <THD/base/data_channels/DataChannelMPI.hpp> 7 #include <THD/base/data_channels/DataChannelTCP.hpp> 8 #include <THD/test/TestUtils.hpp> 10 #include <THPP/tensors/THTensor.hpp> 23 constexpr std::array<int, 4> WORKERS_NUM = {2, 4, 7, 13};
24 constexpr
int MASTER_PORT = 45678;
25 constexpr
int BARRIER_WAIT_TIME = 200;
27 std::vector<std::thread> g_all_workers;
29 std::string g_data_channel_type;
30 std::unique_ptr<Barrier> g_barrier;
32 void test_send_recv_tensor(std::shared_ptr<thd::DataChannel> data_channel) {
33 if (g_data_channel_type ==
"gloo") {
37 if (data_channel->getRank() == 0) {
38 auto float_tensor = buildTensor<float>({1, 2, 3}, 4.2);
39 data_channel->send(*float_tensor, 1);
40 }
else if (data_channel->getRank() == 1) {
41 auto float_tensor = buildTensor<float>({1, 2, 3}, -1.0);
42 data_channel->receive(*float_tensor, 0);
43 ASSERT_TENSOR_VALUE(
float, *float_tensor, 4.2);
47 void test_send_recv_tensor_any_source(
48 std::shared_ptr<thd::DataChannel> data_channel,
50 if (g_data_channel_type ==
"gloo") {
54 if (data_channel->getRank() == 0) {
56 for (
int i = 0; i < workers; i++) {
57 auto int_tensor = buildTensor<int>({1, 2, 3}, -1);
58 data_channel->receive(*int_tensor);
59 ranks.insert(static_cast<int*>(int_tensor->data())[0]);
62 assert(ranks.size() == workers);
64 auto int_tensor = buildTensor<int>({1, 2, 3}, data_channel->getRank());
65 data_channel->send(*int_tensor, 0);
69 void test_send_recv_scalar(std::shared_ptr<thd::DataChannel> data_channel) {
70 if (g_data_channel_type ==
"gloo") {
74 if (data_channel->getRank() == 0) {
75 thd::ScalarWrapper<int> scalar((
int)1232);
76 data_channel->send(scalar, 1);
77 }
else if (data_channel->getRank() == 1) {
78 thd::ScalarWrapper<int> scalar((
int)-1);
79 data_channel->receive(scalar, 0);
80 assert(scalar.value() == 1232);
84 void test_broadcast(std::shared_ptr<thd::DataChannel> data_channel) {
85 for (
size_t dest = 0; dest < data_channel->getNumProcesses(); ++dest) {
86 if (data_channel->getRank() == dest) {
87 auto float_tensor = buildTensor<float>({1, 2, 3, 4, 5}, 10.123);
88 data_channel->broadcast(*float_tensor, dest);
90 auto float_tensor = buildTensor<float>({1, 2, 3, 4, 5}, -1.0);
91 data_channel->broadcast(*float_tensor, dest);
92 ASSERT_TENSOR_VALUE(
float, *float_tensor, 10.123)
97 void _test_reduce_helper(
98 std::shared_ptr<thd::DataChannel> data_channel,
101 int64_t expected_value) {
102 if (data_channel->getRank() == 0) {
103 auto int_tensor = buildTensor<int>({1, 2, 3, 4, 5}, init_value);
104 data_channel->reduce(*int_tensor, op_type, 0);
105 ASSERT_TENSOR_VALUE(
int, *int_tensor, expected_value)
108 buildTensor<int>({1, 2, 3, 4, 5}, data_channel->getRank());
109 data_channel->reduce(*int_tensor, op_type, 0);
113 void test_reduce(std::shared_ptr<thd::DataChannel> data_channel,
int workers) {
114 if (g_data_channel_type ==
"gloo") {
120 THDReduceOp::THDReduceSUM,
122 2 + (workers * (workers + 1) / 2));
124 data_channel, THDReduceOp::THDReducePRODUCT, 2, 2 * factorial(workers));
125 _test_reduce_helper(data_channel, THDReduceOp::THDReduceMIN, 10010, 1);
128 THDReduceOp::THDReduceMAX,
130 data_channel->getNumProcesses() - 1);
133 void _test_allReduce_helper(
134 std::shared_ptr<thd::DataChannel> data_channel,
137 int64_t expected_value) {
138 if (data_channel->getRank() == 0) {
139 auto int_tensor = buildTensor<int>({1, 2, 3, 4, 5, 6, 7, 100}, init_value);
140 data_channel->allReduce(*int_tensor, op_type, 0);
141 ASSERT_TENSOR_VALUE(
int, *int_tensor, expected_value)
144 buildTensor<int>({1, 2, 3, 4, 5, 6, 7, 100}, data_channel->getRank());
145 data_channel->allReduce(*int_tensor, op_type, 0);
146 ASSERT_TENSOR_VALUE(
int, *int_tensor, expected_value)
151 std::shared_ptr<thd::DataChannel> data_channel,
153 _test_allReduce_helper(
155 THDReduceOp::THDReduceSUM,
157 2 + (workers * (workers + 1) / 2));
158 _test_allReduce_helper(
159 data_channel, THDReduceOp::THDReducePRODUCT, 2, 2 * factorial(workers));
160 _test_allReduce_helper(data_channel, THDReduceOp::THDReduceMIN, 10010, 1);
161 _test_allReduce_helper(
163 THDReduceOp::THDReduceMAX,
165 data_channel->getNumProcesses() - 1);
168 void test_scatter(std::shared_ptr<thd::DataChannel> data_channel) {
169 if (g_data_channel_type ==
"gloo") {
173 std::vector<std::shared_ptr<thpp::IntTensor>> tensors;
174 std::vector<thpp::Tensor*> raw_tensors;
175 if (data_channel->getRank() == 0) {
176 for (
size_t i = 0; i < data_channel->getNumProcesses(); ++i) {
177 tensors.push_back(buildTensor<int>({1, 2, 3, 4, 5}, i));
178 raw_tensors.push_back(tensors.back().get());
182 auto int_tensor = buildTensor<int>({1, 2, 3, 4, 5}, -1);
183 data_channel->scatter(raw_tensors, *int_tensor, 0);
184 ASSERT_TENSOR_VALUE(
int, *int_tensor, data_channel->getRank())
187 void test_gather(std::shared_ptr<thd::DataChannel> data_channel) {
188 if (g_data_channel_type ==
"gloo") {
192 std::vector<std::shared_ptr<thpp::IntTensor>> tensors;
193 std::vector<thpp::Tensor*> raw_tensors;
194 auto int_tensor = buildTensor<int>({1, 2, 3, 4, 5}, data_channel->getRank());
195 if (data_channel->getRank() == 0) {
196 for (
size_t i = 0; i < data_channel->getNumProcesses(); ++i) {
197 tensors.push_back(buildTensor<int>({1, 2, 3, 4, 5}, -1));
198 raw_tensors.push_back(tensors.back().get());
201 data_channel->gather(raw_tensors, *int_tensor, 0);
202 for (
size_t i = 0; i < tensors.size(); ++i)
203 ASSERT_TENSOR_VALUE(
int, *(tensors[i]), i)
205 data_channel->gather(raw_tensors, *int_tensor, 0);
209 void test_allGather(std::shared_ptr<thd::DataChannel> data_channel) {
210 std::vector<std::shared_ptr<thpp::IntTensor>> tensors;
211 std::vector<thpp::Tensor*> raw_tensors;
212 auto int_tensor = buildTensor<int>({1, 2, 3, 4, 5}, data_channel->getRank());
213 for (
size_t i = 0; i < data_channel->getNumProcesses(); ++i) {
214 tensors.push_back(buildTensor<int>({1, 2, 3, 4, 5}, -1));
215 raw_tensors.push_back(tensors.back().get());
218 data_channel->allGather(raw_tensors, *int_tensor, 0);
219 for (
size_t i = 0; i < tensors.size(); ++i)
220 ASSERT_TENSOR_VALUE(
int, *(tensors[i]), i)
223 void test_barrier(std::shared_ptr<thd::DataChannel> data_channel) {
224 for (
int i = 0; i < data_channel->getNumProcesses(); ++i) {
225 if (data_channel->getRank() == i) {
226 int64_t time_after_barrier = nowInMilliseconds() + BARRIER_WAIT_TIME;
227 auto time_tensor = buildTensor<int64_t>({1}, time_after_barrier);
228 data_channel->broadcast(*time_tensor, i);
229 std::this_thread::sleep_for(
230 std::chrono::milliseconds(BARRIER_WAIT_TIME + 10));
231 data_channel->barrier();
233 auto time_tensor = buildTensor<int64_t>({1}, -1);
234 data_channel->broadcast(
236 data_channel->barrier();
238 nowInMilliseconds() >=
239 reinterpret_cast<int64_t*>(time_tensor->data())[0]);
244 void test_isend(std::shared_ptr<thd::DataChannel> data_channel) {
245 if (g_data_channel_type ==
"gloo") {
249 if (data_channel->getRank() == 0) {
250 std::vector<std::shared_ptr<thd::DataChannel::Request>> requests;
251 for (
size_t i = 1; i < data_channel->getNumProcesses(); ++i) {
252 auto tensor = buildTensor<int>({1, 2, 3, 4, 5}, i);
253 requests.push_back(std::shared_ptr<thd::DataChannel::Request>(
254 data_channel->isend(*tensor, i)));
257 for (
auto request : requests) {
259 assert(request->isCompleted());
262 auto int_tensor = buildTensor<int>({1, 2, 3, 4, 5}, -1);
263 data_channel->receive(*int_tensor, 0);
264 ASSERT_TENSOR_VALUE(
int, *int_tensor, data_channel->getRank())
268 void test_irecv(std::shared_ptr<thd::DataChannel> data_channel) {
269 if (g_data_channel_type ==
"gloo") {
273 if (data_channel->getRank() == 0) {
274 std::vector<std::shared_ptr<thd::DataChannel::Request>> requests;
275 std::vector<std::shared_ptr<thpp::IntTensor>> tensors;
276 for (
size_t i = 1; i < data_channel->getNumProcesses(); ++i) {
277 tensors.push_back(buildTensor<int>({1, 2, 3, 4, 5}, -1));
278 requests.push_back(std::shared_ptr<thd::DataChannel::Request>(
279 data_channel->ireceive(*tensors.back(), i)));
282 for (
size_t i = 0; i < requests.size(); ++i) {
283 requests.at(i)->wait();
284 assert(requests.at(i)->isCompleted());
285 ASSERT_TENSOR_VALUE(
int, *tensors.at(i), i + 1)
289 buildTensor<int>({1, 2, 3, 4, 5}, data_channel->getRank());
290 data_channel->send(*int_tensor, 0);
294 void test_interlaces(std::shared_ptr<thd::DataChannel> data_channel) {
295 if (g_data_channel_type ==
"gloo") {
299 if (data_channel->getRank() == 0) {
300 std::vector<std::shared_ptr<thd::DataChannel::Request>> requests;
301 for (
size_t i = 1; i < data_channel->getNumProcesses(); ++i) {
302 auto tensor = buildTensor<int>({1, 2, 3, 4, 5}, 10);
303 requests.push_back(std::shared_ptr<thd::DataChannel::Request>(
304 data_channel->isend(*tensor, i)));
307 data_channel->barrier();
309 for (
size_t i = 1; i < data_channel->getNumProcesses(); ++i) {
310 auto int_tensor = buildTensor<int>({1, 2, 3, 4, 5}, 20);
311 data_channel->send(*int_tensor, i);
314 auto int_tensor1 = buildTensor<int>({1, 2, 3, 4, 5}, -1);
315 auto request = std::shared_ptr<thd::DataChannel::Request>(
316 data_channel->ireceive(*int_tensor1, 0));
318 data_channel->barrier();
320 auto int_tensor2 = buildTensor<int>({1, 2, 3, 4, 5}, -1);
321 data_channel->receive(*int_tensor2, 0);
324 ASSERT_TENSOR_VALUE(
int, *int_tensor1, 10)
325 ASSERT_TENSOR_VALUE(
int, *int_tensor2, 20)
340 void test_broadcast_group(
341 std::shared_ptr<
thd::DataChannel> data_channel,
343 std::vector<
thd::rank_type> group_ranks) {
344 if (contains(group_ranks, data_channel->getRank())) {
345 auto int_tensor = buildTensor({1, 2, 3, 4, 5}, -1);
346 if (data_channel->getRank() == group_ranks[0])
347 int_tensor->fill(2000);
349 data_channel->broadcast(*int_tensor, group_ranks[0], group);
350 ASSERT_TENSOR_VALUE(
int, *int_tensor, 2000)
352 auto int_tensor = buildTensor({1, 2, 3, 4, 5}, 1000);
353 data_channel->broadcast(*int_tensor, group_ranks[0], group);
354 ASSERT_TENSOR_VALUE(
int, *int_tensor, 1000)
358 void test_reduce_group(
359 std::shared_ptr<thd::DataChannel> data_channel,
361 std::vector<thd::rank_type> group_ranks) {
362 if (g_data_channel_type ==
"gloo") {
366 if (contains(group_ranks, data_channel->getRank())) {
367 auto int_tensor = buildTensor({1, 2, 3, 4, 5}, 10);
368 data_channel->reduce(
369 *int_tensor, THDReduceOp::THDReduceSUM, group_ranks[0], group);
370 if (data_channel->getRank() == group_ranks[0]) {
371 ASSERT_TENSOR_VALUE(
int, *int_tensor, 10 * group_ranks.size())
373 ASSERT_TENSOR_VALUE(
int, *int_tensor, 10)
376 auto int_tensor = buildTensor({1, 2, 3, 4, 5}, 1000);
377 data_channel->reduce(
378 *int_tensor, THDReduceOp::THDReduceSUM, group_ranks[0], group);
379 ASSERT_TENSOR_VALUE(
int, *int_tensor, 1000)
383 void test_allReduce_group(
384 std::shared_ptr<thd::DataChannel> data_channel,
386 std::vector<thd::rank_type> group_ranks) {
387 if (contains(group_ranks, data_channel->getRank())) {
388 auto int_tensor = buildTensor({1, 2, 3, 4, 5, 6, 7, 100}, 10);
389 data_channel->allReduce(*int_tensor, THDReduceOp::THDReduceSUM, group);
390 ASSERT_TENSOR_VALUE(
int, *int_tensor, 10 * group_ranks.size())
392 auto int_tensor = buildTensor({1, 2, 3, 4, 5, 6, 7, 100}, 1000);
393 data_channel->allReduce(*int_tensor, THDReduceOp::THDReduceSUM, group);
394 ASSERT_TENSOR_VALUE(
int, *int_tensor, 1000)
398 void test_scatter_group(
399 std::shared_ptr<thd::DataChannel> data_channel,
401 std::vector<thd::rank_type> group_ranks) {
402 if (g_data_channel_type ==
"gloo") {
406 std::vector<std::shared_ptr<thpp::IntTensor>> tensors;
407 std::vector<thpp::Tensor*> raw_tensors;
408 if (contains(group_ranks, data_channel->getRank())) {
409 if (data_channel->getRank() == group_ranks[0]) {
410 for (
size_t i = 0; i < group_ranks.size(); ++i) {
411 tensors.push_back(buildTensor<int>({1, 2, 3, 4, 5}, group_ranks[i]));
412 raw_tensors.push_back(tensors.back().get());
416 auto int_tensor = buildTensor<int>({1, 2, 3, 4, 5}, -1);
417 data_channel->scatter(raw_tensors, *int_tensor, group_ranks[0], group);
418 ASSERT_TENSOR_VALUE(
int, *int_tensor, data_channel->getRank())
420 auto int_tensor = buildTensor({1, 2, 3, 4, 5}, 1000);
421 data_channel->scatter(raw_tensors, *int_tensor, group_ranks[0], group);
422 ASSERT_TENSOR_VALUE(
int, *int_tensor, 1000)
426 void test_gather_group(
427 std::shared_ptr<thd::DataChannel> data_channel,
429 std::vector<thd::rank_type> group_ranks) {
430 if (g_data_channel_type ==
"gloo") {
434 std::vector<std::shared_ptr<thpp::IntTensor>> tensors;
435 std::vector<thpp::Tensor*> raw_tensors;
436 if (contains(group_ranks, data_channel->getRank())) {
438 buildTensor<int>({1, 2, 3, 4, 5}, data_channel->getRank());
439 if (data_channel->getRank() == group_ranks[0]) {
440 for (
size_t i = 0; i < group_ranks.size(); ++i) {
441 tensors.push_back(buildTensor<int>({1, 2, 3, 4, 5}, -1));
442 raw_tensors.push_back(tensors.back().get());
445 data_channel->gather(raw_tensors, *int_tensor, group_ranks[0], group);
446 for (
size_t i = 0; i < tensors.size(); ++i)
447 ASSERT_TENSOR_VALUE(
int, *(tensors[i]), group_ranks[i])
449 data_channel->gather(raw_tensors, *int_tensor, group_ranks[0], group);
452 auto int_tensor = buildTensor({1, 2, 3, 4, 5}, 1000);
453 data_channel->gather(raw_tensors, *int_tensor, group_ranks[0], group);
454 ASSERT_TENSOR_VALUE(
int, *int_tensor, 1000)
458 void test_allGather_group(
459 std::shared_ptr<thd::DataChannel> data_channel,
461 std::vector<thd::rank_type> group_ranks) {
462 std::vector<std::shared_ptr<thpp::IntTensor>> tensors;
463 std::vector<thpp::Tensor*> raw_tensors;
464 if (contains(group_ranks, data_channel->getRank())) {
466 buildTensor<int>({1, 2, 3, 4, 5}, data_channel->getRank());
467 for (
size_t i = 0; i < group_ranks.size(); ++i) {
468 tensors.push_back(buildTensor<int>({1, 2, 3, 4, 5}, -1));
469 raw_tensors.push_back(tensors.back().get());
472 data_channel->allGather(raw_tensors, *int_tensor, group);
473 for (
size_t i = 0; i < tensors.size(); ++i)
474 ASSERT_TENSOR_VALUE(
int, *(tensors[i]), group_ranks[i])
476 auto int_tensor = buildTensor({1, 2, 3, 4, 5}, 1000);
477 data_channel->allGather(raw_tensors, *int_tensor, group);
478 ASSERT_TENSOR_VALUE(
int, *int_tensor, 1000)
482 void test_barrier_group(
483 std::shared_ptr<thd::DataChannel> data_channel,
485 std::vector<thd::rank_type> group_ranks) {
486 if (contains(group_ranks, data_channel->getRank())) {
487 for (
int i = 0; i < group_ranks.size(); ++i) {
488 if (data_channel->getRank() == group_ranks[i]) {
489 int64_t time_after_barrier = nowInMilliseconds() + BARRIER_WAIT_TIME;
490 auto time_tensor = buildTensor<int64_t>({1}, time_after_barrier);
491 data_channel->broadcast(*time_tensor, group_ranks[i], group);
492 std::this_thread::sleep_for(
493 std::chrono::milliseconds(BARRIER_WAIT_TIME + 10));
494 data_channel->barrier(group);
496 auto time_tensor = buildTensor<int64_t>({1}, -1);
497 data_channel->broadcast(
501 data_channel->barrier(group);
503 nowInMilliseconds() >=
504 reinterpret_cast<int64_t*>(time_tensor->data())[0]);
508 std::this_thread::sleep_for(
509 std::chrono::milliseconds(BARRIER_WAIT_TIME + 100));
510 data_channel->barrier(group);
518 void test_send_recv_invalid_rank(
519 std::shared_ptr<thd::DataChannel> data_channel) {
520 if (g_data_channel_type ==
"gloo") {
524 if (g_data_channel_type ==
"mpi") {
528 auto rank = data_channel->getRank();
529 auto int_tensor = buildTensor({1, 2, 3, 4, 5}, -1);
532 ASSERT_THROWS(std::logic_error, data_channel->send(*int_tensor, rank))
534 std::logic_error, data_channel->receive(*int_tensor, rank))}
537 ASSERT_THROWS(std::out_of_range, data_channel->send(*int_tensor, -1))
538 ASSERT_THROWS(std::out_of_range, data_channel->receive(*int_tensor, -1))
543 void test_empty_group(std::shared_ptr<thd::DataChannel> data_channel) {
545 if (g_data_channel_type ==
"tcp" || g_data_channel_type ==
"gloo") {
546 ASSERT_THROWS(std::logic_error, data_channel->newGroup({}))
551 void test_process_not_in_group(std::shared_ptr<thd::DataChannel> data_channel) {
552 auto int_tensor = buildTensor({1, 2, 3, 4, 5}, -1);
554 THDGroup group = data_channel->newGroup({1});
555 std::vector<std::shared_ptr<thpp::IntTensor>> tensors = {
556 buildTensor<int>({1, 2, 3, 4, 5}, -1)};
557 std::vector<thpp::Tensor*> raw_tensors = {tensors.back().get()};
559 if (data_channel->getRank() == 1) {
561 std::logic_error, data_channel->broadcast(*int_tensor, 0, group))
563 if (g_data_channel_type ==
"gloo") {
569 data_channel->reduce(*int_tensor, THDReduceOp::THDReduceSUM, 0, group))
573 data_channel->scatter(raw_tensors, *int_tensor, 0, group))
577 data_channel->gather(raw_tensors, *int_tensor, 0, group))
582 void test_tensors_do_not_match_group_size(
583 std::shared_ptr<thd::DataChannel> data_channel) {
584 auto int_tensor = buildTensor({1, 2, 3, 4, 5}, -1);
585 THDGroup group = data_channel->newGroup({1, 2});
586 std::vector<std::shared_ptr<thpp::IntTensor>> tensors = {
587 buildTensor<int>({1, 2, 3, 4, 5}, -1)};
588 std::vector<thpp::Tensor*> raw_tensors = {tensors.back().get()};
590 if (data_channel->getRank() == 1 || data_channel->getRank() == 2) {
593 data_channel->allGather(raw_tensors, *int_tensor, group))
595 if (g_data_channel_type ==
"gloo") {
599 if (data_channel->getRank() == 1) {
602 data_channel->scatter(raw_tensors, *int_tensor, 1, group))
606 data_channel->gather(raw_tensors, *int_tensor, 1, group))
612 void test_tensors_are_not_the_same(
613 std::shared_ptr<thd::DataChannel> data_channel) {
614 auto int_tensor = buildTensor({1, 2, 3, 4, 5}, -1);
615 THDGroup group = data_channel->newGroup({1, 2});
616 std::vector<std::shared_ptr<thpp::IntTensor>> tensors = {
617 buildTensor<int>({1, 2, 3, 4, 5}, -1),
618 buildTensor<int>({1, 2, 3, 4}, -1)};
619 std::vector<thpp::Tensor*> raw_tensors = {tensors[0].get(), tensors[1].get()};
621 if (data_channel->getRank() == 1 || data_channel->getRank() == 2) {
624 data_channel->allGather(raw_tensors, *int_tensor, group))
626 if (g_data_channel_type ==
"gloo") {
630 if (data_channel->getRank() == 1) {
633 data_channel->scatter(raw_tensors, *int_tensor, 1, group))
637 data_channel->gather(raw_tensors, *int_tensor, 1, group))
643 std::shared_ptr<thd::DataChannel> data_channel,
645 test_send_recv_tensor(data_channel);
646 test_send_recv_tensor_any_source(data_channel, workers);
647 test_send_recv_scalar(data_channel);
648 test_broadcast(data_channel);
649 test_reduce(data_channel, workers);
650 test_allReduce(data_channel, workers);
651 test_scatter(data_channel);
652 test_gather(data_channel);
653 test_allGather(data_channel);
654 test_barrier(data_channel);
655 test_isend(data_channel);
656 test_irecv(data_channel);
657 test_interlaces(data_channel);
659 std::vector<thd::rank_type> group_ranks = {1, 2};
660 THDGroup group = data_channel->newGroup(group_ranks);
661 test_broadcast_group(data_channel, group, group_ranks);
662 test_reduce_group(data_channel, group, group_ranks);
663 test_allReduce_group(data_channel, group, group_ranks);
664 test_scatter_group(data_channel, group, group_ranks);
665 test_gather_group(data_channel, group, group_ranks);
666 test_allGather_group(data_channel, group, group_ranks);
667 test_barrier_group(data_channel, group, group_ranks);
669 test_send_recv_invalid_rank(data_channel);
670 test_empty_group(data_channel);
671 test_process_not_in_group(data_channel);
672 test_tensors_do_not_match_group_size(data_channel);
673 test_tensors_are_not_the_same(data_channel);
676 void init_tcp_master(
int workers) {
678 setenv(WORLD_SIZE_ENV, std::to_string((workers + 1)).data(), 1);
679 setenv(RANK_ENV,
"0", 1);
680 setenv(MASTER_PORT_ENV, std::to_string(MASTER_PORT).data(), 1);
681 auto masterChannel = std::make_shared<thd::DataChannelTCP>(
682 thd::getInitConfig(
"env://"));
685 assert(masterChannel->init());
686 run_all_tests(masterChannel, workers);
689 for (
auto& worker : g_all_workers) {
694 void init_tcp_worker(
unsigned int id,
int workers) {
696 setenv(RANK_ENV, std::to_string(
id).data(), 1);
699 std::string(
"127.0.0.1:" + std::to_string(MASTER_PORT)).data(),
701 auto worker_channel = std::make_shared<thd::DataChannelTCP>(
702 thd::getInitConfig(
"env://"));
705 assert(worker_channel->init());
706 run_all_tests(worker_channel, workers);
710 void init_gloo_master(
int workers) {
712 setenv(WORLD_SIZE_ENV, std::to_string((workers + 1)).data(), 1);
713 setenv(RANK_ENV,
"0", 1);
714 setenv(MASTER_PORT_ENV, std::to_string(MASTER_PORT).data(), 1);
715 auto masterChannel = std::make_shared<thd::DataChannelGloo>(
716 thd::getInitConfig(
"env://"));
719 assert(masterChannel->init());
720 run_all_tests(masterChannel, workers);
725 void init_gloo_worker(
unsigned int id,
int workers) {
727 setenv(RANK_ENV, std::to_string(
id).data(), 1);
730 std::string(
"127.0.0.1:" + std::to_string(MASTER_PORT)).data(),
732 auto worker_channel = std::make_shared<thd::DataChannelGloo>(
733 thd::getInitConfig(
"env://"));
736 assert(worker_channel->init());
737 run_all_tests(worker_channel, workers);
744 void init_mpi_process() {
745 auto data_channel = std::make_shared<thd::DataChannelMPI>();
746 assert(data_channel->init());
747 run_all_tests(data_channel, WORKERS_NUM[0]);
749 std::cout <<
"MPI OK (id: " << data_channel->getRank() <<
")" << std::endl;
753 int main(
int argc,
char const* argv[]) {
757 g_data_channel_type =
"tcp";
758 for (
auto workers : WORKERS_NUM) {
759 std::cout <<
"TCP (workers: " << workers <<
"):" << std::endl;
761 std::thread tcp_master_thread(init_tcp_master, workers);
764 for (
int id = 1;
id <= workers; ++id) {
765 g_all_workers.push_back(std::thread(init_tcp_worker,
id, workers));
768 tcp_master_thread.join();
769 g_all_workers.clear();
771 std::cout <<
"TCP - OK" << std::endl;
775 g_data_channel_type =
"gloo";
776 for (
auto workers : WORKERS_NUM) {
777 g_barrier.reset(
new Barrier(workers + 1));
778 std::cout <<
"Gloo (workers: " << workers <<
"):" << std::endl;
780 std::thread gloo_master_thread(init_gloo_master, workers);
783 for (
int id = 1;
id <= workers; ++id) {
784 g_all_workers.push_back(std::thread(init_gloo_worker,
id, workers));
788 for (
auto& worker : g_all_workers) {
792 gloo_master_thread.join();
793 g_all_workers.clear();
795 std::cout <<
"Gloo - OK" << std::endl;
800 std::cout <<
"--------------------------" << std::endl;
803 std::cout <<
"MPI:" << std::endl;
808 std::to_string(WORKERS_NUM[0] + 1).data(),
813 g_data_channel_type =
"mpi";