Caffe2 - C++ API
A deep learning, cross platform ML framework
event_gpu.cc
1 
17 #include "caffe2/core/context_gpu.h"
18 #include "caffe2/core/event_cpu.h"
19 #include "caffe2/core/operator.h"
20 
21 #include <atomic>
22 
23 namespace caffe2 {
24 
26  explicit CudaEventWrapper(const DeviceOption& option)
27  : cuda_stream_(nullptr),
28  cuda_gpu_id_(option.cuda_gpu_id()),
29  status_(EventStatus::EVENT_INITIALIZED) {
30  CAFFE_ENFORCE(option.device_type(), CUDA);
31  DeviceGuard g(cuda_gpu_id_);
32  CUDA_ENFORCE(cudaEventCreate(
33  &cuda_event_, cudaEventDefault | cudaEventDisableTiming));
34  }
35  ~CudaEventWrapper() {
36  DeviceGuard g(cuda_gpu_id_);
37  CUDA_CHECK(cudaEventDestroy(cuda_event_));
38  }
39 
40  cudaEvent_t cuda_event_;
41  cudaStream_t cuda_stream_;
42  int cuda_gpu_id_;
43 
44  std::atomic<int> status_;
45  std::mutex mutex_recorded_;
46  std::condition_variable cv_recorded_;
47  std::string err_msg_;
48 };
49 
50 namespace {
51 const std::string kNoError = "No error";
52 }
53 
54 void EventCreateCUDA(const DeviceOption& option, Event* event) {
55  event->event_ = std::make_shared<CudaEventWrapper>(option);
56 }
57 
58 void EventRecordCUDA(Event* event, const void* context, const char* err_msg) {
59  auto* wrapper = static_cast<CudaEventWrapper*>(event->event_.get());
60  {
61  std::unique_lock<std::mutex> lock(wrapper->mutex_recorded_);
62 
63  // Possible state changes:
64  // INITIALIZED -> SCHEDULED/FAILED
65  // SCHEDULED -> SUCCESS/FAILED
66  // SUCCESS/FAILED - terminal
67  //
68  // No further changes to cuda_event_ and cuda_stream_ after transitioning
69  // from INITIALIZED
70  // No further changes to err_msg_ after transitioning into FAILED
71 
72  CAFFE_ENFORCE_EQ(
73  wrapper->status_,
74  EventStatus::EVENT_INITIALIZED,
75  "Calling Record multiple times");
76 
77  if (!err_msg) {
78  // When recording, one needs to make sure that the current gpu id is
79  // correct.
80  // TODO(jiayq): move the enforce logic to the caller?
81  const auto& current_device = CaffeCudaGetDevice();
82  CAFFE_ENFORCE_EQ(
83  current_device,
84  wrapper->cuda_gpu_id_,
85  "When you call EventRecordCUDA, your current device should be the same "
86  "as the device specified by the event.");
87  CAFFE_ENFORCE_EQ(
88  current_device,
89  static_cast<const CUDAContext*>(context)->cuda_gpu_id());
90  CUDA_ENFORCE(cudaEventRecord(
91  wrapper->cuda_event_,
92  static_cast<const CUDAContext*>(context)->cuda_stream()));
93  wrapper->cuda_stream_ =
94  static_cast<const CUDAContext*>(context)->cuda_stream();
95  wrapper->status_ = EventStatus::EVENT_SCHEDULED;
96  } else {
97  wrapper->err_msg_ = err_msg;
98  wrapper->status_ = EventStatus::EVENT_FAILED;
99  }
100  }
101  wrapper->cv_recorded_.notify_all();
102 }
103 
104 void EventFinishCUDA(const Event* event) {
105  auto* wrapper = static_cast<CudaEventWrapper*>(event->event_.get());
106  {
107  std::unique_lock<std::mutex> lock(wrapper->mutex_recorded_);
108  while (wrapper->status_ == EventStatus::EVENT_INITIALIZED) {
109  wrapper->cv_recorded_.wait(lock);
110  }
111  }
112 
113  if (wrapper->status_ == EventStatus::EVENT_SCHEDULED) {
114  // ok, even if event is already completed and status was not yet updated
115  DeviceGuard g(wrapper->cuda_gpu_id_);
116  auto cudaResult = cudaEventSynchronize(wrapper->cuda_event_);
117  if (cudaResult == cudaSuccess) {
118  wrapper->status_ = EventStatus::EVENT_SUCCESS;
119  } else {
120  const auto& err_msg = cudaGetErrorString(cudaResult);
121 
122  std::unique_lock<std::mutex> lock(wrapper->mutex_recorded_);
123  wrapper->err_msg_ = err_msg;
124  wrapper->status_ = EventStatus::EVENT_FAILED;
125  }
126  }
127 }
128 
129 // Both waiter and event are CUDA. Non-blocking
130 void EventWaitCUDACUDA(const Event* event, void* context) {
131  auto* wrapper = static_cast<CudaEventWrapper*>(event->event_.get());
132  {
133  std::unique_lock<std::mutex> lock(wrapper->mutex_recorded_);
134  while (wrapper->status_ == EventStatus::EVENT_INITIALIZED) {
135  wrapper->cv_recorded_.wait(lock);
136  }
137  }
138 
139  if (wrapper->status_ == EventStatus::EVENT_SCHEDULED) {
140  // ok, even if event is already completed and status was not yet updated
141  auto context_stream = static_cast<CUDAContext*>(context)->cuda_stream();
142  auto event_stream = wrapper->cuda_stream_;
143  if (context_stream != event_stream) {
144  // CAFFE_ENFORCE_EQ(
145  // CaffeCudaGetDevice(),
146  // static_cast<const CUDAContext*>(context)->cuda_gpu_id());
147  CUDA_CHECK(cudaStreamWaitEvent(context_stream, wrapper->cuda_event_, 0));
148  }
149  }
150 }
151 
152 // Waiter is CPU, event is CUDA
153 void EventWaitCPUCUDA(const Event* event, void* context) {
154  EventFinishCUDA(event);
155 }
156 
157 // Waiter is CUDA, event is CPU
158 void EventWaitCUDACPU(const Event* event, void* context) {
159  event->Finish(); // calls EventFinishCPU
160 }
161 
162 EventStatus EventQueryCUDA(const Event* event) {
163  auto* wrapper = static_cast<CudaEventWrapper*>(event->event_.get());
164  if (wrapper->status_ == EventStatus::EVENT_SCHEDULED) {
165  auto cudaResult = cudaEventQuery(wrapper->cuda_event_);
166  if (cudaResult == cudaSuccess) {
167  wrapper->status_ = EventStatus::EVENT_SUCCESS;
168  } else if (cudaResult != cudaErrorNotReady) {
169  const auto& err_msg = cudaGetErrorString(cudaResult);
170 
171  std::unique_lock<std::mutex> lock(wrapper->mutex_recorded_);
172  wrapper->err_msg_ = err_msg;
173  wrapper->status_ = EventStatus::EVENT_FAILED;
174  }
175  }
176  return static_cast<EventStatus>(wrapper->status_.load());
177 }
178 
179 const std::string& EventErrorMessageCUDA(const Event* event) {
180  auto* wrapper = static_cast<CudaEventWrapper*>(event->event_.get());
181  // supposed to be called after EventQueryCUDA to update status first
182  if (wrapper->status_ == EventStatus::EVENT_FAILED) {
183  return wrapper->err_msg_;
184  } else {
185  return kNoError;
186  }
187 }
188 
189 void EventSetFinishedCUDA(const Event* event, const char* err_msg) {
190  auto* wrapper = static_cast<CudaEventWrapper*>(event->event_.get());
191  {
192  std::unique_lock<std::mutex> lock(wrapper->mutex_recorded_);
193 
194  CAFFE_ENFORCE_EQ(
195  wrapper->status_,
196  EventStatus::EVENT_INITIALIZED,
197  "Calling SetFinished on recorded CUDA event");
198 
199  if (!err_msg) {
200  wrapper->status_ = EventStatus::EVENT_SUCCESS;
201  } else {
202  wrapper->err_msg_ = err_msg;
203  wrapper->status_ = EventStatus::EVENT_FAILED;
204  }
205  }
206  wrapper->cv_recorded_.notify_all();
207 }
208 
209 void EventResetCUDA(Event* event) {
210  auto* wrapper = static_cast<CudaEventWrapper*>(event->event_.get());
211  std::unique_lock<std::mutex> lock(wrapper->mutex_recorded_);
212  wrapper->status_ = EventStatus::EVENT_INITIALIZED;
213  wrapper->err_msg_ = "";
214  wrapper->cuda_stream_ = nullptr;
215 }
216 
217 REGISTER_EVENT_CREATE_FUNCTION(CUDA, EventCreateCUDA);
218 REGISTER_EVENT_RECORD_FUNCTION(CUDA, EventRecordCUDA);
219 REGISTER_EVENT_WAIT_FUNCTION(CUDA, CUDA, EventWaitCUDACUDA);
220 REGISTER_EVENT_WAIT_FUNCTION(CPU, CUDA, EventWaitCPUCUDA);
221 REGISTER_EVENT_WAIT_FUNCTION(CUDA, CPU, EventWaitCUDACPU);
222 REGISTER_EVENT_FINISH_FUNCTION(CUDA, EventFinishCUDA);
223 
224 REGISTER_EVENT_QUERY_FUNCTION(CUDA, EventQueryCUDA);
225 REGISTER_EVENT_ERROR_MESSAGE_FUNCTION(CUDA, EventErrorMessageCUDA);
226 REGISTER_EVENT_SET_FINISHED_FUNCTION(CUDA, EventSetFinishedCUDA);
227 REGISTER_EVENT_RESET_FUNCTION(CUDA, EventResetCUDA);
228 
229 REGISTER_EVENT_WAIT_FUNCTION(MKLDNN, CUDA, EventWaitCPUCUDA);
230 REGISTER_EVENT_WAIT_FUNCTION(CUDA, MKLDNN, EventWaitCUDACPU);
231 
232 } // namespace caffe2
Copyright (c) 2016-present, Facebook, Inc.
int CaffeCudaGetDevice()
Gets the current GPU id.
Definition: common_gpu.cc:125