2 #include <condition_variable> 6 #include "caffe2/core/db.h" 7 #include "caffe2/utils/zmq_helper.h" 8 #include "caffe2/core/logging.h" 16 : source_(source), socket_(ZMQ_PULL),
17 prefetched_(
false), finalize_(
false) {
18 socket_.Connect(source_);
20 prefetch_thread_.reset(
21 new std::thread([
this] { this->Prefetch(); }));
29 producer_.notify_one();
31 prefetch_thread_->join();
32 socket_.Disconnect(source_);
35 void Seek(
const string& )
override {
41 std::unique_lock<std::mutex> lock(prefetch_access_mutex_);
42 while (!prefetched_) consumer_.wait(lock);
44 value_ = prefetch_value_;
46 producer_.notify_one();
49 string key()
override {
return key_; }
50 string value()
override {
return value_; }
51 bool Valid()
override {
return true; }
57 std::unique_lock<std::mutex> lock(prefetch_access_mutex_);
58 while (prefetched_) producer_.wait(lock);
63 socket_.RecvTillSuccess(&msg);
64 prefetch_key_.assign(static_cast<char*>(msg.data()), msg.size());
65 socket_.RecvTillSuccess(&msg);
66 prefetch_value_.assign(static_cast<char*>(msg.data()), msg.size());
68 consumer_.notify_one();
77 string prefetch_value_;
79 unique_ptr<std::thread> prefetch_thread_;
80 std::mutex prefetch_access_mutex_;
81 std::condition_variable producer_, consumer_;
82 std::atomic<bool> prefetched_;
84 std::atomic<bool> finalize_;
89 ZmqDB(
const string& source, Mode mode)
90 :
DB(source, mode), source_(source) {
91 CAFFE_ENFORCE(mode == READ,
"ZeroMQ DB only supports read mode.");
99 return make_unique<ZmqDBCursor>(source_);
103 CAFFE_THROW(
"ZeroMQ DB does not support writing with a transaction.");
113 REGISTER_CAFFE2_DB(zmqdb,
ZmqDB);
string value() override
Returns the current value.
An abstract class for the cursor of the database while reading.
void Seek(const string &) override
Seek to a specific key (or if the key does not exist, seek to the immediate next).
string key() override
Returns the current key.
A global dictionary that holds information about what Caffe2 modules have been loaded in the current ...
unique_ptr< Cursor > NewCursor() override
Returns a cursor to read the database.
void SeekToFirst() override
Seek to the first key in the database.
void Close() override
Closes the database.
An abstract class for accessing a database of key-value pairs.
bool Valid() override
Returns whether the current location is valid - for example, if we have reached the end of the databa...
void Next() override
Go to the next location in the database.
unique_ptr< Transaction > NewTransaction() override
Returns a transaction to write data to the database.