Caffe2 - C++ API
A deep learning, cross platform ML framework
recurrent_network_executor_gpu.cc
1 #include "caffe2/operators/rnn/recurrent_network_executor_gpu.h"
2 
3 #include "caffe2/core/context_gpu.h"
4 
5 namespace caffe2 {
6 
7 template <>
8 std::unique_ptr<RecurrentNetworkExecutorBase> createRNNExecutor<CUDAContext>(
9  const NetDef& step_net_def,
10  std::map<string, string>& recurrent_input_map,
11  std::string timestep_blob,
12  ArgumentHelper arg_helper) {
13  auto* exec = new CUDARecurrentNetworkExecutor(
14  step_net_def, recurrent_input_map, timestep_blob);
15  int max_streams = arg_helper.GetSingleArgument<int>("rnn_executor.max_cuda_streams", 0);
16  if (max_streams > 0) {
17  exec->setMaxStreams(max_streams);
18  LOG(INFO) << "Set max streams:" << max_streams;
19  }
20  std::unique_ptr<RecurrentNetworkExecutorBase> ptr(exec);
21  return ptr;
22 }
23 
24 CUDARecurrentNetworkExecutor::~CUDARecurrentNetworkExecutor() {
25  for (cudaEvent_t ev : events_) {
26  if (ev != nullptr) {
27  CUDA_CHECK(cudaEventDestroy(ev));
28  }
29  }
30 }
31 
38 void CUDARecurrentNetworkExecutor::_ExecRange(int from, int to) {
39  int direction = to > from ? 1 : -1;
40 
41  int max_streams = max_parallel_timesteps_ > 0 ?
42  std::min(max_parallel_timesteps_, max_cuda_streams_)
43  : max_cuda_streams_;
44  int stream_seq = 0;
45  int num_ops = timestep_ops_[0].size();
46 
47  events_.resize(num_ops * timestep_ops_.size(), nullptr);
48 
49  int gpu_id = -1;
50 
51  // Loop over timesteps
52  for (int t = from; t != to; t += direction) {
53  bool first_timestep = t == from;
54  bool last_timestep =
55  (direction == -1 && t == 0) || (direction == 1 && t == to - 1);
56  auto& ops = timestep_ops_[t];
57  int stream_id = stream_seq % max_streams;
58 
59  for (int i = 0; i < ops.size(); i++) {
60  auto& rnn_op = ops[i];
61 
62  // Special handling for link ops -- we just run them directly
63  // they do not execute any kernels.
64  if (rnn_op.link_op) {
65  rnn_op.op->RunAsync(stream_id);
66  CAFFE_ENFORCE(
67  rnn_op.dependencies.empty(),
68  "GPU executor ignores link dependencies");
69  continue;
70  }
71 
72  if (gpu_id == -1 &&
73  rnn_op.op->device_option().device_type() ==
74  DeviceTypeProto::PROTO_CUDA) {
75  gpu_id = rnn_op.op->device_option().device_id();
76  } else {
77  CAFFE_ENFORCE(
78  rnn_op.op->device_option().device_type() == 0 ||
79  rnn_op.op->device_option().device_id() == gpu_id,
80  "RNN Executor only supports ops on one GPU");
81  }
82 
83  // If have recurrent parents, add for event waits so that those
84  // parents complete their work.
85  if (has_timestep_parallelism_ && !first_timestep) {
86  for (int parent : rnn_op.parents) {
87  if (parent > i) {
88  int parent_ev_idx = (t - direction) * num_ops + parent;
89  CHECK(events_.size() > parent_ev_idx);
90  CAFFE_ENFORCE(events_[parent_ev_idx] != nullptr);
91  CUDA_CHECK(cudaStreamWaitEvent(
92  CUDAContext::cuda_stream(gpu_id, stream_id),
93  events_[parent_ev_idx],
94  0));
95  }
96  }
97  }
98 
99  // Run the op in the given stream
100  rnn_op.op->RunAsync(stream_id);
101 
102  // Create and record event for this op, if it has at least one
103  // recurrent dependency.
104  if (has_timestep_parallelism_ && !last_timestep) {
105  for (int dep : rnn_op.dependencies) {
106  if (dep < i) {
107  int event_idx = t * num_ops + i;
108  // Create event for recurrent connections
109  if (events_[event_idx] == nullptr) {
110  CUDA_CHECK(cudaEventCreate(&events_[event_idx]));
111  }
112  CUDA_CHECK(cudaEventRecord(
113  events_[event_idx],
114  CUDAContext::cuda_stream(gpu_id, stream_id)));
115  break;
116  }
117  }
118  }
119  } // for over ops
120 
121  // Next timestep will run on different stream
122  if (has_timestep_parallelism_) {
123  stream_seq++;
124  }
125  } // for over timesteps
126 
130  for (int stream_id = 0; stream_id <= std::min(stream_seq, max_streams - 1);
131  stream_id++) {
132  VLOG(1) << "Wait for stream:" << stream_id;
133  CUDA_CHECK(
134  cudaStreamSynchronize(CUDAContext::cuda_stream(gpu_id, stream_id)));
135  }
136 }
137 
138 bool CUDARecurrentNetworkExecutor::Run(int T) {
139  CAFFE_ENFORCE_GE(T, 0, "Negative number of steps");
140  if (T == 0) {
141  return true;
142  }
143  _ExecRange(0, T);
144  return true;
145 }
146 
147 bool CUDARecurrentNetworkExecutor::RunBackwards(int T) {
148  CAFFE_ENFORCE_GE(T, 0, "Negative number of steps");
149  if (T == 0) {
150  return true;
151  }
152  _ExecRange(T - 1, -1);
153  return true;
154 }
155 }
A global dictionary that holds information about what Caffe2 modules have been loaded in the current ...
Definition: blob.h:13