1 #include "caffe2/core/db.h"     5 #include "caffe2/core/blob_serialization.h"     6 #include "caffe2/core/logging.h"    10 CAFFE_KNOWN_TYPE(db::DBReader);
    11 CAFFE_KNOWN_TYPE(db::Cursor);
    15 C10_DEFINE_REGISTRY(Caffe2DBRegistry, DB, 
const string&, Mode);
    25     : file_(f), lock_(*mutex), valid_(
true) {
    31   void Seek(
const string& )
 override {
    32     LOG(FATAL) << 
"MiniDB does not support seeking to a specific key.";
    36     fseek(file_, 0, SEEK_SET);
    37     CAFFE_ENFORCE(!feof(file_), 
"Hmm, empty file?");
    45     if (fread(&key_len_, 
sizeof(
int), 1, file_) == 0) {
    47       VLOG(1) << 
"EOF reached, setting valid to false";
    51     CAFFE_ENFORCE_EQ(fread(&value_len_, 
sizeof(
int), 1, file_), 1);
    52     CAFFE_ENFORCE_GT(key_len_, 0);
    53     CAFFE_ENFORCE_GT(value_len_, 0);
    55     if (key_len_ > (
int)key_.size()) {
    56       key_.resize(key_len_);
    58     if (value_len_ > (
int)value_.size()) {
    59       value_.resize(value_len_);
    63         fread(key_.data(), 
sizeof(char), key_len_, file_), key_len_);
    65         fread(value_.data(), 
sizeof(char), value_len_, file_), value_len_);
    70   string key()
 override {
    71     CAFFE_ENFORCE(valid_, 
"Cursor is at invalid location!");
    72     return string(key_.data(), key_len_);
    76     CAFFE_ENFORCE(valid_, 
"Cursor is at invalid location!");
    77     return string(value_.data(), value_len_);
    80   bool Valid()
 override { 
return valid_; }
    84   std::lock_guard<std::mutex> lock_;
    95     : file_(f), lock_(*mutex) {}
   101     int key_len = key.size();
   102     int value_len = value.size();
   103     CAFFE_ENFORCE_EQ(fwrite(&key_len, 
sizeof(
int), 1, file_), 1);
   104     CAFFE_ENFORCE_EQ(fwrite(&value_len, 
sizeof(
int), 1, file_), 1);
   106         fwrite(key.c_str(), 
sizeof(char), key_len, file_), key_len);
   108         fwrite(value.c_str(), 
sizeof(char), value_len, file_), value_len);
   112     if (file_ != 
nullptr) {
   113       CAFFE_ENFORCE_EQ(fflush(file_), 0);
   120   std::lock_guard<std::mutex> lock_;
   127   MiniDB(
const string& source, Mode mode) : 
DB(source, mode), file_(
nullptr) {
   130         file_ = fopen(source.c_str(), 
"wb");
   133         file_ = fopen(source.c_str(), 
"ab");
   134         fseek(file_, 0, SEEK_END);
   137         file_ = fopen(source.c_str(), 
"rb");
   140     CAFFE_ENFORCE(file_, 
"Cannot open file: " + source);
   141     VLOG(1) << 
"Opened MiniDB " << source;
   155     CAFFE_ENFORCE_EQ(this->mode_, READ);
   156     return make_unique<MiniDBCursor>(file_, &file_access_mutex_);
   160     CAFFE_ENFORCE(this->mode_ == NEW || this->mode_ == WRITE);
   161     return make_unique<MiniDBTransaction>(file_, &file_access_mutex_);
   168   std::mutex file_access_mutex_;
   172 REGISTER_CAFFE2_DB(minidb, 
MiniDB);
   178     BlobSerializerBase::SerializationAcceptor acceptor) {
   179   CAFFE_ENFORCE(typeMeta.Match<
DBReader>());
   180   const auto& reader = *
static_cast<const DBReader*
>(pointer);
   182   proto.set_name(name);
   183   proto.set_source(reader.source_);
   184   proto.set_db_type(reader.db_type_);
   185   if (reader.cursor() && reader.cursor()->SupportsSeek()) {
   186     proto.set_key(reader.cursor()->key());
   188   BlobProto blob_proto;
   189   blob_proto.set_name(name);
   190   blob_proto.set_type(
"DBReader");
   191   blob_proto.set_content(SerializeAsString_EnforceCheck(proto));
   192   acceptor(name, SerializeBlobProtoAsString_EnforceCheck(blob_proto));
   195 void DBReaderDeserializer::Deserialize(
const BlobProto& proto, 
Blob* blob) {
   196   DBReaderProto reader_proto;
   198       reader_proto.ParseFromString(proto.content()),
   199       "Cannot parse content into a DBReaderProto.");
   205 REGISTER_BLOB_SERIALIZER((TypeMeta::Id<DBReader>()),
 unique_ptr< Transaction > NewTransaction() override
Returns a transaction to write data to the database. 
 
Blob is a general container that hosts a typed pointer. 
 
string key() override
Returns the current key. 
 
An abstract class for the current database transaction while writing. 
 
An abstract class for the cursor of the database while reading. 
 
bool Valid() override
Returns whether the current location is valid - for example, if we have reached the end of the databa...
 
unique_ptr< Cursor > NewCursor() override
Returns a cursor to read the database. 
 
A reader wrapper for DB that also allows us to serialize it. 
 
void Next() override
Go to the next location in the database. 
 
void SeekToFirst() override
Seek to the first key in the database. 
 
void Close() override
Closes the database. 
 
void Put(const string &key, const string &value) override
Puts the key value pair to the database. 
 
string value() override
Returns the current value. 
 
A global dictionary that holds information about what Caffe2 modules have been loaded in the current ...
 
An abstract class for accessing a database of key-value pairs. 
 
T * Reset(T *allocated)
Sets the underlying object to the allocated one. 
 
void Seek(const string &) override
Seek to a specific key (or if the key does not exist, seek to the immediate next). 
 
void Serialize(const void *pointer, TypeMeta typeMeta, const string &name, BlobSerializerBase::SerializationAcceptor acceptor) override
Serializes a DBReader. 
 
void Commit() override
Commits the current writes.