Caffe2 - C++ API
A deep learning, cross platform ML framework
net_singlethread_async_gpu.cc
1 
17 #include <condition_variable>
18 #include <mutex>
19 #include <stack>
20 
21 #if !defined(_MSC_VER) && !defined(__APPLE__)
22 #include <sched.h>
23 #endif
24 
25 #include "caffe2/core/context_gpu.h"
26 #include "caffe2/core/net_simple.h"
27 #include "caffe2/core/operator.h"
28 #include "caffe2/proto/caffe2.pb.h"
29 
30 namespace caffe2 {
31 
32 namespace gpu_single_thread {
33 
34 struct Task {
35  std::vector<std::unique_ptr<OperatorBase>>* ops_;
36  std::condition_variable* cv_;
37  std::mutex* mtx_;
38  int stream_id_;
39  bool done_ = false;
40 };
41 
42 class GPUExecutor {
43  public:
44  explicit GPUExecutor(int gpu_id) : gpu_id_(gpu_id) {}
45 
46  ~GPUExecutor() {
47  queue_.NoMoreJobs();
48  thread_.join();
49  }
50 
51  void RunJob(Task* task) {
52  queue_.Push(task);
53  }
54 
55  void start() {
56  thread_ = std::thread(&GPUExecutor::WorkerFunction, this);
57  }
58 
59  static std::shared_ptr<GPUExecutor> Get(int gpu);
60  static void Release(int gpu);
61 
62  private:
63  void set_affinity();
64  void WorkerFunction();
65 
66  std::thread thread_;
67  int gpu_id_;
68  SimpleQueue<Task*> queue_;
69  static std::shared_ptr<GPUExecutor> executors_[CAFFE2_COMPILE_TIME_MAX_GPUS];
70  static std::mutex gpu_mtx_[CAFFE2_COMPILE_TIME_MAX_GPUS];
71 };
72 
73 std::shared_ptr<GPUExecutor>
74  GPUExecutor::executors_[CAFFE2_COMPILE_TIME_MAX_GPUS];
75 std::mutex GPUExecutor::gpu_mtx_[CAFFE2_COMPILE_TIME_MAX_GPUS];
76 
77 std::shared_ptr<GPUExecutor> GPUExecutor::Get(int gpu) {
78  std::lock_guard<std::mutex> grd(gpu_mtx_[gpu]);
79  if (!executors_[gpu].get()) {
80  executors_[gpu].reset(new GPUExecutor(gpu));
81  executors_[gpu].get()->start();
82  }
83  return executors_[gpu];
84 }
85 
86 void GPUExecutor::Release(int gpu) {
87  std::lock_guard<std::mutex> grd(gpu_mtx_[gpu]);
88  if (executors_[gpu].use_count() == 1) {
89  executors_[gpu].reset();
90  }
91 }
92 
93 void GPUExecutor::set_affinity() {
94 // TODO: find a Windows-compatible affinity setting approach.
95 // Currently, set_affinity has no effect in Windows. The code is still
96 // correct with possible slowdowns.
97 #if !defined(_MSC_VER) && !defined(__APPLE__)
98  /* Set CPU affinity */
99  int num_cores = std::thread::hardware_concurrency();
100  if (num_cores > 0) {
101  cpu_set_t mask;
102  CPU_ZERO(&mask);
103 
104  CPU_SET(gpu_id_ % num_cores, &mask);
105  if (sched_setaffinity(0, sizeof(cpu_set_t), &mask)) {
106  LOG(WARNING) << "Could not set CPU affinity";
107  }
108  }
109 #endif
110 }
111 
112 // Worker that takes list of operators from the queue
113 // and executes them.
114 void GPUExecutor::WorkerFunction() {
115  int stream_id_seq = 0;
116  std::stack<int> streams;
117  set_affinity();
118 
119  while (true) {
120  Task* task = nullptr;
121  vector<Task*> task_batch;
122 
123  if (!queue_.Pop(&task)) {
124  return;
125  }
126  int num_tasks = 1 + queue_.size();
127 
128  // Grab all tasks currently in queue so we can run them in parallel
129  // Since we have only one producer, we know this does not block
130 
131  // TODO: launch ops in "zig-zag" manner so that we can start multiple
132  // streams as simultaneously as possible
133  for (int i = num_tasks - 1; i >= 0; i--) {
134  assert(task != nullptr);
135  if (streams.empty()) {
136  task->stream_id_ = stream_id_seq++;
137  } else {
138  task->stream_id_ = streams.top();
139  streams.pop();
140  }
141 
142  for (auto& op : *task->ops_) {
143  op->RunAsync(task->stream_id_);
144  }
145  task_batch.push_back(task);
146 
147  // Get the next one
148  if (i > 0) {
149  if (!queue_.Pop(&task)) {
150  return;
151  }
152  }
153  }
154 
155  // Wait for the currently executing streams
156  for (auto& pendtask : task_batch) {
157  cudaStream_t stream =
158  CUDAContext::cuda_stream(gpu_id_, pendtask->stream_id_);
159  CUDA_ENFORCE(cudaStreamSynchronize(stream));
160  streams.push(pendtask->stream_id_);
161  std::unique_lock<std::mutex> lk(*pendtask->mtx_);
162  pendtask->done_ = true;
163  pendtask->cv_->notify_one();
164  }
165  }
166 }
167 
169  public:
170  using SimpleNet::SimpleNet;
171 
173  if (executor_.get()) {
174  // Explicitly reset my holding of the exeuctor so it can be
175  // killed.
176  executor_.reset();
177  GPUExecutor::Release(gpu_id_);
178  }
179  }
180 
181  bool Run() override {
182  if (!executor_.get()) {
183  initialize();
184  }
185 
186  // Dispatch jobs to the gpu-specific executor thread
187  std::unique_lock<std::mutex> lk(mutex_);
188  Task t;
189  t.ops_ = &operators_;
190  t.cv_ = &cv_;
191  t.mtx_ = &mutex_;
192  t.done_ = false;
193  executor_.get()->RunJob(&t);
194 
195  while (!t.done_) {
196  cv_.wait(lk);
197  }
198 
199  return true;
200  }
201 
202  private:
203  std::condition_variable cv_;
204  std::mutex mutex_;
205 
206  void initialize() {
207  std::lock_guard<std::mutex> grd(mutex_);
208 
209  /* Check the gpu id of this net and check that only one
210  GPU has operators on this net */
211  gpu_id_ = (-1);
212  for (auto& op : operators_) {
213  if (op->device_option().device_type() == CUDA) {
214  if (gpu_id_ < 0) {
215  gpu_id_ = op->device_option().cuda_gpu_id();
216  } else {
217  CAFFE_ENFORCE_EQ(
218  gpu_id_,
219  op->device_option().cuda_gpu_id(),
220  "One net can only have operators for one GPU");
221  }
222  }
223  }
224  executor_ = GPUExecutor::Get(gpu_id_);
225  }
226 
227  int gpu_id_;
228  std::shared_ptr<GPUExecutor> executor_;
229 };
230 
231 REGISTER_NET(singlethread_async, SingleThreadAsyncNet)
232 
233 } // namespace gpu_single_thread
234 } // namespace caffe2
Copyright (c) 2016-present, Facebook, Inc.