Caffe2 - C++ API
A deep learning, cross platform ML framework
event.cc
1 #include "caffe2/core/event_cpu.h"
2 
3 namespace caffe2 {
4 
5 CAFFE2_API EventCreateFunction Event::event_creator_[MaxDeviceTypes];
6 CAFFE2_API EventRecordFunction Event::event_recorder_[MaxDeviceTypes];
7 CAFFE2_API EventWaitFunction
8  Event::event_waiter_[MaxDeviceTypes][MaxDeviceTypes];
9 CAFFE2_API EventFinishFunction Event::event_finisher_[MaxDeviceTypes];
10 
11 CAFFE2_API EventQueryFunction Event::event_querier_[MaxDeviceTypes];
12 CAFFE2_API EventErrorMessageFunction
13  Event::event_err_msg_getter_[MaxDeviceTypes];
14 CAFFE2_API EventSetFinishedFunction
15  Event::event_finished_setter_[MaxDeviceTypes];
16 CAFFE2_API EventResetFunction Event::event_resetter_[MaxDeviceTypes];
17 CAFFE2_API EventSetCallbackFunction
18  Event::event_callback_setter_[MaxDeviceTypes];
19 
20 namespace {
21 const std::string kNoError = "No error";
22 }
23 
24 void EventCreateCPU(const DeviceOption& option, Event* event) {
25  event->event_ = std::make_shared<CPUEventWrapper>(option);
26 }
27 
28 void EventRecordCPU(
29  Event* event,
30  const void* /* unused */,
31  const char* err_msg) {
32  auto* wrapper = static_cast<CPUEventWrapper*>(event->event_.get());
33  std::unique_lock<std::mutex> lock(wrapper->mutex_);
34 
35  // Possible state changes:
36  // INITIALIZED -> SCHEDULED or SUCCESS/FAILED
37  // SCHEDULED -> SUCCESS/FAILED
38  // SUCCESS/FAILED - terminal, no further changes to status_/err_msg_
39 
40  CAFFE_ENFORCE(
41  wrapper->status_ != EventStatus::EVENT_SCHEDULED,
42  "Calling Record multiple times");
43 
44  // Event might be in SUCCESS/FAILED state in case an op has
45  // finished async execution part first
46  if (wrapper->status_ == EventStatus::EVENT_INITIALIZED) {
47  if (!err_msg) {
48  wrapper->status_ = EventStatus::EVENT_SCHEDULED;
49  } else {
50  wrapper->err_msg_ = err_msg;
51  wrapper->status_ = EventStatus::EVENT_FAILED;
52  wrapper->cv_completed_.notify_all();
53  }
54  }
55 }
56 
57 void EventFinishCPU(const Event* event) {
58  auto* wrapper = static_cast<CPUEventWrapper*>(event->event_.get());
59  std::unique_lock<std::mutex> lock(wrapper->mutex_);
60  while (wrapper->status_ != EventStatus::EVENT_SUCCESS &&
61  wrapper->status_ != EventStatus::EVENT_FAILED) {
62  wrapper->cv_completed_.wait(lock);
63  }
64 }
65 
66 void EventWaitCPUCPU(const Event* event, void* /* context */) {
67  EventFinishCPU(event);
68 }
69 
70 EventStatus EventQueryCPU(const Event* event) {
71  auto* wrapper = static_cast<CPUEventWrapper*>(event->event_.get());
72  return static_cast<EventStatus>(wrapper->status_.load());
73 }
74 
75 const std::string& EventErrorMessageCPU(const Event* event) {
76  auto* wrapper = static_cast<CPUEventWrapper*>(event->event_.get());
77  if (wrapper->status_ == EventStatus::EVENT_FAILED) {
78  // Failed is a terminal state, not synchronizing,
79  // err_msg_ should not be changed anymore
80  return wrapper->err_msg_;
81  } else {
82  return kNoError;
83  }
84 }
85 
86 void EventSetFinishedCPU(const Event* event, const char* err_msg) {
87  auto* wrapper = static_cast<CPUEventWrapper*>(event->event_.get());
88  std::unique_lock<std::mutex> lock(wrapper->mutex_);
89 
90  CAFFE_ENFORCE(
91  wrapper->status_ == EventStatus::EVENT_INITIALIZED ||
92  wrapper->status_ == EventStatus::EVENT_SCHEDULED,
93  "Calling SetFinished on finished event");
94 
95  if (!err_msg) {
96  wrapper->status_ = EventStatus::EVENT_SUCCESS;
97  } else {
98  wrapper->err_msg_ = err_msg;
99  wrapper->status_ = EventStatus::EVENT_FAILED;
100  }
101 
102  for (auto& callback : wrapper->callbacks_) {
103  callback();
104  }
105 
106  wrapper->cv_completed_.notify_all();
107 }
108 
109 void EventSetCallbackCPU(Event* event, EventCallbackFunction callback) {
110  auto* wrapper = static_cast<CPUEventWrapper*>(event->event_.get());
111  std::unique_lock<std::mutex> lock(wrapper->mutex_);
112 
113  wrapper->callbacks_.push_back(callback);
114  if (wrapper->status_ == EventStatus::EVENT_SUCCESS ||
115  wrapper->status_ == EventStatus::EVENT_FAILED) {
116  callback();
117  }
118 }
119 
120 void EventResetCPU(Event* event) {
121  auto* wrapper = static_cast<CPUEventWrapper*>(event->event_.get());
122  std::unique_lock<std::mutex> lock(wrapper->mutex_);
123  wrapper->status_ = EventStatus::EVENT_INITIALIZED;
124  wrapper->err_msg_ = "";
125  wrapper->callbacks_.clear();
126 }
127 
128 REGISTER_EVENT_CREATE_FUNCTION(CPU, EventCreateCPU);
129 REGISTER_EVENT_RECORD_FUNCTION(CPU, EventRecordCPU);
130 REGISTER_EVENT_WAIT_FUNCTION(CPU, CPU, EventWaitCPUCPU);
131 REGISTER_EVENT_FINISH_FUNCTION(CPU, EventFinishCPU);
132 
133 REGISTER_EVENT_QUERY_FUNCTION(CPU, EventQueryCPU);
134 REGISTER_EVENT_ERROR_MESSAGE_FUNCTION(CPU, EventErrorMessageCPU);
135 REGISTER_EVENT_SET_FINISHED_FUNCTION(CPU, EventSetFinishedCPU);
136 REGISTER_EVENT_RESET_FUNCTION(CPU, EventResetCPU);
137 
138 REGISTER_EVENT_SET_CALLBACK_FUNCTION(CPU, EventSetCallbackCPU);
139 
140 } // namespace caffe2
A global dictionary that holds information about what Caffe2 modules have been loaded in the current ...
Definition: blob.h:13