Caffe2 - C++ API
A deep learning, cross platform ML framework
blobs_queue_db.h
1 
18 #pragma once
19 
20 #include <chrono>
21 #include <string>
22 
23 #include "caffe2/core/db.h"
24 #include "caffe2/core/logging.h"
25 #include "caffe2/core/stats.h"
26 #include "caffe2/queue/blobs_queue.h"
27 
28 namespace caffe2 {
29 namespace db {
30 
31 namespace {
32 const std::string& GetStringFromBlob(Blob* blob) {
33  if (blob->template IsType<string>()) {
34  return blob->template Get<string>();
35  } else if (blob->template IsType<Tensor<CPUContext>>()) {
36  return *blob->template Get<Tensor<CPUContext>>().template data<string>();
37  } else {
38  CAFFE_THROW("Unsupported Blob type");
39  }
40 }
41 }
42 
43 class BlobsQueueDBCursor : public Cursor {
44  public:
45  explicit BlobsQueueDBCursor(
46  std::shared_ptr<BlobsQueue> queue,
47  int key_blob_index,
48  int value_blob_index,
49  float timeout_secs)
50  : queue_(queue),
51  key_blob_index_(key_blob_index),
52  value_blob_index_(value_blob_index),
53  timeout_secs_(timeout_secs),
54  inited_(false),
55  valid_(false) {
56  LOG(INFO) << "BlobsQueueDBCursor constructed";
57  CAFFE_ENFORCE(queue_ != nullptr, "queue is null");
58  CAFFE_ENFORCE(value_blob_index_ >= 0, "value_blob_index < 0");
59  }
60 
61  virtual ~BlobsQueueDBCursor() {}
62 
63  void Seek(const string& /* unused */) override {
64  CAFFE_THROW("Seek is not supported.");
65  }
66 
67  bool SupportsSeek() override {
68  return false;
69  }
70 
71  void SeekToFirst() override {
72  // not applicable
73  }
74 
75  void Next() override {
76  unique_ptr<Blob> blob = make_unique<Blob>();
77  vector<Blob*> blob_vector{blob.get()};
78  auto success = queue_->blockingRead(blob_vector, timeout_secs_);
79  if (!success) {
80  LOG(ERROR) << "Timed out reading from BlobsQueue or it is closed";
81  valid_ = false;
82  return;
83  }
84 
85  if (key_blob_index_ >= 0) {
86  key_ = GetStringFromBlob(blob_vector[key_blob_index_]);
87  }
88  value_ = GetStringFromBlob(blob_vector[value_blob_index_]);
89  valid_ = true;
90  }
91 
92  string key() override {
93  if (!inited_) {
94  Next();
95  inited_ = true;
96  }
97  return key_;
98  }
99 
100  string value() override {
101  if (!inited_) {
102  Next();
103  inited_ = true;
104  }
105  return value_;
106  }
107 
108  bool Valid() override {
109  return valid_;
110  }
111 
112  private:
113  std::shared_ptr<BlobsQueue> queue_;
114  int key_blob_index_;
115  int value_blob_index_;
116  float timeout_secs_;
117  bool inited_;
118  string key_;
119  string value_;
120  bool valid_;
121 };
122 
123 class BlobsQueueDB : public DB {
124  public:
125  BlobsQueueDB(
126  const string& source,
127  Mode mode,
128  std::shared_ptr<BlobsQueue> queue,
129  int key_blob_index = -1,
130  int value_blob_index = 0,
131  float timeout_secs = 0.0)
132  : DB(source, mode),
133  queue_(queue),
134  key_blob_index_(key_blob_index),
135  value_blob_index_(value_blob_index),
136  timeout_secs_(timeout_secs) {
137  LOG(INFO) << "BlobsQueueDB constructed";
138  }
139 
140  virtual ~BlobsQueueDB() {
141  Close();
142  }
143 
144  void Close() override {}
145  unique_ptr<Cursor> NewCursor() override {
146  return make_unique<BlobsQueueDBCursor>(
147  queue_, key_blob_index_, value_blob_index_, timeout_secs_);
148  }
149 
150  unique_ptr<Transaction> NewTransaction() override {
151  CAFFE_THROW("Not implemented.");
152  }
153 
154  private:
155  std::shared_ptr<BlobsQueue> queue_;
156  int key_blob_index_;
157  int value_blob_index_;
158  float timeout_secs_;
159 };
160 } // namespace db
161 } // namespace caffe2
void Seek(const string &) override
Seek to a specific key (or if the key does not exist, seek to the immediate next).
void Next() override
Go to the next location in the database.
string value() override
Returns the current value.
An abstract class for the cursor of the database while reading.
Definition: db.h:38
string key() override
Returns the current key.
Copyright (c) 2016-present, Facebook, Inc.
bool Valid() override
Returns whether the current location is valid - for example, if we have reached the end of the databa...
void Close() override
Closes the database.
Copyright (c) 2016-present, Facebook, Inc.
An abstract class for accessing a database of key-value pairs.
Definition: db.h:96
void SeekToFirst() override
Seek to the first key in the database.
unique_ptr< Transaction > NewTransaction() override
Returns a transaction to write data to the database.
unique_ptr< Cursor > NewCursor() override
Returns a cursor to read the database.