Caffe2 - C++ API
A deep learning, cross platform ML framework
redis_store_handler.cc
1 
17 #include "redis_store_handler.h"
18 
19 #include <caffe2/core/logging.h>
20 
21 #include <chrono>
22 #include <thread>
23 #include <vector>
24 
25 namespace caffe2 {
26 
27 RedisStoreHandler::RedisStoreHandler(
28  std::string& host,
29  int port,
30  std::string& prefix)
31  : host_(host), port_(port), prefix_(prefix) {
32  struct timeval tv = {
33  .tv_sec = 5, .tv_usec = 0,
34  };
35 
36  redis_ = redisConnectWithTimeout(host.c_str(), port, tv);
37  CAFFE_ENFORCE_NE(redis_, (redisContext*)nullptr);
38  CAFFE_ENFORCE_EQ(redis_->err, 0, redis_->errstr);
39 }
40 
41 RedisStoreHandler::~RedisStoreHandler() {
42  redisFree(redis_);
43 }
44 
45 std::string RedisStoreHandler::compoundKey(const std::string& name) {
46  return prefix_ + name;
47 }
48 
49 void RedisStoreHandler::set(const std::string& name, const std::string& data) {
50  auto key = compoundKey(name);
51  void* ptr = redisCommand(
52  redis_,
53  "SETNX %b %b",
54  key.c_str(),
55  (size_t)key.size(),
56  data.c_str(),
57  (size_t)data.size());
58  CAFFE_ENFORCE_NE(ptr, (void*)nullptr, redis_->errstr);
59  redisReply* reply = static_cast<redisReply*>(ptr);
60  CAFFE_ENFORCE_EQ(reply->type, REDIS_REPLY_INTEGER);
61  CAFFE_ENFORCE_EQ(
62  reply->integer,
63  1,
64  "Value at ",
65  name,
66  " was already set",
67  " (perhaps you reused a run ID you have used before?)");
68 }
69 
70 std::string RedisStoreHandler::get(const std::string& name) {
71  // Block until key is set
72  wait({name});
73 
74  auto key = compoundKey(name);
75  void* ptr = redisCommand(redis_, "GET %b", key.c_str(), (size_t)key.size());
76  CAFFE_ENFORCE_NE(ptr, (void*)nullptr, redis_->errstr);
77  redisReply* reply = static_cast<redisReply*>(ptr);
78  CAFFE_ENFORCE_EQ(reply->type, REDIS_REPLY_STRING);
79  return std::string(reply->str, reply->len);
80 }
81 
82 int64_t RedisStoreHandler::add(const std::string& name, int64_t value) {
83  auto key = compoundKey(name);
84  void* ptr = redisCommand(
85  redis_, "INCRBY %b %ld", key.c_str(), (size_t)key.size(), value);
86  CAFFE_ENFORCE_NE(ptr, (void*)nullptr, redis_->errstr);
87  redisReply* reply = static_cast<redisReply*>(ptr);
88  CAFFE_ENFORCE_EQ(reply->type, REDIS_REPLY_INTEGER);
89  return reply->integer;
90 }
91 
92 bool RedisStoreHandler::check(const std::vector<std::string>& names) {
93  std::vector<std::string> args;
94  args.push_back("EXISTS");
95  for (const auto& name : names) {
96  args.push_back(compoundKey(name));
97  }
98 
99  std::vector<const char*> argv;
100  std::vector<size_t> argvlen;
101  for (const auto& arg : args) {
102  argv.push_back(arg.c_str());
103  argvlen.push_back(arg.length());
104  }
105 
106  auto argc = argv.size();
107  void* ptr = redisCommandArgv(redis_, argc, argv.data(), argvlen.data());
108  CAFFE_ENFORCE_NE(ptr, (void*)nullptr, redis_->errstr);
109  redisReply* reply = static_cast<redisReply*>(ptr);
110  CAFFE_ENFORCE_EQ(reply->type, REDIS_REPLY_INTEGER);
111  return reply->integer == names.size();
112 }
113 
114 void RedisStoreHandler::wait(
115  const std::vector<std::string>& names,
116  const std::chrono::milliseconds& timeout) {
117  // Simple approach: poll...
118  // Complex approach: use pub/sub.
119  // Polling is fine for the typical rendezvous use case, as it is
120  // only done at initialization time and not at run time.
121  const auto start = std::chrono::steady_clock::now();
122  while (!check(names)) {
123  const auto elapsed = std::chrono::duration_cast<std::chrono::seconds>(
124  std::chrono::steady_clock::now() - start);
125  if (timeout != kNoTimeout && elapsed > timeout) {
126  STORE_HANDLER_TIMEOUT("Wait timeout for name(s): ", Join(" ", names));
127  }
128  /* sleep override */
129  std::this_thread::sleep_for(std::chrono::milliseconds(10));
130  }
131 }
132 }
Copyright (c) 2016-present, Facebook, Inc.