1 #include "redis_store_handler.h" 3 #include <caffe2/core/logging.h> 11 RedisStoreHandler::RedisStoreHandler(
15 : host_(host), port_(port), prefix_(prefix) {
21 redis_ = redisConnectWithTimeout(host.c_str(), port, tv);
22 CAFFE_ENFORCE_NE(redis_, (redisContext*)
nullptr);
23 CAFFE_ENFORCE_EQ(redis_->err, 0, redis_->errstr);
26 RedisStoreHandler::~RedisStoreHandler() {
30 std::string RedisStoreHandler::compoundKey(
const std::string& name) {
31 return prefix_ + name;
34 void RedisStoreHandler::set(
const std::string& name,
const std::string& data) {
35 auto key = compoundKey(name);
36 void* ptr = redisCommand(
43 CAFFE_ENFORCE_NE(ptr, (
void*)
nullptr, redis_->errstr);
44 redisReply* reply =
static_cast<redisReply*
>(ptr);
45 CAFFE_ENFORCE_EQ(reply->type, REDIS_REPLY_INTEGER);
52 " (perhaps you reused a run ID you have used before?)");
55 std::string RedisStoreHandler::get(
56 const std::string& name,
57 const std::chrono::milliseconds& timeout) {
59 wait({name}, timeout);
61 auto key = compoundKey(name);
62 void* ptr = redisCommand(redis_,
"GET %b", key.c_str(), (size_t)key.size());
63 CAFFE_ENFORCE_NE(ptr, (
void*)
nullptr, redis_->errstr);
64 redisReply* reply =
static_cast<redisReply*
>(ptr);
65 CAFFE_ENFORCE_EQ(reply->type, REDIS_REPLY_STRING);
66 return std::string(reply->str, reply->len);
69 int64_t RedisStoreHandler::add(
const std::string& name, int64_t value) {
70 auto key = compoundKey(name);
71 void* ptr = redisCommand(
72 redis_,
"INCRBY %b %ld", key.c_str(), (size_t)key.size(), value);
73 CAFFE_ENFORCE_NE(ptr, (
void*)
nullptr, redis_->errstr);
74 redisReply* reply =
static_cast<redisReply*
>(ptr);
75 CAFFE_ENFORCE_EQ(reply->type, REDIS_REPLY_INTEGER);
76 return reply->integer;
79 bool RedisStoreHandler::check(
const std::vector<std::string>& names) {
80 std::vector<std::string> args;
81 args.push_back(
"EXISTS");
82 for (
const auto& name : names) {
83 args.push_back(compoundKey(name));
86 std::vector<const char*> argv;
87 std::vector<size_t> argvlen;
88 for (
const auto& arg : args) {
89 argv.push_back(arg.c_str());
90 argvlen.push_back(arg.length());
93 auto argc = argv.size();
94 void* ptr = redisCommandArgv(redis_, argc, argv.data(), argvlen.data());
95 CAFFE_ENFORCE_NE(ptr, (
void*)
nullptr, redis_->errstr);
96 redisReply* reply =
static_cast<redisReply*
>(ptr);
97 CAFFE_ENFORCE_EQ(reply->type, REDIS_REPLY_INTEGER);
98 return reply->integer == names.size();
101 void RedisStoreHandler::wait(
102 const std::vector<std::string>& names,
103 const std::chrono::milliseconds& timeout) {
108 const auto start = std::chrono::steady_clock::now();
109 while (!check(names)) {
110 const auto elapsed = std::chrono::duration_cast<std::chrono::seconds>(
111 std::chrono::steady_clock::now() - start);
112 if (timeout != kNoTimeout && elapsed > timeout) {
113 STORE_HANDLER_TIMEOUT(
114 "Wait timeout for name(s): ", c10::Join(
" ", names));
117 std::this_thread::sleep_for(std::chrono::milliseconds(10));
A global dictionary that holds information about what Caffe2 modules have been loaded in the current ...