Caffe2 - C++ API
A deep learning, cross platform ML framework
redis_store_handler.cc
1 #include "redis_store_handler.h"
2 
3 #include <caffe2/core/logging.h>
4 
5 #include <chrono>
6 #include <thread>
7 #include <vector>
8 
9 namespace caffe2 {
10 
11 RedisStoreHandler::RedisStoreHandler(
12  std::string& host,
13  int port,
14  std::string& prefix)
15  : host_(host), port_(port), prefix_(prefix) {
16  struct timeval tv = {
17  .tv_sec = 5,
18  .tv_usec = 0,
19  };
20 
21  redis_ = redisConnectWithTimeout(host.c_str(), port, tv);
22  CAFFE_ENFORCE_NE(redis_, (redisContext*)nullptr);
23  CAFFE_ENFORCE_EQ(redis_->err, 0, redis_->errstr);
24 }
25 
26 RedisStoreHandler::~RedisStoreHandler() {
27  redisFree(redis_);
28 }
29 
30 std::string RedisStoreHandler::compoundKey(const std::string& name) {
31  return prefix_ + name;
32 }
33 
34 void RedisStoreHandler::set(const std::string& name, const std::string& data) {
35  auto key = compoundKey(name);
36  void* ptr = redisCommand(
37  redis_,
38  "SETNX %b %b",
39  key.c_str(),
40  (size_t)key.size(),
41  data.c_str(),
42  (size_t)data.size());
43  CAFFE_ENFORCE_NE(ptr, (void*)nullptr, redis_->errstr);
44  redisReply* reply = static_cast<redisReply*>(ptr);
45  CAFFE_ENFORCE_EQ(reply->type, REDIS_REPLY_INTEGER);
46  CAFFE_ENFORCE_EQ(
47  reply->integer,
48  1,
49  "Value at ",
50  name,
51  " was already set",
52  " (perhaps you reused a run ID you have used before?)");
53 }
54 
55 std::string RedisStoreHandler::get(
56  const std::string& name,
57  const std::chrono::milliseconds& timeout) {
58  // Block until key is set
59  wait({name}, timeout);
60 
61  auto key = compoundKey(name);
62  void* ptr = redisCommand(redis_, "GET %b", key.c_str(), (size_t)key.size());
63  CAFFE_ENFORCE_NE(ptr, (void*)nullptr, redis_->errstr);
64  redisReply* reply = static_cast<redisReply*>(ptr);
65  CAFFE_ENFORCE_EQ(reply->type, REDIS_REPLY_STRING);
66  return std::string(reply->str, reply->len);
67 }
68 
69 int64_t RedisStoreHandler::add(const std::string& name, int64_t value) {
70  auto key = compoundKey(name);
71  void* ptr = redisCommand(
72  redis_, "INCRBY %b %ld", key.c_str(), (size_t)key.size(), value);
73  CAFFE_ENFORCE_NE(ptr, (void*)nullptr, redis_->errstr);
74  redisReply* reply = static_cast<redisReply*>(ptr);
75  CAFFE_ENFORCE_EQ(reply->type, REDIS_REPLY_INTEGER);
76  return reply->integer;
77 }
78 
79 bool RedisStoreHandler::check(const std::vector<std::string>& names) {
80  std::vector<std::string> args;
81  args.push_back("EXISTS");
82  for (const auto& name : names) {
83  args.push_back(compoundKey(name));
84  }
85 
86  std::vector<const char*> argv;
87  std::vector<size_t> argvlen;
88  for (const auto& arg : args) {
89  argv.push_back(arg.c_str());
90  argvlen.push_back(arg.length());
91  }
92 
93  auto argc = argv.size();
94  void* ptr = redisCommandArgv(redis_, argc, argv.data(), argvlen.data());
95  CAFFE_ENFORCE_NE(ptr, (void*)nullptr, redis_->errstr);
96  redisReply* reply = static_cast<redisReply*>(ptr);
97  CAFFE_ENFORCE_EQ(reply->type, REDIS_REPLY_INTEGER);
98  return reply->integer == names.size();
99 }
100 
101 void RedisStoreHandler::wait(
102  const std::vector<std::string>& names,
103  const std::chrono::milliseconds& timeout) {
104  // Simple approach: poll...
105  // Complex approach: use pub/sub.
106  // Polling is fine for the typical rendezvous use case, as it is
107  // only done at initialization time and not at run time.
108  const auto start = std::chrono::steady_clock::now();
109  while (!check(names)) {
110  const auto elapsed = std::chrono::duration_cast<std::chrono::seconds>(
111  std::chrono::steady_clock::now() - start);
112  if (timeout != kNoTimeout && elapsed > timeout) {
113  STORE_HANDLER_TIMEOUT(
114  "Wait timeout for name(s): ", c10::Join(" ", names));
115  }
116  /* sleep override */
117  std::this_thread::sleep_for(std::chrono::milliseconds(10));
118  }
119 }
120 } // namespace caffe2
A global dictionary that holds information about what Caffe2 modules have been loaded in the current ...
Definition: blob.h:13