Caffe2 - C++ API
A deep learning, cross platform ML framework
net_async_dag_gpu.cc
1 
17 #include "caffe2/core/net_async_dag_gpu.h"
18 
19 #include <set>
20 #include <stack>
21 #include <unordered_map>
22 #include <unordered_set>
23 
24 #include "caffe2/core/operator.h"
25 #include "caffe2/core/static_tracepoint.h"
26 #include "caffe2/core/timer.h"
27 #include "caffe2/proto/caffe2.pb.h"
28 #include "caffe2/utils/proto_utils.h"
29 
30 #include "caffe2/core/context_gpu.h"
31 
32 #ifdef CAFFE2_USE_NVTX
33 #include <nvToolsExt.h>
34 #endif
35 
36 CAFFE2_DEFINE_bool(caffe2_use_nvtx, false, "Use NVTX ranges for profiling");
37 
38 CAFFE2_DEFINE_bool(
39  caffe2_async_dag_use_multiple_streams,
40  false,
41  "Use multiple streams per thread");
42 
43 CAFFE2_DECLARE_bool(caffe2_dag_net_collect_stats);
44 
45 CAFFE2_DECLARE_bool(caffe2_net_async_finish_chain);
46 
47 CAFFE2_DECLARE_int(caffe2_streams_per_gpu);
48 
49 CAFFE2_DECLARE_bool(caffe2_net_async_check_stream_status);
50 
51 namespace caffe2 {
52 
53 thread_local std::vector<int> AsyncDAGNet::stream_counters_;
54 
55 namespace {
56 
57 using Color = int32_t;
58 constexpr Color kRunColor = 0x0000CCFF; // blue
59 constexpr Color kRecordColor = 0x00FF3300; // red
60 constexpr Color kWaitColor = 0x0066FF33; // green
61 
62 #ifdef CAFFE2_USE_NVTX
63 
64 class ProfiledRange {
65  public:
66  ProfiledRange(const OperatorDef& def, Color color) {
67  if (!FLAGS_caffe2_use_nvtx) {
68  return;
69  }
70  nvtxEventAttributes_t eventAttrib = {0};
71  eventAttrib.version = NVTX_VERSION;
72  eventAttrib.size = NVTX_EVENT_ATTRIB_STRUCT_SIZE;
73  eventAttrib.colorType = NVTX_COLOR_ARGB;
74  eventAttrib.color = color;
75  eventAttrib.messageType = NVTX_MESSAGE_TYPE_ASCII;
76  eventAttrib.message.ascii = def.type().c_str();
77  range_ = nvtxRangeStartEx(&eventAttrib);
78  CAFFE_ENFORCE(range_, "Start range is invalid.");
79  }
80 
81  ~ProfiledRange() {
82  if (!FLAGS_caffe2_use_nvtx) {
83  return;
84  }
85  nvtxRangeEnd(range_);
86  }
87 
88  private:
89  nvtxRangeId_t range_ = 0;
90  DISABLE_COPY_AND_ASSIGN(ProfiledRange);
91 };
92 
93 #else
94 
95 class ProfiledRange {
96  public:
97  ProfiledRange(const OperatorDef& def, Color color) {}
98 
99  private:
100  DISABLE_COPY_AND_ASSIGN(ProfiledRange);
101 };
102 
103 #endif // ifdef CAFFE2_USE_NVTX
104 
105 } // namespace
106 
107 AsyncDAGNet::AsyncDAGNet(
108  const std::shared_ptr<const NetDef>& net_def,
109  Workspace* ws)
110  : DAGNetBase(net_def, ws) {
111  VLOG(1) << "Constructing Async DAG Net " << net_def->name();
112  eventRecorded_.resize(net_def->op_size());
113 
114  // For all chains, their tail should consist the list of events that we are
115  // needing for synchronization in the Run() inteface, unless there are other
116  // chains depending on it.
117  events_.reserve(execution_chains_.size());
118  for (const auto& chain : execution_chains_) {
119  const int tail_op_idx = chain.second.back();
120  if (operator_nodes_[tail_op_idx].children_.empty()) {
121  events_.push_back(&operator_nodes_[tail_op_idx].operator_->event());
122  }
123  }
124  VLOG(1) << "Total " << execution_chains_.size()
125  << " chains, final waiting on " << events_.size() << " events";
126 }
127 
128 int AsyncDAGNet::stream(const DeviceOption& device_option) {
129  int stream_id = 0;
130  if (device_option.device_type() == CUDA) {
131  int gpu_id = device_option.cuda_gpu_id();
132  CAFFE_ENFORCE_GE(gpu_id, 0, "Invalid gpu id: " + caffe2::to_string(gpu_id));
133  if (gpu_id >= stream_counters_.size()) {
134  stream_counters_.resize(gpu_id + 1, 0);
135  }
136  do {
137  stream_id = stream_counters_[gpu_id]++;
138  stream_counters_[gpu_id] %= FLAGS_caffe2_streams_per_gpu;
139  } while (FLAGS_caffe2_net_async_check_stream_status &&
140  !CUDAContext::IsStreamFree(device_option, stream_id));
141  }
142  return stream_id;
143 }
144 
145 bool AsyncDAGNet::RunAt(int chain_id, const std::vector<int>& chain) {
146  CAFFE_ENFORCE(!chain.empty(), "Chain should not be empty.");
147  const auto source_idx = chain.front();
148  const auto& parents = operator_nodes_[source_idx].parents_;
149  // Help ensure that our chaining is correct by verifying at least
150  // one parent recorded an event.
151  CAFFE_ENFORCE(
152  parents.empty() ||
153  std::any_of(
154  parents.begin(),
155  parents.end(),
156  [this](int p) { return eventRecorded_[p]; }),
157  "None of the parent is recorded for an event.");
158 
159  int stream_id = 0;
160  if (FLAGS_caffe2_async_dag_use_multiple_streams) {
161  stream_id = stream(
162  operator_nodes_[source_idx].operator_->event().GetDeviceOption());
163  }
164 
165  std::vector<const Event*> parent_events;
166  parent_events.reserve(operator_nodes_[source_idx].parents_.size());
167  for (auto source_parent_idx : operator_nodes_[source_idx].parents_) {
168  parent_events.push_back(
169  &operator_nodes_[source_parent_idx].operator_->event());
170  }
171  {
172  ProfiledRange r(
173  operator_nodes_[source_idx].operator_->debug_def(), kWaitColor);
174  operator_nodes_[source_idx].operator_->WaitEvents(parent_events, stream_id);
175  }
176 
177  if (FLAGS_caffe2_dag_net_collect_stats) {
178  const auto& device_option =
179  operator_nodes_[source_idx].operator_->event().GetDeviceOption();
180  CAFFE_EVENT(
181  stats_[device_option.device_type()],
182  task_wait_time_us,
183  task_timers_[chain_id]->MicroSeconds());
184  }
185 
186  // We've waited on all our parent indices.
187  bool success = true;
188  for (auto idx : chain) {
189  ProfiledRange r(operator_nodes_[idx].operator_->debug_def(), kRunColor);
190  success &= operator_nodes_[idx].operator_->RunAsync(stream_id);
191  }
192 
193  const auto& sink_idx = chain.back();
194  if (success && FLAGS_caffe2_net_async_finish_chain) {
195  operator_nodes_[sink_idx].operator_->event().Finish();
196  }
197  CAFFE_ENFORCE(
198  !eventRecorded_[sink_idx],
199  "An event for ",
200  sink_idx,
201  " should not be recorded.");
202  eventRecorded_[sink_idx] = 1;
203 
204  if (FLAGS_caffe2_dag_net_collect_stats) {
205  const auto& device_option =
206  operator_nodes_[source_idx].operator_->event().GetDeviceOption();
207  CAFFE_EVENT(
208  stats_[device_option.device_type()],
209  task_time_to_scheduled_us,
210  task_timers_[chain_id]->MicroSeconds());
211  }
212  return success;
213 }
214 
215 bool AsyncDAGNet::DoRunAsync() {
216  // Reset the event tracking at each iteration
217  eventRecorded_.assign(eventRecorded_.size(), 0);
218 
219  const auto result = DAGNetBase::DoRunAsync();
220  return result;
221 }
222 
223 REGISTER_NET(async_dag, AsyncDAGNet);
224 
225 } // namespace caffe2
Copyright (c) 2016-present, Facebook, Inc.