Caffe2 - C++ API
A deep learning, cross platform ML framework
net_dag_utils.cc
1 
17 #include "caffe2/core/net_dag_utils.h"
18 
19 #include <set>
20 #include <stack>
21 #include <unordered_map>
22 #include <unordered_set>
23 
24 #include "caffe2/core/operator.h"
25 #include "caffe2/core/static_tracepoint.h"
26 #include "caffe2/core/timer.h"
27 #include "caffe2/proto/caffe2.pb.h"
28 #include "caffe2/utils/proto_utils.h"
29 
30 namespace caffe2 {
31 namespace dag_utils {
32 
33 namespace {
34 void prune(int node_idx, std::vector<OpGraphNode>& nodes) {
35  // Ancestor table for tracking the visited nodes
36  std::vector<bool> ancestors(nodes.size(), false);
37  // stack element is pair of <curr_node, previous_node>
38  std::stack<std::pair<int, int>> nodes_stack;
39  // initialize the prev_node to be -1
40  nodes_stack.push(std::make_pair(node_idx, -1));
41 
42  while (!nodes_stack.empty()) {
43  const auto& node_pair = nodes_stack.top();
44  int curr = node_pair.first;
45  int prev = node_pair.second;
46 
47  // If the node has already been visited, pop curr out of
48  // stack and clean up the ancestor table
49  CAFFE_ENFORCE(curr < ancestors.size(), "Out of bound access");
50  if (ancestors[curr]) {
51  ancestors[curr] = false;
52  nodes_stack.pop();
53  continue;
54  }
55 
56  // Check if this has a parent that can be pruned:
57  // if parent is not the previous node visited and is
58  // an ancestor of the current traversar, it can be
59  // pruned.
60  if (prev >= 0) {
61  std::vector<int> new_parents;
62  for (auto parent : nodes[curr].parents_) {
63  if (parent != prev && ancestors[parent]) {
64  // We can prune this one
65  nodes[parent].children_.erase(
66  std::remove(
67  nodes[parent].children_.begin(),
68  nodes[parent].children_.end(),
69  curr),
70  nodes[parent].children_.end());
71  } else {
72  new_parents.push_back(parent);
73  }
74  }
75  nodes[curr].parents_ = new_parents;
76  }
77 
78  ancestors[curr] = true;
79 
80  // Descend -- but only once from each node
81  if (nodes[curr].visited_inputs == nodes[curr].num_orig_parents) {
82  const auto& children = nodes[curr].children_;
83  for (auto child : children) {
84  nodes[child].visited_inputs++;
85  nodes_stack.push(std::make_pair(child, curr));
86  }
87  }
88  }
89 }
90 
95 std::vector<OpGraphNode> pruneOpNodeGraph(
96  const std::vector<OperatorNode>& nodes) {
97  Timer t;
98  std::vector<OpGraphNode> pruned;
99 
100  // Create a separate list of pruned operatornodes used
101  // for the chaining computation. Because of the unique_ptr
102  // in the OperatorNode, we cannot do a copy but have to
103  // copy just the fields we need.
104  for (auto& node : nodes) {
105  OpGraphNode nd;
106  nd.children_ = node.children_;
107  nd.parents_ = node.parents_;
108  nd.num_orig_parents = nd.parents_.size();
109  pruned.push_back(nd);
110  }
111 
112  for (int i = 0; i < pruned.size(); ++i) {
113  if (pruned[i].parents_.size() == 0) {
114  prune(i, pruned);
115  }
116  }
117 
118  LOG(INFO) << "Operator graph pruning prior to chain compute took: "
119  << t.Seconds() << " secs";
120  return pruned;
121 }
122 
123 void updateOperatorNodes(
124  std::vector<OperatorNode>& nodes,
125  const ExecutionChains& chains) {
126  for (int i = 0; i < nodes.size(); ++i) {
127  auto& node = nodes[i];
128  if (chains.find(i) != chains.end()) {
129  node.is_chain_start_ = true;
130  } else {
131  node.is_chain_start_ = false;
132  }
133  node.runtime_parent_count_ = 0;
134  }
135 }
136 } // namespace
137 
138 ExecutionChains computeChains(std::vector<OperatorNode>& orig_nodes) {
139  const std::vector<OpGraphNode> nodes = pruneOpNodeGraph(orig_nodes);
140  vector<int> initial_frontier;
141  for (int idx = 0; idx < nodes.size(); ++idx) {
142  if (nodes[idx].parents_.size() == 0) {
143  initial_frontier.push_back(idx);
144  }
145  }
146 
147  // We need to construct the node_seen_count to know how many inner edges each
148  // node has.
149  std::unordered_map<int, int> node_seen_count;
150 
151  for (int root_index : initial_frontier) {
152  const auto& root = nodes[root_index];
153  std::stack<std::pair<int, std::vector<int>::const_iterator>> depth_stack;
154  depth_stack.push(make_pair(root_index, root.children_.begin()));
155  node_seen_count[root_index]++;
156  CAFFE_ENFORCE(
157  node_seen_count[root_index] == 1,
158  "root node ",
159  root_index,
160  " visit count must be == 1");
161 
162  while (depth_stack.size() > 0) {
163  auto cur = depth_stack.top();
164  depth_stack.pop();
165  if (cur.second != nodes[cur.first].children_.end()) {
166  int node_index = *cur.second;
167  node_seen_count[node_index]++;
168  cur.second++;
169  depth_stack.push(cur);
170  if (node_seen_count[node_index] == 1) {
171  // Visit each child only once.
172  depth_stack.push(
173  make_pair(node_index, nodes[node_index].children_.begin()));
174  }
175  }
176  }
177  }
178  // Now, we compute the set of execution chains An execution chain is
179  // a linear set of nodes that can be executed on a single stream
180  // (e.g. a chain of single input, single output operators)
181  ExecutionChains chains;
182  std::unordered_set<int> seen_nodes;
183  std::vector<int> chain;
184  std::pair<int, std::vector<int>::const_iterator> cur;
185  std::stack<std::pair<int, std::vector<int>::const_iterator>> depth_stack;
186  auto check_current_for_chaining = [&]() -> bool {
187  return (
188  node_seen_count[cur.first] == 1 &&
189  (chain.size() == 0 ||
190  (
191  // A chain of operators is executed without additional
192  // synchronization by calling RunAsync sequentially on each
193  // operator and passing the same stream id on each call.
194  // RunAsync may schedule an async computation on device.
195  // In order to be scheduled on the same chain two operators
196  // (parent and dependent) need to satisfy one of:
197  // 1. Parent op does not have an async part
198  // 2. Parent op has async part _and_
199  // both ops are on the same device _and_
200  // dependent op can be executed as an async dependency
201 
202  !orig_nodes[chain.back()].operator_->HasAsyncPart() ||
203  (IsSameDevice(
204  orig_nodes[cur.first].operator_->device_option(),
205  orig_nodes[chain.back()].operator_->device_option()) &&
206  orig_nodes[cur.first].operator_->SupportsAsyncScheduling()))));
207  };
208  auto commit_chain = [&]() {
209  if (chain.size() > 0) {
210  CAFFE_ENFORCE(
211  chains.insert({chain.front(), chain}).second,
212  "Chain ",
213  chain.front(),
214  " was already added.");
215  VLOG(2) << "Added chain: " << chain.front() << "with elements";
216  for (auto ch : chain) {
217  VLOG(2) << ch << ", ";
218  }
219  chain.clear();
220  }
221  };
222  auto depth_traverse = [&]() {
223  while (cur.second != nodes[cur.first].children_.end() &&
224  seen_nodes.find(*cur.second) != seen_nodes.end()) {
225  cur.second++;
226  }
227 
228  if (cur.second != nodes[cur.first].children_.end()) {
229  auto next = make_pair(*cur.second, nodes[*cur.second].children_.begin());
230  depth_stack.push(cur);
231  depth_stack.push(next);
232  }
233  };
234  for (int root_index : initial_frontier) {
235  depth_stack.push(
236  make_pair(root_index, nodes[root_index].children_.begin()));
237  while (depth_stack.size() > 0) {
238  cur = depth_stack.top();
239  depth_stack.pop();
240  if (seen_nodes.find(cur.first) == seen_nodes.end()) {
241  seen_nodes.insert(cur.first);
242  // Has one child, can be candidate for chain or can be added to the
243  // previous chain.
244  if (nodes[cur.first].children_.size() == 1) {
245  if (check_current_for_chaining()) {
246  // Add oneself to the current chain.
247  VLOG(1) << "Adding to existing chain" << cur.first;
248  chain.push_back(cur.first);
249  int index = *nodes[cur.first].children_.begin();
250  depth_stack.push(make_pair(index, nodes[index].children_.begin()));
251  } else {
252  // Can't belong to the previous chain, commit previous chain and
253  // start a new one.
254  commit_chain();
255  chain.push_back(cur.first);
256  int index = *nodes[cur.first].children_.begin();
257  depth_stack.push(make_pair(index, nodes[index].children_.begin()));
258  }
259  } else if (
260  nodes[cur.first].children_.size() == 0 &&
261  check_current_for_chaining()) {
262  // Add current node to the current chain and commit.
263  chain.push_back(cur.first);
264  commit_chain();
265  } else {
266  // Node has more than one child.
267  commit_chain();
268  // Add current node as an independent chain since it won't be a part
269  // of a bigger chain.
270  chain.push_back(cur.first);
271  commit_chain();
272  depth_traverse();
273  }
274  } else {
275  // This node has been seen before, we will only traverse its children.
276  // Commit any pending chains and continue traversing.
277  commit_chain();
278  depth_traverse();
279  }
280  } // End while
281 
282  // Check if this if is even needed.
283  commit_chain();
284  }
285  CAFFE_ENFORCE(
286  seen_nodes.size() == nodes.size(),
287  "Haven't seen all the nodes, expected number of nodes ",
288  nodes.size(),
289  ", but seen only ",
290  seen_nodes.size(),
291  ".");
292 
293  updateOperatorNodes(orig_nodes, chains);
294  return chains;
295 }
296 
297 ExecutionChains singleChains(std::vector<OperatorNode>& nodes) {
298  ExecutionChains chains;
299  for (auto i = 0; i < nodes.size(); ++i) {
300  chains[i] = {i};
301  }
302  updateOperatorNodes(nodes, chains);
303  return chains;
304 }
305 
306 std::vector<OperatorNode> prepareOperatorNodes(
307  const std::shared_ptr<const NetDef>& net_def,
308  Workspace* ws) {
309  std::vector<OperatorNode> operator_nodes(net_def->op_size());
310  std::map<string, int> blob_creator;
311  std::map<string, std::set<int>> blob_readers;
312  bool net_def_has_device_option = net_def->has_device_option();
313  // Initialize the operators
314  for (int idx = 0; idx < net_def->op_size(); ++idx) {
315  const OperatorDef& op_def = net_def->op(idx);
316  VLOG(1) << "Creating operator #" << idx << ": " << op_def.name() << ": "
317  << op_def.type();
318  if (!op_def.has_device_option() && net_def_has_device_option) {
319  OperatorDef temp_def(op_def);
320  temp_def.mutable_device_option()->CopyFrom(net_def->device_option());
321  operator_nodes[idx].operator_ = CreateOperator(temp_def, ws, idx);
322  } else {
323  auto op = CreateOperator(op_def, ws, idx);
324  op->set_debug_def(
325  std::shared_ptr<const OperatorDef>{net_def, &(net_def->op(idx))});
326  operator_nodes[idx].operator_ = std::move(op);
327  }
328  // Check the inputs, and set up parents if necessary. This addressese the
329  // read after write case.
330  auto checkInputs =
331  [&](const google::protobuf::RepeatedPtrField<std::string>& inputs) {
332  for (const string& input : inputs) {
333  if (blob_creator.count(input) == 0) {
334  VLOG(1) << "Input " << input << " not produced by this net. "
335  << "Assuming it is pre-existing.";
336  } else {
337  int parent = blob_creator[input];
338  VLOG(1) << "op dependency (RaW " << input << "): " << parent
339  << "->" << idx;
340  operator_nodes[idx].parents_.push_back(parent);
341  operator_nodes[parent].children_.push_back(idx);
342  }
343  // Add the current idx to the readers of this input.
344  blob_readers[input].insert(idx);
345  }
346  };
347  checkInputs(op_def.input());
348  checkInputs(op_def.control_input());
349 
350  // Check the outputs.
351  for (const string& output : op_def.output()) {
352  if (blob_creator.count(output) != 0) {
353  // This addresses the write after write case - we will assume that all
354  // writes are inherently sequential.
355  int waw_parent = blob_creator[output];
356  VLOG(1) << "op dependency (WaW " << output << "): " << waw_parent
357  << "->" << idx;
358  operator_nodes[idx].parents_.push_back(waw_parent);
359  operator_nodes[waw_parent].children_.push_back(idx);
360  }
361  // This addresses the write after read case - we will assume that writes
362  // should only occur after all previous reads are finished.
363  for (const int war_parent : blob_readers[output]) {
364  VLOG(1) << "op dependency (WaR " << output << "): " << war_parent
365  << "->" << idx;
366  operator_nodes[idx].parents_.push_back(war_parent);
367  operator_nodes[war_parent].children_.push_back(idx);
368  }
369  // Renew the creator of the output name.
370  blob_creator[output] = idx;
371  // The write would create an implicit barrier that all earlier readers of
372  // this output is now parents of the current op, and future writes would
373  // not need to depend on these earlier readers. Thus, we can clear up the
374  // blob readers.
375  blob_readers[output].clear();
376  }
377  }
378 
379  // Now, make sure that the parent list and the children list do not contain
380  // duplicated items.
381  for (int i = 0; i < operator_nodes.size(); ++i) {
382  auto& node = operator_nodes[i];
383  // Sort, remove duplicates, and delete self dependency.
384  auto& p = node.parents_;
385  std::sort(p.begin(), p.end());
386  p.erase(std::unique(p.begin(), p.end()), p.end());
387  p.erase(std::remove(p.begin(), p.end(), i), p.end());
388  // Do the same for the children vector.
389  auto& c = node.children_;
390  std::sort(c.begin(), c.end());
391  c.erase(std::unique(c.begin(), c.end()), c.end());
392  c.erase(std::remove(c.begin(), c.end(), i), c.end());
393  }
394 
395  return operator_nodes;
396 }
397 
398 std::vector<OpGraphNode> prepareChainGraphNodes(
399  const std::vector<dag_utils::OperatorNode>& operator_nodes,
400  const std::vector<std::vector<int>>& execution_chains) {
401  std::unordered_map<int, int> op_to_chain_idx;
402  for (int chain_idx = 0; chain_idx < execution_chains.size(); ++chain_idx) {
403  const auto& chain_indices = execution_chains[chain_idx];
404  for (const auto& chain_op_idx : chain_indices) {
405  CAFFE_ENFORCE(!op_to_chain_idx.count(chain_op_idx));
406  op_to_chain_idx[chain_op_idx] = chain_idx;
407  }
408  }
409 
410  std::vector<OpGraphNode> chain_nodes(execution_chains.size());
411  for (int op_idx = 0; op_idx < operator_nodes.size(); ++op_idx) {
412  CAFFE_ENFORCE(op_to_chain_idx.count(op_idx));
413  auto chain_idx = op_to_chain_idx[op_idx];
414  auto& chain = chain_nodes[chain_idx];
415  auto& op_node = operator_nodes[op_idx];
416 
417  for (const auto& child_idx : op_node.children_) {
418  CAFFE_ENFORCE(op_to_chain_idx.count(child_idx));
419  auto child_chain_idx = op_to_chain_idx[child_idx];
420  if (child_chain_idx != chain_idx) {
421  auto it = std::find(
422  chain.children_.begin(), chain.children_.end(), child_chain_idx);
423  if (it == chain.children_.end()) {
424  chain.children_.push_back(child_chain_idx);
425  }
426  }
427  }
428 
429  for (const auto& parent_idx : op_node.parents_) {
430  CAFFE_ENFORCE(op_to_chain_idx.count(parent_idx));
431  auto parent_chain_idx = op_to_chain_idx[parent_idx];
432  if (parent_chain_idx != chain_idx) {
433  auto it = std::find(
434  chain.parents_.begin(), chain.parents_.end(), parent_chain_idx);
435  if (it == chain.parents_.end()) {
436  chain.parents_.push_back(parent_chain_idx);
437  }
438  }
439  }
440  }
441 
442  return chain_nodes;
443 }
444 
445 } // namespace dag_utils
446 } // namespace caffe2
Copyright (c) 2016-present, Facebook, Inc.