Caffe2 - C++ API
A deep learning, cross platform ML framework
event_gpu.cc
1 #include "caffe2/core/context_gpu.h"
2 #include "caffe2/core/event_cpu.h"
3 #include "caffe2/core/operator.h"
4 
5 #include <atomic>
6 
7 namespace caffe2 {
8 
10  explicit CudaEventWrapper(const DeviceOption& option)
11  : cuda_stream_(nullptr),
12  device_id_(option.device_id()),
13  status_(EventStatus::EVENT_INITIALIZED) {
14  CAFFE_ENFORCE(option.device_type(), PROTO_CUDA);
15  CUDAGuard g(device_id_);
16  CUDA_ENFORCE(cudaEventCreateWithFlags(
17  &cuda_event_, cudaEventDefault | cudaEventDisableTiming));
18  }
19  ~CudaEventWrapper() {
20  CUDAGuard g(device_id_);
21  CUDA_CHECK(cudaEventDestroy(cuda_event_));
22  }
23 
24  cudaEvent_t cuda_event_;
25  cudaStream_t cuda_stream_;
26  int device_id_;
27 
28  std::atomic<int> status_;
29  std::mutex mutex_recorded_;
30  std::condition_variable cv_recorded_;
31  std::string err_msg_;
32 };
33 
34 namespace {
35 const std::string kNoError = "No error";
36 }
37 
38 void EventCreateCUDA(const DeviceOption& option, Event* event) {
39  event->event_ = std::make_shared<CudaEventWrapper>(option);
40 }
41 
42 void EventRecordCUDA(Event* event, const void* context, const char* err_msg) {
43  auto* wrapper = static_cast<CudaEventWrapper*>(event->event_.get());
44  {
45  std::unique_lock<std::mutex> lock(wrapper->mutex_recorded_);
46 
47  // Possible state changes:
48  // INITIALIZED -> SCHEDULED/FAILED
49  // SCHEDULED -> SUCCESS/FAILED
50  // SUCCESS/FAILED - terminal
51  //
52  // No further changes to cuda_event_ and cuda_stream_ after transitioning
53  // from INITIALIZED
54  // No further changes to err_msg_ after transitioning into FAILED
55 
56  CAFFE_ENFORCE_EQ(
57  wrapper->status_,
58  EventStatus::EVENT_INITIALIZED,
59  "Calling Record multiple times");
60 
61  if (!err_msg) {
62  // When recording, one needs to make sure that the current gpu id is
63  // correct.
64  // TODO(jiayq): move the enforce logic to the caller?
65  const auto& current_device = CaffeCudaGetDevice();
66  CAFFE_ENFORCE_EQ(
67  current_device,
68  wrapper->device_id_,
69  "When you call EventRecordCUDA, your current device should be the same "
70  "as the device specified by the event.");
71  CAFFE_ENFORCE_EQ(
72  current_device,
73  static_cast<const CUDAContext*>(context)->device_id());
74  CUDA_ENFORCE(cudaEventRecord(
75  wrapper->cuda_event_,
76  static_cast<const CUDAContext*>(context)->cuda_stream()));
77  wrapper->cuda_stream_ =
78  static_cast<const CUDAContext*>(context)->cuda_stream();
79  wrapper->status_ = EventStatus::EVENT_SCHEDULED;
80  } else {
81  wrapper->err_msg_ = err_msg;
82  wrapper->status_ = EventStatus::EVENT_FAILED;
83  }
84  }
85  wrapper->cv_recorded_.notify_all();
86 }
87 
88 void EventFinishCUDA(const Event* event) {
89  auto* wrapper = static_cast<CudaEventWrapper*>(event->event_.get());
90  {
91  std::unique_lock<std::mutex> lock(wrapper->mutex_recorded_);
92  while (wrapper->status_ == EventStatus::EVENT_INITIALIZED) {
93  wrapper->cv_recorded_.wait(lock);
94  }
95  }
96 
97  if (wrapper->status_ == EventStatus::EVENT_SCHEDULED) {
98  // ok, even if event is already completed and status was not yet updated
99  CUDAGuard g(wrapper->device_id_);
100  auto cudaResult = cudaEventSynchronize(wrapper->cuda_event_);
101  if (cudaResult == cudaSuccess) {
102  wrapper->status_ = EventStatus::EVENT_SUCCESS;
103  } else {
104  const auto& err_msg = cudaGetErrorString(cudaResult);
105 
106  std::unique_lock<std::mutex> lock(wrapper->mutex_recorded_);
107  wrapper->err_msg_ = err_msg;
108  wrapper->status_ = EventStatus::EVENT_FAILED;
109  }
110  }
111 }
112 
113 // Both waiter and event are CUDA. Non-blocking
114 void EventWaitCUDACUDA(const Event* event, void* context) {
115  auto* wrapper = static_cast<CudaEventWrapper*>(event->event_.get());
116  {
117  std::unique_lock<std::mutex> lock(wrapper->mutex_recorded_);
118  while (wrapper->status_ == EventStatus::EVENT_INITIALIZED) {
119  wrapper->cv_recorded_.wait(lock);
120  }
121  }
122 
123  if (wrapper->status_ == EventStatus::EVENT_SCHEDULED) {
124  // ok, even if event is already completed and status was not yet updated
125  auto context_stream = static_cast<CUDAContext*>(context)->cuda_stream();
126  auto event_stream = wrapper->cuda_stream_;
127  if (context_stream != event_stream) {
128  // CAFFE_ENFORCE_EQ(
129  // CaffeCudaGetDevice(),
130  // static_cast<const CUDAContext*>(context)->device_id());
131  CUDA_CHECK(cudaStreamWaitEvent(context_stream, wrapper->cuda_event_, 0));
132  }
133  }
134 }
135 
136 // Waiter is CPU, event is CUDA
137 void EventWaitCPUCUDA(const Event* event, void* context) {
138  EventFinishCUDA(event);
139 }
140 
141 // Waiter is CUDA, event is CPU
142 void EventWaitCUDACPU(const Event* event, void* context) {
143  event->Finish(); // calls EventFinishCPU
144 }
145 
146 EventStatus EventQueryCUDA(const Event* event) {
147  auto* wrapper = static_cast<CudaEventWrapper*>(event->event_.get());
148  if (wrapper->status_ == EventStatus::EVENT_SCHEDULED) {
149  auto cudaResult = cudaEventQuery(wrapper->cuda_event_);
150  if (cudaResult == cudaSuccess) {
151  wrapper->status_ = EventStatus::EVENT_SUCCESS;
152  } else if (cudaResult != cudaErrorNotReady) {
153  const auto& err_msg = cudaGetErrorString(cudaResult);
154 
155  std::unique_lock<std::mutex> lock(wrapper->mutex_recorded_);
156  wrapper->err_msg_ = err_msg;
157  wrapper->status_ = EventStatus::EVENT_FAILED;
158  }
159  }
160  return static_cast<EventStatus>(wrapper->status_.load());
161 }
162 
163 const std::string& EventErrorMessageCUDA(const Event* event) {
164  auto* wrapper = static_cast<CudaEventWrapper*>(event->event_.get());
165  // supposed to be called after EventQueryCUDA to update status first
166  if (wrapper->status_ == EventStatus::EVENT_FAILED) {
167  return wrapper->err_msg_;
168  } else {
169  return kNoError;
170  }
171 }
172 
173 void EventSetFinishedCUDA(const Event* event, const char* err_msg) {
174  auto* wrapper = static_cast<CudaEventWrapper*>(event->event_.get());
175  {
176  std::unique_lock<std::mutex> lock(wrapper->mutex_recorded_);
177 
178  CAFFE_ENFORCE_EQ(
179  wrapper->status_,
180  EventStatus::EVENT_INITIALIZED,
181  "Calling SetFinished on recorded CUDA event");
182 
183  if (!err_msg) {
184  wrapper->status_ = EventStatus::EVENT_SUCCESS;
185  } else {
186  wrapper->err_msg_ = err_msg;
187  wrapper->status_ = EventStatus::EVENT_FAILED;
188  }
189  }
190  wrapper->cv_recorded_.notify_all();
191 }
192 
193 void EventResetCUDA(Event* event) {
194  auto* wrapper = static_cast<CudaEventWrapper*>(event->event_.get());
195  std::unique_lock<std::mutex> lock(wrapper->mutex_recorded_);
196  wrapper->status_ = EventStatus::EVENT_INITIALIZED;
197  wrapper->err_msg_ = "";
198  wrapper->cuda_stream_ = nullptr;
199 }
200 
201 REGISTER_EVENT_CREATE_FUNCTION(CUDA, EventCreateCUDA);
202 REGISTER_EVENT_RECORD_FUNCTION(CUDA, EventRecordCUDA);
203 REGISTER_EVENT_WAIT_FUNCTION(CUDA, CUDA, EventWaitCUDACUDA);
204 REGISTER_EVENT_WAIT_FUNCTION(CPU, CUDA, EventWaitCPUCUDA);
205 REGISTER_EVENT_WAIT_FUNCTION(CUDA, CPU, EventWaitCUDACPU);
206 REGISTER_EVENT_FINISH_FUNCTION(CUDA, EventFinishCUDA);
207 
208 REGISTER_EVENT_QUERY_FUNCTION(CUDA, EventQueryCUDA);
209 REGISTER_EVENT_ERROR_MESSAGE_FUNCTION(CUDA, EventErrorMessageCUDA);
210 REGISTER_EVENT_SET_FINISHED_FUNCTION(CUDA, EventSetFinishedCUDA);
211 REGISTER_EVENT_RESET_FUNCTION(CUDA, EventResetCUDA);
212 
213 REGISTER_EVENT_WAIT_FUNCTION(MKLDNN, CUDA, EventWaitCPUCUDA);
214 REGISTER_EVENT_WAIT_FUNCTION(CUDA, MKLDNN, EventWaitCUDACPU);
215 
216 } // namespace caffe2
int CaffeCudaGetDevice()
Gets the current GPU id.
Definition: common_gpu.cc:96
A global dictionary that holds information about what Caffe2 modules have been loaded in the current ...
Definition: blob.h:13
A variant of DeviceGuard that is specialized for CUDA.
Definition: CUDAGuard.h:20