1 #include "caffe2/core/net_async_task.h" 3 #include "caffe2/core/net_async_task_graph.h" 7 AsyncTask::AsyncTask(
const std::vector<OperatorBase*>& ops) : ops_(ops) {
8 CAFFE_ENFORCE(!ops_.empty());
9 device_option_ = ops_.front()->device_option();
10 for (
auto& op : ops_) {
11 CAFFE_ENFORCE(IsSameDevice(device_option_, op->device_option()));
16 void AsyncTask::handleChainError(
19 bool save_exception) {
20 std::string err_msg = err_str;
22 err_msg +=
", op " + (op->has_debug_def() ? op->type() :
" unknown");
24 LOG(ERROR) << err_msg;
27 auto last_op = ops_.back();
29 last_op->event().SetFinishedWithException(err_msg.c_str());
31 last_op->event().SetFinished(err_msg.c_str());
36 future_.SetCompleted(err_msg.c_str());
39 bool AsyncTask::Run(
const ExecutionOptions& options) {
41 OperatorBase* op =
nullptr;
43 for (
auto op_idx = 0; op_idx < ops_.size(); ++op_idx) {
46 if (!op->RunAsync(stream_id)) {
47 handleChainError(op,
"Failed to execute an op");
52 if (options.finish_chain_) {
59 if (IsCPUDeviceType(device_option_.device_type()) &&
60 ops_.back()->HasAsyncPart()) {
61 auto&
event = ops_.back()->event();
62 event.SetCallback([
this, &event]() {
63 CAFFE_ENFORCE(event.IsFinished());
64 if (event.Query() == EventStatus::EVENT_SUCCESS) {
65 future_.SetCompleted();
68 future_.SetCompleted(event.ErrorMessage().c_str());
72 future_.SetCompleted();
74 }
catch (
const std::exception& e) {
75 handleChainError(op, e.what(),
true);
80 "Failed to execute task: unknown error",
88 void AsyncTask::Reset() {
89 for (
auto& op : ops_) {
95 DeviceOption AsyncTask::GetDeviceOption()
const {
96 return device_option_;
99 AsyncTaskFuture& AsyncTask::GetFuture() {
103 const AsyncTaskFuture& AsyncTask::GetFuture()
const {
A global dictionary that holds information about what Caffe2 modules have been loaded in the current ...