Caffe2 - C++ API
A deep learning, cross platform ML framework
recurrent_network_executor.cc
1 #include "caffe2/operators/rnn/recurrent_network_executor.h"
2 
3 #include "caffe2/core/timer.h"
4 
5 namespace caffe2 {
6 
12 template <>
13 std::unique_ptr<RecurrentNetworkExecutorBase> createRNNExecutor<CPUContext>(
14  const NetDef& step_net_def,
15  std::map<string, string>& recurrent_input_map,
16  std::string timestep_blob,
17  ArgumentHelper rnn_args) {
18  auto* exec = new ThreadedRecurrentNetworkExecutor(
19  step_net_def, recurrent_input_map, timestep_blob);
20  int num_threads =
21  rnn_args.GetSingleArgument<int>("rnn_executor.num_threads", 0);
22  if (num_threads > 0) {
23  exec->setNumThreads(num_threads);
24  LOG(INFO) << "Set num threads: " << num_threads;
25  }
26  exec->debug_ = rnn_args.GetSingleArgument<int>("rnn_executor_debug", 0);
27  return std::unique_ptr<RecurrentNetworkExecutorBase>(exec);
28 }
29 
34  CAFFE_ENFORCE_GE(T, 0, "Negative number of steps");
35  if (T == 0) {
36  return true;
37  }
38 
39  CAFFE_ENFORCE(timestep_ops_.size() >= T);
40  countdown_ = T * timestep_ops_[0].size();
41  finished_timesteps_ = 0;
42 
43  CHECK(task_queue_.size() == 0);
44 
45  for (auto& rnn_op : timestep_ops_[0]) {
46  // Launch "frontier"-ops first.
47  if (rnn_op.frontier) {
48  task_queue_.Push(OpTask(0, rnn_op.order, T, 1));
49  }
50  }
51 
52  _Exec();
53  return true;
54 }
55 
60  CAFFE_ENFORCE_GE(T, 0, "Negative number of steps");
61  if (T == 0) {
62  return true;
63  }
64 
65  CAFFE_ENFORCE(timestep_ops_.size() >= T);
66  countdown_ = T * timestep_ops_[0].size();
67  finished_timesteps_ = 0;
68 
69  // Frontier
70  CHECK(task_queue_.size() == 0);
71 
72  for (auto& rnn_op : timestep_ops_[T - 1]) {
73  if (rnn_op.frontier) {
74  task_queue_.Push(OpTask(T - 1, rnn_op.order, T, -1));
75  }
76  }
77 
78  _Exec();
79  return true;
80 }
81 
86 void ThreadedRecurrentNetworkExecutor::RunOp(OpTask job, int /*thread_id*/) {
87  bool first_timestep =
88  ((job.forward() && job.timestep == 0) ||
89  (job.backward() && job.timestep == job.T - 1));
90  bool last_timestep =
91  ((job.backward() && job.timestep == 0) ||
92  (job.forward() && job.timestep == job.T - 1));
93  auto& rnn_op = timestep_ops_[job.timestep][job.op_idx];
94  if (rnn_op.num_dynamic_inputs > 0 && !rnn_op.frontier) {
95  CAFFE_ENFORCE_EQ(
96  rnn_op.proc_inputs,
97  rnn_op.num_dynamic_inputs -
98  first_timestep * rnn_op.num_recurrent_inputs,
99  "Error at operator ",
100  job.op_idx,
101  " on timestep ",
102  job.timestep,
103  " T=",
104  job.T,
105  " first =",
106  first_timestep);
107  }
108 
109  // Reset input dependency counter
110  rnn_op.proc_inputs = 0;
111 
112  // Run the operator
113  rnn_op.op->Run();
114 
115  // Knock down dependencies and start next ops, if this
116  // was last dependency fulfilled.
117  for (int depidx : rnn_op.dependencies) {
118  int t = job.timestep;
119  bool for_next_timestep = depidx <= rnn_op.order;
120  if (!last_timestep && for_next_timestep) {
121  t += job.direction;
122  } else if (for_next_timestep) {
123  continue;
124  }
125 
126  auto& dep_op = timestep_ops_[t][depidx];
127  int proc_inputs = dep_op.proc_inputs.fetch_add(1) + 1;
128 
129  // Schedule next op, if this was the last dependency. Note that on
130  // first timestep we don't have recurrent inputs.
131  int num_req_inputs = dep_op.num_dynamic_inputs;
132  if (first_timestep && !for_next_timestep) {
133  num_req_inputs -= dep_op.num_recurrent_inputs;
134  }
135 
136  if (proc_inputs == num_req_inputs || num_req_inputs == 0) {
137  task_queue_.Push(OpTask(t, depidx, job.T, job.direction));
138  }
139  }
140 
141  // Decrement countdown: when at zero, we have run all ops and can
142  // notify the caller thread.
143  if (countdown_.fetch_sub(1) == 1) {
144  CAFFE_ENFORCE_EQ(0, task_queue_.size());
145  std::unique_lock<std::mutex> lk(countdown_mtx_);
146  cv_.notify_one();
147  }
148 }
149 
154 void ThreadedRecurrentNetworkExecutor::WorkerFunction() {
155  size_t num_jobs = 0;
156  static std::atomic<int> seq(0);
157  int id = seq.fetch_add(1);
158 
159  while (!failed_) {
160  OpTask job;
161  if (!task_queue_.Pop(&job)) {
162  break;
163  }
164 
165  // Check for limited timestep parallelism, and if too many timesteps would
166  // be started concurrently, return the task to task queue.
167  if (max_parallel_timesteps_ > 0) {
168  int t = (job.direction == 1 ? job.timestep : job.T - job.timestep + 1);
169  if (t - finished_timesteps_ >= max_parallel_timesteps_) {
170  // Return to queue
171  task_queue_.Push(job);
172  continue;
173  }
174  }
175 
176  try {
177  RunOp(job, id);
178  if (job.op_idx == timestep_ops_template_.size() - 1) {
179  finished_timesteps_.fetch_add(1);
180  }
181  num_jobs++;
182  } catch (::caffe2::EnforceNotMet& enf) {
183  std::unique_lock<std::mutex> lk(countdown_mtx_);
184  LOG(ERROR) << "Crash at thread " << id << " timestep " << job.timestep
185  << " op:" << ProtoDebugString(step_net_def_.op(job.op_idx))
186  << enf.what();
187  task_queue_.NoMoreJobs();
188  failed_ = true;
189  cv_.notify_one();
190  return;
191  }
192  }
193  VLOG(1) << "Worker exiting, did run: " << num_jobs << " jobs";
194 }
195 
200 void ThreadedRecurrentNetworkExecutor::_Exec() {
201  CAFFE_ENFORCE_EQ(
202  false, failed_, "Tried to execute a previously failed RNN executor");
203 
204  // Start threads if not started
205  std::unique_lock<std::mutex> lk(countdown_mtx_);
206  while (workers_.size() < num_threads_) {
207  VLOG(1) << "Start RNN worker " << workers_.size() << " / " << num_threads_;
208  workers_.push_back(
209  std::thread(&ThreadedRecurrentNetworkExecutor::WorkerFunction, this));
210  }
211 
212  // Wait until threads finish.
213  Timer t;
214  while (!failed_ && countdown_ > 0) {
215  cv_.wait_for(lk, std::chrono::seconds(30), [&] {
216  // Log if we are still running, so that we catch deadlocks.. there
217  // should not be any deadlocks, but...
218  if (t.Seconds() > 10) {
219  LOG(INFO) << "RNN Executor still running, remaining ops: "
220  << countdown_;
221  }
222  return failed_ || countdown_ == 0;
223  });
224  }
225 
226  CAFFE_ENFORCE_EQ(
227  false,
228  failed_,
229  "RNN executor encountered failure. See prior error logs for details.");
230 }
231 
232 } // namespace caffe2
Data structure for a scheduled task in the task queue.
std::unique_ptr< RecurrentNetworkExecutorBase > createRNNExecutor< CPUContext >(const NetDef &step_net_def, std::map< string, string > &recurrent_input_map, std::string timestep_blob, ArgumentHelper rnn_args)
Implementation of RecurrentNetworkExecutor that uses thread pool for multithreaded execution of RNNs...
A helper class to index into arguments.
Definition: proto_utils.h:200
The primary ATen error class.
Definition: Exception.h:27
A global dictionary that holds information about what Caffe2 modules have been loaded in the current ...
Definition: blob.h:13
const char * what() const noexceptoverride
Returns the complete error message, including the source location.
Definition: Exception.h:70
float Seconds()
Returns the elapsed time in seconds.
Definition: timer.h:40
bool RunBackwards(int T) override
Run backward pass with T timesteps.
A simple timer object for measuring time.
Definition: timer.h:16
bool Run(int T) override
Run forwardpass with T timesteps.