Caffe2 - C++ API
A deep learning, cross platform ML framework
dataset_ops.cc
1 
17 #include "caffe2/operators/dataset_ops.h"
18 
19 #include <memory>
20 #include <mutex>
21 #include <string>
22 #include <vector>
23 #include "caffe2/core/blob_serialization.h"
24 #include "caffe2/core/operator.h"
25 #include "caffe2/core/tensor.h"
26 #include "caffe2/utils/string_utils.h"
27 
28 namespace caffe2 {
29 
30 CAFFE_KNOWN_TYPE(std::unique_ptr<dataset_ops::TreeCursor>);
31 CAFFE_KNOWN_TYPE(dataset_ops::TensorVectorPtr<CPUContext>);
32 CAFFE_KNOWN_TYPE(dataset_ops::SharedTensorVectorPtr);
33 
34 namespace dataset_ops {
35 namespace {
36 
37 const char kDatasetFieldSeparator = ':';
38 const char* kDatasetLengthField = "lengths";
39 
40 // how much percent to grow the dataset when needed
41 const int kDatasetGrowthPct = 40;
42 
43 } // namespace
44 
45 TreeIterator::TreeIterator(const std::vector<std::string>& fields) {
46  // populate field vector and split field names
47  fields_.resize(fields.size());
48  std::vector<std::vector<std::string>> nameParts(fields_.size());
49  for (int i = 0; i < fields.size(); ++i) {
50  auto& field = fields_.at(i);
51  field.name = fields[i];
52  field.id = i;
53  field.lengthFieldId = -1;
54  nameParts.at(i) = split(kDatasetFieldSeparator, field.name);
55  }
56 
57  // populate lengthFields
58  for (const auto& field : fields_) {
59  const auto& parts = nameParts.at(field.id);
60  if (!parts.empty() && parts.back() == kDatasetLengthField) {
61  lengthFieldIds_.push_back(field.id);
62  }
63  }
64 
65  // find length-field with maximum prefix matching for each field
66  for (auto& field : fields_) {
67  // by default, we are matching against the root domain
68  int maxMatchLevel = 1;
69  int maxMatchLengthFieldId = -1;
70  for (int j = 0; j < numLengthFields(); ++j) {
71  const auto& lenField = lengthField(j);
72  // a length field can't have itself as its length field
73  if (field.id == lenField.id) {
74  continue;
75  }
76  auto lf = nameParts.at(lenField.id);
77  auto lfEnd = lf.end() - 1;
78  // check whether this lengthField is a prefix for this field name
79  if (std::mismatch(lf.begin(), lfEnd, nameParts.at(field.id).begin())
80  .first != lfEnd) {
81  continue;
82  }
83  if (lf.size() > maxMatchLevel) {
84  maxMatchLevel = lf.size();
85  maxMatchLengthFieldId = j;
86  }
87  }
88  field.lengthFieldId = maxMatchLengthFieldId;
89  }
90 
91  // check that fields are topologically sorted
92  // (no length field depends on a length defined afterwards)
93  for (const auto& field : fields_) {
94  const auto* lengthField = lengthFieldFor(field);
95  CAFFE_ENFORCE(
96  (lengthField == nullptr) || (lengthField->id < field.id),
97  "Error: Field ",
98  field.id,
99  " (",
100  field.name,
101  ") ",
102  "depends on a field defined afterwards: ",
103  lengthField->id,
104  " (",
105  lengthField->name,
106  ").");
107  }
108 }
109 
110 void TreeIterator::advance(
111  const std::vector<const TLength*>& lengths,
112  std::vector<TOffset>& offsets,
113  std::vector<TOffset>& sizes,
114  std::vector<TOffset>& limits,
115  TOffset num) {
116  std::vector<TOffset> newOffsets;
117  CAFFE_ENFORCE_EQ(lengths.size(), numLengthFields());
118  CAFFE_ENFORCE_EQ(offsets.size(), numOffsetFields());
119  sizes.resize(offsets.size());
120  newOffsets.resize(offsets.size());
121  // first index, top level
122  {
123  auto limit = limits[0];
124  auto offset = offsets[0];
125  CAFFE_ENFORCE(limit >= offset, "Tried to advance past end of cursor.");
126  TOffset total = std::min(limit - offset, num);
127  sizes[0] = total;
128  newOffsets[0] = offset + total;
129  }
130  // child indices
131  for (int j = 1; j < numOffsetFields(); ++j) {
132  TOffset total = 0;
133  int parentOffsetId = offsetFieldIdFor(lengthField(j - 1));
134  const TLength* length = lengths[j - 1] + offsets[parentOffsetId];
135  for (int k = 0; k < sizes[parentOffsetId]; ++k) {
136  total += *(length++);
137  }
138  auto offset = offsets[j];
139  CAFFE_ENFORCE(
140  offset + total <= limits[j],
141  "Inconsistent field length: ",
142  "tried to advance past the end of field ",
143  j);
144  sizes[j] = total;
145  newOffsets[j] = offset + total;
146  }
147  offsets = newOffsets;
148 }
149 
150 TreeWalker::TreeWalker(const vector<const Blob*>& inputs, TreeCursor& cursor)
151  : inputs_(inputs), cursor_(cursor), sizes_(cursor.it.numOffsetFields()) {
152  CAFFE_ENFORCE_EQ(inputs.size(), cursor.it.fields().size());
153  if (cursor.offsets.empty()) {
154  cursor.offsets.assign(cursor.it.numOffsetFields(), 0);
155  }
156 
157  for (int fieldId = 0; fieldId < cursor_.it.fields().size(); ++fieldId) {
158  fields_.emplace_back(*this, fieldId);
159  }
160 
161  gatherLengthData();
162 
163  gatherSizeLimits();
164 
165  // The invariant we hold is that we are always one step ahead
166  advance();
167 }
168 
169 void TreeWalker::advance() {
170  prevOffsets_ = cursor_.offsets;
171  cursor_.it.advance(lengths_, cursor_.offsets, sizes_, limits_, 1);
172 }
173 
174 std::vector<TIndex> TreeWalker::fieldDim(int fieldId) const {
175  auto tensorDim = input(fieldId).dims();
176  tensorDim[0] = sizes_[lengthIdx(fieldId)];
177  return tensorDim;
178 }
179 
180 void* TreeWalker::fieldPtr(int fieldId) const {
181  auto& in = input(fieldId);
182  return (char*)in.raw_data() +
183  offset(fieldId) * in.size_from_dim(1) * in.meta().itemsize();
184 }
185 
186 void TreeWalker::gatherLengthData() {
187  static const TLength lenZero = 0;
188  lengths_.resize(cursor_.it.numLengthFields());
189  for (int i = 0; i < lengths_.size(); ++i) {
190  auto& in = input(cursor_.it.lengthField(i).id);
191  if (in.size() > 0) {
192  lengths_[i] = in.data<int>();
193  } else {
194  lengths_[i] = &lenZero;
195  }
196  }
197 }
198 
199 void TreeWalker::gatherSizeLimits() {
200  limits_.assign(sizes_.size(), std::numeric_limits<TOffset>::max());
201  for (auto fieldId = 0; fieldId < cursor_.it.fields().size(); ++fieldId) {
202  auto lengthFieldIdx = lengthIdx(fieldId);
203  limits_[lengthFieldIdx] =
204  std::min(limits_[lengthFieldIdx], (TOffset)input(fieldId).dims()[0]);
205  }
206 }
207 
208 namespace {
209 
210 class CreateTreeCursorOp : public Operator<CPUContext> {
211  public:
212  CreateTreeCursorOp(const OperatorDef& operator_def, Workspace* ws)
213  : Operator(operator_def, ws),
214  fields_(OperatorBase::GetRepeatedArgument<std::string>("fields")) {}
215 
216  bool RunOnDevice() override {
217  *OperatorBase::Output<std::unique_ptr<TreeCursor>>(0) =
218  std::unique_ptr<TreeCursor>(new TreeCursor(TreeIterator(fields_)));
219  return true;
220  }
221 
222  private:
223  std::vector<std::string> fields_;
224 };
225 
226 class ResetCursorOp : public Operator<CPUContext> {
227  public:
228  ResetCursorOp(const OperatorDef& operator_def, Workspace* ws)
229  : Operator(operator_def, ws) {}
230 
231  bool RunOnDevice() override {
232  auto& cursor = OperatorBase::Input<std::unique_ptr<TreeCursor>>(0);
233  std::lock_guard<std::mutex> lock(cursor->mutex_);
234  cursor->offsets.clear();
235  return true;
236  }
237 };
238 
239 class CheckDatasetConsistencyOp : public Operator<CPUContext> {
240  public:
241  CheckDatasetConsistencyOp(const OperatorDef& operator_def, Workspace* ws)
242  : Operator(operator_def, ws),
243  iterator_(OperatorBase::GetRepeatedArgument<std::string>("fields")) {}
244 
245  bool RunOnDevice() override {
246  std::vector<const TLength*> lengths;
247  std::vector<TOffset> limits;
248  std::vector<TOffset> sizes;
249  std::vector<TOffset> offsets;
250  CAFFE_ENFORCE(
251  InputSize() == iterator_.fields().size(),
252  "Invalid number of fields. Expected ",
253  iterator_.fields().size(),
254  ", got ",
255  InputSize());
256  sizes.resize(iterator_.numOffsetFields());
257  // gather length data
258  lengths.resize(iterator_.numLengthFields());
259  for (int i = 0; i < lengths.size(); ++i) {
260  lengths[i] = Input(iterator_.lengthField(i).id).data<TLength>();
261  }
262  // gather size limits
263  limits.assign(sizes.size(), std::numeric_limits<TOffset>::max());
264  for (int i = 0; i < iterator_.fields().size(); ++i) {
265  int lengthIdx = iterator_.fields()[i].lengthFieldId + 1;
266  CAFFE_ENFORCE_GT(Input(i).ndim(), 0);
267  TOffset size = (TOffset)Input(i).dims()[0];
268  if (limits[lengthIdx] == std::numeric_limits<TOffset>::max()) {
269  limits[lengthIdx] = size;
270  } else {
271  CAFFE_ENFORCE(
272  limits[lengthIdx] == size,
273  "Inconsistent sizes for fields belonging to same domain.",
274  " Field: ",
275  i,
276  " (",
277  iterator_.fields()[i].name,
278  "); Length field index: ",
279  lengthIdx,
280  "); Previous size: ",
281  limits[lengthIdx],
282  "; New size: ",
283  size);
284  }
285  }
286  // advance to the end
287  offsets.assign(sizes.size(), 0);
288  iterator_.advance(lengths, offsets, sizes, limits, limits[0]);
289  for (int i = 0; i < limits.size(); ++i) {
290  CAFFE_ENFORCE(limits[i] == offsets[i]);
291  }
292  return true;
293  }
294 
295  private:
296  TreeIterator iterator_;
297 };
298 
299 class PackRecordsOp : public Operator<CPUContext> {
300  public:
301  PackRecordsOp(const OperatorDef& operator_def, Workspace* ws)
302  : Operator(operator_def, ws),
303  fields_(OperatorBase::GetRepeatedArgument<std::string>("fields")) {}
304 
305  bool RunOnDevice() override {
306  // There should be one input per field
307  CAFFE_ENFORCE_EQ(InputSize(), fields_.size());
308  CAFFE_ENFORCE_EQ(OutputSize(), 1);
309 
310  TreeCursor cursor((TreeIterator(fields_)));
311 
312  TreeWalker walker(Inputs(), cursor);
313 
314  Output(0)->Resize(walker.size());
315 
316  // Output(0)->raw_mutable_data(TypeMeta::Make<SharedTensorVectorPtr>()));
317  auto* dst = Output(0)->mutable_data<SharedTensorVectorPtr>();
318 
319  for (int batchId = 0; batchId < walker.size(); ++batchId) {
320  dst[batchId] = std::make_shared<std::vector<TensorCPU>>();
321  dst[batchId]->reserve(walker.fields().size());
322 
323  for (const auto& field : walker.fields()) {
324  dst[batchId]->emplace_back(field.dim());
325  auto& tensor = dst[batchId]->back();
326  context_.template CopyItems<CPUContext, CPUContext>(
327  field.meta(),
328  tensor.size(),
329  field.ptr() /* src */,
330  tensor.raw_mutable_data(field.meta()) /* dst */);
331  }
332 
333  walker.advance();
334  }
335 
336  return true;
337  }
338 
339  private:
340  std::vector<std::string> fields_;
341 };
342 
343 class UnPackRecordsOp : public Operator<CPUContext> {
344  public:
345  UnPackRecordsOp(const OperatorDef& operator_def, Workspace* ws)
346  : Operator(operator_def, ws),
347  fields_(OperatorBase::GetRepeatedArgument<std::string>("fields")) {}
348 
349  bool RunOnDevice() override {
350  const auto* inputs = Input(0).template data<SharedTensorVectorPtr>();
351  const auto numRows = Input(0).size();
352 
353  CAFFE_ENFORCE_GE(numRows, 0);
354 
355  auto numTensors = OutputSize();
356 
357  // Precomputer the output sizes to avoid resizing
358  std::vector<std::vector<TIndex>> outputDims(numTensors);
359  std::vector<const TypeMeta*> metas(numTensors);
360 
361  CAFFE_ENFORCE(
362  numRows > 0 || InputSize() > 1,
363  "Unpacking empty record without shape will leave output blobs in "
364  "undefined state.");
365 
366  if (InputSize() == 1) {
367  getShapeAndMetaFromInput(outputDims, metas);
368  } else {
369  getShapeAndMetaFromPrototypeBlobs(outputDims, metas);
370  }
371 
372  for (int i = 0; i < numRows; ++i) {
373  CAFFE_ENFORCE(inputs[i]);
374  for (int j = 0; j < inputs[i]->size(); ++j) {
375  const auto& input = inputs[i]->at(j);
376 
377  // Checks to ensure that dimensions/sizes match
378  CAFFE_ENFORCE_EQ(outputDims[j].size(), input.ndim());
379  CAFFE_ENFORCE(*metas[j] == input.meta());
380  // We look from first dimension, because we concat on the first.
381  for (int k = 1; k < input.ndim(); ++k) {
382  CAFFE_ENFORCE_EQ(input.dims()[k], outputDims[j][k]);
383  }
384 
385  outputDims[j][0] += input.dim(0);
386  }
387  }
388 
389  // Resize to the final output size
390  std::vector<void*> destinations(numTensors);
391  for (int i = 0; i < numTensors; ++i) {
392  Output(i)->Resize(outputDims[i]);
393  destinations[i] = Output(i)->raw_mutable_data(*metas[i]);
394  }
395 
396  for (int i = 0; i < numRows; ++i) {
397  for (int j = 0; j < numTensors; ++j) {
398  const auto& input = inputs[i]->at(j);
399 
400  context_.CopyItems<CPUContext, CPUContext>(
401  *metas[j],
402  input.size(),
403  input.raw_data() /* src */,
404  destinations[j] /* dst */
405  );
406 
407  destinations[j] =
408  (char*)destinations[j] + input.size() * input.itemsize();
409  }
410  }
411 
412  return true;
413  }
414 
415  private:
416  void getShapeAndMetaFromInput(
417  std::vector<std::vector<TIndex>>& outputDims,
418  std::vector<const TypeMeta*>& metas) {
419  const auto* inputs = Input(0).template data<SharedTensorVectorPtr>();
420 
421  const auto& inputZero = inputs[0];
422  CAFFE_ENFORCE(inputZero);
423 
424  const auto numTensors = inputZero->size();
425 
426  CAFFE_ENFORCE_EQ(numTensors, fields_.size());
427  CAFFE_ENFORCE_EQ(numTensors, OutputSize());
428 
429  for (int i = 0; i < numTensors; ++i) {
430  outputDims[i] = inputZero->at(i).dims();
431  outputDims[i][0] = 0;
432  metas[i] = &inputZero->at(i).meta();
433  }
434  }
435 
436  void getShapeAndMetaFromPrototypeBlobs(
437  std::vector<std::vector<TIndex>>& outputDims,
438  std::vector<const TypeMeta*>& metas) {
439  const auto numTensors = fields_.size();
440  CAFFE_ENFORCE_EQ(numTensors, InputSize() - 1);
441  CAFFE_ENFORCE_EQ(numTensors, OutputSize());
442  for (int i = 0; i < numTensors; ++i) {
443  const auto& input = Input(i + 1);
444  outputDims[i] = input.dims();
445  outputDims[i][0] = 0;
446  metas[i] = &input.meta();
447  }
448  }
449 
450  std::vector<std::string> fields_;
451 };
452 
453 class ReadNextBatchOp : public Operator<CPUContext> {
454  public:
455  ReadNextBatchOp(const OperatorDef& operator_def, Workspace* ws)
456  : Operator(operator_def, ws),
457  batchSize_(OperatorBase::GetSingleArgument<int>("batch_size", 1)),
458  enforceBatchSize_(OperatorBase::GetSingleArgument<bool>(
459  "enforce_batch_size",
460  false)) {}
461 
462  bool RunOnDevice() override {
463  auto& cursor = OperatorBase::Input<std::unique_ptr<TreeCursor>>(0);
464  CAFFE_ENFORCE(InputSize() == cursor->it.fields().size() + 1);
465  std::vector<const TLength*> lengths;
466  std::vector<TOffset> limits;
467  std::vector<TOffset> sizes;
468  std::vector<TOffset> offsets;
469  TLength lenZero = 0;
470  sizes.resize(cursor->it.numOffsetFields());
471  // gather length data
472  lengths.resize(cursor->it.numLengthFields());
473  for (int i = 0; i < lengths.size(); ++i) {
474  auto& a = Input(cursor->it.lengthField(i).id + 1);
475  if (a.size() > 0) {
476  lengths[i] = a.data<int>();
477  } else {
478  lengths[i] = &lenZero;
479  }
480  }
481  // gather size limits
482  limits.assign(sizes.size(), std::numeric_limits<TOffset>::max());
483  for (int i = 0; i < cursor->it.fields().size(); ++i) {
484  int lengthFieldIdx = cursor->it.fields()[i].lengthFieldId + 1;
485  limits[lengthFieldIdx] =
486  std::min(limits[lengthFieldIdx], (TOffset)Input(i + 1).dims()[0]);
487  }
488  // advance cursor
489  {
490  std::lock_guard<std::mutex> lock(cursor->mutex_);
491  if (cursor->offsets.empty()) {
492  cursor->offsets.assign(sizes.size(), 0);
493  }
494  offsets = cursor->offsets;
495  cursor->it.advance(lengths, cursor->offsets, sizes, limits, batchSize_);
496  if (enforceBatchSize_ && sizes[0] < batchSize_) {
497  // if we enforce batch_size but don't have enough rows left to
498  // complete a full batch, return empty for all columns.
499  // This signals end of dataset to the caller.
500  sizes.assign(sizes.size(), 0);
501  }
502  }
503  // gather data
504  std::vector<TIndex> outDim;
505  for (int i = 0; i < cursor->it.fields().size(); ++i) {
506  auto lengthIdx = cursor->it.fields()[i].lengthFieldId + 1;
507  auto size = sizes[lengthIdx];
508  auto offset = offsets[lengthIdx];
509  auto& in = Input(i + 1);
510  auto innerSize = in.size_from_dim(1);
511  outDim = in.dims();
512  outDim[0] = size;
513  auto* out = Output(i);
514  out->Resize(outDim);
515  void* src =
516  (char*)in.raw_data() + offset * innerSize * in.meta().itemsize();
517  void* dst = out->raw_mutable_data(in.meta()); // create the tensor
518  if (out->size() == 0) {
519  continue;
520  }
521  context_.template CopyItems<CPUContext, CPUContext>(
522  in.meta(), out->size(), src, dst);
523  }
524  return true;
525  }
526  int batchSize_;
527  bool enforceBatchSize_;
528 };
529 
530 class ComputeOffsetOp : public Operator<CPUContext> {
531  public:
532  ComputeOffsetOp(const OperatorDef& operator_def, Workspace* ws)
533  : Operator(operator_def, ws) {}
534 
535  bool RunOnDevice() override {
536  auto& cursor = OperatorBase::Input<std::unique_ptr<TreeCursor>>(0);
537  CAFFE_ENFORCE(InputSize() == cursor->it.fields().size() + 1);
538  auto* out = Output(0);
539  std::vector<const TLength*> lengths;
540  std::vector<TOffset> limits;
541  std::vector<TOffset> sizes;
542  std::vector<TOffset> offsets;
543  TLength lenZero = 0;
544  sizes.resize(cursor->it.numOffsetFields());
545  // gather length data
546  lengths.resize(cursor->it.numLengthFields());
547  for (int i = 0; i < lengths.size(); ++i) {
548  auto& a = Input(cursor->it.lengthField(i).id + 1);
549  if (a.size() > 0) {
550  lengths[i] = a.data<int>();
551  } else {
552  lengths[i] = &lenZero;
553  }
554  }
555  // gather size limits
556  limits.assign(sizes.size(), std::numeric_limits<TOffset>::max());
557  for (int i = 0; i < cursor->it.fields().size(); ++i) {
558  int lengthFieldIdx = cursor->it.fields()[i].lengthFieldId + 1;
559  limits[lengthFieldIdx] =
560  std::min(limits[lengthFieldIdx], (TOffset)Input(i + 1).dims()[0]);
561  }
562  out->Resize(limits.at(0) + 1, sizes.size());
563  auto* out_data = out->mutable_data<int64_t>();
564  for (int k = 0; k <= limits.at(0); k++) {
565  // advance cursor
566  if (cursor->offsets.empty()) {
567  cursor->offsets.assign(sizes.size(), 0);
568  }
569  // write output
570  std::copy(cursor->offsets.begin(), cursor->offsets.end(), out_data);
571  out_data += sizes.size();
572  cursor->it.advance(lengths, cursor->offsets, sizes, limits, 1);
573  }
574  cursor->offsets.assign(sizes.size(), 0); // reSet after getting meta info
575  return true;
576  }
577 };
578 
579 class SortAndShuffleOp : public Operator<CPUContext> {
580  public:
581  SortAndShuffleOp(const OperatorDef& operator_def, Workspace* ws)
582  : Operator(operator_def, ws),
583  sort_by_field_idx_(
584  OperatorBase::GetSingleArgument<int>("sort_by_field_idx", 1)),
585  batch_size_(OperatorBase::GetSingleArgument<int>("batch_size", 1)),
586  shuffle_size_(OperatorBase::GetSingleArgument<int>("shuffle_size", 1)) {
587  }
588 
589  bool RunOnDevice() override {
590  auto& cursor = OperatorBase::Input<std::unique_ptr<TreeCursor>>(0);
591  CAFFE_ENFORCE(InputSize() == cursor->it.fields().size() + 1);
592  CAFFE_ENFORCE(-1 <= sort_by_field_idx_);
593  CAFFE_ENFORCE(cursor->it.fields().size() - sort_by_field_idx_ > 0);
594  int size;
595  if (sort_by_field_idx_ != -1) {
596  size = Input(sort_by_field_idx_ + 1).dims()[0];
597  } else {
598  size = Input(1).dims()[0];
599  }
600 
601  CAFFE_ENFORCE(
602  batch_size_ > 0 && shuffle_size_ > 0 &&
603  0 < batch_size_ * shuffle_size_);
604  // adjust shuffle_size_ if it is too large
605  if (batch_size_ * shuffle_size_ > size) {
606  shuffle_size_ = size / batch_size_;
607  }
608 
609  int num_batch = size / batch_size_;
610  auto* out = Output(0);
611  out->Resize(size);
612  auto* out_data = out->mutable_data<int64_t>();
613 
614  vector<int> shuffle_idx(size);
615  iota(shuffle_idx.begin(), shuffle_idx.end(), 0);
616 
617  if (sort_by_field_idx_ != -1) {
618  auto& sortblob = Input(sort_by_field_idx_ + 1);
619  auto* sortdata = sortblob.data<int>();
620  // must sort by a field at the root level
621  CAFFE_ENFORCE(
622  cursor->it.fields()[sort_by_field_idx_].lengthFieldId == -1);
623  sort(shuffle_idx.begin(), shuffle_idx.end(), [&sortdata](int i1, int i2) {
624  return sortdata[i1] < sortdata[i2];
625  });
626  }
627 
628  if (batch_size_ * shuffle_size_ > 1) {
629  int offset = 0;
630  while (offset + batch_size_ * shuffle_size_ < size) {
631  std::shuffle(
632  shuffle_idx.begin() + offset,
633  shuffle_idx.begin() + offset + batch_size_ * shuffle_size_,
634  std::default_random_engine());
635  offset += batch_size_ * shuffle_size_;
636  }
637  }
638 
639  vector<int> batch_idx(num_batch);
640  iota(batch_idx.begin(), batch_idx.end(), 0);
641  std::shuffle(
642  batch_idx.begin(), batch_idx.end(), std::default_random_engine());
643 
644  for (int i = 0; i < num_batch; i++) {
645  std::copy(
646  shuffle_idx.begin() + batch_idx[i] * batch_size_,
647  shuffle_idx.begin() + (batch_idx[i] + 1) * batch_size_,
648  out_data);
649  out_data += batch_size_;
650  }
651  std::copy(
652  shuffle_idx.begin() + num_batch * batch_size_,
653  shuffle_idx.end(),
654  out_data);
655 
656  return true;
657  }
658 
659  int sort_by_field_idx_;
660  int batch_size_;
661  int shuffle_size_;
662 };
663 
664 class ReadRandomBatchOp : public Operator<CPUContext> {
665  public:
666  ReadRandomBatchOp(const OperatorDef& operator_def, Workspace* ws)
667  : Operator(operator_def, ws),
668  batchSize_(OperatorBase::GetSingleArgument<int>("batch_size", 1)),
669  enforceBatchSize_(
670  OperatorBase::GetSingleArgument<bool>("enforce_batch_size", false)),
671  loopOver_(OperatorBase::GetSingleArgument<bool>("loop_over", false)) {}
672  bool RunOnDevice() override {
673  auto& cursor = OperatorBase::Input<std::unique_ptr<TreeCursor>>(0);
674  auto& idxblob = Input(1);
675  auto& offsetsmat = Input(2);
676  CAFFE_ENFORCE(InputSize() == cursor->it.fields().size() + 3);
677  auto idxvec = idxblob.template data<int64_t>();
678  auto& offsetdim = offsetsmat.dims();
679  // gather data
680  std::vector<TIndex> outDim;
681  int64_t idx;
682  {
683  std::lock_guard<std::mutex> lock(cursor->mutex_);
684  cursor->offsets.resize(1);
685  idx = cursor->offsets.at(0);
686  // if we want to enforce batch size but we dont have a complete
687  // batch, skip the last rows.
688  if (enforceBatchSize_ && idx + batchSize_ > idxblob.size()) {
689  idx = idxblob.size();
690  }
691  if (loopOver_ && idx >= idxblob.size()) {
692  cursor->offsets.at(0) = 0;
693  idx = 0;
694  }
695  cursor->offsets.at(0) += batchSize_;
696  }
697 
698  for (int i = 0; i < cursor->it.fields().size(); ++i) {
699  auto lengthIdx = cursor->it.fields()[i].lengthFieldId + 1;
700  auto& in = Input(i + 3);
701  outDim = in.dims();
702  outDim.at(0) = 0;
703  auto idxbegin = idx;
704  for (int j = 0; j < batchSize_; ++j) {
705  if (idx >= idxblob.size()) {
706  break;
707  }
708  CAFFE_ENFORCE(
709  (idxvec[idx] + 1) * offsetdim[1] + lengthIdx < offsetsmat.size(),
710  "Out of bound when trying to get elem from offsetsmat");
711  auto offsetptr = offsetsmat.template data<TOffset>() +
712  idxvec[idx] * offsetdim[1] + lengthIdx;
713  auto offset = *offsetptr;
714  auto size = *(offsetptr + offsetdim[1]) - offset;
715  outDim.at(0) += size; // accumulate over the batch
716  idx++;
717  }
718  idx = idxbegin; // reSet
719  auto* out = Output(i);
720  out->Resize(outDim);
721  if (out->size() == 0) {
722  continue;
723  }
724  auto dst = static_cast<char*>(out->raw_mutable_data(in.meta()));
725  int block_size = in.size() / in.dim(0);
726  auto block_bytesize = in.size_from_dim(1) * in.meta().itemsize();
727  CAFFE_ENFORCE(
728  block_bytesize == in.nbytes() / in.dim(0),
729  "block_bytesize should be consistent with data dim");
730  auto src_base = static_cast<const char*>(in.raw_data());
731  int start = 0;
732  for (int j = 0; j < batchSize_; ++j) {
733  if (idx >= idxblob.size()) {
734  break;
735  }
736  auto offsetptr = offsetsmat.template data<TOffset>() +
737  idxvec[idx] * offsetdim[1] + lengthIdx;
738  auto offset = *offsetptr;
739  auto size = *(offsetptr + offsetdim[1]) - offset;
740  // copy data
741  auto src = src_base + offset * block_bytesize;
742  context_.template CopyItems<CPUContext, CPUContext>(
743  in.meta(), size * block_size, src, dst + start * block_bytesize);
744  start += size;
745  idx++;
746  }
747  idx = idxbegin; // reSet
748  }
749  return true;
750  }
751  int batchSize_;
752  bool enforceBatchSize_;
753  bool loopOver_;
754 };
755 
756 template <class Context>
757 class AppendOp final : public Operator<Context> {
758  public:
759  USE_OPERATOR_CONTEXT_FUNCTIONS;
760  AppendOp(const OperatorDef& operator_def, Workspace* ws)
761  : Operator<Context>(operator_def, ws) {}
762 
763  bool RunOnDevice() override {
764  auto& a = Input(0);
765  auto& b = Input(1);
766  auto* c = Output(0);
767  CAFFE_ENFORCE(b.ndim() >= 1);
768  if (a.size() == 0 && a.dim(0) == 0) {
769  c->CopyFrom(b);
770  return true;
771  }
772  CAFFE_ENFORCE(&a == c, "First argument must be in-place.");
773  CAFFE_ENFORCE(c->ndim() == b.ndim());
774  CAFFE_ENFORCE(b.ndim() == c->ndim());
775  CAFFE_ENFORCE(a.meta() == b.meta());
776  for (int i = 1; i < a.ndim(); ++i) {
777  CAFFE_ENFORCE(a.dims()[i] == b.dims()[i]);
778  }
779  auto oldSize = c->size();
780  c->Extend(b.dims()[0], kDatasetGrowthPct, &context_);
781  auto* dst = (char*)c->raw_mutable_data() + oldSize * b.meta().itemsize();
782  context_.template CopyItems<Context, Context>(
783  b.meta(), b.size(), b.raw_data(), dst);
784  return true;
785  }
786 };
787 
788 template <class Context>
789 class AtomicAppendOp final : public Operator<Context> {
790  public:
791  USE_OPERATOR_CONTEXT_FUNCTIONS;
792  AtomicAppendOp(const OperatorDef& operator_def, Workspace* ws)
793  : Operator<Context>(operator_def, ws) {}
794 
795  bool RunOnDevice() override {
796  auto& mutex = OperatorBase::Input<std::unique_ptr<std::mutex>>(0);
797  const auto numFields = (InputSize() - 1) / 2;
798  CAFFE_ENFORCE(OutputSize() == numFields);
799 
800  std::lock_guard<std::mutex> guard(*mutex);
801 
802  // 1: checks
803  for (int i = 0; i < numFields; ++i) {
804  auto& a = Input(1 + i);
805  auto& b = Input(1 + i + numFields);
806  auto* c = Output(i);
807  CAFFE_ENFORCE(b.ndim() >= 1);
808  if (a.size() == 0) {
809  continue;
810  }
811  CAFFE_ENFORCE(
812  (void*)&a == (void*)c, "Appended-to arguments must be in-place.");
813  CAFFE_ENFORCE(c->ndim() == b.ndim());
814  CAFFE_ENFORCE(b.ndim() == c->ndim());
815  CAFFE_ENFORCE(a.meta() == b.meta());
816  for (int j = 1; j < a.ndim(); ++j) {
817  CAFFE_ENFORCE(a.dims()[j] == b.dims()[j]);
818  }
819  }
820 
821  // 2: copies
822  for (int i = 0; i < numFields; ++i) {
823  auto& a = Input(1 + i);
824  auto& b = Input(1 + i + numFields);
825  auto* c = Output(i);
826  if (a.size() == 0 && a.dim(0) == 0) {
827  c->CopyFrom(b);
828  continue;
829  }
830  auto oldSize = c->size();
831  c->Extend(b.dims()[0], kDatasetGrowthPct, &context_);
832  auto* dst = (char*)c->raw_mutable_data() + oldSize * b.meta().itemsize();
833  context_.template CopyItems<Context, Context>(
834  b.meta(), b.size(), b.raw_data(), dst);
835  }
836  return true;
837  }
838 };
839 
840 template <class Context>
841 class CreateTensorVectorOp final : public Operator<Context> {
842  public:
843  USE_OPERATOR_CONTEXT_FUNCTIONS;
844  using Operator<Context>::Operator;
845 
846  bool RunOnDevice() override {
847  auto ptr = make_unique<std::vector<Tensor<Context>>>();
848  *OperatorBase::Output<TensorVectorPtr<Context>>(TENSOR_VECTOR) =
849  std::move(ptr);
850  return true;
851  }
852 
853  private:
854  OUTPUT_TAGS(TENSOR_VECTOR);
855 };
856 
857 template <class Context>
858 class TensorVectorSizeOp final : public Operator<Context> {
859  public:
860  USE_OPERATOR_CONTEXT_FUNCTIONS;
861  USE_SIMPLE_CTOR_DTOR(TensorVectorSizeOp);
862 
863  bool RunOnDevice() override {
864  auto& vector_ptr =
865  OperatorBase::Input<TensorVectorPtr<Context>>(TENSOR_VECTOR);
866  auto* size = Output(SIZE);
867  size->Resize();
868  // 32-bit should be enough here
869  *size->template mutable_data<int32_t>() = vector_ptr->size();
870  return true;
871  }
872 
873  private:
874  INPUT_TAGS(TENSOR_VECTOR);
875  OUTPUT_TAGS(SIZE);
876 };
877 
878 template <class Context>
879 class ConcatTensorVectorOp final : public Operator<Context> {
880  public:
881  USE_OPERATOR_CONTEXT_FUNCTIONS;
882  using Operator<Context>::Operator;
883 
884  bool RunOnDevice() override {
885  const TensorVectorPtr<Context>& tensorVector =
886  OperatorBase::Input<TensorVectorPtr<Context>>(TENSOR_VECTOR);
887 
888  auto* tensor = Output(TENSOR);
889  CAFFE_ENFORCE(!tensorVector->empty());
890 
891  vector<TIndex> outputDims(tensorVector->at(0).dims());
892  CAFFE_ENFORCE(outputDims.size() > 0);
893  for (int i = 1; i < tensorVector->size(); i++) {
894  // the tensor shapes are the same except for the first dimension
895  for (int j = 1; j < tensorVector->at(i).ndim(); j++) {
896  CAFFE_ENFORCE(outputDims[j] == tensorVector->at(i).dims()[j]);
897  }
898  CAFFE_ENFORCE(tensorVector->at(0).meta() == tensorVector->at(i).meta());
899  outputDims[0] += tensorVector->at(i).dims()[0];
900  }
901 
902  tensor->Resize(outputDims);
903  TIndex offset = 0;
904  auto* dst = (char*)tensor->raw_mutable_data(tensorVector->at(0).meta());
905 
906  for (const auto& t : *tensorVector) {
907  context_.template CopyItems<Context, Context>(
908  t.meta(), t.size(), t.raw_data(), dst + offset);
909  offset += t.nbytes();
910  }
911 
912  return true;
913  }
914 
915  private:
916  INPUT_TAGS(TENSOR_VECTOR);
917  OUTPUT_TAGS(TENSOR);
918 };
919 
920 template <class Context>
921 class CollectTensorOp final : public Operator<Context> {
922  public:
923  USE_OPERATOR_CONTEXT_FUNCTIONS;
924  CollectTensorOp(const OperatorDef& operator_def, Workspace* ws)
925  : Operator<Context>(operator_def, ws),
926  numToCollect_(
927  OperatorBase::GetSingleArgument<int>("num_to_collect", -1)),
928  numVisited_(0) {
929  CAFFE_ENFORCE(numToCollect_ > 0);
930  }
931 
932  bool RunOnDevice() override {
933  int pos = -1;
934  if (numVisited_ < numToCollect_) {
935  // append
936  pos = numVisited_;
937  } else {
938  auto& gen = context_.RandGenerator();
939  // uniform between [0, numVisited_]
940  std::uniform_int_distribution<int> uniformDist(0, numVisited_);
941  pos = uniformDist(gen);
942  if (pos >= numToCollect_) {
943  // discard
944  pos = -1;
945  }
946  }
947 
948  for (int i = 0; i < OutputSize(); ++i) {
949  // TENSOR_VECTOR_IN is enforced inplace with TENSOR_VECTOR_OUT
950  TensorVectorPtr<Context>& tensorVector =
951  *OperatorBase::Output<TensorVectorPtr<Context>>(i);
952 
953  if (numVisited_ >= numToCollect_) {
954  CAFFE_ENFORCE(
955  tensorVector->size() == numToCollect_,
956  "TensorVecotor size = ",
957  tensorVector->size(),
958  " is different from numToCollect = ",
959  numToCollect_);
960  }
961 
962  const auto& tensor = Input(OutputSize() + i);
963 
964  if (pos < 0) {
965  // discard
966  CAFFE_ENFORCE(numVisited_ >= numToCollect_);
967  } else if (pos >= tensorVector->size()) {
968  // append
969  tensorVector->push_back(Tensor<Context>());
970  tensorVector->back().template CopyFrom<Context, Context>(
971  tensor, &context_);
972  } else {
973  // replace
974  tensorVector->at(pos).template CopyFrom<Context, Context>(
975  tensor, &context_);
976  }
977  }
978 
979  numVisited_++;
980  return true;
981  }
982 
983  private:
984  // number of tensors to collect
985  int numToCollect_;
986  // number of tensors visited
987  int numVisited_;
988 };
989 
990 class TrimDatasetOp : public Operator<CPUContext> {
991  public:
992  TrimDatasetOp(const OperatorDef& operator_def, Workspace* ws)
993  : Operator(operator_def, ws),
994  iterator_(OperatorBase::GetRepeatedArgument<std::string>("fields")),
995  multiple_of_(OperatorBase::GetSingleArgument<int>("multiple_of", 1)) {
996  CAFFE_ENFORCE_GE(multiple_of_, 1);
997  }
998 
999  bool RunOnDevice() override {
1000  TreeCursor cursor(iterator_);
1001  TreeWalker walker(Inputs(), cursor);
1002 
1003  int trimmedSize = (walker.size() / multiple_of_) * multiple_of_;
1004  if (trimmedSize == walker.size()) {
1005  // we already satisfy the condition
1006  return true;
1007  }
1008  // advance desired number of records
1009  for (int i = 0; i < trimmedSize; ++i) {
1010  walker.advance();
1011  }
1012  // trim each column to the offset
1013  for (int col = 0; col < walker.fields().size(); ++col) {
1014  auto newOuterSize = walker.fields().at(col).offset();
1015  Output(col)->Shrink(newOuterSize);
1016  }
1017  return true;
1018  }
1019 
1020  private:
1021  TreeIterator iterator_;
1022  int multiple_of_;
1023 };
1024 
1025 REGISTER_CPU_OPERATOR(CreateTreeCursor, CreateTreeCursorOp);
1026 REGISTER_CPU_OPERATOR(ResetCursor, ResetCursorOp);
1027 REGISTER_CPU_OPERATOR(ReadNextBatch, ReadNextBatchOp);
1028 REGISTER_CPU_OPERATOR(ComputeOffset, ComputeOffsetOp);
1029 REGISTER_CPU_OPERATOR(SortAndShuffle, SortAndShuffleOp);
1030 REGISTER_CPU_OPERATOR(ReadRandomBatch, ReadRandomBatchOp);
1031 REGISTER_CPU_OPERATOR(CheckDatasetConsistency, CheckDatasetConsistencyOp);
1032 REGISTER_CPU_OPERATOR(Append, AppendOp<CPUContext>);
1033 REGISTER_CPU_OPERATOR(AtomicAppend, AtomicAppendOp<CPUContext>);
1034 REGISTER_CPU_OPERATOR(CreateTensorVector, CreateTensorVectorOp<CPUContext>);
1035 REGISTER_CPU_OPERATOR(TensorVectorSize, TensorVectorSizeOp<CPUContext>);
1036 REGISTER_CPU_OPERATOR(ConcatTensorVector, ConcatTensorVectorOp<CPUContext>);
1037 REGISTER_CPU_OPERATOR(CollectTensor, CollectTensorOp<CPUContext>);
1038 REGISTER_CPU_OPERATOR(PackRecords, PackRecordsOp);
1039 REGISTER_CPU_OPERATOR(UnPackRecords, UnPackRecordsOp);
1040 REGISTER_CPU_OPERATOR(TrimDataset, TrimDatasetOp);
1041 
1042 OPERATOR_SCHEMA(CreateTreeCursor)
1043  .NumInputs(0)
1044  .NumOutputs(1)
1045  .SetDoc(R"DOC(
1046 Creates a cursor to iterate through a list of tensors, where some of those
1047 tensors contains the lengths in a nested schema. The schema is determined by
1048 the `fields` arguments.
1049 
1050 For example, to represent the following schema:
1051 
1052  Struct(
1053  a=Int(),
1054  b=List(List(Int),
1055  c=List(
1056  Struct(
1057  c1=String,
1058  c2=List(Int),
1059  ),
1060  ),
1061  )
1062 
1063 the field list will be:
1064  [
1065  "a",
1066  "b:lengths",
1067  "b:values:lengths",
1068  "b:values:values",
1069  "c:lengths",
1070  "c:c1",
1071  "c:c2:lengths",
1072  "c:c2:values",
1073  ]
1074 
1075 And for the following instance of the struct:
1076 
1077  Struct(
1078  a=3,
1079  b=[[4, 5], [6, 7, 8], [], [9]],
1080  c=[
1081  Struct(c1='alex', c2=[10, 11]),
1082  Struct(c1='bob', c2=[12]),
1083  ],
1084  )
1085 
1086 The values of the fields will be:
1087  {
1088  "a": [3],
1089  "b:lengths": [4],
1090  "b:values:lengths": [2, 3, 0, 1],
1091  "b:values:values": [4, 5, 6, 7, 8, 9],
1092  "c:lengths": [2],
1093  "c:c1": ["alex", "bob"],
1094  "c:c2:lengths": [2, 1],
1095  "c:c2:values", [10, 11, 12],
1096  }
1097 
1098 In general, every field name in the format "{prefix}:lengths" defines a domain
1099 "{prefix}", and every subsequent field in the format "{prefix}:{field}" will
1100 be in that domain, and the length of the domain is provided for each entry of
1101 the parent domain. In the example, "b:lengths" defines a domain of length 4, so
1102 every field under domain "b" will have 4 entries.
1103 The "lengths" field for a given domain must appear before any reference to
1104 that domain.
1105 
1106 Returns a pointer to an instance of the Cursor, which keeps the current offset
1107 on each of the domains defined by `fields`. Cursor also ensures thread-safety
1108 such that ReadNextBatch and ResetCursor can be used safely in parallel.
1109 
1110 A cursor does not contain data per se, so calls to ReadNextBatch actually need
1111 to pass a list of blobs containing the data to read for each one of the fields.
1112 )DOC")
1113  .Output(0, "cursor", "A blob pointing to an instance of a new TreeCursor.")
1114  .Arg(
1115  "fields",
1116  "A list of strings each one representing a field of the dataset.");
1117 
1118 OPERATOR_SCHEMA(ResetCursor)
1119  .NumInputs(1)
1120  .NumOutputs(0)
1121  .SetDoc(R"DOC(
1122 Resets the offsets for the given TreeCursor. This operation is thread safe.
1123 )DOC")
1124  .Input(0, "cursor", "A blob containing a pointer to the cursor.");
1125 
1126 OPERATOR_SCHEMA(ReadNextBatch)
1127  .NumInputs(1, INT_MAX)
1128  .NumOutputs(1, INT_MAX)
1129  .SetDoc(R"DOC(
1130 Read the next batch of examples out of the given cursor and data blobs.
1131 
1132 Input(0) is a blob pointing to a TreeCursor, and
1133 [Input(1),... Input(num_fields)] a list of tensors containing the data for
1134 each field of the dataset.
1135 
1136 ReadNextBatch is thread safe.
1137 )DOC")
1138  .Input(0, "cursor", "A blob containing a pointer to the cursor.")
1139  .Input(1, "dataset_field_0", "First dataset field")
1140  .Output(0, "field_0", "Tensor containing the next batch for field 0.")
1141  .Arg("batch_size", "Number of top-level entries to read.");
1142 
1143 OPERATOR_SCHEMA(ComputeOffset)
1144  .NumInputs(1, INT_MAX)
1145  .NumOutputs(1)
1146  .SetDoc(R"DOC(
1147 Compute the offsets matrix given cursor and data blobs. Need to be ran at
1148 beginning or after reseting cursor
1149 
1150 Input(0) is a blob pointing to a TreeCursor, and
1151 [Input(1),... Input(num_fields)] a list of tensors containing the data for
1152 each field of the dataset.
1153 
1154 ComputeOffset is thread safe.
1155 )DOC")
1156  .Input(0, "cursor", "A blob containing a pointer to the cursor.")
1157  .Input(1, "dataset_field_0", "First dataset field")
1158  .Output(0, "field_0", "Tensor containing offset info for this chunk.");
1159 
1160 OPERATOR_SCHEMA(SortAndShuffle)
1161  .NumInputs(1, INT_MAX)
1162  .NumOutputs(1)
1163  .SetDoc(R"DOC(
1164 Compute the sorted indices given a field index to sort by and break the sorted
1165 indices into chunks of shuffle_size * batch_size and shuffle each chunk,
1166 finally we shuffle between batches. If sort_by_field_idx is -1 we skip sort.
1167 
1168 For example, we have data sorted as
1169 1,2,3,4,5,6,7,8,9,10,11,12
1170 
1171 and batchSize = 2 and shuffleSize = 3, when we shuffle we get:
1172 [3,1,4,6,5,2] [12,10,11,8,9,7]
1173 
1174 After this we will shuffle among different batches with size 2
1175 [3,1],[4,6],[5,2],[12,10],[11,8],[9,7]
1176 
1177 We may end up with something like
1178 [9,7],[5,2],[12,10],[4,6],[3,1],[11,8]
1179 
1180 Input(0) is a blob pointing to a TreeCursor, and
1181 [Input(1),... Input(num_fields)] a list of tensors containing the data for
1182 each field of the dataset.
1183 
1184 SortAndShuffle is thread safe.
1185 )DOC")
1186  .Input(0, "cursor", "A blob containing a pointer to the cursor.")
1187  .Input(1, "dataset_field_0", "First dataset field")
1188  .Output(0, "indices", "Tensor containing sorted indices.");
1189 
1190 OPERATOR_SCHEMA(ReadRandomBatch)
1191  .NumInputs(1, INT_MAX)
1192  .NumOutputs(1, INT_MAX)
1193  .SetDoc(R"DOC(
1194 Read the next batch of examples out of the given cursor,
1195 idx blob, offset matrix and data blobs.
1196 
1197 Input(0) is a blob pointing to a TreeCursor,
1198 Input(1) is a blob pointing to the shuffled idx
1199 Input(2) is a blob pointing to the offset matrix and
1200 [Input(3),... Input(num_fields)] a list of tensors containing the data for
1201 each field of the dataset.
1202 
1203 ReadRandomBatch is thread safe.
1204 )DOC")
1205  .Input(0, "cursor", "A blob containing a pointer to the cursor.")
1206  .Input(1, "idx", "idx with a shuffled order.")
1207  .Input(2, "offsetsmat", "offset matrix containing length offset info.")
1208  .Input(3, "dataset_field_0", "First dataset field")
1209  .Output(0, "field_0", "Tensor containing the next batch for field 0.")
1210  .Arg("batch_size", "Number of top-level entries to read.")
1211  .Arg("loop_over", "(bool) Repeat the dataset indefinitely");
1212 
1213 OPERATOR_SCHEMA(CheckDatasetConsistency)
1214  .NumInputs(1, INT_MAX)
1215  .NumOutputs(0)
1216  .SetDoc(R"DOC(
1217 Checks that the given data fields represents a consistent dataset under
1218 the schema specified by the `fields` argument. Operator fails if the fields
1219 are not consistent. If data is consistent, each field's data can be safely
1220 appended to an existing dataset, keeping it consistent.
1221 )DOC")
1222  .Input(0, "field_0", "Data for field 0.")
1223  .Arg(
1224  "fields",
1225  "List of strings representing the string names in the format"
1226  "specified in the doc for CreateTreeCursor.");
1227 
1228 OPERATOR_SCHEMA(Append)
1229  .NumInputs(2)
1230  .NumOutputs(1)
1231  .EnforceInplace({{0, 0}})
1232  .SetDoc(R"DOC(
1233 Append input 2 to the end of input 1.
1234 Input 1 must be the same as output, that is, it is required to be in-place.
1235 Input 1 may have to be re-allocated in order for accommodate to the new size.
1236 Currently, an exponential growth ratio is used in order to ensure amortized
1237 constant time complexity.
1238 All except the outer-most dimension must be the same between input 1 and 2.
1239 )DOC")
1240  .Input(0, "dataset", "The tensor to be appended to.")
1241  .Input(1, "new_data", "Tensor to append to the end of dataset.")
1242  .Output(0, "dataset", "Same as input 0, representing the mutated tensor.");
1243 
1244 OPERATOR_SCHEMA(AtomicAppend)
1245  .NumInputs(3, INT_MAX)
1246  .NumOutputs(1, INT_MAX)
1247  .AllowInplace([](int in, int out) { return in == out + 1; });
1248 
1249 OPERATOR_SCHEMA(CreateTensorVector)
1250  .NumInputs(0)
1251  .NumOutputs(1)
1252  .SetDoc("Create a std::unique_ptr<std::vector<Tensor> >");
1253 
1254 OPERATOR_SCHEMA(TensorVectorSize)
1255  .NumInputs(1)
1256  .NumOutputs(1)
1257  .SetDoc("Get the size of the input vector")
1258  .Input(0, "tensor vector", "std::unique_ptr<std::vector<Tensor> >")
1259  .Output(0, "size", "int32_t size");
1260 
1261 OPERATOR_SCHEMA(ConcatTensorVector)
1262  .NumInputs(1)
1263  .NumOutputs(1)
1264  .SetDoc(R"DOC(
1265 Concat Tensors in the std::unique_ptr<std::vector<Tensor> >
1266 along the first dimension.
1267  )DOC")
1268  .Input(0, "vector of Tensor", "std::unique_ptr<std::vector<Tensor> >")
1269  .Output(0, "tensor", "tensor after concatenating");
1270 
1271 OPERATOR_SCHEMA(CollectTensor)
1272  .NumInputs([](int n) { return n > 0 && n % 2 == 0; })
1273  .NumOutputs(1, INT_MAX)
1274  .NumInputsOutputs([](int in, int out) { return in == out * 2; })
1275  .EnforceInplace([](int in, int out) { return in == out; })
1276  .SetDoc(R"DOC(
1277 Collect tensor into tensor vector by reservoir sampling,
1278 argument num_to_collect indicates the max number of tensors that will be
1279 collected. The first half of the inputs are tensor vectors, which are also the
1280 outputs. The second half of the inputs are the tensors to be collected into each
1281 vector (in the same order). The input tensors are collected in all-or-none
1282 manner. If they are collected, they will be placed at the same index in the
1283 output vectors.
1284 )DOC")
1285  .Arg("num_to_collect", "The max number of tensors to collect");
1286 
1287 OPERATOR_SCHEMA(PackRecords)
1288  .NumInputs(1, INT_MAX)
1289  .NumOutputs(1)
1290  .SetDoc(R"DOC(
1291 Given a dataset under a schema specified by the `fields` argument will pack all
1292 the input tensors into one, where each tensor element represents a row of data
1293 (batch of size 1). This format allows easier use with the rest of Caffe2
1294 operators.
1295 )DOC")
1296  .Arg(
1297  "fields",
1298  "List of strings representing the string names in the format"
1299  "specified in the doc for CreateTreeCursor.")
1300  .Output(
1301  0,
1302  "tensor",
1303  "One dimensional tensor having a complex type of SharedTensorVectorPtr."
1304  " In order to reverse it back to the original input it has to be "
1305  "inserted into UnPackRecordsOp.");
1306 
1307 OPERATOR_SCHEMA(TrimDataset)
1308  .NumInputs(1, INT_MAX)
1309  .NumOutputs(1, INT_MAX)
1310  .SetDoc(R"DOC(
1311 Trim the given dataset inplace, given the dataset blobs and the field specs.
1312 Trimming happens such that the dataset will contain the largest possible number
1313 of records that is a multiple of the 'multiple_of' argument.
1314 )DOC")
1315  .EnforceInplace([](int input, int output) { return input == output; })
1316  .Arg(
1317  "fields",
1318  "List of strings representing the string names in the format"
1319  "specified in the doc for CreateTreeCursor.");
1320 
1321 OPERATOR_SCHEMA(UnPackRecords)
1322  .NumInputs(1, INT_MAX)
1323  .NumOutputs(1, INT_MAX)
1324  .SetDoc(R"DOC(
1325 Given a packed dataset (packed by the PackRecordsOp) and the `fields` argument
1326 describing the datasets schema returns the original dataset format. Number of
1327 returned tensors is equal to the number of fields in the `fields` argument.
1328 
1329 The first input is the packed tensor to be unpacked. Optionally, you can provide
1330 prototype tensors to give the expected shapes of the output tensors. This is
1331 helpful when you expected to unpack empty tensor, e.g., output of a sampling
1332 process.
1333 )DOC")
1334  .Arg(
1335  "fields",
1336  "List of strings representing the string names in the format"
1337  "specified in the doc for CreateTreeCursor.")
1338  .Input(0, "packed_tensor", "The tensor to be unpacked");
1339 
1340 SHOULD_NOT_DO_GRADIENT(CreateTreeCursor);
1341 SHOULD_NOT_DO_GRADIENT(ResetCursor);
1342 SHOULD_NOT_DO_GRADIENT(ReadNextBatch);
1343 SHOULD_NOT_DO_GRADIENT(ComputeOffset);
1344 SHOULD_NOT_DO_GRADIENT(ReadRandomBatch);
1345 SHOULD_NOT_DO_GRADIENT(CheckDatasetConsistency);
1346 SHOULD_NOT_DO_GRADIENT(Append);
1347 SHOULD_NOT_DO_GRADIENT(AtomicAppend);
1348 SHOULD_NOT_DO_GRADIENT(CreateTensorVector);
1349 SHOULD_NOT_DO_GRADIENT(TensorVectorSize);
1350 SHOULD_NOT_DO_GRADIENT(ConcatTensorVector);
1351 SHOULD_NOT_DO_GRADIENT(CollectTensor);
1352 SHOULD_NOT_DO_GRADIENT(UnPackRecords);
1353 SHOULD_NOT_DO_GRADIENT(PackRecords);
1354 
1355 class TreeCursorSerializer : public BlobSerializerBase {
1356  public:
1357  TreeCursorSerializer() {}
1358  ~TreeCursorSerializer() {}
1359 
1360  void Serialize(
1361  const Blob& blob,
1362  const string& name,
1363  SerializationAcceptor acceptor) override {
1364  auto& cursor = blob.template Get<std::unique_ptr<TreeCursor>>();
1365  BlobProto blob_proto;
1366 
1367  // serialize offsets as a tensor
1368  if (cursor->offsets.size() > 0) {
1369  Blob offsets_blob;
1370  auto* offsets = offsets_blob.template GetMutable<Tensor<CPUContext>>();
1371  offsets->Resize(cursor->offsets.size());
1372  std::copy(
1373  cursor->offsets.begin(),
1374  cursor->offsets.end(),
1375  offsets->mutable_data<TOffset>());
1376  TensorSerializer<CPUContext> ser;
1377  ser.Serialize(
1378  *offsets, name, blob_proto.mutable_tensor(), 0, offsets->size());
1379  }
1380  blob_proto.set_name(name);
1381  blob_proto.set_type("std::unique_ptr<TreeCursor>");
1382 
1383  // serialize field names in the content
1384  std::ostringstream os;
1385  for (const auto& field : cursor->it.fields()) {
1386  os << field.name << " ";
1387  }
1388  blob_proto.set_content(os.str());
1389 
1390  acceptor(name, blob_proto.SerializeAsString());
1391  }
1392 };
1393 
1394 class TreeCursorDeserializer : public BlobDeserializerBase {
1395  public:
1396  void Deserialize(const BlobProto& proto, Blob* blob) override {
1397  // deserialize the offsets
1398  TensorDeserializer<CPUContext> deser;
1399  Blob offset_blob;
1400  deser.Deserialize(proto, &offset_blob);
1401  auto& offsets = offset_blob.template Get<Tensor<CPUContext>>();
1402  auto* offsets_ptr = offsets.data<TOffset>();
1403 
1404  // deserialize the field names
1405  std::vector<std::string> fieldNames;
1406  std::istringstream is(proto.content());
1407  std::string field;
1408  while (true) {
1409  is >> field;
1410  if (is.eof()) {
1411  break;
1412  }
1413  fieldNames.push_back(field);
1414  }
1415  TreeIterator it(fieldNames);
1416 
1417  auto* base = blob->template GetMutable<std::unique_ptr<TreeCursor>>();
1418  (*base).reset(new TreeCursor(it));
1419  (*base)->offsets.assign(offsets_ptr, offsets_ptr + offsets.size());
1420  }
1421 };
1422 
1423 REGISTER_BLOB_SERIALIZER(
1424  (TypeMeta::Id<std::unique_ptr<TreeCursor>>()),
1425  TreeCursorSerializer);
1426 REGISTER_BLOB_DESERIALIZER(std::unique_ptr<TreeCursor>, TreeCursorDeserializer);
1427 
1428 } // namespace
1429 
1430 void SharedTensorVectorPtrSerializer::Serialize(
1431  const Blob& blob,
1432  const string& name,
1433  BlobSerializerBase::SerializationAcceptor acceptor) {
1434  /* This is dummy serialize that doesn't save anything. If saving the content
1435  is desired in future use case, you can change this serializer. Note: special
1436  care need to be taken for the parameter initialization of
1437  LastNWindowCollectorOp and ReservoirSamplingOp if this serializer actually
1438  saves the content.
1439  */
1440  CAFFE_ENFORCE(blob.IsType<std::shared_ptr<std::vector<TensorCPU>>>());
1441  BlobProto blob_proto;
1442  blob_proto.set_name(name);
1443  blob_proto.set_type("std::shared_ptr<std::vector<TensorCPU>>");
1444  blob_proto.set_content("");
1445  acceptor(name, blob_proto.SerializeAsString());
1446 };
1447 
1448 void SharedTensorVectorPtrDeserializer::Deserialize(
1449  const BlobProto& /* unused */,
1450  Blob* blob) {
1451  /* This is dummy deserialize which creates a nullptr
1452  */
1453  blob->GetMutable<std::shared_ptr<std::vector<TensorCPU>>>();
1454 }
1455 
1456 REGISTER_BLOB_SERIALIZER(
1457  (TypeMeta::Id<std::shared_ptr<std::vector<TensorCPU>>>()),
1458  SharedTensorVectorPtrSerializer);
1459 
1460 REGISTER_BLOB_DESERIALIZER(
1461  std::shared_ptr<std::vector<TensorCPU>>,
1462  SharedTensorVectorPtrDeserializer);
1463 
1464 } // namespace dataset_ops
1465 } // namespace caffe2
Definition: types.h:88
static CAFFE2_API CaffeTypeId Id()
Returns the unique id for the given type T.
Copyright (c) 2016-present, Facebook, Inc.
Copyright (c) 2016-present, Facebook, Inc.