Caffe2 - C++ API
A deep learning, cross platform ML framework
db.cc
1 #include "caffe2/core/db.h"
2 
3 #include <mutex>
4 
5 #include "caffe2/core/blob_serialization.h"
6 #include "caffe2/core/logging.h"
7 
8 namespace caffe2 {
9 
10 CAFFE_KNOWN_TYPE(db::DBReader);
11 CAFFE_KNOWN_TYPE(db::Cursor);
12 
13 namespace db {
14 
15 C10_DEFINE_REGISTRY(Caffe2DBRegistry, DB, const string&, Mode);
16 
17 // Below, we provide a bare minimum database "minidb" as a reference
18 // implementation as well as a portable choice to store data.
19 // Note that the MiniDB classes are not exposed via a header file - they should
20 // be created directly via the db interface. See MiniDB for details.
21 
22 class MiniDBCursor : public Cursor {
23  public:
24  explicit MiniDBCursor(FILE* f, std::mutex* mutex)
25  : file_(f), lock_(*mutex), valid_(true) {
26  // We call Next() to read in the first entry.
27  Next();
28  }
29  ~MiniDBCursor() override {}
30 
31  void Seek(const string& /*key*/) override {
32  LOG(FATAL) << "MiniDB does not support seeking to a specific key.";
33  }
34 
35  void SeekToFirst() override {
36  fseek(file_, 0, SEEK_SET);
37  CAFFE_ENFORCE(!feof(file_), "Hmm, empty file?");
38  // Read the first item.
39  valid_ = true;
40  Next();
41  }
42 
43  void Next() override {
44  // First, read in the key and value length.
45  if (fread(&key_len_, sizeof(int), 1, file_) == 0) {
46  // Reaching EOF.
47  VLOG(1) << "EOF reached, setting valid to false";
48  valid_ = false;
49  return;
50  }
51  CAFFE_ENFORCE_EQ(fread(&value_len_, sizeof(int), 1, file_), 1);
52  CAFFE_ENFORCE_GT(key_len_, 0);
53  CAFFE_ENFORCE_GT(value_len_, 0);
54  // Resize if the key and value len is larger than the current one.
55  if (key_len_ > (int)key_.size()) {
56  key_.resize(key_len_);
57  }
58  if (value_len_ > (int)value_.size()) {
59  value_.resize(value_len_);
60  }
61  // Actually read in the contents.
62  CAFFE_ENFORCE_EQ(
63  fread(key_.data(), sizeof(char), key_len_, file_), key_len_);
64  CAFFE_ENFORCE_EQ(
65  fread(value_.data(), sizeof(char), value_len_, file_), value_len_);
66  // Note(Yangqing): as we read the file, the cursor naturally moves to the
67  // beginning of the next entry.
68  }
69 
70  string key() override {
71  CAFFE_ENFORCE(valid_, "Cursor is at invalid location!");
72  return string(key_.data(), key_len_);
73  }
74 
75  string value() override {
76  CAFFE_ENFORCE(valid_, "Cursor is at invalid location!");
77  return string(value_.data(), value_len_);
78  }
79 
80  bool Valid() override { return valid_; }
81 
82  private:
83  FILE* file_;
84  std::lock_guard<std::mutex> lock_;
85  bool valid_;
86  int key_len_;
87  vector<char> key_;
88  int value_len_;
89  vector<char> value_;
90 };
91 
93  public:
94  explicit MiniDBTransaction(FILE* f, std::mutex* mutex)
95  : file_(f), lock_(*mutex) {}
96  ~MiniDBTransaction() override {
97  Commit();
98  }
99 
100  void Put(const string& key, const string& value) override {
101  int key_len = key.size();
102  int value_len = value.size();
103  CAFFE_ENFORCE_EQ(fwrite(&key_len, sizeof(int), 1, file_), 1);
104  CAFFE_ENFORCE_EQ(fwrite(&value_len, sizeof(int), 1, file_), 1);
105  CAFFE_ENFORCE_EQ(
106  fwrite(key.c_str(), sizeof(char), key_len, file_), key_len);
107  CAFFE_ENFORCE_EQ(
108  fwrite(value.c_str(), sizeof(char), value_len, file_), value_len);
109  }
110 
111  void Commit() override {
112  if (file_ != nullptr) {
113  CAFFE_ENFORCE_EQ(fflush(file_), 0);
114  file_ = nullptr;
115  }
116  }
117 
118  private:
119  FILE* file_;
120  std::lock_guard<std::mutex> lock_;
121 
122  C10_DISABLE_COPY_AND_ASSIGN(MiniDBTransaction);
123 };
124 
125 class MiniDB : public DB {
126  public:
127  MiniDB(const string& source, Mode mode) : DB(source, mode), file_(nullptr) {
128  switch (mode) {
129  case NEW:
130  file_ = fopen(source.c_str(), "wb");
131  break;
132  case WRITE:
133  file_ = fopen(source.c_str(), "ab");
134  fseek(file_, 0, SEEK_END);
135  break;
136  case READ:
137  file_ = fopen(source.c_str(), "rb");
138  break;
139  }
140  CAFFE_ENFORCE(file_, "Cannot open file: " + source);
141  VLOG(1) << "Opened MiniDB " << source;
142  }
143  ~MiniDB() override {
144  Close();
145  }
146 
147  void Close() override {
148  if (file_) {
149  fclose(file_);
150  }
151  file_ = nullptr;
152  }
153 
154  unique_ptr<Cursor> NewCursor() override {
155  CAFFE_ENFORCE_EQ(this->mode_, READ);
156  return make_unique<MiniDBCursor>(file_, &file_access_mutex_);
157  }
158 
159  unique_ptr<Transaction> NewTransaction() override {
160  CAFFE_ENFORCE(this->mode_ == NEW || this->mode_ == WRITE);
161  return make_unique<MiniDBTransaction>(file_, &file_access_mutex_);
162  }
163 
164  private:
165  FILE* file_;
166  // access mutex makes sure we don't have multiple cursors/transactions
167  // reading the same file.
168  std::mutex file_access_mutex_;
169 };
170 
171 REGISTER_CAFFE2_DB(MiniDB, MiniDB);
172 REGISTER_CAFFE2_DB(minidb, MiniDB);
173 
175  const void* pointer,
176  TypeMeta typeMeta,
177  const string& name,
178  BlobSerializerBase::SerializationAcceptor acceptor) {
179  CAFFE_ENFORCE(typeMeta.Match<DBReader>());
180  const auto& reader = *static_cast<const DBReader*>(pointer);
181  DBReaderProto proto;
182  proto.set_name(name);
183  proto.set_source(reader.source_);
184  proto.set_db_type(reader.db_type_);
185  if (reader.cursor() && reader.cursor()->SupportsSeek()) {
186  proto.set_key(reader.cursor()->key());
187  }
188  BlobProto blob_proto;
189  blob_proto.set_name(name);
190  blob_proto.set_type("DBReader");
191  blob_proto.set_content(SerializeAsString_EnforceCheck(proto));
192  acceptor(name, SerializeBlobProtoAsString_EnforceCheck(blob_proto));
193 }
194 
195 void DBReaderDeserializer::Deserialize(const BlobProto& proto, Blob* blob) {
196  DBReaderProto reader_proto;
197  CAFFE_ENFORCE(
198  reader_proto.ParseFromString(proto.content()),
199  "Cannot parse content into a DBReaderProto.");
200  blob->Reset(new DBReader(reader_proto));
201 }
202 
203 namespace {
204 // Serialize TensorCPU.
205 REGISTER_BLOB_SERIALIZER((TypeMeta::Id<DBReader>()),
207 REGISTER_BLOB_DESERIALIZER(DBReader, DBReaderDeserializer);
208 } // namespace
209 
210 } // namespace db
211 } // namespace caffe2
unique_ptr< Transaction > NewTransaction() override
Returns a transaction to write data to the database.
Definition: db.cc:159
Blob is a general container that hosts a typed pointer.
Definition: blob.h:24
string key() override
Returns the current key.
Definition: db.cc:70
An abstract class for the current database transaction while writing.
Definition: db.h:61
An abstract class for the cursor of the database while reading.
Definition: db.h:22
bool Valid() override
Returns whether the current location is valid - for example, if we have reached the end of the databa...
Definition: db.cc:80
unique_ptr< Cursor > NewCursor() override
Returns a cursor to read the database.
Definition: db.cc:154
A reader wrapper for DB that also allows us to serialize it.
Definition: db.h:144
void Next() override
Go to the next location in the database.
Definition: db.cc:43
void SeekToFirst() override
Seek to the first key in the database.
Definition: db.cc:35
void Close() override
Closes the database.
Definition: db.cc:147
void Put(const string &key, const string &value) override
Puts the key value pair to the database.
Definition: db.cc:100
string value() override
Returns the current value.
Definition: db.cc:75
A global dictionary that holds information about what Caffe2 modules have been loaded in the current ...
Definition: blob.h:13
An abstract class for accessing a database of key-value pairs.
Definition: db.h:80
T * Reset(T *allocated)
Sets the underlying object to the allocated one.
Definition: blob.h:132
void Seek(const string &) override
Seek to a specific key (or if the key does not exist, seek to the immediate next).
Definition: db.cc:31
TypeMeta is a thin class that allows us to store the type of a container such as a blob...
Definition: typeid.h:324
void Serialize(const void *pointer, TypeMeta typeMeta, const string &name, BlobSerializerBase::SerializationAcceptor acceptor) override
Serializes a DBReader.
Definition: db.cc:174
void Commit() override
Commits the current writes.
Definition: db.cc:111