1 #include "caffe2/queue/blobs_queue.h" 4 #include <condition_variable> 9 #include "caffe2/core/blob_stats.h" 10 #include "caffe2/core/logging.h" 11 #include "caffe2/core/stats.h" 12 #include "caffe2/core/tensor.h" 13 #include "caffe2/core/timer.h" 14 #include "caffe2/core/workspace.h" 19 static constexpr
int SDT_NONBLOCKING_OP = 0;
20 static constexpr
int SDT_BLOCKING_OP = 1;
21 static constexpr uint64_t SDT_TIMEOUT = (uint64_t)-1;
22 static constexpr uint64_t SDT_ABORT = (uint64_t)-2;
23 static constexpr uint64_t SDT_CANCEL = (uint64_t)-3;
25 BlobsQueue::BlobsQueue(
27 const std::string& queueName,
30 bool enforceUniqueName,
31 const std::vector<std::string>& fieldNames)
32 : numBlobs_(numBlobs), name_(queueName), stats_(queueName) {
33 if (!fieldNames.empty()) {
35 fieldNames.size(), numBlobs,
"Wrong number of fieldNames provided.");
36 stats_.queue_dequeued_bytes.setDetails(fieldNames);
38 queue_.reserve(capacity);
39 for (
size_t i = 0; i < capacity; ++i) {
40 std::vector<Blob*> blobs;
41 blobs.reserve(numBlobs);
42 for (
size_t j = 0; j < numBlobs; ++j) {
43 const auto blobName = queueName +
"_" + to_string(i) +
"_" + to_string(j);
44 if (enforceUniqueName) {
46 !ws->GetBlob(blobName),
47 "Queue internal blob already exists: ",
50 blobs.push_back(ws->CreateBlob(blobName));
52 queue_.push_back(blobs);
54 DCHECK_EQ(queue_.size(), capacity);
57 bool BlobsQueue::blockingRead(
58 const std::vector<Blob*>& inputs,
61 auto keeper = this->shared_from_this();
62 const auto& name = name_.c_str();
63 CAFFE_SDT(queue_read_start, name, (
void*)
this, SDT_BLOCKING_OP);
64 std::unique_lock<std::mutex> g(mutex_);
65 auto canRead = [
this]() {
66 CAFFE_ENFORCE_LE(reader_, writer_);
67 return reader_ != writer_;
71 CAFFE_EVENT(stats_, queue_balance, -1);
72 if (timeout_secs > 0) {
73 std::chrono::milliseconds timeout_ms(
int(timeout_secs * 1000));
75 g, timeout_ms, [
this, canRead]() {
return closing_ || canRead(); });
77 cv_.wait(g, [
this, canRead]() {
return closing_ || canRead(); });
80 if (timeout_secs > 0 && !closing_) {
81 LOG(ERROR) <<
"DequeueBlobs timed out in " << timeout_secs <<
" secs";
82 CAFFE_SDT(queue_read_end, name, (
void*)
this, SDT_TIMEOUT);
84 CAFFE_SDT(queue_read_end, name, (
void*)
this, SDT_CANCEL);
89 auto& result = queue_[reader_ % queue_.size()];
90 CAFFE_ENFORCE(inputs.size() >= result.size());
91 for (
auto i = 0; i < result.size(); ++i) {
92 auto bytes = BlobStat::sizeBytes(*result[i]);
93 CAFFE_EVENT(stats_, queue_dequeued_bytes, bytes, i);
95 swap(*(inputs[i]), *(result[i]));
97 CAFFE_SDT(queue_read_end, name, (
void*)
this, writer_ - reader_);
98 CAFFE_EVENT(stats_, queue_dequeued_records);
101 CAFFE_EVENT(stats_, read_time_ns, readTimer.NanoSeconds());
105 bool BlobsQueue::tryWrite(
const std::vector<Blob*>& inputs) {
107 auto keeper = this->shared_from_this();
108 const auto& name = name_.c_str();
109 CAFFE_SDT(queue_write_start, name, (
void*)
this, SDT_NONBLOCKING_OP);
110 std::unique_lock<std::mutex> g(mutex_);
112 CAFFE_SDT(queue_write_end, name, (
void*)
this, SDT_ABORT);
117 CAFFE_EVENT(stats_, queue_balance, 1);
120 CAFFE_EVENT(stats_, write_time_ns, writeTimer.NanoSeconds());
124 bool BlobsQueue::blockingWrite(
const std::vector<Blob*>& inputs) {
126 auto keeper = this->shared_from_this();
127 const auto& name = name_.c_str();
128 CAFFE_SDT(queue_write_start, name, (
void*)
this, SDT_BLOCKING_OP);
129 std::unique_lock<std::mutex> g(mutex_);
132 CAFFE_EVENT(stats_, queue_balance, 1);
133 cv_.wait(g, [
this]() {
return closing_ || canWrite(); });
135 CAFFE_SDT(queue_write_end, name, (
void*)
this, SDT_ABORT);
140 CAFFE_EVENT(stats_, write_time_ns, writeTimer.NanoSeconds());
144 void BlobsQueue::close() {
147 std::lock_guard<std::mutex> g(mutex_);
151 bool BlobsQueue::canWrite() {
154 CAFFE_ENFORCE_LE(reader_, writer_);
155 CAFFE_ENFORCE_LE(writer_, reader_ + queue_.size());
156 return writer_ != reader_ + queue_.size();
159 void BlobsQueue::doWrite(
const std::vector<Blob*>& inputs) {
160 auto& result = queue_[writer_ % queue_.size()];
161 CAFFE_ENFORCE(inputs.size() >= result.size());
162 const auto& name = name_.c_str();
163 for (
auto i = 0; i < result.size(); ++i) {
165 swap(*(inputs[i]), *(result[i]));
168 queue_write_end, name, (
void*)
this, reader_ + queue_.size() - writer_);
A global dictionary that holds information about what Caffe2 modules have been loaded in the current ...