Caffe2 - C++ API
A deep learning, cross platform ML framework
recurrent_network_op.h
1 
17 #ifndef CAFFE2_OPERATORS_RECURRENT_NETWORK_OP_H_
18 #define CAFFE2_OPERATORS_RECURRENT_NETWORK_OP_H_
19 
20 #include "caffe2/core/context.h"
21 #include "caffe2/core/logging.h"
22 #include "caffe2/core/operator.h"
23 #include "caffe2/core/tensor.h"
24 #include "caffe2/operators/recurrent_network_executor.h"
25 #include "caffe2/utils/conversions.h"
26 #include "caffe2/utils/math.h"
27 
28 CAFFE2_DECLARE_bool(caffe2_rnn_executor);
29 
30 namespace caffe2 {
31 namespace detail {
32 
33 struct Param {
34  std::string param;
35  std::string grad;
36  std::string cellGradient;
37 };
38 
40  std::string state;
41  std::string input;
42 };
43 
45  std::string param;
46  std::string grad;
47  std::string externalGrad;
48  std::string lastExternalGrad;
49  int32_t offset;
50 };
51 
52 struct OffsetAlias {
53  std::string src;
54  std::string dst;
55  int32_t offset{0};
56 };
57 
58 struct Link {
59  std::string internal;
60  std::string external;
61  int32_t offset{0};
62  int32_t window{1};
63 };
64 
66  std::vector<std::shared_ptr<Workspace>> stepWorkspaces;
67  std::shared_ptr<Workspace> sharedBlobsWs = nullptr;
68 };
69 
70 inline void UpdateTimestepBlob(Workspace* ws, std::string blob_name, int t) {
71  ws->CreateBlob(blob_name)->GetMutable<TensorCPU>()->Resize(1);
72  auto timestepBlob = ws->GetBlob(blob_name);
73  CAFFE_ENFORCE(timestepBlob);
74  timestepBlob->GetMutable<TensorCPU>()->mutable_data<int32_t>()[0] = t;
75 }
76 
77 std::map<string, string> GetRecurrentMapping(
78  const std::vector<detail::Link>& links, bool backward);
79 
80 template <typename T, typename Context>
81 void applyOffsetAlias(
82  const OffsetAlias& oc,
83  Workspace* ws,
84  Context* /*context*/) {
85  VLOG(1) << "Aliasing: " << oc.src << " to: " << oc.dst
86  << " at offset: " << oc.offset;
87  auto srcBlob = ws->GetBlob(oc.src);
88  CAFFE_ENFORCE(srcBlob);
89  auto* src = srcBlob->template GetMutable<Tensor<Context>>();
90  auto* dst = ws->GetBlob(oc.dst)->template GetMutable<Tensor<Context>>();
91  auto timestep = src->size() / src->dim(0);
92  auto dims = src->dims();
93  const int32_t startDstTimestep =
94  oc.offset >= 0 ? oc.offset : src->dim(0) + oc.offset;
95  const int32_t numDstTimesteps = src->dim(0) - startDstTimestep;
96  CAFFE_ENFORCE(
97  numDstTimesteps >= 1, "Invalid number of timesteps: ", numDstTimesteps);
98  dims[0] = numDstTimesteps;
99  dst->Resize(dims);
100  CAFFE_ENFORCE(timestep == dst->size() / numDstTimesteps, "Invalid offset");
101  dst->ShareExternalPointer(
102  src->template mutable_data<T>() + startDstTimestep * timestep,
103  dst->size());
104 }
105 
106 template <typename T, class Context>
107 void repeatCopy(
108  size_t repeat_n,
109  size_t n,
110  const T* src,
111  T* dst,
112  Context* context) {
113  for (int i = 0; i < repeat_n; ++i) {
114  context->template Copy<T, Context, Context>(n, src, dst + i * n);
115  }
116 }
117 
122 template <typename T, typename Context>
123 void initializeRecurrentInput(
124  const RecurrentInput& rc,
125  int32_t seqLen,
126  int32_t batchSize,
127  Workspace* ws,
128  Context* context) {
129  auto stateBlob = ws->GetBlob(rc.state);
130  CAFFE_ENFORCE(stateBlob);
131  auto* state = stateBlob->template GetMutable<Tensor<Context>>();
132 
133  auto inputBlob = ws->GetBlob(rc.input);
134  CAFFE_ENFORCE(inputBlob);
135  const auto& input = inputBlob->template Get<Tensor<Context>>();
136  CAFFE_ENFORCE_GE(input.ndim(), 1, rc.input);
137  CAFFE_ENFORCE_LE(input.ndim(), 3, rc.input);
138 
139  const auto stateSize = input.dim(input.ndim() - 1);
140  // Sometimes we want to provide more than one initial step.
141  // For example, if we do a convolution op in step net
142  // and need a sufficient left padding around the input.
143  // This could be used together with links where window != 1.
144  auto initialStateLength = 1;
145  if (input.ndim() == 3) {
146  initialStateLength = input.dim(0);
147  }
148  // States at [0, ..., (T + initialStateLength - 1)] (inclusive)
149  state->Resize(seqLen + initialStateLength, batchSize, stateSize);
150 
151  if (input.ndim() >= 2) {
152  CAFFE_ENFORCE_EQ(input.dim(input.ndim() - 2), batchSize, rc.input);
153  context->template Copy<T, Context, Context>(
154  batchSize * stateSize * initialStateLength,
155  input.template data<T>(),
156  state->template mutable_data<T>());
157  } else {
158  // Usually, the initial state is the same for all inputs in the batch.
159  // So the op conveniently accepts 1-D input and copies it batchSize times.
160  repeatCopy<T, Context>(
161  batchSize,
162  stateSize,
163  input.template data<T>(),
164  state->template mutable_data<T>(),
165  context);
166  }
167 }
168 
169 void PrependOps(std::vector<OperatorDef> ops, NetDef* netdef);
170 
171 void AddApplyLinkOps(
172  const vector<Link>& links,
173  std::string timestep,
174  const DeviceOption& device_option,
175  NetDef* netdef);
176 
177 void extractLinks(
178  OperatorBase* op,
179  const std::string& internalArg,
180  const std::string& externalArg,
181  const std::string& offsetArg,
182  const std::string& windowArg,
183  std::vector<detail::Link>* links);
184 
185 NetDef extractNetDef(const OperatorDef& op, const std::string& argName);
186 } // namespace detail
187 
188 template <class Context>
189 class RecurrentNetworkOp final : public Operator<Context> {
190  public:
191  USE_OPERATOR_CONTEXT_FUNCTIONS;
192  RecurrentNetworkOp(const OperatorDef& operator_def, Workspace* ws)
193  : Operator<Context>(operator_def, ws),
194  sharedWs_(ws),
195  enable_rnn_executor_(OperatorBase::template GetSingleArgument<bool>(
196  "enable_rnn_executor",
197  false)),
198  timestep_(OperatorBase::template GetSingleArgument<std::string>(
199  "timestep",
200  "timestep")) {
201  CAFFE_ENFORCE(ws);
202 
203  stepNetDef_ = detail::extractNetDef(operator_def, "step_net");
204 
205  recurrentInputs_ = constructRecurrentInputs(operator_def, sharedWs_);
206  links_ = constructLinks();
207  aliases_ = constructAliases();
208 
209  stepNetDef_.add_external_input(timestep_);
210  detail::AddApplyLinkOps(
211  links_, timestep_, operator_def.device_option(), &stepNetDef_);
212 
213  if (FLAGS_caffe2_rnn_executor && enable_rnn_executor_) {
214  VLOG(1) << "Use RecurrentNetworkExecutor";
215  auto recurrent_map = detail::GetRecurrentMapping(links_, false /* backward */);
216  rnnExecutor_ =
217  createRNNExecutor<Context>(
218  stepNetDef_,
219  recurrent_map,
220  timestep_,
221  ArgumentHelper(operator_def));
222  } else {
223  // Fix for legacy models that pass "rnn" type net
224  if (stepNetDef_.type() == "rnn") {
225  stepNetDef_.set_type("async_simple");
226  }
227  CAFFE_ENFORCE(stepNetDef_.type() != "async_dag");
228  }
229  }
230 
231  size_t NumObservers() override {
232  size_t num = this->observers_list_.size();
233  if (rnnExecutor_) {
234  num += rnnExecutor_->NumObserversStepNet();
235  }
236  return num;
237  }
238 
239  std::vector<detail::RecurrentInput> constructRecurrentInputs(
240  const OperatorDef& operator_def,
241  Workspace* sharedWs) {
242  const auto states =
243  OperatorBase::GetRepeatedArgument<std::string>("recurrent_states");
244  const auto inputs =
245  OperatorBase::GetRepeatedArgument<int>("initial_recurrent_state_ids");
246  CAFFE_ENFORCE_EQ(states.size(), inputs.size(), "states/inputs mismatch");
247  std::vector<detail::RecurrentInput> ris;
248  for (auto i = 0; i < states.size(); ++i) {
249  // States need to be "global" (since they are shared between
250  // forward and backward).
251  sharedWs->CreateBlob(states[i]);
252 
254  ri.state = states[i];
255  ri.input = operator_def.input(inputs[i]);
256  ris.push_back(ri);
257  }
258  return ris;
259  }
260 
261  std::vector<detail::OffsetAlias> constructAliases() {
262  const auto& src =
263  OperatorBase::GetRepeatedArgument<std::string>("alias_src");
264  const auto& dst =
265  OperatorBase::GetRepeatedArgument<std::string>("alias_dst");
266  const auto& offset =
267  OperatorBase::GetRepeatedArgument<int32_t>("alias_offset");
268  CAFFE_ENFORCE(
269  src.size() == offset.size(), "alias_src/alias_offset mismatch");
270  CAFFE_ENFORCE(
271  dst.size() == offset.size(), "alias_dst/alias_offset mismatch");
272  std::vector<detail::OffsetAlias> aliases;
273  for (auto i = 0; i < src.size(); ++i) {
275  oc.src = src[i];
276  oc.dst = dst[i];
277  oc.offset = offset[i];
278  aliases.push_back(oc);
279  }
280  return aliases;
281  }
282 
290  std::vector<std::string> v;
291  const auto& blobs = OperatorBase::GetRepeatedArgument<std::string>(
292  "recompute_blobs_on_backward", v);
293  for (const auto& b : blobs) {
294  // Note: if the blob already was created, this is a no-op.
295  sharedBlobsWs->CreateBlob(b);
296  }
297  }
298 
299  std::vector<detail::Link> constructLinks() {
300  std::vector<detail::Link> links;
301  detail::extractLinks(
302  this,
303  "link_internal",
304  "link_external",
305  "link_offset",
306  "link_window",
307  &links);
308  return links;
309  }
310 
311  template<typename T>
312  bool DoRunWithType() {
313  const auto seqLen = Input(0).dim32(0);
314  const auto batchSize = Input(0).dim32(1);
315  for (const auto& ri : recurrentInputs_) {
316  detail::initializeRecurrentInput<T, Context>(
317  ri, seqLen, batchSize, sharedWs_, &context_);
318  }
319 
320  // If we don't have a backward step net, this operator is forward_only
321  // and we can avoid creating multiple workspaces.
322  bool has_backward_pass =
323  OperatorBase::HasSingleArgumentOfType<NetDef>("backward_step_net") ||
324  (OperatorBase::HasSingleArgumentOfType<string>("backward_step_net") &&
325  OperatorBase::GetSingleArgument<string>("backward_step_net", "") !=
326  "");
327 
328  // With backward pass: we need to create workspace for each timestep
329  detail::ScratchWorkspaces* scratch =
330  OperatorBase::Output<detail::ScratchWorkspaces>(OutputSize() - 1);
331  std::vector<std::shared_ptr<Workspace>>& stepWorkspaces =
332  scratch->stepWorkspaces;
333  std::shared_ptr<Workspace>& sharedBlobsWs = scratch->sharedBlobsWs;
334  if (!sharedBlobsWs) {
335  sharedBlobsWs = std::make_shared<Workspace>(sharedWs_);
336  }
337 
338  // Caller can decide that some of the forward activations
339  // are recomputed on backward pass. Then those activations do not
340  // have to be stored in step workspaces but can be shared.
341  initializeBlobsToRecomputeOnBackward(sharedBlobsWs.get());
342 
343  if (has_backward_pass && seqLen > stepWorkspaces.size()) {
344  stepWorkspaces.resize(seqLen);
345  }
346 
347  // In forward-only mode, we cycle over workspaces. This limits the amount
348  // of parallelism over timesteps that the RNNExecutor provides. So with
349  // RNN executor we use more workspaces to get better perf.
350  int num_workspaces_on_fwd_only = rnnExecutor_ ? 4 : 2;
351 
352  if (!has_backward_pass && stepWorkspaces.size() < num_workspaces_on_fwd_only) {
353  // Use alternating stepWorkspaces when forward_only=True.
354  // Note that the step workspaces can be shared by other ops, thus
355  // we cannot shrink it to 2 if there are more than 2 step workspaces.
356  stepWorkspaces.resize(num_workspaces_on_fwd_only);
357  }
358 
359  for (auto t = 0; t < seqLen; ++t) {
360  auto& currentStepWorkspace =
361  (has_backward_pass ? stepWorkspaces[t] :
362  stepWorkspaces[t % num_workspaces_on_fwd_only]);
363  if (!currentStepWorkspace) {
364  currentStepWorkspace = std::make_shared<Workspace>(sharedBlobsWs.get());
365  }
366 
367  if (rnnExecutor_) {
368  if (!has_backward_pass) {
369  // Need to limit timestep parallelism because we cycle over workspaces
370  rnnExecutor_->SetMaxParallelTimesteps(num_workspaces_on_fwd_only);
371  }
372  rnnExecutor_->EnsureTimestepInitialized(
373  t, currentStepWorkspace.get(), this->observers_list_);
374  } else {
375  // Use plain Caffe2 nets
376  detail::UpdateTimestepBlob(currentStepWorkspace.get(), timestep_, t);
377  auto* stepNet = currentStepWorkspace->GetNet(stepNetDef_.name());
378  if (stepNet == nullptr) {
379  stepNet = currentStepWorkspace->CreateNet(stepNetDef_);
380  }
381  CAFFE_ENFORCE(stepNet, "Step Net construction failure");
382  // Since we have a SimpleNet, there are no races here.
383  stepNet->RunAsync();
384  }
385  }
386 
387  if (rnnExecutor_) {
388  rnnExecutor_->Run(seqLen);
389  }
390 
391  for (const auto& alias : aliases_) {
392  detail::applyOffsetAlias<T, Context>(alias, sharedWs_, &context_);
393  }
394 
395  return true;
396  }
397 
398  bool RunOnDevice() override {
399  return DoRunWithType<float>();
400  }
401 
402  protected:
403  NetDef stepNetDef_;
404  Workspace* sharedWs_;
405  bool enable_rnn_executor_;
406  std::unique_ptr<RecurrentNetworkExecutorBase> rnnExecutor_;
407 
408  std::vector<detail::Link> links_;
409  std::vector<detail::OffsetAlias> aliases_;
410  std::vector<detail::RecurrentInput> recurrentInputs_;
411  std::string timestep_;
412 };
413 
414 template <class Context>
415 class RecurrentNetworkGradientOp final : public Operator<Context> {
416  public:
417  USE_OPERATOR_CONTEXT_FUNCTIONS;
418  RecurrentNetworkGradientOp(const OperatorDef& operator_def, Workspace* ws)
419  : Operator<Context>(operator_def, ws),
420  sharedWs_(ws),
421  enable_rnn_executor_(OperatorBase::template GetSingleArgument<bool>(
422  "enable_rnn_executor",
423  false)),
424  timestep_(OperatorBase::template GetSingleArgument<std::string>(
425  "timestep",
426  "timestep")),
427  gradInputs_(OperatorBase::template GetRepeatedArgument<int32_t>(
428  "outputs_with_grads")) {
429  CAFFE_ENFORCE(ws);
430 
431  stepNetDef_ = detail::extractNetDef(operator_def, "backward_step_net");
432 
433  links_ = constructLinks();
434  params_ = constructParams(operator_def);
435  recurrentGradients_ = constructRecurrentGradients(operator_def);
436  recurrentInputIds_ = OperatorBase::template GetRepeatedArgument<int32_t>(
437  "initial_recurrent_state_ids");
438 
439  /* Add operators to the backward step net to handle accumulation of
440  gradients over timesteps
441  */
442  stepNetDef_.add_external_input(timestep_);
443 
444  AddGradientInputAccumulationOps(operator_def);
445  detail::AddApplyLinkOps(
446  links_, timestep_, operator_def.device_option(), &stepNetDef_);
447  AddParamGradientAccumulationOps(operator_def);
448 
449  if (FLAGS_caffe2_rnn_executor && enable_rnn_executor_) {
450  InitializeExecutor(operator_def);
451  }
452  }
453 
454  // Renaming maps (generated by memonger.py)
455  std::string remappedName(std::string blob_name) {
456  return OperatorBase::template GetSingleArgument<std::string>(
457  blob_name + ".rename", blob_name);
458  }
459 
460  detail::Link remappedLink(const detail::Link& link) {
461  detail::Link renamed_link = link;
462  renamed_link.internal = remappedName(link.internal);
463  renamed_link.external = remappedName(link.external);
464  return renamed_link;
465  }
466 
467  void renameOpInputOutput(std::string from_name, std::string to_name) {
468  for (int j = 0; j < stepNetDef_.op_size(); j++) {
469  auto* op = stepNetDef_.mutable_op(j);
470  for (int i = 0; i < op->input_size(); i++) {
471  if (op->input(i) == from_name) {
472  op->set_input(i, to_name);
473  }
474  }
475  for (int i = 0; i < op->output_size(); i++) {
476  if (op->output(i) == from_name) {
477  op->set_output(i, to_name);
478  }
479  }
480  }
481  }
482 
483  std::vector<detail::Param> constructParams(const OperatorDef& operator_def) {
484  std::vector<detail::Param> params;
485  const auto& param = OperatorBase::GetRepeatedArgument<int32_t>("param");
486  const auto& param_grads =
487  OperatorBase::GetRepeatedArgument<string>("param_grads");
488  CAFFE_ENFORCE(
489  param_grads.empty() || param_grads.size() == param.size(),
490  param.size(),
491  " != ",
492  param_grads.size());
493  for (int i = 0; i < param.size(); ++i) {
494  detail::Param p;
495  // Forward inputs come after [outputs_with_grads] gradient inputs
496  p.param = operator_def.input(param[i] + gradInputs_.size());
497  // See GetRecurrentNetworkGradient to understand offseting here
498  p.grad = operator_def.output(i + numSequences_);
499 
500  std::string grad_blob =
501  param_grads.empty() ? p.grad : remappedName(param_grads[i]);
502  p.cellGradient = grad_blob + "_tmpstep";
503  params.push_back(p);
504 
505  renameOpInputOutput(grad_blob, p.cellGradient);
506  }
507  return params;
508  }
509 
510  std::vector<detail::RecurrentGradient> constructRecurrentGradients(
511  const OperatorDef& operator_def) {
512  std::vector<detail::RecurrentGradient> rgs;
513  const auto& recurrent =
514  OperatorBase::GetRepeatedArgument<std::string>("recurrent_states");
515  const auto& alias_src =
516  OperatorBase::GetRepeatedArgument<std::string>("alias_src");
517  const auto& offset =
518  OperatorBase::GetRepeatedArgument<int32_t>("alias_offset");
519 
520  for (auto i = 0; i < recurrent.size(); ++i) {
522  rg.param = recurrent[i];
523  rg.grad = remappedName(recurrent[i] + "_grad");
524 
525  for (int j = 0; j < alias_src.size(); ++j) {
526  if (alias_src[j] != recurrent[i]) {
527  continue;
528  }
529  int idx = -1;
530  for (int k = 0; k < gradInputs_.size(); ++k) {
531  if (gradInputs_[k] == j) {
532  idx = k;
533  }
534  }
535  if (idx == -1) {
536  continue;
537  }
538 
539  CAFFE_ENFORCE(offset[j] == 1 || offset[j] == -1);
540  if (offset[j] == 1) {
541  rg.externalGrad = operator_def.input(idx);
542  } else if (offset[j] == -1) {
543  rg.lastExternalGrad = operator_def.input(idx);
544  }
545  }
546  rg.offset = 1;
547  rgs.push_back(rg);
548  }
549  return rgs;
550  }
551 
552  std::vector<detail::Link> constructLinks() {
553  std::vector<detail::Link> links;
554  detail::extractLinks(
555  this,
556  "link_internal",
557  "link_external",
558  "link_offset",
559  "link_window",
560  &links);
561  detail::extractLinks(
562  this,
563  "backward_link_internal",
564  "backward_link_external",
565  "backward_link_offset",
566  "",
567  &links);
568  for (int i = 0; i < links.size(); i++) {
569  links[i] = remappedLink(links[i]);
570  }
571  return links;
572  }
573 
574  void InitializeExecutor(const OperatorDef& operator_def) {
575  VLOG(1) << "Use RecurrentNetworkExecutor for backward";
576  auto recurrent_map = detail::GetRecurrentMapping(links_, true /* backward */);
577  rnnExecutor_ = createRNNExecutor<Context>(
578  stepNetDef_, recurrent_map, timestep_, ArgumentHelper(operator_def));
579  }
580 
581  void AddGradientInputAccumulationOps(const OperatorDef& operator_def) {
585  std::vector<OperatorDef> ops;
586  for (const auto& rg : recurrentGradients_) {
587  if (rg.externalGrad.empty()) {
588  continue;
589  }
590  VLOG(1) << "Accumulating into: " << rg.grad << " from " << rg.externalGrad
591  << ", offset: " << rg.offset;
592 
593  OperatorDef opdef;
594  opdef.set_type("rnn_internal_accumulate_gradient_input");
595  opdef.add_input(timestep_);
596  opdef.add_input(rg.externalGrad);
597  opdef.add_input(rg.grad);
598  opdef.add_output(rg.grad);
599 
600  // Add also the linked blobs to outputs, to ensure correct
601  // chaining.
602  for (auto& l : links_) {
603  if (rg.grad == l.external) {
604  Argument* dep_arg = opdef.add_arg();
605  dep_arg->set_name("rnn_dependency." + l.internal);
606  dep_arg->set_s(l.internal);
607  }
608  }
609 
610  opdef.mutable_device_option()->CopyFrom(operator_def.device_option());
611 
612  Argument* offset_arg = opdef.add_arg();
613  offset_arg->set_name("offset");
614  offset_arg->set_i(rg.offset);
615  ops.push_back(opdef);
616 
617  stepNetDef_.add_external_input(rg.externalGrad);
618  stepNetDef_.add_external_input(rg.grad);
619  }
620  detail::PrependOps(ops, &stepNetDef_);
621  }
622 
623  void AddParamGradientAccumulationOps(const OperatorDef& operator_def) {
624  // If a user passes in param_grads mapping, we can copy dirrectly
625  // form a blob where backward cell net written data to.
626  // This becomes handy in a case where gradient from the cell net
627  // is an internal blob of the backward cell. This happens, for example,
628  // when SumOp is the first op of the cell
629  for (const auto& param : params_) {
630  OperatorDef opdef;
631  opdef.set_type("Sum");
632  opdef.add_input(param.grad);
633  opdef.add_input(param.cellGradient);
634  opdef.add_output(param.grad);
635  opdef.mutable_device_option()->CopyFrom(operator_def.device_option());
636  stepNetDef_.add_op()->CopyFrom(opdef);
637  stepNetDef_.add_external_input(param.grad);
638  }
639  }
640 
642  const std::shared_ptr<Workspace>& step0Ws,
643  Workspace* sharedBlobsWs) {
648  for (auto& op : stepNetDef_.op()) {
649  for (const string& outp : op.output()) {
650  if (!step0Ws->HasBlob(outp)) {
651  sharedBlobsWs->CreateBlob(outp);
652  }
653  }
654  }
655  }
656 
657  template<typename T>
658  bool DoRunWithType() {
659  const auto seqLen = Input(gradInputs_.size()).dim32(0);
660  VLOG(1) << "seqLen: " << seqLen;
661 
662  const detail::ScratchWorkspaces& scratch =
663  OperatorBase::Input<detail::ScratchWorkspaces>(InputSize() - 1);
664  const std::vector<std::shared_ptr<Workspace>>& stepWorkspaces =
665  scratch.stepWorkspaces;
666  CAFFE_ENFORCE_GE(stepWorkspaces.size(), seqLen);
667  Workspace& sharedBlobsWs = *scratch.sharedBlobsWs.get();
668 
669  const auto batchSize = Input(0).dim32(1);
670  for (auto& param : params_) {
671  auto pBlob = sharedWs_->GetBlob(param.param);
672  CAFFE_ENFORCE(pBlob);
673  const auto& p = pBlob->template Get<Tensor<Context>>();
674 
675  auto gBlob = sharedWs_->GetBlob(param.grad);
676  CAFFE_ENFORCE(gBlob);
677  auto* g = gBlob->template GetMutable<Tensor<Context>>();
678  g->ResizeLike(p);
679  math::Set<T, Context>(
680  g->size(),
681  convert::To<float,T>(0.0),
682  g->template mutable_data<T>(),
683  &context_);
684  }
685 
686  for (auto& rg : recurrentGradients_) {
687  auto pBlob = sharedWs_->GetBlob(rg.param);
688  CAFFE_ENFORCE(pBlob);
689  const auto& p = pBlob->template Get<Tensor<Context>>();
690 
691  auto gBlob = sharedWs_->CreateBlob(rg.grad);
692  CAFFE_ENFORCE(gBlob);
693  auto* g = gBlob->template GetMutable<Tensor<Context>>();
694  g->ResizeLike(p);
695  CAFFE_ENFORCE_EQ(g->ndim(), 3);
696  const auto timestep = g->size() / g->dim(0);
697  // Fill the last timestep with zeros for the gradient
698  math::Set<T, Context>(
699  timestep,
700  convert::To<float,T>(0.0),
701  g->template mutable_data<T>() + (g->dim(0) - 1) * timestep,
702  &context_);
703  }
704 
705  // This code assumes that there are several inputs
706  // sequences. Actually it is not supported by the rest of the code,
707  // and numSequences_ is a constant, equal to 1.
708  for (int i = 0; i < numSequences_; ++i) {
709  // Offseting as the first gradInputs_.size() inputs of the op
710  // are from GO. Then all I(0..N).
711  const int gradientInputIndex = i + gradInputs_.size();
712  const auto& inputName = this->debug_def().input(gradientInputIndex);
713  auto gradientName = remappedName(inputName + "_grad");
714  VLOG(1) << "Initializing gradient for input " << gradientInputIndex
715  << " (" << inputName << ") "
716  << " as blob " << gradientName
717  << ". Size: " << Input(gradientInputIndex).size();
718  auto pGradientBlob = sharedWs_->GetBlob(gradientName);
719  CAFFE_ENFORCE(pGradientBlob);
720  auto* g = pGradientBlob->template GetMutable<Tensor<Context>>();
721  g->ResizeLike(Input(gradientInputIndex));
722  g->template mutable_data<T>();
723  }
724 
725  auto accumulateFinalInputGradients = [&]() {
726  for (const auto& rg : recurrentGradients_) {
727  if (rg.lastExternalGrad.empty()) {
728  continue;
729  }
730  VLOG(1) << "Accumulating into: " << rg.grad << " from "
731  << rg.lastExternalGrad << " for final time step (sep. blob)";
732  auto gBlob = sharedWs_->GetBlob(rg.grad);
733  CAFFE_ENFORCE(gBlob);
734  auto* g = gBlob->template GetMutable<Tensor<Context>>();
735 
736  auto oglastBlob = sharedWs_->GetBlob(rg.lastExternalGrad);
737  CAFFE_ENFORCE(oglastBlob);
738  const auto& oglast = oglastBlob->template Get<Tensor<Context>>();
739  CAFFE_ENFORCE_EQ(g->dim(1), oglast.dim(1));
740  CAFFE_ENFORCE_EQ(g->dim(2), oglast.dim(2));
741 
742  const auto t = g->dim(0) - 1;
743  const auto timestep_size = g->size() / g->dim(0);
744  CAFFE_ENFORCE_EQ(timestep_size, oglast.size());
745  T* g_data_with_offset =
746  g->template mutable_data<T>() + t * timestep_size;
747  math::Add<T, Context>(
748  timestep_size,
749  oglast.template data<T>(),
750  g_data_with_offset,
751  g_data_with_offset,
752  &context_);
753  }
754  };
755 
756  accumulateFinalInputGradients();
757 
758  // Create shared blobs for blobs that can be shared between
759  // all timesteps.
760  if (stepWorkspaces.size() > 0) {
761  CreateSharedBlobs(stepWorkspaces[0], &sharedBlobsWs);
762  }
763  for (int32_t t = seqLen - 1; t >= 0; --t) {
764  if (rnnExecutor_) {
765  rnnExecutor_->EnsureTimestepInitialized(
766  t, stepWorkspaces[t].get(), this->observers_list_);
767  } else {
768  auto* stepNet = stepWorkspaces[t].get()->GetNet(stepNetDef_.name());
769  if (stepNet == nullptr) {
770  stepNet = stepWorkspaces[t].get()->CreateNet(stepNetDef_);
771  }
772  CAFFE_ENFORCE(stepNet);
773  stepNet->RunAsync();
774  }
775  }
776 
777  if (rnnExecutor_) {
778  rnnExecutor_->RunBackwards(seqLen);
779  }
780 
781  CAFFE_ENFORCE_EQ(recurrentInputIds_.size(), recurrentGradients_.size());
782  for (int i = 0; i < recurrentInputIds_.size(); ++i) {
783  // See GetRecurrentNetworkGradient to understand offseting here
784  // Outputs of the gradient are inputs of the forward pass.
785  // So we need to offset on all inputs that go before recurrent
786  // initial ones
787  auto outputIdx = i + params_.size() + numSequences_;
788  // because first gradInputs_.size() inputs are from GO
789  int inputId = recurrentInputIds_[i] + gradInputs_.size();
790  VLOG(1) << "Resetting output " << this->debug_def().output(outputIdx)
791  << " like input " << this->debug_def().input(inputId);
792  Output(outputIdx)->ResizeLike(Input(inputId));
793  T* output_data = Output(outputIdx)->template mutable_data<T>();
794  auto pBlob = sharedWs_->GetBlob(recurrentGradients_[i].grad);
795  CAFFE_ENFORCE(pBlob);
796  auto* p = pBlob->template GetMutable<Tensor<Context>>();
797 
798  if (Input(inputId).ndim() >= 2) {
799  // Gradient states blob should live. And if it gets changed by the
800  // backward pass, then output should be changed as well. Thus it should
801  // be okay to share data here
802  Output(outputIdx)->template ShareExternalPointer<T>(
803  p->template mutable_data<T>());
804  } else {
805  // We need to do a bunch of Adds any way. So lets not worry about
806  // copy / share data here. One way to speed this up could be a kernel
807  // which sums up several tensors together instead of going 1 by 1
808  const auto recurrentStateSize = Input(inputId).dim32(0);
809 
810  math::Set<T, Context>(
811  recurrentStateSize,
812  convert::To<float,T>(0.0),
813  output_data,
814  &context_);
815 
816  math::AddStripedBatch<T, Context>(
817  recurrentStateSize,
818  p->template data<T>(),
819  output_data,
820  recurrentStateSize,
821  batchSize,
822  &context_);
823  }
824  }
825 
826  return true;
827  }
828 
829  bool RunOnDevice() override {
830  return DoRunWithType<float>();
831  }
832 
833  protected:
834  NetDef stepNetDef_;
835  Workspace* sharedWs_;
836  bool enable_rnn_executor_;
837  std::unique_ptr<RecurrentNetworkExecutorBase> rnnExecutor_;
838  std::vector<detail::Link> links_;
839  std::vector<detail::Param> params_;
840  std::vector<detail::RecurrentGradient> recurrentGradients_;
841  std::string timestep_;
842  // For now we support only one input sequence
843  const int numSequences_{1};
844  std::vector<int32_t> recurrentInputIds_;
845  std::vector<int32_t> gradInputs_;
846 };
847 
848 template <class Context>
849 class AccumulateInputGradientOp : public Operator<Context> {
850  public:
851  AccumulateInputGradientOp(const OperatorDef& def, Workspace* ws)
852  : Operator<Context>(def, ws),
853  offset_(OperatorBase::GetSingleArgument<int>("offset", -1)) {
854  CAFFE_ENFORCE(offset_ >= 0, "Offset not set");
855  }
856  USE_OPERATOR_CONTEXT_FUNCTIONS;
857 
858  template<typename T>
859  bool DoRunWithType() {
860  const auto& t0 = OperatorBase::Input<Tensor<CPUContext>>(0);
861  const auto t = t0.template data<int32_t>()[0];
862  auto& og = Input(1);
863  auto* g = Output(0);
864 
865  T* g_data = g->template mutable_data<T>();
866  const auto timestep_size = g->size() / g->dim(0);
867 
868  CAFFE_ENFORCE(
869  (t + offset_) * timestep_size + timestep_size <= g->size(),
870  "Accumulation destination address over bounds");
871  CAFFE_ENFORCE(
872  t * timestep_size + timestep_size <= og.size(),
873  "Accumulation source address out of bounds");
874 
875  math::Add<T, Context>(
876  timestep_size,
877  og.template data<T>() + t * timestep_size,
878  g_data + (t + offset_) * timestep_size,
879  g_data + (t + offset_) * timestep_size,
880  &context_);
881  return true;
882  }
883 
884  bool RunOnDevice() override {
885  return DispatchHelper<TensorTypes<float>>::call(this, Input(1));
886  }
887 
888  private:
889  int offset_;
890 };
891 
892 template <class Context>
893 class RNNApplyLinkOp : public Operator<Context> {
894  public:
895  RNNApplyLinkOp(const OperatorDef& def, Workspace* ws)
896  : Operator<Context>(def, ws),
897  offset_(OperatorBase::GetSingleArgument<int>("offset", -1)),
898  window_(OperatorBase::GetSingleArgument<int>("window", -1)) {
899  CAFFE_ENFORCE(offset_ >= 0, "offset not set");
900  CAFFE_ENFORCE(window_ >= 0, "window not set");
901  }
902 
903  USE_OPERATOR_CONTEXT_FUNCTIONS;
904 
905  template <typename T>
906  bool DoRunWithType() {
907  // Both internal and external appear as both input and output to enforce
908  // correct dependency computation.
909  const auto& t0 = OperatorBase::Input<Tensor<CPUContext>>(0);
910  const auto t = t0.template data<int32_t>()[0];
911  auto& external = Input(1);
912 
913  auto* internal_out = Output(0);
914  auto* external_out = Output(1);
915 
916  CAFFE_ENFORCE_GT(external.size(), 0);
917  const TIndex externalTimestepSize = external.size() / external.dim(0);
918  auto* externalData = external_out->template mutable_data<T>() +
919  (t + offset_) * externalTimestepSize;
920  auto internalDims = external_out->dims();
921  internalDims[0] = window_;
922 
923  internal_out->Resize(internalDims);
924  internal_out->ShareExternalPointer(
925  externalData, externalTimestepSize * window_);
926  return true;
927  }
928 
929  bool RunOnDevice() override {
930  return DoRunWithType<float>();
931  }
932 
933  private:
934  int offset_;
935  int window_;
936 };
937 
938 } // namespace caffe2
939 
940 #endif // CAFFE2_OPERATORS_RECURRENT_NETWORK_OP_H_
void AddGradientInputAccumulationOps(const OperatorDef &operator_def)
Blob * CreateBlob(const string &name)
Creates a blob of the given name.
Definition: workspace.cc:120
void initializeBlobsToRecomputeOnBackward(Workspace *sharedBlobsWs)
Some blobs can be marked as to be recomputed on backward pass.
A helper class to index into arguments.
Definition: proto_utils.h:198
Workspace is a class that holds all the related objects created during runtime: (1) all blobs...
Definition: workspace.h:63
const Blob * GetBlob(const string &name) const
Gets the blob with the given name as a const pointer.
Definition: workspace.cc:180
Copyright (c) 2016-present, Facebook, Inc.
T * GetMutable(bool *is_new_object=nullptr)
Gets a mutable pointer to the stored object.
Definition: blob.h:117
void CreateSharedBlobs(const std::shared_ptr< Workspace > &step0Ws, Workspace *sharedBlobsWs)