Caffe2 - C++ API
A deep learning, cross platform ML framework
recurrent_network_executor.h
1 
17 #ifndef CAFFE2_OPERATORS_RECURRENT_NETWORK_EXECUTOR_H_
18 #define CAFFE2_OPERATORS_RECURRENT_NETWORK_EXECUTOR_H_
19 
20 #include <map>
21 #include <unordered_set>
22 #include <vector>
23 
24 #include "caffe2/core/context.h"
25 #include "caffe2/core/logging.h"
26 #include "caffe2/core/operator.h"
27 #include "caffe2/core/timer.h"
28 #include "caffe2/operators/recurrent_network_executor_incl.h"
29 
30 namespace caffe2 {
31 
48  protected:
50  const NetDef& step_net_def,
51  std::map<string, string>& recurrent_input_map,
52  std::string timestep_blob)
53  : step_net_def_(step_net_def),
54  recurrent_input_map_(recurrent_input_map),
55  timestep_blob_(timestep_blob) {
56  for (int i = 0; i < step_net_def_.op_size(); i++) {
57  op_deps_.push_back(op_deps(i));
58  }
59  }
60 
61  public:
62  virtual ~RecurrentNetworkExecutorBase() {
63  if (debug_) {
64  if (timestep_ops_.size() > 0) {
65  PrintInfo(0);
66  }
67  }
68  }
69 
70  virtual bool Run(int T) = 0;
71 
72  virtual bool RunBackwards(int T) = 0;
73 
82  int t,
83  Workspace* ws,
84  const std::vector<std::unique_ptr<ObserverBase<OperatorBase>>>&
85  observers_list) {
86  if (timestep_ops_template_.size() == 0) {
87  // Firsrt invocation -- compute dependencies
88  CalculateInternalDependencies();
89 
90  // Label ops based on whether they contain reference to the timestep
91  // blob. This is an optimization to avoid string comparisons later.
92  for (auto& rnn_op : timestep_ops_template_) {
93  rnn_op.has_timestep_blob = false;
94  const OperatorDef& op = step_net_def_.op(rnn_op.order);
95  for (int i = 0; i < op.input_size(); i++) {
96  if (op.input(i) == timestep_blob_) {
97  rnn_op.has_timestep_blob = true;
98  break;
99  }
100  }
101  CAFFE_ENFORCE(
102  !HasOutput(op, timestep_blob_),
103  "Timestep cannot be output of an op: ",
104  timestep_blob_,
105  " op=" + ProtoDebugString(op));
106  }
107  }
108 
109  // Initialize timestep if it is not initialized
110  if (timestep_ops_.size() <= t ||
111  (timestep_ops_.size() > t && timestep_ops_[t].size() == 0)) {
112  // Initialize empty timestep ops vectors for each timestep preceding
113  // this.
114  for (int j = timestep_ops_.size(); j < t + 1; j++) {
115  timestep_ops_.push_back(std::vector<RNNNetOperator>());
116  timestep_ops_.back().reserve(timestep_ops_template_.size());
117  }
118 
119  // Keep track of workspaces for optimization in forward-only case
120  if (workspaces_.size() < t + 1) {
121  workspaces_.resize(t + 1);
122  }
123  workspaces_[t] = ws;
124 
125  // Create a specific timestep blob for this timestep. This is to
126  // avoid conflicting timestep blobs when reusing workspaces, as with
127  // the forward-only mode.
128  std::string this_timestep_blob =
129  timestep_blob_ + "_rnnexec_t" + caffe2::to_string(t);
130  ws->CreateBlob(this_timestep_blob)->GetMutable<TensorCPU>()->Resize(1);
131  auto b = ws->GetBlob(this_timestep_blob);
132  CAFFE_ENFORCE(b);
133  b->GetMutable<TensorCPU>()->mutable_data<int32_t>()[0] = t;
134 
135  // Copy the operators from template
136  for (auto& template_rnn_op : timestep_ops_template_) {
137  auto& rnn_op = template_rnn_op;
138 
139  // For ops that have the timestep blob as an input we need to
140  // create a new operator definition with the timestep-specific
141  // timestep blob. This is required to avoid race conditions when
142  // multiple timesteps execute in paralle.
143  if (rnn_op.has_timestep_blob) {
144  OperatorDef op_copy = step_net_def_.op(rnn_op.order);
145 
146  for (int i = 0; i < op_copy.input_size(); i++) {
147  if (op_copy.input(i) == timestep_blob_) {
148  op_copy.set_input(i, this_timestep_blob);
149  }
150  }
151 
152  rnn_op.op = CreateOperator(op_copy, ws);
153  for (const auto& observer : observers_list) {
154  std::unique_ptr<ObserverBase<OperatorBase>> observer_copy =
155  observer->copy(rnn_op.op.get());
156  CAFFE_ENFORCE(
157  observer_copy,
158  "Observers without copy() implemented cannot be attached "
159  "to RNN using RNNExecutor.");
160  rnn_op.op->AttachObserver(std::move(observer_copy));
161  }
162  } else {
163  // Optimization for forward-only models when we can share workspaces
164  // with timesteps: then we can just copy the op reference.
165  if (t > max_parallel_timesteps_ && max_parallel_timesteps_ > 0 &&
166  workspaces_[t - max_parallel_timesteps_] == ws) {
167  rnn_op.op =
168  timestep_ops_[t - max_parallel_timesteps_][rnn_op.order].op;
169  } else {
170  // Otherwise, we need to create a brand new op with the workspace
171  // owned by this timestep.
172  rnn_op.op = CreateOperator(step_net_def_.op(rnn_op.order), ws);
173  for (const auto& observer : observers_list) {
174  std::unique_ptr<ObserverBase<OperatorBase>> observer_copy =
175  observer->copy(rnn_op.op.get());
176  CAFFE_ENFORCE(
177  observer_copy,
178  "Observers without copy() implemented cannot be attached "
179  "to RNN using RNNExecutor.");
180  rnn_op.op->AttachObserver(std::move(observer_copy));
181  }
182  }
183  }
184  rnn_op.op->DisableEvent();
185 
186  timestep_ops_[t].emplace_back(rnn_op);
187  }
188  }
189  }
190 
197  max_parallel_timesteps_ = p;
198  }
199 
200  size_t NumObserversStepNet() {
201  size_t num = 0;
202  for (auto& ops_at_timestep_t : timestep_ops_) {
203  for (auto& rnn_op : ops_at_timestep_t) {
204  num += rnn_op.op->NumObservers();
205  }
206  }
207  return num;
208  }
209 
210  private:
211  // Utility method to check if any of the op inputs or control inputs
212  // contain given blob 'input'
213  bool has_input(std::string x, int opidx) {
214  for (auto& inp : step_net_def_.op(opidx).input()) {
215  if (inp == x) {
216  return true;
217  }
218  }
219  for (auto& inp : step_net_def_.op(opidx).control_input()) {
220  if (inp == x) {
221  return true;
222  }
223  }
224  return false;
225  }
226 
227  // Return all outbound dependencies of an op. Special case for
228  // rnn dependencies, that are set in recurent_network_op.
229  std::vector<string> op_deps(int i) {
230  std::vector<string> outs;
231  auto& opdef = step_net_def_.op(i);
232  for (string o : opdef.output()) {
233  outs.push_back(o);
234  };
235  for (auto& arg : opdef.arg()) {
236  if (arg.name().find("rnn_dependency") == 0) {
237  outs.push_back(arg.s());
238  }
239  }
240  return outs;
241  }
242 
247  void infer_dependencies(
248  int start_i,
249  std::unordered_set<string> outputs,
250  std::vector<RNNNetOperator>& rnn_ops,
251  std::unordered_set<int>* dep_ops) {
252  std::unordered_set<int> already_accounted_deps;
253  int num_ops = step_net_def_.op_size();
254  bool ignore_links = this->ignoreLinkDependencies();
255  for (int j = 0; j < num_ops - 1 && !outputs.empty(); j++) {
256  int i = (start_i + j) % num_ops;
257  if (ignore_links && rnn_ops[i].link_op) {
258  continue;
259  }
260  for (auto& outp : outputs) {
261  if (has_input(outp, i)) {
262  if (already_accounted_deps.find(i) == already_accounted_deps.end()) {
263  dep_ops->insert(i);
264  }
265 
266  // Now we can take the deps of this ops and not
267  // add them anymore
268  for (int odep : rnn_ops[i].dependencies) {
269  already_accounted_deps.insert(odep);
270  }
271  for (string& dep_out : op_deps_[i]) {
272  auto oit = outputs.find(dep_out);
273  if (oit != outputs.end()) {
274  // This op produces output of the orignal op, so the dependency
275  // passed through that op
276  outputs.erase(oit);
277  }
278  }
279  break;
280  }
281  }
282  }
283  }
284 
292  void add_race_conflict_dependencies(
293  int opidx,
294  std::vector<RNNNetOperator>& rnn_ops,
295  std::unordered_set<int>* dep_ops) {
296  for (int i = 0; i < rnn_ops.size(); i++) {
297  if (i == opidx) {
298  continue;
299  }
300  if (rnn_ops[i].link_op && this->ignoreLinkDependencies()) {
301  continue;
302  }
303  for (auto& dep_blob : op_deps_[i]) {
304  for (auto& inp : step_net_def_.op(opidx).input()) {
305  if (inp == dep_blob) {
306  dep_ops->insert(i);
307  break;
308  }
309  }
310  if (i < opidx) {
311  for (auto& outp : step_net_def_.op(opidx).output()) {
312  if (outp == dep_blob) {
313  dep_ops->insert(i);
314  break;
315  }
316  }
317  }
318  }
319  }
320  }
321 
327  void CalculateInternalDependencies() {
328  for (int i = 0; i < step_net_def_.op_size(); i++) {
329  timestep_ops_template_.push_back(RNNNetOperator(step_net_def_.op(i), i));
330  }
331  // Then see which outputs appear as inputs, and those are
332  // the internal blobs.
333  for (auto& rnn_op : timestep_ops_template_) {
334  std::unordered_set<string> dep_outputs;
335  for (auto& outp : op_deps_[rnn_op.order]) {
336  dep_outputs.insert(outp);
337  }
338 
339  // Add recurrent dependencies as 'outputs' for this op
340  for (auto& outp : dep_outputs) {
341  auto rit = recurrent_input_map_.find(outp);
342  if (rit != recurrent_input_map_.end()) {
343  dep_outputs.insert(rit->second);
344  } else {
345  dep_outputs.insert(outp);
346  }
347  }
348 
349  // Compute dependencies of this op.
350  if (!rnn_op.link_op || !this->ignoreLinkDependencies()) {
351  std::unordered_set<int> dependent_ops;
352  infer_dependencies(
353  rnn_op.order + 1,
354  dep_outputs,
355  timestep_ops_template_,
356  &dependent_ops);
357 
358  // Race conditions arise when operator writes a blob that is
359  // being read by another.
360  if (!this->ignoreLinkDependencies()) {
361  add_race_conflict_dependencies(
362  rnn_op.order, timestep_ops_template_, &dependent_ops);
363  }
364 
365  for (int i : dependent_ops) {
366  rnn_op.dependencies.push_back(i);
367  }
368 
369  // Sort in ascending order of dependency distance. If op
370  // j > i, then distance is j - i. But if j < i, then distance
371  // from i to j passes the timestep boundary and is j + num ops - i.
372  std::sort(
373  rnn_op.dependencies.begin(),
374  rnn_op.dependencies.end(),
375  [&](const int& a, const int& b) {
376  if (a < rnn_op.order && b < rnn_op.order) {
377  return a < b;
378  }
379  if (a >= rnn_op.order && b >= rnn_op.order) {
380  return a < b;
381  }
382  if (a >= rnn_op.order && b < rnn_op.order) {
383  return true;
384  }
385  return false;
386  });
387  }
388  }
389 
390  // Update dependency counts
391  for (auto& rnn_op : timestep_ops_template_) {
392  for (int i : rnn_op.dependencies) {
393  timestep_ops_template_[i].num_dynamic_inputs++;
394 
395  if (i > rnn_op.order) {
396  timestep_ops_template_[i].frontier = false;
397  } else {
398  timestep_ops_template_[i].num_recurrent_inputs++;
399  }
400  }
401  }
402  // Find ops that have no recurrent inputs, and bind them
403  // to the last op of the timestep. If there is only one op
404  // in the step net, then it will depend on itself. Note that
405  // we do not increase the dynamic input counter.
406  for (auto& rnn_op : timestep_ops_template_) {
407  if (rnn_op.num_dynamic_inputs == 0 && rnn_op.num_recurrent_inputs == 0) {
408  if (rnn_op.link_op && this->ignoreLinkDependencies()) {
409  continue;
410  }
411  timestep_ops_template_.back().dependencies.push_back(rnn_op.order);
412  }
413  }
414 
415  // compute parents
416  for (auto& rnn_op : timestep_ops_template_) {
417  for (int dep : rnn_op.dependencies) {
418  timestep_ops_template_[dep].parents.push_back(rnn_op.order);
419  }
420  }
421  AnalyzeOps();
422  }
423 
424  protected:
429  void PrintInfo(int t) {
430  auto& rnn_ops = timestep_ops_[t];
431 
432  LOG(INFO) << "Timestep: " << t;
433  for (auto& rnn_op : rnn_ops) {
434  auto& op = rnn_op.op;
435  LOG(INFO) << "Operator " << rnn_op.order << ": " << op->type()
436  << " dep inputs:" << rnn_op.num_dynamic_inputs
437  << " rec inputs:" << rnn_op.num_recurrent_inputs
438  << " frontier: " << rnn_op.frontier;
439  for (auto& inp : rnn_op.op->debug_def().input()) {
440  LOG(INFO) << " ---- input: " << inp;
441  }
442  for (auto& outp : rnn_op.op->debug_def().output()) {
443  LOG(INFO) << " ---- output: " << outp;
444  }
445  for (auto j : rnn_op.dependencies) {
446  LOG(INFO) << " dep: " << j << ": " << rnn_ops[j].op->type();
447  }
448  for (auto j : rnn_op.parents) {
449  LOG(INFO) << " parent: " << j << ": " << rnn_ops[j].op->type();
450  }
451  }
452 
453  LOG(INFO) << "recurrent_inputs:" << recurrent_input_map_;
454 
455  for (auto& rnn_op : rnn_ops) {
456  LOG(INFO) << "Operator " << rnn_op.order;
457  LOG(INFO) << ProtoDebugString(rnn_op.op->debug_def());
458  }
459  }
460 
461  virtual void AnalyzeOps() {}
462 
463  virtual bool ignoreLinkDependencies() = 0;
464 
465  std::vector<std::vector<RNNNetOperator>> timestep_ops_;
466  std::vector<OperatorBase*> op_ptrs_;
467 
468  std::vector<RNNNetOperator> timestep_ops_template_;
469 
470  NetDef step_net_def_;
471  std::vector<std::vector<string>> op_deps_;
472  std::vector<Workspace*> workspaces_;
473  std::map<string, string> recurrent_input_map_;
474  std::string timestep_blob_;
475 
476  int max_parallel_timesteps_ = -1;
477 
478  public:
479  bool debug_ = false;
480 };
481 
482 template <class Context>
483 std::unique_ptr<RecurrentNetworkExecutorBase> createRNNExecutor(
484  const NetDef& step_net_def,
485  std::map<string, string>& recurrent_input_map,
486  std::string timestep_blob,
487  ArgumentHelper rnn_args);
488 
490  public:
492  const NetDef& step_net_def,
493  std::map<string, string>& recurrent_input_map,
494  std::string timestep_blob)
495  : RecurrentNetworkExecutorBase(step_net_def, recurrent_input_map, timestep_blob),
496  failed_(false) {}
497 
499  task_queue_.NoMoreJobs();
500  VLOG(1) << "Joining workers.";
501  for (auto& worker : workers_) {
502  worker.join();
503  }
504  }
505 
506  bool Run(int T) override;
507 
508  bool RunBackwards(int T) override;
509 
510  bool ignoreLinkDependencies() override {
511  return false;
512  }
513 
514  void setNumThreads(int n) {
515  num_threads_ = n;
516  }
517 
518  private:
519  void _ExecRange(int from, int to);
520 
521  void _Exec();
522 
523  void WorkerFunction();
524 
525  void RunOp(OpTask job, int thread_id);
526 
527  SimpleQueue<OpTask> task_queue_;
528  std::atomic<int> countdown_;
529  std::atomic<bool> failed_;
530  std::atomic<int> finished_timesteps_;
531  int num_ops_;
532  std::mutex countdown_mtx_;
533  std::condition_variable cv_;
534  std::vector<std::thread> workers_;
535  int num_threads_ = 4;
536 };
537 
538 } // namespace caffe2
539 
540 #endif // CAFFE2_OPERATORS_RECURRENT_NETWORK_EXECUTOR_H_
Blob * CreateBlob(const string &name)
Creates a blob of the given name.
Definition: workspace.cc:120
RecurrentNetworkExecutor is a specialized runtime for recurrent neural networks (RNNs).
Struct for operator in a timestep and its dependenceis.
Data structure for a scheduled task in the task queue.
void EnsureTimestepInitialized(int t, Workspace *ws, const std::vector< std::unique_ptr< ObserverBase< OperatorBase >>> &observers_list)
Callers must call EnsureTimestepInitialized before starting execution for each of the relevant timest...
void PrintInfo(int t)
For debug purposes, print the dependency structure.
A helper class to index into arguments.
Definition: proto_utils.h:198
Workspace is a class that holds all the related objects created during runtime: (1) all blobs...
Definition: workspace.h:63
const Blob * GetBlob(const string &name) const
Gets the blob with the given name as a const pointer.
Definition: workspace.cc:180
void SetMaxParallelTimesteps(int p)
Set limit for the number of timesteps that run in parallel.
Copyright (c) 2016-present, Facebook, Inc.
T * GetMutable(bool *is_new_object=nullptr)
Gets a mutable pointer to the stored object.
Definition: blob.h:117