1 #include <THD/base/data_channels/DataChannelGloo.hpp> 2 #include <THD/test/TestUtils.hpp> 4 #include <THPP/tensors/THTensor.hpp> 15 constexpr std::array<int, 1> WORKERS_NUM = {10};
16 constexpr
int MASTER_PORT = 45678;
18 std::vector<std::thread> g_all_workers;
21 void test(std::shared_ptr<thd::DataChannel> data_channel) {
22 for (
size_t dest = 0; dest < data_channel->getNumProcesses(); ++dest) {
23 if (data_channel->getRank() == dest) {
24 auto float_tensor = buildTensor<float>({1, 2, 3, 4, 5}, 10.123);
25 data_channel->broadcast(*float_tensor, dest);
27 auto float_tensor = buildTensor<float>({1, 2, 3, 4, 5}, -1.0);
28 data_channel->broadcast(*float_tensor, dest);
29 ASSERT_TENSOR_VALUE(
float, *float_tensor, 10.123)
35 std::shared_ptr<thd::DataChannel> data_channel,
40 for (
size_t i = 0; i < 1000; ++i) {
45 void init_gloo_master(
int workers) {
47 setenv(WORLD_SIZE_ENV, std::to_string((workers + 1)).data(), 1);
48 setenv(RANK_ENV,
"0", 1);
49 setenv(MASTER_PORT_ENV, std::to_string(MASTER_PORT).data(), 1);
50 auto masterChannel = std::make_shared<thd::DataChannelGloo>(
51 thd::getInitConfig(
"env://"));
54 assert(masterChannel->init());
55 run_all_tests(masterChannel, workers);
58 void init_gloo_worker(
unsigned int id,
int workers) {
60 setenv(RANK_ENV, std::to_string(
id).data(), 1);
63 std::string(
"127.0.0.1:" + std::to_string(MASTER_PORT)).data(),
65 auto worker_channel = std::make_shared<thd::DataChannelGloo>(
66 thd::getInitConfig(
"env://"));
69 assert(worker_channel->init());
70 run_all_tests(worker_channel, workers);
74 for (
auto workers : WORKERS_NUM) {
75 std::cout <<
"Gloo (workers: " << workers <<
"):" << std::endl;
77 std::thread gloo_master_thread(init_gloo_master, workers);
80 for (
int id = 1;
id <= workers; ++id) {
81 g_all_workers.push_back(std::thread(init_gloo_worker,
id, workers));
85 for (
auto& worker : g_all_workers) {
89 gloo_master_thread.join();
90 g_all_workers.clear();
92 std::cout <<
"Gloo - OK" << std::endl;