Caffe2 - C++ API
A deep learning, cross platform ML framework
simple_queue.h
1 
17 #ifndef CAFFE2_UTILS_SIMPLE_QUEUE_H_
18 #define CAFFE2_UTILS_SIMPLE_QUEUE_H_
19 
20 #include <condition_variable> // NOLINT
21 #include <mutex> // NOLINT
22 #include <queue>
23 
24 #include "caffe2/core/logging.h"
25 
26 namespace caffe2 {
27 
28 // This is a very simple queue that Yangqing wrote when bottlefeeding the baby,
29 // so don't take it seriously. What it does is a minimal thread-safe queue that
30 // allows me to run network as a DAG.
31 //
32 // A usual work pattern looks like this: one or multiple producers push jobs
33 // into this queue, and one or multiple workers pops jobs from this queue. If
34 // nothing is in the queue but NoMoreJobs() is not called yet, the pop calls
35 // will wait. If NoMoreJobs() has been called, pop calls will return false,
36 // which serves as a message to the workers that they should exit.
37 template <typename T>
38 class SimpleQueue {
39  public:
40  SimpleQueue() : no_more_jobs_(false) {}
41 
42  // Pops a value and writes it to the value pointer. If there is nothing in the
43  // queue, this will wait till a value is inserted to the queue. If there are
44  // no more jobs to pop, the function returns false. Otherwise, it returns
45  // true.
46  bool Pop(T* value) {
47  std::unique_lock<std::mutex> mutex_lock(mutex_);
48  while (queue_.size() == 0 && !no_more_jobs_) cv_.wait(mutex_lock);
49  if (queue_.size() == 0 && no_more_jobs_) return false;
50  *value = queue_.front();
51  queue_.pop();
52  return true;
53  }
54 
55  int size() {
56  std::unique_lock<std::mutex> mutex_lock(mutex_);
57  return queue_.size();
58  }
59 
60  // Push pushes a value to the queue.
61  void Push(const T& value) {
62  {
63  std::lock_guard<std::mutex> mutex_lock(mutex_);
64  CAFFE_ENFORCE(!no_more_jobs_, "Cannot push to a closed queue.");
65  queue_.push(value);
66  }
67  cv_.notify_one();
68  }
69 
70  // NoMoreJobs() marks the close of this queue. It also notifies all waiting
71  // Pop() calls so that they either check out remaining jobs, or return false.
72  // After NoMoreJobs() is called, this queue is considered closed - no more
73  // Push() functions are allowed, and once existing items are all checked out
74  // by the Pop() functions, any more Pop() function will immediately return
75  // false with nothing set to the value.
76  void NoMoreJobs() {
77  {
78  std::lock_guard<std::mutex> mutex_lock(mutex_);
79  no_more_jobs_ = true;
80  }
81  cv_.notify_all();
82  }
83 
84  private:
85  std::mutex mutex_;
86  std::condition_variable cv_;
87  std::queue<T> queue_;
88  bool no_more_jobs_;
89  // We do not allow copy constructors.
90  SimpleQueue(const SimpleQueue& /*src*/) {}
91 };
92 
93 } // namespace caffe2
94 
95 #endif // CAFFE2_UTILS_SIMPLE_QUEUE_H_
Copyright (c) 2016-present, Facebook, Inc.