Caffe2 - C++ API
A deep learning, cross platform ML framework
blobs_queue.h
1 #pragma once
2 
3 #include <atomic>
4 #include <condition_variable>
5 #include <memory>
6 #include <mutex>
7 #include <queue>
8 
9 #include "caffe2/core/blob_stats.h"
10 #include "caffe2/core/logging.h"
11 #include "caffe2/core/stats.h"
12 #include "caffe2/core/tensor.h"
13 #include "caffe2/core/workspace.h"
14 
15 namespace caffe2 {
16 
17 // A thread-safe, bounded, blocking queue.
18 // Modelled as a circular buffer.
19 
20 // Containing blobs are owned by the workspace.
21 // On read, we swap out the underlying data for the blob passed in for blobs
22 
23 class CAFFE2_API BlobsQueue : public std::enable_shared_from_this<BlobsQueue> {
24  public:
25  BlobsQueue(
26  Workspace* ws,
27  const std::string& queueName,
28  size_t capacity,
29  size_t numBlobs,
30  bool enforceUniqueName,
31  const std::vector<std::string>& fieldNames = {});
32 
33  ~BlobsQueue() {
34  close();
35  }
36 
37  bool blockingRead(
38  const std::vector<Blob*>& inputs,
39  float timeout_secs = 0.0f);
40  bool tryWrite(const std::vector<Blob*>& inputs);
41  bool blockingWrite(const std::vector<Blob*>& inputs);
42  void close();
43  size_t getNumBlobs() const {
44  return numBlobs_;
45  }
46 
47  private:
48  bool canWrite();
49  void doWrite(const std::vector<Blob*>& inputs);
50 
51  std::atomic<bool> closing_{false};
52 
53  size_t numBlobs_;
54  std::mutex mutex_; // protects all variables in the class.
55  std::condition_variable cv_;
56  int64_t reader_{0};
57  int64_t writer_{0};
58  std::vector<std::vector<Blob*>> queue_;
59  const std::string name_;
60 
61  struct QueueStats {
62  CAFFE_STAT_CTOR(QueueStats);
63  CAFFE_EXPORTED_STAT(queue_balance);
64  CAFFE_EXPORTED_STAT(queue_dequeued_records);
65  CAFFE_DETAILED_EXPORTED_STAT(queue_dequeued_bytes);
66  CAFFE_AVG_EXPORTED_STAT(read_time_ns);
67  CAFFE_AVG_EXPORTED_STAT(write_time_ns);
68  } stats_;
69 };
70 } // namespace caffe2
Workspace is a class that holds all the related objects created during runtime: (1) all blobs...
Definition: workspace.h:47
A global dictionary that holds information about what Caffe2 modules have been loaded in the current ...
Definition: blob.h:13