Caffe2 - C++ API
A deep learning, cross platform ML framework
data_channel_tcp_slow_master.cpp
1 #include <THD/base/data_channels/DataChannelTCP.hpp>
2 #include <THD/test/TestUtils.hpp>
3 
4 #include <THPP/tensors/THTensor.hpp>
5 
6 #include <cassert>
7 #include <iostream>
8 #include <memory>
9 #include <mutex>
10 #include <thread>
11 
12 constexpr int WORKERS_NUM = 2;
13 constexpr int MASTER_PORT = 45679;
14 
15 std::vector<std::thread> g_all_workers;
16 std::mutex g_mutex;
17 
18 void master() {
19  g_mutex.lock();
20  setenv(WORLD_SIZE_ENV, std::to_string((WORKERS_NUM + 1)).data(), 1);
21  setenv(RANK_ENV, "0", 1);
22  setenv(MASTER_PORT_ENV, std::to_string(MASTER_PORT).data(), 1);
23  auto masterChannel = std::make_shared<thd::DataChannelTCP>(
24  thd::getInitConfig("env://")); // reads all env variable
25  g_mutex.unlock();
26 
27  // wait a long time before init
28  std::this_thread::sleep_for(std::chrono::seconds(4));
29 
30  assert(masterChannel->init());
31 
32  auto float_tensor = buildTensor<float>({1, 2, 3}, 4);
33  masterChannel->broadcast(*float_tensor, 0); // send good tensor
34 
35  // wait for all workers to finish
36  for (auto& worker : g_all_workers) {
37  worker.join();
38  }
39 }
40 
41 void worker(int id) {
42  g_mutex.lock();
43  setenv(RANK_ENV, std::to_string(id).data(), 1);
44  setenv(
45  MASTER_ADDR_ENV,
46  std::string("127.0.0.1:" + std::to_string(MASTER_PORT)).data(),
47  1);
48  auto workerChannel = std::make_shared<thd::DataChannelTCP>(
49  thd::getInitConfig("env://")); // reads all env variable
50  g_mutex.unlock();
51 
52  assert(workerChannel->init());
53 
54  auto float_tensor = buildTensor<float>({1, 2, 3}, -1);
55  workerChannel->broadcast(*float_tensor, 0);
56  ASSERT_TENSOR_VALUE(float, *float_tensor, 4)
57 }
58 
59 int main() {
60  // start master
61  std::thread master_thread(master);
62 
63  // start worker
64  for (int id = 1; id <= WORKERS_NUM; ++id) {
65  g_all_workers.push_back(std::thread(worker, id));
66  }
67 
68  master_thread.join();
69  std::cout << "OK" << std::endl;
70  return 0;
71 }