Caffe2 - C++ API
A deep learning, cross platform ML framework
thread_pool.cpp
1 #include <c10/core/thread_pool.h>
2 
3 namespace c10 {
4 
5 ThreadPool::ThreadPool(std::size_t pool_size, int numa_node_id)
6  : threads_(pool_size),
7  running_(true),
8  complete_(true),
9  available_(threads_.size()),
10  total_(threads_.size()),
11  numa_node_id_(numa_node_id) {
12  for (std::size_t i = 0; i < threads_.size(); ++i) {
13  threads_[i] = std::thread(std::bind(&ThreadPool::main_loop, this, i));
14  }
15 }
16 
17 ThreadPool::~ThreadPool() {
18  // Set running flag to false then notify all threads.
19  {
20  std::unique_lock<std::mutex> lock(mutex_);
21  running_ = false;
22  condition_.notify_all();
23  }
24 
25  for (auto& t : threads_) {
26  try {
27  t.join();
28  } catch (const std::exception&) {
29  }
30  }
31 }
32 
33 size_t ThreadPool::size() const {
34  return threads_.size();
35 }
36 
37 size_t ThreadPool::numAvailable() const {
38  return available_;
39 }
40 
42  for (auto& thread : threads_) {
43  if (thread.get_id() == std::this_thread::get_id()) {
44  return true;
45  }
46  }
47  return false;
48 }
49 
50 void ThreadPool::run(const std::function<void()>& func) {
51  std::unique_lock<std::mutex> lock(mutex_);
52 
53  // Set task and signal condition variable so that a worker thread will
54  // wake up and use the task.
55  tasks_.push(task_element_t(func));
56  complete_ = false;
57  condition_.notify_one();
58 }
59 
61  std::unique_lock<std::mutex> lock(mutex_);
62  while (!complete_) {
63  completed_.wait(lock);
64  }
65 }
66 
67 void ThreadPool::main_loop(std::size_t index) {
68  init_thread();
69 
70  std::unique_lock<std::mutex> lock(mutex_);
71  while (running_) {
72  // Wait on condition variable while the task is empty and
73  // the pool is still running.
74  while (tasks_.empty() && running_) {
75  condition_.wait(lock);
76  }
77  // If pool is no longer running, break out of loop.
78  if (!running_) {
79  break;
80  }
81 
82  // Copy task locally and remove from the queue. This is
83  // done within its own scope so that the task object is
84  // destructed immediately after running the task. This is
85  // useful in the event that the function contains
86  // shared_ptr arguments bound via bind.
87  {
88  auto tasks = tasks_.front();
89  tasks_.pop();
90  // Decrement count, indicating thread is no longer available.
91  --available_;
92 
93  lock.unlock();
94 
95  // Run the task.
96  try {
97  if (tasks.run_with_id) {
98  tasks.with_id(index);
99  } else {
100  tasks.no_id();
101  }
102  } catch (const std::exception&) {
103  }
104 
105  // Update status of empty, maybe
106  // Need to recover the lock first
107  lock.lock();
108 
109  // Increment count, indicating thread is available.
110  ++available_;
111  if (tasks_.empty() && available_ == total_) {
112  complete_ = true;
113  completed_.notify_one();
114  }
115 
116  // Deliberately hold the lock on the backedge, so this thread has an
117  // opportunity to acquire a new task before another thread acquires
118  // the lock.
119  }
120  } // while running_
121 }
122 
123 // constexpr initialization guaranteed to be before any static initialization
124 std::atomic<int> num_threads{1};
125 void setNumThreads(size_t v) {
126  if(-1 == num_threads.exchange(v)) {
127  throw std::runtime_error("Error: cannot set num threads after pool has started");
128  }
129 }
130 
131 TaskThreadPoolBase& global_work_queue() {
132  static std::shared_ptr<TaskThreadPoolBase> pool =
133  ThreadPoolRegistry()->Create("C10", 0, num_threads.exchange(-1), false);
134  return *pool;
135 }
136 
137 C10_DEFINE_SHARED_REGISTRY(
138  ThreadPoolRegistry,
140  int,
141  int,
142  bool);
143 
144 namespace {
145 
146 std::shared_ptr<TaskThreadPoolBase> createC10ThreadPool(
147  int device_id,
148  int pool_size,
149  bool create_new) {
150  static std::shared_ptr<TaskThreadPoolBase> pool =
151  std::make_shared<ThreadPool>(pool_size);
152  return pool;
153 }
154 
155 } // namespace
156 
157 C10_REGISTER_CREATOR(ThreadPoolRegistry, C10, createC10ThreadPool);
158 
159 } // namespace c10
size_t numAvailable() const override
The number of available (i.e.
Definition: thread_pool.cpp:37
void waitWorkComplete()
Wait for queue to be empty.
Definition: thread_pool.cpp:60
bool inThreadPool() const override
Check if the current thread is from the thread pool.
Definition: thread_pool.cpp:41
To register your own kernel for an operator, do in one (!) cpp file: C10_REGISTER_KERNEL(OperatorHand...
Definition: alias_info.h:7