Caffe2 - C++ API
A deep learning, cross platform ML framework
core.cpp
1 #include <cstring>
2 #include <string>
3 #include <unordered_map>
4 
5 #include <TH/TH.h>
6 #include <libshm/err.h>
7 #include <libshm/socket.h>
8 #include <libshm/libshm.h>
9 
10 std::unordered_map<std::string, ClientSocket> managers;
11 std::string manager_executable_path;
12 
13 AllocInfo get_alloc_info(const char* filename) {
14  AllocInfo info = {0};
15  info.pid = getpid();
16  info.free = false;
17  size_t len = strlen(filename);
18  if (len >= sizeof(info.filename)) {
19  throw std::runtime_error("THMapAllocatorContext_filename too long");
20  }
21  memcpy(info.filename, filename, len + 1);
22  return info;
23 }
24 
25 void start_manager() {
26  int pipe_ends[2];
27  SYSCHECK_ERR_RETURN_NEG1(pipe(pipe_ends));
28 
29  pid_t pid;
30  SYSCHECK_ERR_RETURN_NEG1(pid = fork());
31  if (!pid) {
32  SYSCHECK_ERR_RETURN_NEG1(close(pipe_ends[0]));
33  SYSCHECK_ERR_RETURN_NEG1(dup2(pipe_ends[1], 1)); // Replace stdout
34  SYSCHECK_ERR_RETURN_NEG1(close(pipe_ends[1]));
35  execl(manager_executable_path.c_str(), "torch_shm_manager", NULL);
36  exit(1);
37  }
38  SYSCHECK_ERR_RETURN_NEG1(close(pipe_ends[1]));
39 
40  ssize_t bytes_read;
41  char buffer[1000];
42  std::string handle;
43  for (;;) {
44  SYSCHECK_ERR_RETURN_NEG1(bytes_read = read(pipe_ends[0], buffer, sizeof(buffer)));
45  handle.append(buffer, bytes_read);
46  if (bytes_read == 0 || handle[handle.length() - 1] == '\n') {
47  break;
48  }
49  }
50  SYSCHECK_ERR_RETURN_NEG1(close(pipe_ends[0]));
51  if (handle.length() == 0) {
52  std::string msg("error executing torch_shm_manager at \"");
53  msg += manager_executable_path;
54  msg += "\"";
55  throw std::runtime_error(msg);
56  }
57 
58  handle.pop_back(); // remove \n
59  if (handle == "ERROR")
60  throw std::exception();
61 
62  ClientSocket manager {handle};
63  managers.emplace(std::move(handle), std::move(manager));
64 }
65 
66 ClientSocket& get_manager_socket(const std::string& manager_handle) {
67  auto it = managers.find(manager_handle);
68  if (it == managers.end()) {
69  auto socket = ClientSocket(manager_handle);
70  auto result = managers.emplace(manager_handle, std::move(socket));
71  return result.first->second;
72  } else {
73  return it->second;
74  }
75 }
76 
77 void libshm_init(const char *manager_exec_path) {
78  manager_executable_path = std::string(manager_exec_path);
79 }
80 
81 THManagedMapAllocatorInit::THManagedMapAllocatorInit(const char* manager_handle, const char* filename)
82  : manager_handle_(manager_handle ? manager_handle : "") {
83  // TODO: unlock GIL when contacting the manager
84  try {
85  ClientSocket *socket;
86  if (!manager_handle_.empty()) {
87  socket = &get_manager_socket(manager_handle_);
88  } else {
89  if (managers.size() == 0) {
90  start_manager();
91  }
92  const auto &manager = managers.begin();
93  manager_handle_ = manager->first;
94  socket = &manager->second;
95  }
96  AllocInfo info = get_alloc_info(filename);
97  socket->register_allocation(info);
98  } catch(std::exception &e) {
99  THError(e.what());
100  }
101 }
102 
103 THManagedMapAllocator::THManagedMapAllocator(const char *manager_handle, const char *filename, int flags, ptrdiff_t size)
104  : THManagedMapAllocatorInit(manager_handle, filename), THRefcountedMapAllocator(filename, flags, size) {}
105 
106 void THManagedMapAllocator::close() {
107  if (closed_) return;
108  AllocInfo info = get_alloc_info(filename());
109  info.free = true;
110  ClientSocket &socket = get_manager_socket(manager_handle_);
111  THRefcountedMapAllocator::close();
112  socket.register_deallocation(info);
113 }
114 
115 static void deleteTHManagedMapAllocator(void* ptr) {
116  delete static_cast<THManagedMapAllocator*>(ptr);
117 }
118 
119 at::DataPtr THManagedMapAllocator::makeDataPtr(const char* manager_handle, const char* filename, int flags, ptrdiff_t size) {
120  auto* context = new THManagedMapAllocator(manager_handle, filename, flags, size);
121  return {context->data(), context, &deleteTHManagedMapAllocator, at::DeviceType::CPU};
122 }
123 
124 THManagedMapAllocator* THManagedMapAllocator::fromDataPtr(const at::DataPtr& dptr) {
125  return dptr.cast_context<THManagedMapAllocator>(&deleteTHManagedMapAllocator);
126 }