Caffe2 - C++ API
A deep learning, cross platform ML framework
blobs_queue_db.h
1 
2 #pragma once
3 
4 #include <chrono>
5 #include <string>
6 
7 #include "caffe2/core/db.h"
8 #include "caffe2/core/logging.h"
9 #include "caffe2/core/stats.h"
10 #include "caffe2/queue/blobs_queue.h"
11 
12 namespace caffe2 {
13 namespace db {
14 
15 namespace {
16 const std::string& GetStringFromBlob(Blob* blob) {
17  if (blob->template IsType<string>()) {
18  return blob->template Get<string>();
19  } else if (blob->template IsType<Tensor>()) {
20  return *blob->template Get<Tensor>().template data<string>();
21  } else {
22  CAFFE_THROW("Unsupported Blob type");
23  }
24 }
25 }
26 
27 class BlobsQueueDBCursor : public Cursor {
28  public:
29  explicit BlobsQueueDBCursor(
30  std::shared_ptr<BlobsQueue> queue,
31  int key_blob_index,
32  int value_blob_index,
33  float timeout_secs)
34  : queue_(queue),
35  key_blob_index_(key_blob_index),
36  value_blob_index_(value_blob_index),
37  timeout_secs_(timeout_secs),
38  inited_(false),
39  valid_(false) {
40  LOG(INFO) << "BlobsQueueDBCursor constructed";
41  CAFFE_ENFORCE(queue_ != nullptr, "queue is null");
42  CAFFE_ENFORCE(value_blob_index_ >= 0, "value_blob_index < 0");
43  }
44 
45  virtual ~BlobsQueueDBCursor() {}
46 
47  void Seek(const string& /* unused */) override {
48  CAFFE_THROW("Seek is not supported.");
49  }
50 
51  bool SupportsSeek() override {
52  return false;
53  }
54 
55  void SeekToFirst() override {
56  // not applicable
57  }
58 
59  void Next() override {
60  unique_ptr<Blob> blob = make_unique<Blob>();
61  vector<Blob*> blob_vector{blob.get()};
62  auto success = queue_->blockingRead(blob_vector, timeout_secs_);
63  if (!success) {
64  LOG(ERROR) << "Timed out reading from BlobsQueue or it is closed";
65  valid_ = false;
66  return;
67  }
68 
69  if (key_blob_index_ >= 0) {
70  key_ = GetStringFromBlob(blob_vector[key_blob_index_]);
71  }
72  value_ = GetStringFromBlob(blob_vector[value_blob_index_]);
73  valid_ = true;
74  }
75 
76  string key() override {
77  if (!inited_) {
78  Next();
79  inited_ = true;
80  }
81  return key_;
82  }
83 
84  string value() override {
85  if (!inited_) {
86  Next();
87  inited_ = true;
88  }
89  return value_;
90  }
91 
92  bool Valid() override {
93  return valid_;
94  }
95 
96  private:
97  std::shared_ptr<BlobsQueue> queue_;
98  int key_blob_index_;
99  int value_blob_index_;
100  float timeout_secs_;
101  bool inited_;
102  string key_;
103  string value_;
104  bool valid_;
105 };
106 
107 class BlobsQueueDB : public DB {
108  public:
109  BlobsQueueDB(
110  const string& source,
111  Mode mode,
112  std::shared_ptr<BlobsQueue> queue,
113  int key_blob_index = -1,
114  int value_blob_index = 0,
115  float timeout_secs = 0.0)
116  : DB(source, mode),
117  queue_(queue),
118  key_blob_index_(key_blob_index),
119  value_blob_index_(value_blob_index),
120  timeout_secs_(timeout_secs) {
121  LOG(INFO) << "BlobsQueueDB constructed";
122  }
123 
124  virtual ~BlobsQueueDB() {
125  Close();
126  }
127 
128  void Close() override {}
129  unique_ptr<Cursor> NewCursor() override {
130  return make_unique<BlobsQueueDBCursor>(
131  queue_, key_blob_index_, value_blob_index_, timeout_secs_);
132  }
133 
134  unique_ptr<Transaction> NewTransaction() override {
135  CAFFE_THROW("Not implemented.");
136  }
137 
138  private:
139  std::shared_ptr<BlobsQueue> queue_;
140  int key_blob_index_;
141  int value_blob_index_;
142  float timeout_secs_;
143 };
144 } // namespace db
145 } // namespace caffe2
void Seek(const string &) override
Seek to a specific key (or if the key does not exist, seek to the immediate next).
void Next() override
Go to the next location in the database.
string value() override
Returns the current value.
An abstract class for the cursor of the database while reading.
Definition: db.h:22
string key() override
Returns the current key.
A global dictionary that holds information about what Caffe2 modules have been loaded in the current ...
Definition: blob.h:13
bool Valid() override
Returns whether the current location is valid - for example, if we have reached the end of the databa...
void Close() override
Closes the database.
An abstract class for accessing a database of key-value pairs.
Definition: db.h:80
void SeekToFirst() override
Seek to the first key in the database.
unique_ptr< Transaction > NewTransaction() override
Returns a transaction to write data to the database.
unique_ptr< Cursor > NewCursor() override
Returns a cursor to read the database.