7 #include "caffe2/core/db.h" 8 #include "caffe2/core/logging.h" 9 #include "caffe2/core/stats.h" 10 #include "caffe2/queue/blobs_queue.h" 16 const std::string& GetStringFromBlob(Blob* blob) {
17 if (blob->template IsType<string>()) {
18 return blob->template Get<string>();
19 }
else if (blob->template IsType<Tensor>()) {
20 return *blob->template Get<Tensor>().
template data<string>();
22 CAFFE_THROW(
"Unsupported Blob type");
30 std::shared_ptr<BlobsQueue> queue,
35 key_blob_index_(key_blob_index),
36 value_blob_index_(value_blob_index),
37 timeout_secs_(timeout_secs),
40 LOG(INFO) <<
"BlobsQueueDBCursor constructed";
41 CAFFE_ENFORCE(queue_ !=
nullptr,
"queue is null");
42 CAFFE_ENFORCE(value_blob_index_ >= 0,
"value_blob_index < 0");
47 void Seek(
const string& )
override {
48 CAFFE_THROW(
"Seek is not supported.");
51 bool SupportsSeek()
override {
60 unique_ptr<Blob> blob = make_unique<Blob>();
61 vector<Blob*> blob_vector{blob.get()};
62 auto success = queue_->blockingRead(blob_vector, timeout_secs_);
64 LOG(ERROR) <<
"Timed out reading from BlobsQueue or it is closed";
69 if (key_blob_index_ >= 0) {
70 key_ = GetStringFromBlob(blob_vector[key_blob_index_]);
72 value_ = GetStringFromBlob(blob_vector[value_blob_index_]);
76 string key()
override {
97 std::shared_ptr<BlobsQueue> queue_;
99 int value_blob_index_;
110 const string& source,
112 std::shared_ptr<BlobsQueue> queue,
113 int key_blob_index = -1,
114 int value_blob_index = 0,
115 float timeout_secs = 0.0)
118 key_blob_index_(key_blob_index),
119 value_blob_index_(value_blob_index),
120 timeout_secs_(timeout_secs) {
121 LOG(INFO) <<
"BlobsQueueDB constructed";
130 return make_unique<BlobsQueueDBCursor>(
131 queue_, key_blob_index_, value_blob_index_, timeout_secs_);
135 CAFFE_THROW(
"Not implemented.");
139 std::shared_ptr<BlobsQueue> queue_;
141 int value_blob_index_;
void Seek(const string &) override
Seek to a specific key (or if the key does not exist, seek to the immediate next).
void Next() override
Go to the next location in the database.
string value() override
Returns the current value.
An abstract class for the cursor of the database while reading.
string key() override
Returns the current key.
A global dictionary that holds information about what Caffe2 modules have been loaded in the current ...
bool Valid() override
Returns whether the current location is valid - for example, if we have reached the end of the databa...
void Close() override
Closes the database.
An abstract class for accessing a database of key-value pairs.
void SeekToFirst() override
Seek to the first key in the database.
unique_ptr< Transaction > NewTransaction() override
Returns a transaction to write data to the database.
unique_ptr< Cursor > NewCursor() override
Returns a cursor to read the database.