1 #include "caffe2/core/db.h" 5 #include "caffe2/core/blob_serialization.h" 6 #include "caffe2/core/logging.h" 10 CAFFE_KNOWN_TYPE(db::DBReader);
11 CAFFE_KNOWN_TYPE(db::Cursor);
15 C10_DEFINE_REGISTRY(Caffe2DBRegistry, DB,
const string&, Mode);
25 : file_(f), lock_(*mutex), valid_(
true) {
31 void Seek(
const string& )
override {
32 LOG(FATAL) <<
"MiniDB does not support seeking to a specific key.";
36 fseek(file_, 0, SEEK_SET);
37 CAFFE_ENFORCE(!feof(file_),
"Hmm, empty file?");
45 if (fread(&key_len_,
sizeof(
int), 1, file_) == 0) {
47 VLOG(1) <<
"EOF reached, setting valid to false";
51 CAFFE_ENFORCE_EQ(fread(&value_len_,
sizeof(
int), 1, file_), 1);
52 CAFFE_ENFORCE_GT(key_len_, 0);
53 CAFFE_ENFORCE_GT(value_len_, 0);
55 if (key_len_ > (
int)key_.size()) {
56 key_.resize(key_len_);
58 if (value_len_ > (
int)value_.size()) {
59 value_.resize(value_len_);
63 fread(key_.data(),
sizeof(char), key_len_, file_), key_len_);
65 fread(value_.data(),
sizeof(char), value_len_, file_), value_len_);
70 string key()
override {
71 CAFFE_ENFORCE(valid_,
"Cursor is at invalid location!");
72 return string(key_.data(), key_len_);
76 CAFFE_ENFORCE(valid_,
"Cursor is at invalid location!");
77 return string(value_.data(), value_len_);
80 bool Valid()
override {
return valid_; }
84 std::lock_guard<std::mutex> lock_;
95 : file_(f), lock_(*mutex) {}
101 int key_len = key.size();
102 int value_len = value.size();
103 CAFFE_ENFORCE_EQ(fwrite(&key_len,
sizeof(
int), 1, file_), 1);
104 CAFFE_ENFORCE_EQ(fwrite(&value_len,
sizeof(
int), 1, file_), 1);
106 fwrite(key.c_str(),
sizeof(char), key_len, file_), key_len);
108 fwrite(value.c_str(),
sizeof(char), value_len, file_), value_len);
112 if (file_ !=
nullptr) {
113 CAFFE_ENFORCE_EQ(fflush(file_), 0);
120 std::lock_guard<std::mutex> lock_;
127 MiniDB(
const string& source, Mode mode) :
DB(source, mode), file_(
nullptr) {
130 file_ = fopen(source.c_str(),
"wb");
133 file_ = fopen(source.c_str(),
"ab");
134 fseek(file_, 0, SEEK_END);
137 file_ = fopen(source.c_str(),
"rb");
140 CAFFE_ENFORCE(file_,
"Cannot open file: " + source);
141 VLOG(1) <<
"Opened MiniDB " << source;
155 CAFFE_ENFORCE_EQ(this->mode_, READ);
156 return make_unique<MiniDBCursor>(file_, &file_access_mutex_);
160 CAFFE_ENFORCE(this->mode_ == NEW || this->mode_ == WRITE);
161 return make_unique<MiniDBTransaction>(file_, &file_access_mutex_);
168 std::mutex file_access_mutex_;
172 REGISTER_CAFFE2_DB(minidb,
MiniDB);
178 BlobSerializerBase::SerializationAcceptor acceptor) {
179 CAFFE_ENFORCE(typeMeta.Match<
DBReader>());
180 const auto& reader = *
static_cast<const DBReader*
>(pointer);
182 proto.set_name(name);
183 proto.set_source(reader.source_);
184 proto.set_db_type(reader.db_type_);
185 if (reader.cursor() && reader.cursor()->SupportsSeek()) {
186 proto.set_key(reader.cursor()->key());
188 BlobProto blob_proto;
189 blob_proto.set_name(name);
190 blob_proto.set_type(
"DBReader");
191 blob_proto.set_content(SerializeAsString_EnforceCheck(proto));
192 acceptor(name, SerializeBlobProtoAsString_EnforceCheck(blob_proto));
195 void DBReaderDeserializer::Deserialize(
const BlobProto& proto,
Blob* blob) {
196 DBReaderProto reader_proto;
198 reader_proto.ParseFromString(proto.content()),
199 "Cannot parse content into a DBReaderProto.");
205 REGISTER_BLOB_SERIALIZER((TypeMeta::Id<DBReader>()),
unique_ptr< Transaction > NewTransaction() override
Returns a transaction to write data to the database.
Blob is a general container that hosts a typed pointer.
string key() override
Returns the current key.
An abstract class for the current database transaction while writing.
An abstract class for the cursor of the database while reading.
bool Valid() override
Returns whether the current location is valid - for example, if we have reached the end of the databa...
unique_ptr< Cursor > NewCursor() override
Returns a cursor to read the database.
A reader wrapper for DB that also allows us to serialize it.
void Next() override
Go to the next location in the database.
void SeekToFirst() override
Seek to the first key in the database.
void Close() override
Closes the database.
void Put(const string &key, const string &value) override
Puts the key value pair to the database.
string value() override
Returns the current value.
A global dictionary that holds information about what Caffe2 modules have been loaded in the current ...
An abstract class for accessing a database of key-value pairs.
T * Reset(T *allocated)
Sets the underlying object to the allocated one.
void Seek(const string &) override
Seek to a specific key (or if the key does not exist, seek to the immediate next).
void Serialize(const void *pointer, TypeMeta typeMeta, const string &name, BlobSerializerBase::SerializationAcceptor acceptor) override
Serializes a DBReader.
void Commit() override
Commits the current writes.