1 #include "caffe2/queue/blobs_queue.h"     4 #include <condition_variable>     9 #include "caffe2/core/blob_stats.h"    10 #include "caffe2/core/logging.h"    11 #include "caffe2/core/stats.h"    12 #include "caffe2/core/tensor.h"    13 #include "caffe2/core/timer.h"    14 #include "caffe2/core/workspace.h"    19 static constexpr 
int SDT_NONBLOCKING_OP = 0;
    20 static constexpr 
int SDT_BLOCKING_OP = 1;
    21 static constexpr uint64_t SDT_TIMEOUT = (uint64_t)-1;
    22 static constexpr uint64_t SDT_ABORT = (uint64_t)-2;
    23 static constexpr uint64_t SDT_CANCEL = (uint64_t)-3;
    25 BlobsQueue::BlobsQueue(
    27     const std::string& queueName,
    30     bool enforceUniqueName,
    31     const std::vector<std::string>& fieldNames)
    32     : numBlobs_(numBlobs), name_(queueName), stats_(queueName) {
    33   if (!fieldNames.empty()) {
    35         fieldNames.size(), numBlobs, 
"Wrong number of fieldNames provided.");
    36     stats_.queue_dequeued_bytes.setDetails(fieldNames);
    38   queue_.reserve(capacity);
    39   for (
size_t i = 0; i < capacity; ++i) {
    40     std::vector<Blob*> blobs;
    41     blobs.reserve(numBlobs);
    42     for (
size_t j = 0; j < numBlobs; ++j) {
    43       const auto blobName = queueName + 
"_" + to_string(i) + 
"_" + to_string(j);
    44       if (enforceUniqueName) {
    46             !ws->GetBlob(blobName),
    47             "Queue internal blob already exists: ",
    50       blobs.push_back(ws->CreateBlob(blobName));
    52     queue_.push_back(blobs);
    54   DCHECK_EQ(queue_.size(), capacity);
    57 bool BlobsQueue::blockingRead(
    58     const std::vector<Blob*>& inputs,
    61   auto keeper = this->shared_from_this();
    62   const auto& name = name_.c_str();
    63   CAFFE_SDT(queue_read_start, name, (
void*)
this, SDT_BLOCKING_OP);
    64   std::unique_lock<std::mutex> g(mutex_);
    65   auto canRead = [
this]() {
    66     CAFFE_ENFORCE_LE(reader_, writer_);
    67     return reader_ != writer_;
    71   CAFFE_EVENT(stats_, queue_balance, -1);
    72   if (timeout_secs > 0) {
    73     std::chrono::milliseconds timeout_ms(
int(timeout_secs * 1000));
    75         g, timeout_ms, [
this, canRead]() { 
return closing_ || canRead(); });
    77     cv_.wait(g, [
this, canRead]() { 
return closing_ || canRead(); });
    80     if (timeout_secs > 0 && !closing_) {
    81       LOG(ERROR) << 
"DequeueBlobs timed out in " << timeout_secs << 
" secs";
    82       CAFFE_SDT(queue_read_end, name, (
void*)
this, SDT_TIMEOUT);
    84       CAFFE_SDT(queue_read_end, name, (
void*)
this, SDT_CANCEL);
    89   auto& result = queue_[reader_ % queue_.size()];
    90   CAFFE_ENFORCE(inputs.size() >= result.size());
    91   for (
auto i = 0; i < result.size(); ++i) {
    92     auto bytes = BlobStat::sizeBytes(*result[i]);
    93     CAFFE_EVENT(stats_, queue_dequeued_bytes, bytes, i);
    95     swap(*(inputs[i]), *(result[i]));
    97   CAFFE_SDT(queue_read_end, name, (
void*)
this, writer_ - reader_);
    98   CAFFE_EVENT(stats_, queue_dequeued_records);
   101   CAFFE_EVENT(stats_, read_time_ns, readTimer.NanoSeconds());
   105 bool BlobsQueue::tryWrite(
const std::vector<Blob*>& inputs) {
   107   auto keeper = this->shared_from_this();
   108   const auto& name = name_.c_str();
   109   CAFFE_SDT(queue_write_start, name, (
void*)
this, SDT_NONBLOCKING_OP);
   110   std::unique_lock<std::mutex> g(mutex_);
   112     CAFFE_SDT(queue_write_end, name, (
void*)
this, SDT_ABORT);
   117   CAFFE_EVENT(stats_, queue_balance, 1);
   120   CAFFE_EVENT(stats_, write_time_ns, writeTimer.NanoSeconds());
   124 bool BlobsQueue::blockingWrite(
const std::vector<Blob*>& inputs) {
   126   auto keeper = this->shared_from_this();
   127   const auto& name = name_.c_str();
   128   CAFFE_SDT(queue_write_start, name, (
void*)
this, SDT_BLOCKING_OP);
   129   std::unique_lock<std::mutex> g(mutex_);
   132   CAFFE_EVENT(stats_, queue_balance, 1);
   133   cv_.wait(g, [
this]() { 
return closing_ || canWrite(); });
   135     CAFFE_SDT(queue_write_end, name, (
void*)
this, SDT_ABORT);
   140   CAFFE_EVENT(stats_, write_time_ns, writeTimer.NanoSeconds());
   144 void BlobsQueue::close() {
   147   std::lock_guard<std::mutex> g(mutex_);
   151 bool BlobsQueue::canWrite() {
   154   CAFFE_ENFORCE_LE(reader_, writer_);
   155   CAFFE_ENFORCE_LE(writer_, reader_ + queue_.size());
   156   return writer_ != reader_ + queue_.size();
   159 void BlobsQueue::doWrite(
const std::vector<Blob*>& inputs) {
   160   auto& result = queue_[writer_ % queue_.size()];
   161   CAFFE_ENFORCE(inputs.size() >= result.size());
   162   const auto& name = name_.c_str();
   163   for (
auto i = 0; i < result.size(); ++i) {
   165     swap(*(inputs[i]), *(result[i]));
   168       queue_write_end, name, (
void*)
this, reader_ + queue_.size() - writer_);
 A global dictionary that holds information about what Caffe2 modules have been loaded in the current ...