Caffe2 - C++ API
A deep learning, cross platform ML framework
plan_executor.cc
1 
17 #include "caffe2/core/plan_executor.h"
18 
19 #include <condition_variable>
20 #include <memory>
21 #include <mutex>
22 #include <thread>
23 #include <unordered_map>
24 #include <vector>
25 
26 #include "caffe2/core/timer.h"
27 #include "caffe2/core/workspace.h"
28 #include "caffe2/proto/caffe2.pb.h"
29 
30 CAFFE2_DEFINE_bool(
31  caffe2_handle_executor_threads_exceptions,
32  false,
33  "If used we will handle exceptions in executor threads. "
34  "This avoids SIGABRT but may cause process to deadlock");
35 
36 namespace caffe2 {
37 
38 namespace {
39 
40 struct NetDefInfo {
41  const NetDef* netDef;
42  // in order to keep the "override existing nets" on the top-level workflow,
43  // we need to makr the nets that already exist so that we can override them
44  // exactly once.
45  bool needsOverride;
46 };
47 
48 using NetDefMap = std::unordered_map<std::string, NetDefInfo>;
49 
50 struct Reporter {
51  struct ReporterInstance {
52  std::mutex report_mutex;
53  std::condition_variable report_cv;
54  std::thread report_thread;
55  ReporterInstance(int intervalMillis, bool* done, std::function<void()> f) {
56  auto interval = std::chrono::milliseconds(intervalMillis);
57  auto reportWorker = [=]() {
58  std::unique_lock<std::mutex> lk(report_mutex);
59  do {
60  report_cv.wait_for(lk, interval, [&]() { return *done; });
61  f();
62  } while (!*done);
63  };
64  report_thread = std::thread(reportWorker);
65  }
66  };
67 
68  void start(int64_t intervalMillis, std::function<void()> f) {
69  instances_.emplace_back(new ReporterInstance(intervalMillis, &done, f));
70  }
71 
72  ~Reporter() {
73  done = true;
74  for (auto& instance : instances_) {
75  if (!instance->report_thread.joinable()) {
76  continue;
77  }
78  instance->report_cv.notify_all();
79  instance->report_thread.join();
80  }
81  }
82 
83  private:
84  std::vector<std::unique_ptr<ReporterInstance>> instances_;
85  bool done{false};
86 };
87 
88 // Returns a function that returns `true` if we should continue
89 // iterating, given the current iteration count.
90 std::function<bool(int64_t)> getContinuationTest(
91  Workspace* /*ws*/,
92  const ExecutionStep& step) {
93  if (step.has_should_stop_blob()) {
94  CAFFE_ENFORCE(
95  !step.has_num_iter(),
96  "Must not specify num_iter if should_stop_blob is set");
97  }
98 
99  if (!step.has_should_stop_blob()) { // control by iteration
100  CAFFE_ENFORCE(!step.has_only_once(), "not supported");
101  int64_t iterations = step.has_num_iter() ? step.num_iter() : 1;
102  VLOG(1) << "Will execute step " << step.name() << " for " << iterations
103  << " iterations.";
104  return [=](int64_t i) { return i < iterations; };
105  } else { // control by signal blob
106  bool onlyOnce = step.has_only_once() && step.only_once();
107  VLOG(1) << "Will execute step" << step.name() << (onlyOnce ? " once " : "")
108  << " until stopped by blob " << step.should_stop_blob();
109  if (onlyOnce) {
110  return [](int64_t i) { return i == 0; };
111  } else {
112  return [](int64_t /*i*/) { return true; };
113  }
114  }
115 };
116 
117 // if the blob doesn't exist or is not initiaized, return false
118 inline bool getShouldStop(const Blob* b) {
119  if (!b || !b->meta().id()) { // not exist or uninitialized
120  return false;
121  }
122 
123  const auto& t = b->Get<TensorCPU>();
124  CAFFE_ENFORCE(t.IsType<bool>() && t.size() == 1, "expects a scalar boolean");
125  return *(t.template data<bool>());
126 }
127 
136 struct WorkspaceIdInjector {
137  static const string NODE_ID;
138  static const string GLOBAL_WORKSPACE_ID;
139 
140  void InjectWorkspaceId(Workspace* workspace) {
141  if (workspace->HasBlob(NODE_ID)) {
142  Blob* node_id_blob = workspace->GetBlob(NODE_ID);
143  TensorCPU node_id_tensor = node_id_blob->template Get<TensorCPU>();
144  int node_id = node_id_tensor.template data<int32_t>()[0];
145  CAFFE_ENFORCE(
146  seq_ < (1 << 16),
147  "Integer overflow while calculating GLOBAL_WORKSPACE_ID blob");
148  int32_t global_ws_id = (seq_++) + (static_cast<int32_t>(node_id) << 16);
149  Blob* global_ws_id_blob = workspace->CreateLocalBlob(GLOBAL_WORKSPACE_ID);
150  TensorCPU* global_ws_id_tensor =
151  global_ws_id_blob->template GetMutable<TensorCPU>();
152  global_ws_id_tensor->Resize();
153  global_ws_id_tensor->template mutable_data<int32_t>()[0] = global_ws_id;
154  VLOG(1) << "Adding " << GLOBAL_WORKSPACE_ID << " = " << global_ws_id;
155  }
156  }
157 
158  private:
159  std::atomic<int> seq_{0};
160 };
161 
162 const string WorkspaceIdInjector::NODE_ID = "NODE_ID";
163 const string WorkspaceIdInjector::GLOBAL_WORKSPACE_ID = "GLOBAL_WORKSPACE_ID";
164 
165 struct CompiledExecutionStep;
166 
186 struct ExecutionStepWrapper {
187  ExecutionStepWrapper(
188  const ExecutionStep* step,
189  Workspace* externalWorkspace,
190  ShouldContinue externalShouldContinue,
191  NetDefMap* netDefs,
192  WorkspaceIdInjector* ws_id_injector)
193  : step_(step),
194  externalWorkspace_(externalWorkspace),
195  externalShouldContinue_(externalShouldContinue),
196  netDefs_(netDefs),
197  ws_id_injector_(ws_id_injector) {
198  // If this execution step does not create a child workspace,
199  // then just eagerly-compile it. This will trigger CreateNet on the
200  // nets used by this execution step.
201  if (!step_->create_workspace()) {
202  compiledStep_ = doCompile();
203  }
204  }
205 
206  class CompiledGuard {
207  void reset(std::unique_ptr<CompiledExecutionStep>&& compiled) {
208  compiled_ = std::move(compiled);
209  compiledRef_ = compiled_.get();
210  }
211  void reset(CompiledExecutionStep* compiledRef) {
212  compiled_.reset();
213  compiledRef_ = compiledRef;
214  }
215 
216  public:
217  CompiledExecutionStep* operator->() {
218  return compiledRef_;
219  }
220 
221  private:
222  CompiledGuard() {}
223  std::unique_ptr<CompiledExecutionStep> compiled_;
224  CompiledExecutionStep* compiledRef_;
225  friend struct ExecutionStepWrapper;
226  };
227 
228  const ExecutionStep& step() {
229  return *step_;
230  }
231 
232  CompiledGuard compiled() {
233  CompiledGuard guard;
234  if (compiledStep_) {
235  guard.reset(compiledStep_.get());
236  } else {
237  guard.reset(doCompile());
238  }
239  return guard;
240  }
241 
242  private:
243  std::unique_ptr<CompiledExecutionStep> doCompile();
244 
245  const ExecutionStep* step_;
246  Workspace* externalWorkspace_;
247  ShouldContinue externalShouldContinue_;
248  NetDefMap* netDefs_;
249  std::unique_ptr<CompiledExecutionStep> compiledStep_;
250  WorkspaceIdInjector* ws_id_injector_;
251 };
252 
253 struct CompiledExecutionStep {
254  typedef std::function<bool(int)> ShouldContinue;
255 
256  CompiledExecutionStep(
257  const ExecutionStep* mainStep,
258  Workspace* externalWorkspace,
259  ShouldContinue externalShouldContinue,
260  NetDefMap* netDefs,
261  WorkspaceIdInjector* ws_id_injector)
262  : step(mainStep) {
263  if (mainStep->create_workspace()) {
264  localWorkspace_.reset(new Workspace(externalWorkspace));
265  workspace = localWorkspace_.get();
266  ws_id_injector->InjectWorkspaceId(workspace);
267  } else {
268  workspace = externalWorkspace;
269  }
270 
271  CAFFE_ENFORCE(
272  (step->substep_size() == 0 || step->network_size() == 0),
273  "An ExecutionStep should either have substep or networks"
274  "but not both.");
275 
276  auto createAndGetNet = [&](const std::string& network_name) {
277  auto it = netDefs->find(network_name);
278  CAFFE_ENFORCE(
279  it != netDefs->end(),
280  "ExecutionStep " + mainStep->name() + " uses undefined net " +
281  network_name);
282  // needsOverride does not need synchronization because it is only
283  // relevant for non-dynamic executions steps. This is due to the fact
284  // that concurrent nets run on child workspaces, that do not needOverride.
285  if (it->second.needsOverride || !workspace->GetNet(network_name)) {
286  workspace->CreateNet(*it->second.netDef, true);
287  it->second.needsOverride = false;
288  }
289  auto* net = workspace->GetNet(network_name);
290  CAFFE_ENFORCE(net != nullptr, "Network ", network_name, " not found.");
291  return net;
292  };
293 
294  if (step->substep_size()) {
295  ShouldContinue substepShouldContinue;
296  if (!step->concurrent_substeps() || step->substep().size() <= 1) {
297  substepShouldContinue = externalShouldContinue;
298  } else {
299  substepShouldContinue = [this, externalShouldContinue](int64_t it) {
300  return !gotFailure && externalShouldContinue(it);
301  };
302  }
303 
304  for (const auto& ss : step->substep()) {
305  auto compiledSubstep = std::make_shared<ExecutionStepWrapper>(
306  &ss, workspace, substepShouldContinue, netDefs, ws_id_injector);
307  if (ss.has_run_every_ms()) {
308  reportSubsteps.push_back(compiledSubstep);
309  } else {
310  recurringSubsteps.push_back(compiledSubstep);
311  }
312  }
313  } else {
314  for (const string& network_name : step->network()) {
315  networks.push_back(createAndGetNet(network_name));
316  }
317  }
318 
319  if (step->has_should_stop_blob()) {
320  shouldStop = workspace->GetBlob(step->should_stop_blob());
321  CAFFE_ENFORCE(
322  shouldStop, "blob ", step->should_stop_blob(), " does not exist");
323  }
324 
325  if (step->has_report_net()) {
326  CAFFE_ENFORCE(
327  step->has_report_interval(),
328  "A report_interval must be provided if report_net is set.");
329  reportNet = createAndGetNet(step->report_net());
330  } else {
331  reportNet = nullptr;
332  }
333 
334  netShouldContinue = getContinuationTest(workspace, *step);
335  shouldContinue = [this, externalShouldContinue](int64_t iter) {
336  return externalShouldContinue(iter) && this->netShouldContinue(iter);
337  };
338  }
339 
340  const ExecutionStep* step;
341  Workspace* workspace;
342  vector<std::shared_ptr<ExecutionStepWrapper>> reportSubsteps;
343  vector<std::shared_ptr<ExecutionStepWrapper>> recurringSubsteps;
344 
345  vector<NetBase*> networks;
346  NetBase* reportNet;
347  Blob* shouldStop{nullptr};
348  ShouldContinue netShouldContinue;
349  ShouldContinue shouldContinue;
350  std::atomic<bool> gotFailure{false};
351 
352  private:
353  std::unique_ptr<Workspace> localWorkspace_;
354 };
355 
356 std::unique_ptr<CompiledExecutionStep> ExecutionStepWrapper::doCompile() {
357  return std::unique_ptr<CompiledExecutionStep>(new CompiledExecutionStep(
358  step_,
359  externalWorkspace_,
360  externalShouldContinue_,
361  netDefs_,
362  ws_id_injector_));
363 }
364 
365 #define CHECK_SHOULD_STOP(step, shouldStop) \
366  if (getShouldStop(shouldStop)) { \
367  VLOG(1) << "Execution step " << step.name() << " stopped by " \
368  << step.should_stop_blob(); \
369  return true; \
370  }
371 
372 bool ExecuteStepRecursive(ExecutionStepWrapper& stepWrapper) {
373  const auto& step = stepWrapper.step();
374  auto compiledStep = stepWrapper.compiled();
375 
376  VLOG(1) << "Running execution step " << step.name();
377 
378  std::unique_ptr<Reporter> reporter;
379  if (step.has_report_net() || compiledStep->reportSubsteps.size() > 0) {
380  reporter = caffe2::make_unique<Reporter>();
381  auto* reportNet = compiledStep->reportNet;
382  if (reportNet) {
383  VLOG(1) << "Starting reporter net";
384  reporter->start(step.report_interval() * 1000, [reportNet]() {
385  if (!reportNet->Run()) {
386  LOG(WARNING) << "Error running report_net.";
387  }
388  });
389  }
390  for (auto& substepWrapper : compiledStep->reportSubsteps) {
391  reporter->start(
392  substepWrapper->step().run_every_ms(), [substepWrapper]() {
393  if (!ExecuteStepRecursive(*substepWrapper)) {
394  LOG(WARNING) << "Error running report step.";
395  }
396  });
397  }
398  }
399 
400  const Blob* shouldStop = compiledStep->shouldStop;
401 
402  if (step.substep_size()) {
403  bool sequential =
404  (!step.concurrent_substeps() || step.substep().size() <= 1) &&
405  (!step.has_num_concurrent_instances() ||
406  step.num_concurrent_instances() <= 1);
407  for (int64_t iter = 0; compiledStep->shouldContinue(iter); ++iter) {
408  if (sequential) {
409  VLOG(1) << "Executing step " << step.name() << " iteration " << iter;
410  for (auto& substepWrapper : compiledStep->recurringSubsteps) {
411  if (!ExecuteStepRecursive(*substepWrapper)) {
412  return false;
413  }
414  CHECK_SHOULD_STOP(step, shouldStop);
415  }
416  } else {
417  VLOG(1) << "Executing step " << step.name() << " iteration " << iter
418  << " with " << step.substep().size() << " concurrent substeps";
419 
420  std::atomic<int> next_substep{0};
421  std::mutex exception_mutex;
422  string first_exception;
423  auto worker = [&]() {
424  auto num_substeps = compiledStep->recurringSubsteps.size();
425  int substep_id = next_substep++ % num_substeps;
426  if (compiledStep->gotFailure) {
427  return;
428  }
429  try {
430  if (!ExecuteStepRecursive(
431  *compiledStep->recurringSubsteps.at(substep_id))) {
432  compiledStep->gotFailure = true;
433  }
434  } catch (const std::exception& ex) {
435  std::lock_guard<std::mutex> guard(exception_mutex);
436  if (!first_exception.size()) {
437  first_exception = GetExceptionString(ex);
438  LOG(ERROR) << "Parallel worker exception:\n" << first_exception;
439  }
440  compiledStep->gotFailure = true;
441  if (!FLAGS_caffe2_handle_executor_threads_exceptions) {
442  // In complex plans other threads might get stuck if another
443  // one fails. So we let exception to go out of thread which
444  // causes SIGABRT. In local setup one might use this flag
445  // in order to use Python debugger after a failure
446  throw;
447  }
448  }
449  };
450 
451  std::vector<std::thread> threads;
452  auto numThreads = compiledStep->recurringSubsteps.size();
453  if (step.has_num_concurrent_instances()) {
454  numThreads *= step.num_concurrent_instances();
455  }
456  for (int64_t i = 0; i < numThreads; ++i) {
457  threads.emplace_back(worker);
458  }
459  for (auto& thread : threads) {
460  thread.join();
461  }
462  if (compiledStep->gotFailure) {
463  LOG(ERROR) << "One of the workers failed.";
464  if (first_exception.size()) {
465  CAFFE_THROW(
466  "One of the workers died with an unhandled exception ",
467  first_exception);
468  }
469  return false;
470  }
471  // concurrent substeps should be careful about setting should_stop_blob
472  CHECK_SHOULD_STOP(step, shouldStop);
473  }
474  }
475  return true;
476  } else {
477  // If this ExecutionStep just contains nets, we can directly run it.
478  for (int64_t iter = 0; compiledStep->shouldContinue(iter); ++iter) {
479  VLOG(1) << "Executing networks " << step.name() << " iteration " << iter;
480  for (NetBase* network : compiledStep->networks) {
481  if (!network->Run()) {
482  return false;
483  }
484  CHECK_SHOULD_STOP(step, shouldStop);
485  }
486  }
487  }
488  return true;
489 }
490 
491 #undef CHECK_SHOULD_STOP
492 }
493 
494 bool RunPlanOnWorkspace(
495  Workspace* ws,
496  const PlanDef& plan,
497  ShouldContinue shouldContinue) {
498  LOG(INFO) << "Started executing plan.";
499  if (plan.execution_step_size() == 0) {
500  LOG(WARNING) << "Nothing to run - did you define a correct plan?";
501  // We will do nothing, but the plan is still legal so we will return true.
502  return true;
503  }
504  LOG(INFO) << "Initializing networks.";
505 
506  NetDefMap net_defs;
507  for (const NetDef& net_def : plan.network()) {
508  CAFFE_ENFORCE(
509  net_defs.count(net_def.name()) == 0,
510  "Your plan contains networks of the same name \"",
511  net_def.name(),
512  "\", which should not happen. Check your plan to see "
513  "if you made a programming error in creating the plan.");
514  auto netAlreadyExists = ws->GetNet(net_def.name()) != nullptr;
515  net_defs[net_def.name()] = NetDefInfo{&net_def, netAlreadyExists};
516  }
517  WorkspaceIdInjector ws_id_injector;
518  Timer plan_timer;
519  for (const ExecutionStep& step : plan.execution_step()) {
520  Timer step_timer;
521  ExecutionStepWrapper stepWrapper(
522  &step, ws, shouldContinue, &net_defs, &ws_id_injector);
523  if (!ExecuteStepRecursive(stepWrapper)) {
524  LOG(ERROR) << "Failed initializing step " << step.name();
525  return false;
526  }
527  LOG(INFO) << "Step " << step.name() << " took " << step_timer.Seconds()
528  << " seconds.";
529  }
530  float exec_time = plan_timer.Seconds();
531 
532 #ifndef CAFFE2_MOBILE
533  PlanExecutionTime plan_stat(plan.name());
534  CAFFE_EVENT(
535  plan_stat, plan_execution_time_ns, (long)(exec_time * 1000000000));
536 #endif // CAFFE2_MOBILE
537 
538  LOG(INFO) << "Total plan took " << exec_time << " seconds.";
539  LOG(INFO) << "Plan executed successfully.";
540  return true;
541 }
542 }
void Resize(Ts...dim_source)
Resizes a tensor.
Definition: tensor.h:304
Copyright (c) 2016-present, Facebook, Inc.