Caffe2 - C++ API
A deep learning, cross platform ML framework
blobs_queue.cc
1 
17 #include "caffe2/queue/blobs_queue.h"
18 
19 #include <atomic>
20 #include <condition_variable>
21 #include <memory>
22 #include <mutex>
23 #include <queue>
24 
25 #include "caffe2/core/blob_stats.h"
26 #include "caffe2/core/logging.h"
27 #include "caffe2/core/stats.h"
28 #include "caffe2/core/tensor.h"
29 #include "caffe2/core/workspace.h"
30 
31 namespace caffe2 {
32 
33 // Constants for user tracepoints
34 static constexpr int SDT_NONBLOCKING_OP = 0;
35 static constexpr int SDT_BLOCKING_OP = 1;
36 static constexpr uint64_t SDT_TIMEOUT = (uint64_t)-1;
37 static constexpr uint64_t SDT_ABORT = (uint64_t)-2;
38 static constexpr uint64_t SDT_CANCEL = (uint64_t)-3;
39 
40 BlobsQueue::BlobsQueue(
41  Workspace* ws,
42  const std::string& queueName,
43  size_t capacity,
44  size_t numBlobs,
45  bool enforceUniqueName,
46  const std::vector<std::string>& fieldNames)
47  : numBlobs_(numBlobs), name_(queueName), stats_(queueName) {
48  if (!fieldNames.empty()) {
49  CAFFE_ENFORCE_EQ(
50  fieldNames.size(), numBlobs, "Wrong number of fieldNames provided.");
51  stats_.queue_dequeued_bytes.setDetails(fieldNames);
52  }
53  queue_.reserve(capacity);
54  for (auto i = 0; i < capacity; ++i) {
55  std::vector<Blob*> blobs;
56  blobs.reserve(numBlobs);
57  for (auto j = 0; j < numBlobs; ++j) {
58  const auto blobName = queueName + "_" + to_string(i) + "_" + to_string(j);
59  if (enforceUniqueName) {
60  CAFFE_ENFORCE(
61  !ws->GetBlob(blobName),
62  "Queue internal blob already exists: ",
63  blobName);
64  }
65  blobs.push_back(ws->CreateBlob(blobName));
66  }
67  queue_.push_back(blobs);
68  }
69  DCHECK_EQ(queue_.size(), capacity);
70 }
71 
72 bool BlobsQueue::blockingRead(
73  const std::vector<Blob*>& inputs,
74  float timeout_secs) {
75  auto keeper = this->shared_from_this();
76  const auto& name = name_.c_str();
77  CAFFE_SDT(queue_read_start, name, (void*)this, SDT_BLOCKING_OP);
78  std::unique_lock<std::mutex> g(mutex_);
79  auto canRead = [this]() {
80  CAFFE_ENFORCE_LE(reader_, writer_);
81  return reader_ != writer_;
82  };
83  CAFFE_EVENT(stats_, queue_balance, -1);
84  if (timeout_secs > 0) {
85  std::chrono::milliseconds timeout_ms(int(timeout_secs * 1000));
86  cv_.wait_for(
87  g, timeout_ms, [this, canRead]() { return closing_ || canRead(); });
88  } else {
89  cv_.wait(g, [this, canRead]() { return closing_ || canRead(); });
90  }
91  if (!canRead()) {
92  if (timeout_secs > 0 && !closing_) {
93  LOG(ERROR) << "DequeueBlobs timed out in " << timeout_secs << " secs";
94  CAFFE_SDT(queue_read_end, name, (void*)this, SDT_TIMEOUT);
95  } else {
96  CAFFE_SDT(queue_read_end, name, (void*)this, SDT_CANCEL);
97  }
98  return false;
99  }
100  DCHECK(canRead());
101  auto& result = queue_[reader_ % queue_.size()];
102  CAFFE_ENFORCE(inputs.size() >= result.size());
103  for (auto i = 0; i < result.size(); ++i) {
104  auto bytes = BlobStat::sizeBytes(*result[i]);
105  CAFFE_EVENT(stats_, queue_dequeued_bytes, bytes, i);
106  using std::swap;
107  swap(*(inputs[i]), *(result[i]));
108  }
109  CAFFE_SDT(queue_read_end, name, (void*)this, writer_ - reader_);
110  CAFFE_EVENT(stats_, queue_dequeued_records);
111  ++reader_;
112  cv_.notify_all();
113  return true;
114 }
115 
116 bool BlobsQueue::tryWrite(const std::vector<Blob*>& inputs) {
117  auto keeper = this->shared_from_this();
118  const auto& name = name_.c_str();
119  CAFFE_SDT(queue_write_start, name, (void*)this, SDT_NONBLOCKING_OP);
120  std::unique_lock<std::mutex> g(mutex_);
121  if (!canWrite()) {
122  CAFFE_SDT(queue_write_end, name, (void*)this, SDT_ABORT);
123  return false;
124  }
125  CAFFE_EVENT(stats_, queue_balance, 1);
126  DCHECK(canWrite());
127  doWrite(inputs);
128  return true;
129 }
130 
131 bool BlobsQueue::blockingWrite(const std::vector<Blob*>& inputs) {
132  auto keeper = this->shared_from_this();
133  const auto& name = name_.c_str();
134  CAFFE_SDT(queue_write_start, name, (void*)this, SDT_BLOCKING_OP);
135  std::unique_lock<std::mutex> g(mutex_);
136  CAFFE_EVENT(stats_, queue_balance, 1);
137  cv_.wait(g, [this]() { return closing_ || canWrite(); });
138  if (!canWrite()) {
139  CAFFE_SDT(queue_write_end, name, (void*)this, SDT_ABORT);
140  return false;
141  }
142  DCHECK(canWrite());
143  doWrite(inputs);
144  return true;
145 }
146 
147 void BlobsQueue::close() {
148  closing_ = true;
149 
150  std::lock_guard<std::mutex> g(mutex_);
151  cv_.notify_all();
152 }
153 
154 bool BlobsQueue::canWrite() {
155  // writer is always within [reader, reader + size)
156  // we can write if reader is within [reader, reader + size)
157  CAFFE_ENFORCE_LE(reader_, writer_);
158  CAFFE_ENFORCE_LE(writer_, reader_ + queue_.size());
159  return writer_ != reader_ + queue_.size();
160 }
161 
162 void BlobsQueue::doWrite(const std::vector<Blob*>& inputs) {
163  auto& result = queue_[writer_ % queue_.size()];
164  CAFFE_ENFORCE(inputs.size() >= result.size());
165  const auto& name = name_.c_str();
166  for (auto i = 0; i < result.size(); ++i) {
167  using std::swap;
168  swap(*(inputs[i]), *(result[i]));
169  }
170  CAFFE_SDT(
171  queue_write_end, name, (void*)this, reader_ + queue_.size() - writer_);
172  ++writer_;
173  cv_.notify_all();
174 }
175 
176 } // namespace caffe2
Copyright (c) 2016-present, Facebook, Inc.