Caffe2 - C++ API
A deep learning, cross platform ML framework
zmqdb.cc
1 
17 #include <atomic>
18 #include <condition_variable>
19 #include <mutex>
20 #include <thread> // NOLINT
21 
22 #include "caffe2/core/db.h"
23 #include "caffe2/utils/zmq_helper.h"
24 #include "caffe2/core/logging.h"
25 
26 namespace caffe2 {
27 namespace db {
28 
29 class ZmqDBCursor : public Cursor {
30  public:
31  explicit ZmqDBCursor(const string& source)
32  : source_(source), socket_(ZMQ_PULL),
33  prefetched_(false), finalize_(false) {
34  socket_.Connect(source_);
35  // Start prefetching thread.
36  prefetch_thread_.reset(
37  new std::thread([this] { this->Prefetch(); }));
38  // obtain the first value.
39  Next();
40  }
41 
42  ~ZmqDBCursor() {
43  finalize_ = true;
44  prefetched_ = false;
45  producer_.notify_one();
46  // Wait for the prefetch thread to finish elegantly.
47  prefetch_thread_->join();
48  socket_.Disconnect(source_);
49  }
50 
51  void Seek(const string& /*key*/) override { /* do nothing */
52  }
53 
54  void SeekToFirst() override { /* do nothing */ }
55 
56  void Next() override {
57  std::unique_lock<std::mutex> lock(prefetch_access_mutex_);
58  while (!prefetched_) consumer_.wait(lock);
59  key_ = prefetch_key_;
60  value_ = prefetch_value_;
61  prefetched_ = false;
62  producer_.notify_one();
63  }
64 
65  string key() override { return key_; }
66  string value() override { return value_; }
67  bool Valid() override { return true; }
68 
69  private:
70 
71  void Prefetch() {
72  while (!finalize_) {
73  std::unique_lock<std::mutex> lock(prefetch_access_mutex_);
74  while (prefetched_) producer_.wait(lock);
75  if (finalize_) {
76  return;
77  }
78  ZmqMessage msg;
79  socket_.RecvTillSuccess(&msg);
80  prefetch_key_.assign(static_cast<char*>(msg.data()), msg.size());
81  socket_.RecvTillSuccess(&msg);
82  prefetch_value_.assign(static_cast<char*>(msg.data()), msg.size());
83  prefetched_ = true;
84  consumer_.notify_one();
85  }
86  }
87 
88  string source_;
89  ZmqSocket socket_;
90  string key_;
91  string value_;
92  string prefetch_key_;
93  string prefetch_value_;
94 
95  unique_ptr<std::thread> prefetch_thread_;
96  std::mutex prefetch_access_mutex_;
97  std::condition_variable producer_, consumer_;
98  std::atomic<bool> prefetched_;
99  // finalize_ is used to tell the prefetcher to quit.
100  std::atomic<bool> finalize_;
101 };
102 
103 class ZmqDB : public DB {
104  public:
105  ZmqDB(const string& source, Mode mode)
106  : DB(source, mode), source_(source) {
107  CAFFE_ENFORCE(mode == READ, "ZeroMQ DB only supports read mode.");
108  }
109 
110  ~ZmqDB() {}
111 
112  void Close() override {}
113 
114  unique_ptr<Cursor> NewCursor() override {
115  return make_unique<ZmqDBCursor>(source_);
116  }
117 
118  unique_ptr<Transaction> NewTransaction() override {
119  CAFFE_THROW("ZeroMQ DB does not support writing with a transaction.");
120  return nullptr; // dummy placeholder to suppress old compiler warnings.
121  }
122 
123  private:
124  string source_;
125 };
126 
127 REGISTER_CAFFE2_DB(ZmqDB, ZmqDB);
128 // For lazy-minded, one can also call with lower-case name.
129 REGISTER_CAFFE2_DB(zmqdb, ZmqDB);
130 
131 } // namespace db
132 } // namespace caffe2
string value() override
Returns the current value.
Definition: zmqdb.cc:66
An abstract class for the cursor of the database while reading.
Definition: db.h:38
void Seek(const string &) override
Seek to a specific key (or if the key does not exist, seek to the immediate next).
Definition: zmqdb.cc:51
string key() override
Returns the current key.
Definition: zmqdb.cc:65
Copyright (c) 2016-present, Facebook, Inc.
unique_ptr< Cursor > NewCursor() override
Returns a cursor to read the database.
Definition: zmqdb.cc:114
void SeekToFirst() override
Seek to the first key in the database.
Definition: zmqdb.cc:54
void Close() override
Closes the database.
Definition: zmqdb.cc:112
An abstract class for accessing a database of key-value pairs.
Definition: db.h:96
bool Valid() override
Returns whether the current location is valid - for example, if we have reached the end of the databa...
Definition: zmqdb.cc:67
void Next() override
Go to the next location in the database.
Definition: zmqdb.cc:56
unique_ptr< Transaction > NewTransaction() override
Returns a transaction to write data to the database.
Definition: zmqdb.cc:118