Caffe2 - C++ API
A deep learning, cross platform ML framework
rocksdb.cc
1 
17 #include "caffe2/core/db.h"
18 #include "caffe2/core/logging.h"
19 #include "caffe2/core/module.h"
20 #include "caffe2/core/flags.h"
21 #include "rocksdb/db.h"
22 #include "rocksdb/utilities/leveldb_options.h"
23 
24 C10_DEFINE_int(
25  caffe2_rocksdb_block_size,
26  65536,
27  "The caffe2 rocksdb block size when writing a rocksdb.");
28 
29 namespace caffe2 {
30 namespace db {
31 
32 class RocksDBCursor : public Cursor {
33  public:
34  explicit RocksDBCursor(rocksdb::DB* db)
35  : iter_(db->NewIterator(rocksdb::ReadOptions())) {
36  SeekToFirst();
37  }
38  ~RocksDBCursor() {}
39  void Seek(const string& key) override { iter_->Seek(key); }
40  bool SupportsSeek() override { return true; }
41  void SeekToFirst() override { iter_->SeekToFirst(); }
42  void Next() override { iter_->Next(); }
43  string key() override { return iter_->key().ToString(); }
44  string value() override { return iter_->value().ToString(); }
45  bool Valid() override { return iter_->Valid(); }
46 
47  private:
48  std::unique_ptr<rocksdb::Iterator> iter_;
49 };
50 
52  public:
53  explicit RocksDBTransaction(rocksdb::DB* db) : db_(db) {
54  CAFFE_ENFORCE(db_);
55  batch_.reset(new rocksdb::WriteBatch());
56  }
57  ~RocksDBTransaction() { Commit(); }
58  void Put(const string& key, const string& value) override {
59  batch_->Put(key, value);
60  }
61  void Commit() override {
62  rocksdb::Status status = db_->Write(rocksdb::WriteOptions(), batch_.get());
63  batch_.reset(new rocksdb::WriteBatch());
64  CAFFE_ENFORCE(
65  status.ok(), "Failed to write batch to rocksdb: " + status.ToString());
66  }
67 
68  private:
69  rocksdb::DB* db_;
70  std::unique_ptr<rocksdb::WriteBatch> batch_;
71 
72  C10_DISABLE_COPY_AND_ASSIGN(RocksDBTransaction);
73 };
74 
75 class RocksDB : public DB {
76  public:
77  RocksDB(const string& source, Mode mode) : DB(source, mode) {
78  rocksdb::LevelDBOptions options;
79  options.block_size = FLAGS_caffe2_rocksdb_block_size;
80  options.write_buffer_size = 268435456;
81  options.max_open_files = 100;
82  options.error_if_exists = mode == NEW;
83  options.create_if_missing = mode != READ;
84  rocksdb::Options rocksdb_options = rocksdb::ConvertOptions(options);
85 
86  rocksdb::DB* db_temp;
87  rocksdb::Status status = rocksdb::DB::Open(
88  rocksdb_options, source, &db_temp);
89  CAFFE_ENFORCE(
90  status.ok(),
91  "Failed to open rocksdb ",
92  source,
93  "\n",
94  status.ToString());
95  db_.reset(db_temp);
96  VLOG(1) << "Opened rocksdb " << source;
97  }
98 
99  void Close() override { db_.reset(); }
100  unique_ptr<Cursor> NewCursor() override {
101  return make_unique<RocksDBCursor>(db_.get());
102  }
103  unique_ptr<Transaction> NewTransaction() override {
104  return make_unique<RocksDBTransaction>(db_.get());
105  }
106 
107  private:
108  std::unique_ptr<rocksdb::DB> db_;
109 };
110 
111 REGISTER_CAFFE2_DB(RocksDB, RocksDB);
112 // For lazy-minded, one can also call with lower-case name.
113 REGISTER_CAFFE2_DB(rocksdb, RocksDB);
114 
115 } // namespace db
116 
117 CAFFE2_MODULE(caffe2_rocksdb, "RocksDB implementation for caffe2::DB.");
118 } // namespace caffe2
bool Valid() override
Returns whether the current location is valid - for example, if we have reached the end of the databa...
Definition: rocksdb.cc:45
void Next() override
Go to the next location in the database.
Definition: rocksdb.cc:42
unique_ptr< Cursor > NewCursor() override
Returns a cursor to read the database.
Definition: rocksdb.cc:100
An abstract class for the current database transaction while writing.
Definition: db.h:61
An abstract class for the cursor of the database while reading.
Definition: db.h:22
unique_ptr< Transaction > NewTransaction() override
Returns a transaction to write data to the database.
Definition: rocksdb.cc:103
string value() override
Returns the current value.
Definition: rocksdb.cc:44
A global dictionary that holds information about what Caffe2 modules have been loaded in the current ...
Definition: blob.h:13
An abstract class for accessing a database of key-value pairs.
Definition: db.h:80
void SeekToFirst() override
Seek to the first key in the database.
Definition: rocksdb.cc:41
void Put(const string &key, const string &value) override
Puts the key value pair to the database.
Definition: rocksdb.cc:58
void Seek(const string &key) override
Seek to a specific key (or if the key does not exist, seek to the immediate next).
Definition: rocksdb.cc:39
void Commit() override
Commits the current writes.
Definition: rocksdb.cc:61
string key() override
Returns the current key.
Definition: rocksdb.cc:43
void Close() override
Closes the database.
Definition: rocksdb.cc:99