1 #include "caffe2/operators/rnn/recurrent_network_executor_gpu.h" 3 #include "caffe2/core/context_gpu.h" 8 std::unique_ptr<RecurrentNetworkExecutorBase> createRNNExecutor<CUDAContext>(
9 const NetDef& step_net_def,
10 std::map<string, string>& recurrent_input_map,
11 std::string timestep_blob,
12 ArgumentHelper arg_helper) {
13 auto* exec =
new CUDARecurrentNetworkExecutor(
14 step_net_def, recurrent_input_map, timestep_blob);
15 int max_streams = arg_helper.GetSingleArgument<
int>(
"rnn_executor.max_cuda_streams", 0);
16 if (max_streams > 0) {
17 exec->setMaxStreams(max_streams);
18 LOG(INFO) <<
"Set max streams:" << max_streams;
20 std::unique_ptr<RecurrentNetworkExecutorBase> ptr(exec);
24 CUDARecurrentNetworkExecutor::~CUDARecurrentNetworkExecutor() {
25 for (cudaEvent_t ev : events_) {
27 CUDA_CHECK(cudaEventDestroy(ev));
38 void CUDARecurrentNetworkExecutor::_ExecRange(
int from,
int to) {
39 int direction = to > from ? 1 : -1;
41 int max_streams = max_parallel_timesteps_ > 0 ?
42 std::min(max_parallel_timesteps_, max_cuda_streams_)
45 int num_ops = timestep_ops_[0].size();
47 events_.resize(num_ops * timestep_ops_.size(),
nullptr);
52 for (
int t = from; t != to; t += direction) {
53 bool first_timestep = t == from;
55 (direction == -1 && t == 0) || (direction == 1 && t == to - 1);
56 auto& ops = timestep_ops_[t];
57 int stream_id = stream_seq % max_streams;
59 for (
int i = 0; i < ops.size(); i++) {
60 auto& rnn_op = ops[i];
65 rnn_op.op->RunAsync(stream_id);
67 rnn_op.dependencies.empty(),
68 "GPU executor ignores link dependencies");
73 rnn_op.op->device_option().device_type() ==
74 DeviceTypeProto::PROTO_CUDA) {
75 gpu_id = rnn_op.op->device_option().device_id();
78 rnn_op.op->device_option().device_type() == 0 ||
79 rnn_op.op->device_option().device_id() == gpu_id,
80 "RNN Executor only supports ops on one GPU");
85 if (has_timestep_parallelism_ && !first_timestep) {
86 for (
int parent : rnn_op.parents) {
88 int parent_ev_idx = (t - direction) * num_ops + parent;
89 CHECK(events_.size() > parent_ev_idx);
90 CAFFE_ENFORCE(events_[parent_ev_idx] !=
nullptr);
91 CUDA_CHECK(cudaStreamWaitEvent(
92 CUDAContext::cuda_stream(gpu_id, stream_id),
93 events_[parent_ev_idx],
100 rnn_op.op->RunAsync(stream_id);
104 if (has_timestep_parallelism_ && !last_timestep) {
105 for (
int dep : rnn_op.dependencies) {
107 int event_idx = t * num_ops + i;
109 if (events_[event_idx] ==
nullptr) {
110 CUDA_CHECK(cudaEventCreate(&events_[event_idx]));
112 CUDA_CHECK(cudaEventRecord(
114 CUDAContext::cuda_stream(gpu_id, stream_id)));
122 if (has_timestep_parallelism_) {
130 for (
int stream_id = 0; stream_id <= std::min(stream_seq, max_streams - 1);
132 VLOG(1) <<
"Wait for stream:" << stream_id;
134 cudaStreamSynchronize(CUDAContext::cuda_stream(gpu_id, stream_id)));
138 bool CUDARecurrentNetworkExecutor::Run(
int T) {
139 CAFFE_ENFORCE_GE(T, 0,
"Negative number of steps");
147 bool CUDARecurrentNetworkExecutor::RunBackwards(
int T) {
148 CAFFE_ENFORCE_GE(T, 0,
"Negative number of steps");
152 _ExecRange(T - 1, -1);
A global dictionary that holds information about what Caffe2 modules have been loaded in the current ...