Caffe2 - C++ API
A deep learning, cross platform ML framework
dataset_ops.cc
1 #include "caffe2/operators/dataset_ops.h"
2 
3 #include <memory>
4 #include <mutex>
5 #include <string>
6 #include <vector>
7 #include "caffe2/core/blob_serialization.h"
8 #include "caffe2/core/operator.h"
9 #include "caffe2/core/tensor.h"
10 #include "caffe2/utils/string_utils.h"
11 
12 namespace caffe2 {
13 
14 CAFFE_KNOWN_TYPE(std::unique_ptr<dataset_ops::TreeCursor>);
15 CAFFE_KNOWN_TYPE(dataset_ops::TensorVectorPtr);
16 CAFFE_KNOWN_TYPE(dataset_ops::SharedTensorVectorPtr);
17 
18 namespace dataset_ops {
19 namespace {
20 
21 const char kDatasetFieldSeparator = ':';
22 const char* kDatasetLengthField = "lengths";
23 
24 // how much percent to grow the dataset when needed
25 const int kDatasetGrowthPct = 40;
26 
27 } // namespace
28 
29 TreeIterator::TreeIterator(const std::vector<std::string>& fields) {
30  // populate field vector and split field names
31  fields_.resize(fields.size());
32  std::vector<std::vector<std::string>> nameParts(fields_.size());
33  for (size_t i = 0; i < fields.size(); ++i) {
34  auto& field = fields_.at(i);
35  field.name = fields[i];
36  field.id = i;
37  field.lengthFieldId = -1;
38  nameParts.at(i) = split(kDatasetFieldSeparator, field.name);
39  }
40 
41  // populate lengthFields
42  for (const auto& field : fields_) {
43  const auto& parts = nameParts.at(field.id);
44  if (!parts.empty() && parts.back() == kDatasetLengthField) {
45  lengthFieldIds_.push_back(field.id);
46  }
47  }
48 
49  // find length-field with maximum prefix matching for each field
50  for (auto& field : fields_) {
51  // by default, we are matching against the root domain
52  size_t maxMatchLevel = 1;
53  int maxMatchLengthFieldId = -1;
54  for (int j = 0; j < numLengthFields(); ++j) {
55  const auto& lenField = lengthField(j);
56  // a length field can't have itself as its length field
57  if (field.id == lenField.id) {
58  continue;
59  }
60  auto lf = nameParts.at(lenField.id);
61  auto lfEnd = lf.end() - 1;
62  // check whether this lengthField is a prefix for this field name
63  if (std::mismatch(lf.begin(), lfEnd, nameParts.at(field.id).begin())
64  .first != lfEnd) {
65  continue;
66  }
67  if (lf.size() > maxMatchLevel) {
68  maxMatchLevel = lf.size();
69  maxMatchLengthFieldId = j;
70  }
71  }
72  field.lengthFieldId = maxMatchLengthFieldId;
73  }
74 
75  // check that fields are topologically sorted
76  // (no length field depends on a length defined afterwards)
77  for (const auto& field : fields_) {
78  const auto* lengthField = lengthFieldFor(field);
79  CAFFE_ENFORCE(
80  (lengthField == nullptr) || (lengthField->id < field.id),
81  "Error: Field ",
82  field.id,
83  " (",
84  field.name,
85  ") ",
86  "depends on a field defined afterwards: ",
87  lengthField->id,
88  " (",
89  lengthField->name,
90  ").");
91  }
92 }
93 
94 void TreeIterator::advance(
95  const std::vector<const TLength*>& lengths,
96  std::vector<TOffset>& offsets,
97  std::vector<TOffset>& sizes,
98  std::vector<TOffset>& limits,
99  TOffset num) {
100  std::vector<TOffset> newOffsets;
101  CAFFE_ENFORCE_EQ(lengths.size(), numLengthFields());
102  CAFFE_ENFORCE_EQ(offsets.size(), numOffsetFields());
103  sizes.resize(offsets.size());
104  newOffsets.resize(offsets.size());
105  // first index, top level
106  {
107  auto limit = limits[0];
108  auto offset = offsets[0];
109  CAFFE_ENFORCE(limit >= offset, "Tried to advance past end of cursor.");
110  TOffset total = std::min(limit - offset, num);
111  sizes[0] = total;
112  newOffsets[0] = offset + total;
113  }
114  // child indices
115  for (int j = 1; j < numOffsetFields(); ++j) {
116  TOffset total = 0;
117  int parentOffsetId = offsetFieldIdFor(lengthField(j - 1));
118  const TLength* length = lengths[j - 1] + offsets[parentOffsetId];
119  for (int k = 0; k < sizes[parentOffsetId]; ++k) {
120  total += *(length++);
121  }
122  auto offset = offsets[j];
123  CAFFE_ENFORCE(
124  offset + total <= limits[j],
125  "Inconsistent field length: ",
126  "tried to advance past the end of field ",
127  j);
128  sizes[j] = total;
129  newOffsets[j] = offset + total;
130  }
131  offsets = newOffsets;
132 }
133 
134 TreeWalker::TreeWalker(const vector<const Blob*>& inputs, TreeCursor& cursor)
135  : inputs_(inputs), cursor_(cursor), sizes_(cursor.it.numOffsetFields()) {
136  CAFFE_ENFORCE_EQ(inputs.size(), cursor.it.fields().size());
137  if (cursor.offsets.empty()) {
138  cursor.offsets.assign(cursor.it.numOffsetFields(), 0);
139  }
140 
141  for (int fieldId = 0; fieldId < cursor_.it.fields().size(); ++fieldId) {
142  fields_.emplace_back(*this, fieldId);
143  }
144 
145  gatherLengthData();
146 
147  gatherSizeLimits();
148 
149  // The invariant we hold is that we are always one step ahead
150  advance();
151 }
152 
153 void TreeWalker::advance() {
154  prevOffsets_ = cursor_.offsets;
155  cursor_.it.advance(lengths_, cursor_.offsets, sizes_, limits_, 1);
156 }
157 
158 std::vector<int64_t> TreeWalker::fieldDim(int fieldId) const {
159  auto tensorDim = input(fieldId).sizes().vec();
160  tensorDim[0] = sizes_[lengthIdx(fieldId)];
161  return tensorDim;
162 }
163 
164 void* TreeWalker::fieldPtr(int fieldId) const {
165  auto& in = input(fieldId);
166  return (char*)in.raw_data() +
167  offset(fieldId) * in.size_from_dim(1) * in.dtype().itemsize();
168 }
169 
170 void TreeWalker::gatherLengthData() {
171  static const TLength lenZero = 0;
172  lengths_.resize(cursor_.it.numLengthFields());
173  for (int i = 0; i < lengths_.size(); ++i) {
174  auto& in = input(cursor_.it.lengthField(i).id);
175  if (in.numel() > 0) {
176  lengths_[i] = in.data<int>();
177  } else {
178  lengths_[i] = &lenZero;
179  }
180  }
181 }
182 
183 void TreeWalker::gatherSizeLimits() {
184  limits_.assign(sizes_.size(), std::numeric_limits<TOffset>::max());
185  for (auto fieldId = 0; fieldId < cursor_.it.fields().size(); ++fieldId) {
186  auto lengthFieldIdx = lengthIdx(fieldId);
187  limits_[lengthFieldIdx] =
188  std::min(limits_[lengthFieldIdx], (TOffset)input(fieldId).sizes()[0]);
189  }
190 }
191 
192 namespace {
193 
194 class CreateTreeCursorOp : public Operator<CPUContext> {
195  public:
196  template <class... Args>
197  explicit CreateTreeCursorOp(Args&&... args)
198  : Operator(std::forward<Args>(args)...),
199  fields_(OperatorBase::GetRepeatedArgument<std::string>("fields")) {}
200 
201  bool RunOnDevice() override {
202  *OperatorBase::Output<std::unique_ptr<TreeCursor>>(0) =
203  std::unique_ptr<TreeCursor>(new TreeCursor(TreeIterator(fields_)));
204  return true;
205  }
206 
207  private:
208  std::vector<std::string> fields_;
209 };
210 
211 class GetCursorOffsetOp : public Operator<CPUContext> {
212  public:
213  template <class... Args>
214  explicit GetCursorOffsetOp(Args&&... args)
215  : Operator(std::forward<Args>(args)...) {}
216 
217  bool RunOnDevice() override {
218  auto& cursor = OperatorBase::Input<std::unique_ptr<TreeCursor>>(0);
219  Output(0)->Resize(cursor->offsets.size());
220  auto* output = Output(0)->template mutable_data<int>();
221  for (size_t i = 0; i < cursor->offsets.size(); ++i) {
222  output[i] = cursor->offsets[i];
223  }
224  return true;
225  }
226 };
227 
228 class ResetCursorOp : public Operator<CPUContext> {
229  public:
230  template <class... Args>
231  explicit ResetCursorOp(Args&&... args)
232  : Operator(std::forward<Args>(args)...) {}
233 
234  bool RunOnDevice() override {
235  auto& cursor = OperatorBase::Input<std::unique_ptr<TreeCursor>>(0);
236  std::lock_guard<std::mutex> lock(cursor->mutex_);
237  cursor->offsets.clear();
238  return true;
239  }
240 };
241 
242 class CheckDatasetConsistencyOp : public Operator<CPUContext> {
243  public:
244  template <class... Args>
245  explicit CheckDatasetConsistencyOp(Args&&... args)
246  : Operator(std::forward<Args>(args)...),
247  iterator_(OperatorBase::GetRepeatedArgument<std::string>("fields")) {}
248 
249  bool RunOnDevice() override {
250  std::vector<const TLength*> lengths;
251  std::vector<TOffset> limits;
252  std::vector<TOffset> sizes;
253  std::vector<TOffset> offsets;
254  CAFFE_ENFORCE(
255  InputSize() == iterator_.fields().size(),
256  "Invalid number of fields. Expected ",
257  iterator_.fields().size(),
258  ", got ",
259  InputSize());
260  sizes.resize(iterator_.numOffsetFields());
261  // gather length data
262  lengths.resize(iterator_.numLengthFields());
263  for (size_t i = 0; i < lengths.size(); ++i) {
264  lengths[i] = Input(iterator_.lengthField(i).id).data<TLength>();
265  }
266  // gather size limits
267  limits.assign(sizes.size(), std::numeric_limits<TOffset>::max());
268  for (size_t i = 0; i < iterator_.fields().size(); ++i) {
269  int lengthIdx = iterator_.fields()[i].lengthFieldId + 1;
270  CAFFE_ENFORCE_GT(Input(i).dim(), 0);
271  TOffset size = (TOffset)Input(i).sizes()[0];
272  if (limits[lengthIdx] == std::numeric_limits<TOffset>::max()) {
273  limits[lengthIdx] = size;
274  } else {
275  CAFFE_ENFORCE(
276  limits[lengthIdx] == size,
277  "Inconsistent sizes for fields belonging to same domain.",
278  " Field: ",
279  i,
280  " (",
281  iterator_.fields()[i].name,
282  "); Length field index: ",
283  lengthIdx,
284  "); Previous size: ",
285  limits[lengthIdx],
286  "; New size: ",
287  size);
288  }
289  }
290  // advance to the end
291  offsets.assign(sizes.size(), 0);
292  iterator_.advance(lengths, offsets, sizes, limits, limits[0]);
293  for (size_t i = 0; i < limits.size(); ++i) {
294  CAFFE_ENFORCE(limits[i] == offsets[i]);
295  }
296  return true;
297  }
298 
299  private:
300  TreeIterator iterator_;
301 };
302 
303 class PackRecordsOp : public Operator<CPUContext> {
304  public:
305  template <class... Args>
306  explicit PackRecordsOp(Args&&... args)
307  : Operator(std::forward<Args>(args)...),
308  fields_(OperatorBase::GetRepeatedArgument<std::string>("fields")) {}
309 
310  bool RunOnDevice() override {
311  // There should be one input per field
312  CAFFE_ENFORCE_EQ(InputSize(), fields_.size());
313  CAFFE_ENFORCE_EQ(OutputSize(), 1);
314 
315  TreeCursor cursor((TreeIterator(fields_)));
316 
317  TreeWalker walker(Inputs(), cursor);
318 
319  Output(0)->Resize(walker.size());
320 
321  // Output(0)->raw_mutable_data(TypeMeta::Make<SharedTensorVectorPtr>()));
322  auto* dst = Output(0)->template mutable_data<SharedTensorVectorPtr>();
323 
324  for (int batchId = 0; batchId < walker.size(); ++batchId) {
325  dst[batchId] = std::make_shared<std::vector<TensorCPU>>();
326  dst[batchId]->reserve(walker.fields().size());
327 
328  for (const auto& field : walker.fields()) {
329  dst[batchId]->emplace_back(field.dim(), CPU);
330  auto& tensor = dst[batchId]->back();
331  context_.CopyItemsSameDevice(
332  field.meta(),
333  tensor.numel(),
334  field.ptr() /* src */,
335  tensor.raw_mutable_data(field.meta()) /* dst */);
336  }
337 
338  walker.advance();
339  }
340 
341  return true;
342  }
343 
344  private:
345  std::vector<std::string> fields_;
346 };
347 
348 class UnPackRecordsOp : public Operator<CPUContext> {
349  public:
350  template <class... Args>
351  explicit UnPackRecordsOp(Args&&... args)
352  : Operator(std::forward<Args>(args)...),
353  fields_(OperatorBase::GetRepeatedArgument<std::string>("fields")) {}
354 
355  bool RunOnDevice() override {
356  const auto* inputs = Input(0).template data<SharedTensorVectorPtr>();
357  const auto numRows = Input(0).numel();
358 
359  CAFFE_ENFORCE_GE(numRows, 0);
360 
361  auto numTensors = OutputSize();
362 
363  // Precomputer the output sizes to avoid resizing
364  std::vector<std::vector<int64_t>> outputDims(numTensors);
365  std::vector<const TypeMeta*> metas(numTensors);
366 
367  CAFFE_ENFORCE(
368  numRows > 0 || InputSize() > 1,
369  "Unpacking empty record without shape will leave output blobs in "
370  "undefined state.");
371 
372  if (InputSize() == 1) {
373  getShapeAndMetaFromInput(outputDims, metas);
374  } else {
375  getShapeAndMetaFromPrototypeBlobs(outputDims, metas);
376  }
377 
378  for (int i = 0; i < numRows; ++i) {
379  CAFFE_ENFORCE(inputs[i]);
380  for (int j = 0; j < inputs[i]->size(); ++j) {
381  const auto& input = inputs[i]->at(j);
382 
383  // Checks to ensure that dimensions/sizes match
384  CAFFE_ENFORCE_EQ(outputDims[j].size(), input.dim());
385  CAFFE_ENFORCE(*metas[j] == input.dtype());
386  // We look from first dimension, because we concat on the first.
387  for (int k = 1; k < input.dim(); ++k) {
388  CAFFE_ENFORCE_EQ(input.sizes()[k], outputDims[j][k]);
389  }
390 
391  outputDims[j][0] += input.size(0);
392  }
393  }
394 
395  // Resize to the final output size
396  std::vector<void*> destinations(numTensors);
397  for (int i = 0; i < numTensors; ++i) {
398  Output(i)->Resize(outputDims[i]);
399  destinations[i] = Output(i)->raw_mutable_data(*metas[i]);
400  }
401 
402  for (int i = 0; i < numRows; ++i) {
403  for (int j = 0; j < numTensors; ++j) {
404  const auto& input = inputs[i]->at(j);
405 
406  context_.CopyItemsSameDevice(
407  *metas[j],
408  input.numel(),
409  input.raw_data() /* src */,
410  destinations[j] /* dst */
411  );
412 
413  destinations[j] =
414  (char*)destinations[j] + input.numel() * input.itemsize();
415  }
416  }
417 
418  return true;
419  }
420 
421  private:
422  void getShapeAndMetaFromInput(
423  std::vector<std::vector<int64_t>>& outputDims,
424  std::vector<const TypeMeta*>& metas) {
425  const auto* inputs = Input(0).template data<SharedTensorVectorPtr>();
426 
427  const auto& inputZero = inputs[0];
428  CAFFE_ENFORCE(inputZero);
429 
430  const auto numTensors = inputZero->size();
431 
432  CAFFE_ENFORCE_EQ(numTensors, fields_.size());
433  CAFFE_ENFORCE_EQ(numTensors, OutputSize());
434 
435  for (int i = 0; i < numTensors; ++i) {
436  outputDims[i] = inputZero->at(i).sizes().vec();
437  outputDims[i][0] = 0;
438  metas[i] = &inputZero->at(i).dtype();
439  }
440  }
441 
442  void getShapeAndMetaFromPrototypeBlobs(
443  std::vector<std::vector<int64_t>>& outputDims,
444  std::vector<const TypeMeta*>& metas) {
445  const auto numTensors = fields_.size();
446  CAFFE_ENFORCE_EQ(numTensors, InputSize() - 1);
447  CAFFE_ENFORCE_EQ(numTensors, OutputSize());
448  for (int i = 0; i < numTensors; ++i) {
449  const auto& input = Input(i + 1);
450  outputDims[i] = input.sizes().vec();
451  outputDims[i][0] = 0;
452  metas[i] = &input.dtype();
453  }
454  }
455 
456  std::vector<std::string> fields_;
457 };
458 
459 class ReadNextBatchOp : public Operator<CPUContext> {
460  public:
461  template <class... Args>
462  explicit ReadNextBatchOp(Args&&... args)
463  : Operator(std::forward<Args>(args)...),
464  batchSize_(OperatorBase::GetSingleArgument<int>("batch_size", 1)),
465  enforceBatchSize_(OperatorBase::GetSingleArgument<bool>(
466  "enforce_batch_size",
467  false)) {}
468 
469  bool RunOnDevice() override {
470  auto& cursor = OperatorBase::Input<std::unique_ptr<TreeCursor>>(0);
471  CAFFE_ENFORCE(InputSize() == cursor->it.fields().size() + 1);
472  std::vector<const TLength*> lengths;
473  std::vector<TOffset> limits;
474  std::vector<TOffset> sizes;
475  std::vector<TOffset> offsets;
476  TLength lenZero = 0;
477  sizes.resize(cursor->it.numOffsetFields());
478  // gather length data
479  lengths.resize(cursor->it.numLengthFields());
480  for (int i = 0; i < lengths.size(); ++i) {
481  auto& a = Input(cursor->it.lengthField(i).id + 1);
482  if (a.numel() > 0) {
483  lengths[i] = a.data<int>();
484  } else {
485  lengths[i] = &lenZero;
486  }
487  }
488  // gather size limits
489  limits.assign(sizes.size(), std::numeric_limits<TOffset>::max());
490  for (int i = 0; i < cursor->it.fields().size(); ++i) {
491  int lengthFieldIdx = cursor->it.fields()[i].lengthFieldId + 1;
492  limits[lengthFieldIdx] =
493  std::min(limits[lengthFieldIdx], (TOffset)Input(i + 1).sizes()[0]);
494  }
495  // advance cursor
496  {
497  std::lock_guard<std::mutex> lock(cursor->mutex_);
498  if (cursor->offsets.empty()) {
499  cursor->offsets.assign(sizes.size(), 0);
500  }
501  offsets = cursor->offsets;
502  cursor->it.advance(lengths, cursor->offsets, sizes, limits, batchSize_);
503  if (enforceBatchSize_ && sizes[0] < batchSize_) {
504  // if we enforce batch_size but don't have enough rows left to
505  // complete a full batch, return empty for all columns.
506  // This signals end of dataset to the caller.
507  sizes.assign(sizes.size(), 0);
508  }
509  }
510  // gather data
511  std::vector<int64_t> outDim;
512  for (int i = 0; i < cursor->it.fields().size(); ++i) {
513  auto lengthIdx = cursor->it.fields()[i].lengthFieldId + 1;
514  auto size = sizes[lengthIdx];
515  auto offset = offsets[lengthIdx];
516  auto& in = Input(i + 1);
517  auto innerSize = in.size_from_dim(1);
518  outDim = in.sizes().vec();
519  outDim[0] = size;
520  auto* out = Output(i);
521  out->Resize(outDim);
522  void* src =
523  (char*)in.raw_data() + offset * innerSize * in.dtype().itemsize();
524  void* dst = out->raw_mutable_data(in.dtype()); // create the tensor
525  if (out->numel() == 0) {
526  continue;
527  }
528  context_.CopyItemsSameDevice(in.dtype(), out->numel(), src, dst);
529  }
530  return true;
531  }
532  int batchSize_;
533  bool enforceBatchSize_;
534 };
535 
536 class ComputeOffsetOp : public Operator<CPUContext> {
537  public:
538  template <class... Args>
539  explicit ComputeOffsetOp(Args&&... args)
540  : Operator(std::forward<Args>(args)...) {}
541 
542  bool RunOnDevice() override {
543  auto& cursor = OperatorBase::Input<std::unique_ptr<TreeCursor>>(0);
544  CAFFE_ENFORCE(InputSize() == cursor->it.fields().size() + 1);
545  auto* out = Output(0);
546  std::vector<const TLength*> lengths;
547  std::vector<TOffset> limits;
548  std::vector<TOffset> sizes;
549  std::vector<TOffset> offsets;
550  TLength lenZero = 0;
551  sizes.resize(cursor->it.numOffsetFields());
552  // gather length data
553  lengths.resize(cursor->it.numLengthFields());
554  for (int i = 0; i < lengths.size(); ++i) {
555  auto& a = Input(cursor->it.lengthField(i).id + 1);
556  if (a.numel() > 0) {
557  lengths[i] = a.data<int>();
558  } else {
559  lengths[i] = &lenZero;
560  }
561  }
562  // gather size limits
563  limits.assign(sizes.size(), std::numeric_limits<TOffset>::max());
564  for (int i = 0; i < cursor->it.fields().size(); ++i) {
565  int lengthFieldIdx = cursor->it.fields()[i].lengthFieldId + 1;
566  limits[lengthFieldIdx] =
567  std::min(limits[lengthFieldIdx], (TOffset)Input(i + 1).sizes()[0]);
568  }
569  out->Resize(limits.at(0) + 1, sizes.size());
570  auto* out_data = out->template mutable_data<int64_t>();
571  for (int k = 0; k <= limits.at(0); k++) {
572  // advance cursor
573  if (cursor->offsets.empty()) {
574  cursor->offsets.assign(sizes.size(), 0);
575  }
576  // write output
577  std::copy(cursor->offsets.begin(), cursor->offsets.end(), out_data);
578  out_data += sizes.size();
579  cursor->it.advance(lengths, cursor->offsets, sizes, limits, 1);
580  }
581  cursor->offsets.assign(sizes.size(), 0); // reSet after getting meta info
582  return true;
583  }
584 };
585 
586 class SortAndShuffleOp : public Operator<CPUContext> {
587  public:
588  template <class... Args>
589  explicit SortAndShuffleOp(Args&&... args)
590  : Operator(std::forward<Args>(args)...),
591  sort_by_field_idx_(
592  OperatorBase::GetSingleArgument<int>("sort_by_field_idx", 1)),
593  batch_size_(OperatorBase::GetSingleArgument<int>("batch_size", 1)),
594  shuffle_size_(OperatorBase::GetSingleArgument<int>("shuffle_size", 1)) {
595  }
596 
597  bool RunOnDevice() override {
598  auto& cursor = OperatorBase::Input<std::unique_ptr<TreeCursor>>(0);
599  CAFFE_ENFORCE(InputSize() == cursor->it.fields().size() + 1);
600  CAFFE_ENFORCE(-1 <= sort_by_field_idx_);
601  CAFFE_ENFORCE(cursor->it.fields().size() - sort_by_field_idx_ > 0);
602  int size;
603  if (sort_by_field_idx_ != -1) {
604  size = Input(sort_by_field_idx_ + 1).sizes()[0];
605  } else {
606  size = Input(1).sizes()[0];
607  }
608 
609  CAFFE_ENFORCE(
610  batch_size_ > 0 && shuffle_size_ > 0 &&
611  0 < batch_size_ * shuffle_size_);
612  // adjust shuffle_size_ if it is too large
613  if (batch_size_ * shuffle_size_ > size) {
614  shuffle_size_ = size / batch_size_;
615  }
616 
617  int num_batch = size / batch_size_;
618  auto* out = Output(0);
619  out->Resize(size);
620  auto* out_data = out->template mutable_data<int64_t>();
621 
622  vector<int> shuffle_idx(size);
623  iota(shuffle_idx.begin(), shuffle_idx.end(), 0);
624 
625  if (sort_by_field_idx_ != -1) {
626  auto& sortblob = Input(sort_by_field_idx_ + 1);
627  auto* sortdata = sortblob.data<int>();
628  // must sort by a field at the root level
629  CAFFE_ENFORCE(
630  cursor->it.fields()[sort_by_field_idx_].lengthFieldId == -1);
631  sort(shuffle_idx.begin(), shuffle_idx.end(), [&sortdata](int i1, int i2) {
632  return sortdata[i1] < sortdata[i2];
633  });
634  }
635 
636  if (batch_size_ * shuffle_size_ > 1) {
637  int offset = 0;
638  while (offset + batch_size_ * shuffle_size_ < size) {
639  std::shuffle(
640  shuffle_idx.begin() + offset,
641  shuffle_idx.begin() + offset + batch_size_ * shuffle_size_,
642  std::default_random_engine());
643  offset += batch_size_ * shuffle_size_;
644  }
645  }
646 
647  vector<int> batch_idx(num_batch);
648  iota(batch_idx.begin(), batch_idx.end(), 0);
649  std::shuffle(
650  batch_idx.begin(), batch_idx.end(), std::default_random_engine());
651 
652  for (int i = 0; i < num_batch; i++) {
653  std::copy(
654  shuffle_idx.begin() + batch_idx[i] * batch_size_,
655  shuffle_idx.begin() + (batch_idx[i] + 1) * batch_size_,
656  out_data);
657  out_data += batch_size_;
658  }
659  std::copy(
660  shuffle_idx.begin() + num_batch * batch_size_,
661  shuffle_idx.end(),
662  out_data);
663 
664  return true;
665  }
666 
667  int sort_by_field_idx_;
668  int batch_size_;
669  int shuffle_size_;
670 };
671 
672 class ReadRandomBatchOp : public Operator<CPUContext> {
673  public:
674  template <class... Args>
675  explicit ReadRandomBatchOp(Args&&... args)
676  : Operator(std::forward<Args>(args)...),
677  batchSize_(OperatorBase::GetSingleArgument<int>("batch_size", 1)),
678  enforceBatchSize_(
679  OperatorBase::GetSingleArgument<bool>("enforce_batch_size", false)),
680  loopOver_(OperatorBase::GetSingleArgument<bool>("loop_over", false)) {}
681  bool RunOnDevice() override {
682  auto& cursor = OperatorBase::Input<std::unique_ptr<TreeCursor>>(0);
683  auto& idxblob = Input(1);
684  auto& offsetsmat = Input(2);
685  CAFFE_ENFORCE(InputSize() == cursor->it.fields().size() + 3);
686  auto idxvec = idxblob.template data<int64_t>();
687  auto offsetdim = offsetsmat.sizes();
688  // gather data
689  std::vector<int64_t> outDim;
690  int64_t idx;
691  {
692  std::lock_guard<std::mutex> lock(cursor->mutex_);
693  cursor->offsets.resize(1);
694  idx = cursor->offsets.at(0);
695  // if we want to enforce batch size but we dont have a complete
696  // batch, skip the last rows.
697  if (enforceBatchSize_ && idx + batchSize_ > idxblob.numel()) {
698  idx = idxblob.numel();
699  }
700  if (loopOver_ && idx >= idxblob.numel()) {
701  cursor->offsets.at(0) = 0;
702  idx = 0;
703  }
704  cursor->offsets.at(0) += batchSize_;
705  }
706 
707  for (int i = 0; i < cursor->it.fields().size(); ++i) {
708  auto lengthIdx = cursor->it.fields()[i].lengthFieldId + 1;
709  auto& in = Input(i + 3);
710  outDim = in.sizes().vec();
711  outDim.at(0) = 0;
712  auto idxbegin = idx;
713  for (int j = 0; j < batchSize_; ++j) {
714  if (idx >= idxblob.numel()) {
715  break;
716  }
717  CAFFE_ENFORCE(
718  (idxvec[idx] + 1) * offsetdim[1] + lengthIdx < offsetsmat.numel(),
719  "Out of bound when trying to get elem from offsetsmat");
720  auto offsetptr = offsetsmat.template data<TOffset>() +
721  idxvec[idx] * offsetdim[1] + lengthIdx;
722  auto offset = *offsetptr;
723  auto size = *(offsetptr + offsetdim[1]) - offset;
724  outDim.at(0) += size; // accumulate over the batch
725  idx++;
726  }
727  idx = idxbegin; // reSet
728  auto* out = Output(i);
729  out->Resize(outDim);
730  if (out->numel() == 0) {
731  continue;
732  }
733  auto dst = static_cast<char*>(out->raw_mutable_data(in.dtype()));
734  int block_size = in.numel() / in.size(0);
735  auto block_bytesize = in.size_from_dim(1) * in.dtype().itemsize();
736  CAFFE_ENFORCE(
737  block_bytesize == in.nbytes() / in.size(0),
738  "block_bytesize should be consistent with data dim");
739  auto src_base = static_cast<const char*>(in.raw_data());
740  int start = 0;
741  for (int j = 0; j < batchSize_; ++j) {
742  if (idx >= idxblob.numel()) {
743  break;
744  }
745  auto offsetptr = offsetsmat.template data<TOffset>() +
746  idxvec[idx] * offsetdim[1] + lengthIdx;
747  auto offset = *offsetptr;
748  auto size = *(offsetptr + offsetdim[1]) - offset;
749  // copy data
750  auto src = src_base + offset * block_bytesize;
751  context_.CopyItemsSameDevice(
752  in.dtype(), size * block_size, src, dst + start * block_bytesize);
753  start += size;
754  idx++;
755  }
756  idx = idxbegin; // reSet
757  }
758  return true;
759  }
760  int batchSize_;
761  bool enforceBatchSize_;
762  bool loopOver_;
763 };
764 
765 template <class Context>
766 class AppendOp final : public Operator<Context> {
767  public:
768  USE_OPERATOR_CONTEXT_FUNCTIONS;
769  template <class... Args>
770  explicit AppendOp(Args&&... args)
771  : Operator<Context>(std::forward<Args>(args)...) {}
772 
773  bool RunOnDevice() override {
774  auto& a = Input(0);
775  auto& b = Input(1);
776  auto* c = Output(0);
777  CAFFE_ENFORCE(b.dim() >= 1);
778  if (a.numel() == 0 && a.size(0) == 0) {
779  c->CopyFrom(b);
780  return true;
781  }
782  CAFFE_ENFORCE(&a == c, "First argument must be in-place.");
783  CAFFE_ENFORCE(c->dim() == b.dim());
784  CAFFE_ENFORCE(b.dim() == c->dim());
785  CAFFE_ENFORCE(a.dtype() == b.dtype());
786  for (int i = 1; i < a.dim(); ++i) {
787  CAFFE_ENFORCE(a.sizes()[i] == b.sizes()[i]);
788  }
789  auto oldSize = c->numel();
790  c->Extend(b.sizes()[0], kDatasetGrowthPct);
791  auto* dst = (char*)c->raw_mutable_data() + oldSize * b.dtype().itemsize();
792  context_.CopyItemsSameDevice(b.dtype(), b.numel(), b.raw_data(), dst);
793  return true;
794  }
795 };
796 
797 template <class Context>
798 class AtomicAppendOp final : public Operator<Context> {
799  public:
800  USE_OPERATOR_CONTEXT_FUNCTIONS;
801  template <class... Args>
802  explicit AtomicAppendOp(Args&&... args)
803  : Operator<Context>(std::forward<Args>(args)...) {}
804 
805  bool RunOnDevice() override {
806  auto& mutex = OperatorBase::Input<std::unique_ptr<std::mutex>>(0);
807  const auto numFields = (InputSize() - 1) / 2;
808  CAFFE_ENFORCE(OutputSize() == numFields);
809 
810  std::lock_guard<std::mutex> guard(*mutex);
811 
812  // 1: checks
813  for (int i = 0; i < numFields; ++i) {
814  auto& a = Input(1 + i);
815  auto& b = Input(1 + i + numFields);
816  auto* c = Output(i);
817  CAFFE_ENFORCE(b.dim() >= 1);
818  if (a.numel() == 0) {
819  continue;
820  }
821  CAFFE_ENFORCE(
822  (void*)&a == (void*)c, "Appended-to arguments must be in-place.");
823  CAFFE_ENFORCE(c->dim() == b.dim());
824  CAFFE_ENFORCE(b.dim() == c->dim());
825  CAFFE_ENFORCE(a.dtype() == b.dtype());
826  for (int j = 1; j < a.dim(); ++j) {
827  CAFFE_ENFORCE(a.sizes()[j] == b.sizes()[j]);
828  }
829  }
830 
831  // 2: copies
832  for (int i = 0; i < numFields; ++i) {
833  auto& a = Input(1 + i);
834  auto& b = Input(1 + i + numFields);
835  auto* c = Output(i);
836  if (a.numel() == 0 && a.size(0) == 0) {
837  c->CopyFrom(b);
838  continue;
839  }
840  auto oldSize = c->numel();
841  c->Extend(b.sizes()[0], kDatasetGrowthPct);
842  auto* dst = (char*)c->raw_mutable_data() + oldSize * b.dtype().itemsize();
843  context_.CopyItemsSameDevice(b.dtype(), b.numel(), b.raw_data(), dst);
844  }
845  return true;
846  }
847 };
848 
849 template <class Context>
850 class CreateTensorVectorOp final : public Operator<Context> {
851  public:
852  USE_OPERATOR_CONTEXT_FUNCTIONS;
853  using Operator<Context>::Operator;
854 
855  bool RunOnDevice() override {
856  auto ptr = make_unique<std::vector<Tensor>>();
857  *OperatorBase::Output<TensorVectorPtr>(TENSOR_VECTOR) = std::move(ptr);
858  return true;
859  }
860 
861  private:
862  OUTPUT_TAGS(TENSOR_VECTOR);
863 };
864 
865 template <class Context>
866 class TensorVectorSizeOp final : public Operator<Context> {
867  public:
868  USE_OPERATOR_CONTEXT_FUNCTIONS;
869  USE_SIMPLE_CTOR_DTOR(TensorVectorSizeOp);
870 
871  bool RunOnDevice() override {
872  auto& vector_ptr = OperatorBase::Input<TensorVectorPtr>(TENSOR_VECTOR);
873  auto* size = Output(SIZE);
874  size->Resize();
875  // 32-bit should be enough here
876  *size->template mutable_data<int32_t>() = vector_ptr->size();
877  return true;
878  }
879 
880  private:
881  INPUT_TAGS(TENSOR_VECTOR);
882  OUTPUT_TAGS(SIZE);
883 };
884 
885 template <class Context>
886 class ConcatTensorVectorOp final : public Operator<Context> {
887  public:
888  USE_OPERATOR_CONTEXT_FUNCTIONS;
889  using Operator<Context>::Operator;
890 
891  bool RunOnDevice() override {
892  const TensorVectorPtr& tensorVector =
893  OperatorBase::Input<TensorVectorPtr>(TENSOR_VECTOR);
894 
895  auto* tensor = Output(TENSOR);
896  CAFFE_ENFORCE(!tensorVector->empty());
897 
898  vector<int64_t> outputDims(tensorVector->at(0).sizes().vec());
899  CAFFE_ENFORCE(outputDims.size() > 0);
900  for (int i = 1; i < tensorVector->size(); i++) {
901  // the tensor shapes are the same except for the first dimension
902  for (int j = 1; j < tensorVector->at(i).dim(); j++) {
903  CAFFE_ENFORCE(outputDims[j] == tensorVector->at(i).sizes()[j]);
904  }
905  CAFFE_ENFORCE(tensorVector->at(0).dtype() == tensorVector->at(i).dtype());
906  outputDims[0] += tensorVector->at(i).sizes()[0];
907  }
908 
909  tensor->Resize(outputDims);
910  int64_t offset = 0;
911  auto* dst = (char*)tensor->raw_mutable_data(tensorVector->at(0).dtype());
912 
913  for (const auto& t : *tensorVector) {
914  context_.CopyItemsSameDevice(
915  t.dtype(), t.numel(), t.raw_data(), dst + offset);
916  offset += t.nbytes();
917  }
918 
919  return true;
920  }
921 
922  private:
923  INPUT_TAGS(TENSOR_VECTOR);
924  OUTPUT_TAGS(TENSOR);
925 };
926 
927 template <class Context>
928 class CollectTensorOp final : public Operator<Context> {
929  public:
930  USE_OPERATOR_CONTEXT_FUNCTIONS;
931  template <class... Args>
932  explicit CollectTensorOp(Args&&... args)
933  : Operator<Context>(std::forward<Args>(args)...),
934  numToCollect_(
935  OperatorBase::GetSingleArgument<int>("num_to_collect", -1)),
936  numVisited_(0) {
937  CAFFE_ENFORCE(numToCollect_ > 0);
938  }
939 
940  bool RunOnDevice() override {
941  int pos = -1;
942  if (numVisited_ < numToCollect_) {
943  // append
944  pos = numVisited_;
945  } else {
946  auto& gen = context_.RandGenerator();
947  // uniform between [0, numVisited_]
948  std::uniform_int_distribution<int> uniformDist(0, numVisited_);
949  pos = uniformDist(gen);
950  if (pos >= numToCollect_) {
951  // discard
952  pos = -1;
953  }
954  }
955 
956  for (int i = 0; i < OutputSize(); ++i) {
957  // TENSOR_VECTOR_IN is enforced inplace with TENSOR_VECTOR_OUT
958  TensorVectorPtr& tensorVector = *OperatorBase::Output<TensorVectorPtr>(i);
959 
960  if (numVisited_ >= numToCollect_) {
961  CAFFE_ENFORCE(
962  tensorVector->size() == numToCollect_,
963  "TensorVecotor size = ",
964  tensorVector->size(),
965  " is different from numToCollect = ",
966  numToCollect_);
967  }
968 
969  const auto& tensor = Input(OutputSize() + i);
970 
971  if (pos < 0) {
972  // discard
973  CAFFE_ENFORCE(numVisited_ >= numToCollect_);
974  } else if (pos >= tensorVector->size()) {
975  // append
976  tensorVector->emplace_back();
977  ReinitializeAndCopyFrom(
978  &tensorVector->back(),
979  Context::GetDeviceType(),
980  tensor); // sync copy
981  } else {
982  // replace
983  tensorVector->at(pos).CopyFrom(tensor); // sync copy
984  }
985  }
986 
987  numVisited_++;
988  return true;
989  }
990 
991  private:
992  // number of tensors to collect
993  int numToCollect_;
994  // number of tensors visited
995  int numVisited_;
996 };
997 
998 class TrimDatasetOp : public Operator<CPUContext> {
999  public:
1000  template <class... Args>
1001  explicit TrimDatasetOp(Args&&... args)
1002  : Operator(std::forward<Args>(args)...),
1003  iterator_(OperatorBase::GetRepeatedArgument<std::string>("fields")),
1004  multiple_of_(OperatorBase::GetSingleArgument<int>("multiple_of", 1)) {
1005  CAFFE_ENFORCE_GE(multiple_of_, 1);
1006  }
1007 
1008  bool RunOnDevice() override {
1009  TreeCursor cursor(iterator_);
1010  TreeWalker walker(Inputs(), cursor);
1011 
1012  int trimmedSize = (walker.size() / multiple_of_) * multiple_of_;
1013  if (trimmedSize == walker.size()) {
1014  // we already satisfy the condition
1015  return true;
1016  }
1017  // advance desired number of records
1018  for (int i = 0; i < trimmedSize; ++i) {
1019  walker.advance();
1020  }
1021  // trim each column to the offset
1022  for (int col = 0; col < walker.fields().size(); ++col) {
1023  auto newOuterSize = walker.fields().at(col).offset();
1024  Output(col)->ShrinkTo(newOuterSize);
1025  }
1026  return true;
1027  }
1028 
1029  private:
1030  TreeIterator iterator_;
1031  int multiple_of_;
1032 };
1033 
1034 REGISTER_CPU_OPERATOR(CreateTreeCursor, CreateTreeCursorOp);
1035 REGISTER_CPU_OPERATOR(ResetCursor, ResetCursorOp);
1036 REGISTER_CPU_OPERATOR(ReadNextBatch, ReadNextBatchOp);
1037 REGISTER_CPU_OPERATOR(GetCursorOffset, GetCursorOffsetOp);
1038 REGISTER_CPU_OPERATOR(ComputeOffset, ComputeOffsetOp);
1039 REGISTER_CPU_OPERATOR(SortAndShuffle, SortAndShuffleOp);
1040 REGISTER_CPU_OPERATOR(ReadRandomBatch, ReadRandomBatchOp);
1041 REGISTER_CPU_OPERATOR(CheckDatasetConsistency, CheckDatasetConsistencyOp);
1042 REGISTER_CPU_OPERATOR(Append, AppendOp<CPUContext>);
1043 REGISTER_CPU_OPERATOR(AtomicAppend, AtomicAppendOp<CPUContext>);
1044 REGISTER_CPU_OPERATOR(CreateTensorVector, CreateTensorVectorOp<CPUContext>);
1045 REGISTER_CPU_OPERATOR(TensorVectorSize, TensorVectorSizeOp<CPUContext>);
1046 REGISTER_CPU_OPERATOR(ConcatTensorVector, ConcatTensorVectorOp<CPUContext>);
1047 REGISTER_CPU_OPERATOR(CollectTensor, CollectTensorOp<CPUContext>);
1048 REGISTER_CPU_OPERATOR(PackRecords, PackRecordsOp);
1049 REGISTER_CPU_OPERATOR(UnPackRecords, UnPackRecordsOp);
1050 REGISTER_CPU_OPERATOR(TrimDataset, TrimDatasetOp);
1051 
1052 OPERATOR_SCHEMA(CreateTreeCursor)
1053  .NumInputs(0)
1054  .NumOutputs(1)
1055  .SetDoc(R"DOC(
1056 Creates a cursor to iterate through a list of tensors, where some of those
1057 tensors contains the lengths in a nested schema. The schema is determined by
1058 the `fields` arguments.
1059 
1060 For example, to represent the following schema:
1061 
1062  Struct(
1063  a=Int(),
1064  b=List(List(Int),
1065  c=List(
1066  Struct(
1067  c1=String,
1068  c2=List(Int),
1069  ),
1070  ),
1071  )
1072 
1073 the field list will be:
1074  [
1075  "a",
1076  "b:lengths",
1077  "b:values:lengths",
1078  "b:values:values",
1079  "c:lengths",
1080  "c:c1",
1081  "c:c2:lengths",
1082  "c:c2:values",
1083  ]
1084 
1085 And for the following instance of the struct:
1086 
1087  Struct(
1088  a=3,
1089  b=[[4, 5], [6, 7, 8], [], [9]],
1090  c=[
1091  Struct(c1='alex', c2=[10, 11]),
1092  Struct(c1='bob', c2=[12]),
1093  ],
1094  )
1095 
1096 The values of the fields will be:
1097  {
1098  "a": [3],
1099  "b:lengths": [4],
1100  "b:values:lengths": [2, 3, 0, 1],
1101  "b:values:values": [4, 5, 6, 7, 8, 9],
1102  "c:lengths": [2],
1103  "c:c1": ["alex", "bob"],
1104  "c:c2:lengths": [2, 1],
1105  "c:c2:values", [10, 11, 12],
1106  }
1107 
1108 In general, every field name in the format "{prefix}:lengths" defines a domain
1109 "{prefix}", and every subsequent field in the format "{prefix}:{field}" will
1110 be in that domain, and the length of the domain is provided for each entry of
1111 the parent domain. In the example, "b:lengths" defines a domain of length 4, so
1112 every field under domain "b" will have 4 entries.
1113 The "lengths" field for a given domain must appear before any reference to
1114 that domain.
1115 
1116 Returns a pointer to an instance of the Cursor, which keeps the current offset
1117 on each of the domains defined by `fields`. Cursor also ensures thread-safety
1118 such that ReadNextBatch and ResetCursor can be used safely in parallel.
1119 
1120 A cursor does not contain data per se, so calls to ReadNextBatch actually need
1121 to pass a list of blobs containing the data to read for each one of the fields.
1122 )DOC")
1123  .Output(0, "cursor", "A blob pointing to an instance of a new TreeCursor.")
1124  .Arg(
1125  "fields",
1126  "A list of strings each one representing a field of the dataset.");
1127 
1128 OPERATOR_SCHEMA(ResetCursor)
1129  .NumInputs(1)
1130  .NumOutputs(0)
1131  .SetDoc(R"DOC(
1132 Resets the offsets for the given TreeCursor. This operation is thread safe.
1133 )DOC")
1134  .Input(0, "cursor", "A blob containing a pointer to the cursor.");
1135 
1136 OPERATOR_SCHEMA(ReadNextBatch)
1137  .NumInputs(1, INT_MAX)
1138  .NumOutputs(1, INT_MAX)
1139  .SetDoc(R"DOC(
1140 Read the next batch of examples out of the given cursor and data blobs.
1141 
1142 Input(0) is a blob pointing to a TreeCursor, and
1143 [Input(1),... Input(num_fields)] a list of tensors containing the data for
1144 each field of the dataset.
1145 
1146 ReadNextBatch is thread safe.
1147 )DOC")
1148  .Input(0, "cursor", "A blob containing a pointer to the cursor.")
1149  .Input(1, "dataset_field_0", "First dataset field")
1150  .Output(0, "field_0", "Tensor containing the next batch for field 0.")
1151  .Arg("batch_size", "Number of top-level entries to read.");
1152 
1153 OPERATOR_SCHEMA(GetCursorOffset)
1154  .NumInputs(1)
1155  .NumOutputs(1)
1156  .SetDoc("Get the current offset in the cursor.")
1157  .Input(0, "cursor", "A blob containing a pointer to the cursor.")
1158  .Output(0, "offsets", "Tensor containing the offsets for the cursor.");
1159 
1160 OPERATOR_SCHEMA(ComputeOffset)
1161  .NumInputs(1, INT_MAX)
1162  .NumOutputs(1)
1163  .SetDoc(R"DOC(
1164 Compute the offsets matrix given cursor and data blobs. Need to be ran at
1165 beginning or after reseting cursor
1166 
1167 Input(0) is a blob pointing to a TreeCursor, and
1168 [Input(1),... Input(num_fields)] a list of tensors containing the data for
1169 each field of the dataset.
1170 
1171 ComputeOffset is thread safe.
1172 )DOC")
1173  .Input(0, "cursor", "A blob containing a pointer to the cursor.")
1174  .Input(1, "dataset_field_0", "First dataset field")
1175  .Output(0, "field_0", "Tensor containing offset info for this chunk.");
1176 
1177 OPERATOR_SCHEMA(SortAndShuffle)
1178  .NumInputs(1, INT_MAX)
1179  .NumOutputs(1)
1180  .SetDoc(R"DOC(
1181 Compute the sorted indices given a field index to sort by and break the sorted
1182 indices into chunks of shuffle_size * batch_size and shuffle each chunk,
1183 finally we shuffle between batches. If sort_by_field_idx is -1 we skip sort.
1184 
1185 For example, we have data sorted as
1186 1,2,3,4,5,6,7,8,9,10,11,12
1187 
1188 and batchSize = 2 and shuffleSize = 3, when we shuffle we get:
1189 [3,1,4,6,5,2] [12,10,11,8,9,7]
1190 
1191 After this we will shuffle among different batches with size 2
1192 [3,1],[4,6],[5,2],[12,10],[11,8],[9,7]
1193 
1194 We may end up with something like
1195 [9,7],[5,2],[12,10],[4,6],[3,1],[11,8]
1196 
1197 Input(0) is a blob pointing to a TreeCursor, and
1198 [Input(1),... Input(num_fields)] a list of tensors containing the data for
1199 each field of the dataset.
1200 
1201 SortAndShuffle is thread safe.
1202 )DOC")
1203  .Input(0, "cursor", "A blob containing a pointer to the cursor.")
1204  .Input(1, "dataset_field_0", "First dataset field")
1205  .Output(0, "indices", "Tensor containing sorted indices.");
1206 
1207 OPERATOR_SCHEMA(ReadRandomBatch)
1208  .NumInputs(1, INT_MAX)
1209  .NumOutputs(1, INT_MAX)
1210  .SetDoc(R"DOC(
1211 Read the next batch of examples out of the given cursor,
1212 idx blob, offset matrix and data blobs.
1213 
1214 Input(0) is a blob pointing to a TreeCursor,
1215 Input(1) is a blob pointing to the shuffled idx
1216 Input(2) is a blob pointing to the offset matrix and
1217 [Input(3),... Input(num_fields)] a list of tensors containing the data for
1218 each field of the dataset.
1219 
1220 ReadRandomBatch is thread safe.
1221 )DOC")
1222  .Input(0, "cursor", "A blob containing a pointer to the cursor.")
1223  .Input(1, "idx", "idx with a shuffled order.")
1224  .Input(2, "offsetsmat", "offset matrix containing length offset info.")
1225  .Input(3, "dataset_field_0", "First dataset field")
1226  .Output(0, "field_0", "Tensor containing the next batch for field 0.")
1227  .Arg("batch_size", "Number of top-level entries to read.")
1228  .Arg("loop_over", "(bool) Repeat the dataset indefinitely");
1229 
1230 OPERATOR_SCHEMA(CheckDatasetConsistency)
1231  .NumInputs(1, INT_MAX)
1232  .NumOutputs(0)
1233  .SetDoc(R"DOC(
1234 Checks that the given data fields represents a consistent dataset under
1235 the schema specified by the `fields` argument. Operator fails if the fields
1236 are not consistent. If data is consistent, each field's data can be safely
1237 appended to an existing dataset, keeping it consistent.
1238 )DOC")
1239  .Input(0, "field_0", "Data for field 0.")
1240  .Arg(
1241  "fields",
1242  "List of strings representing the string names in the format"
1243  "specified in the doc for CreateTreeCursor.");
1244 
1245 OPERATOR_SCHEMA(Append)
1246  .NumInputs(2)
1247  .NumOutputs(1)
1248  .EnforceInplace({{0, 0}})
1249  .SetDoc(R"DOC(
1250 Append input `B` to the end of input `A`.
1251 
1252 - It is required that this operation run in-place, meaning that the input `A` blob must match the output blob.
1253 - All except the outer-most dimension must be the same between `A` and `B`.
1254 - Input `A` may have to be re-allocated in order for accommodate to the new size. Currently, an exponential growth ratio is used in order to ensure amortized constant time complexity.
1255 
1256 Github Links:
1257 - https://github.com/pytorch/pytorch/blob/master/caffe2/operators/dataset_ops.cc
1258 
1259 <details>
1260 
1261 <summary> <b>Example</b> </summary>
1262 
1263 **Code**
1264 
1265 ```
1266 
1267 workspace.ResetWorkspace()
1268 
1269 op = core.CreateOperator(
1270  "Append",
1271  ["A", "B"],
1272  ["A"],
1273 )
1274 
1275 workspace.FeedBlob("A", np.random.randint(10, size=(1,3,3)))
1276 workspace.FeedBlob("B", np.random.randint(10, size=(2,3,3)))
1277 print("A:", workspace.FetchBlob("A"))
1278 print("B:", workspace.FetchBlob("B"))
1279 workspace.RunOperatorOnce(op)
1280 print("A:", workspace.FetchBlob("A"))
1281 
1282 ```
1283 
1284 **Result**
1285 
1286 ```
1287 
1288 A:
1289 [[[3 8 7]
1290  [1 6 6]
1291  [5 0 6]]]
1292 B:
1293 [[[4 3 1]
1294  [7 9 6]
1295  [9 4 5]]
1296 
1297  [[7 7 4]
1298  [9 8 7]
1299  [1 6 6]]]
1300 A:
1301 [[[3 8 7]
1302  [1 6 6]
1303  [5 0 6]]
1304 
1305  [[4 3 1]
1306  [7 9 6]
1307  [9 4 5]]
1308 
1309  [[7 7 4]
1310  [9 8 7]
1311  [1 6 6]]]
1312 
1313 ```
1314 
1315 </details>
1316 
1317 )DOC")
1318  .Input(0, "A", "(*Tensor*): base input tensor of shape $(N, d_1, d_2, ..., d_n)$")
1319  .Input(1, "B", "(*Tensor*): second input tensor of shape $(M, d_1, d_2, ..., d_n)$ to be appended to the base")
1320  .Output(0, "A", "(*Tensor*): output tensor of shape $(N+M, d_1, d_2, ..., d_n)$");
1321 
1322 OPERATOR_SCHEMA(AtomicAppend)
1323  .NumInputs(3, INT_MAX)
1324  .NumOutputs(1, INT_MAX)
1325  .AllowInplace([](int in, int out) { return in == out + 1; });
1326 
1327 OPERATOR_SCHEMA(CreateTensorVector)
1328  .NumInputs(0)
1329  .NumOutputs(1)
1330  .SetDoc("Create a std::unique_ptr<std::vector<Tensor> >");
1331 
1332 OPERATOR_SCHEMA(TensorVectorSize)
1333  .NumInputs(1)
1334  .NumOutputs(1)
1335  .SetDoc("Get the size of the input vector")
1336  .Input(0, "tensor vector", "std::unique_ptr<std::vector<Tensor> >")
1337  .Output(0, "size", "int32_t size");
1338 
1339 OPERATOR_SCHEMA(ConcatTensorVector)
1340  .NumInputs(1)
1341  .NumOutputs(1)
1342  .SetDoc(R"DOC(
1343 Concat Tensors in the std::unique_ptr<std::vector<Tensor> >
1344 along the first dimension.
1345  )DOC")
1346  .Input(0, "vector of Tensor", "std::unique_ptr<std::vector<Tensor> >")
1347  .Output(0, "tensor", "tensor after concatenating");
1348 
1349 OPERATOR_SCHEMA(CollectTensor)
1350  .NumInputs([](int n) { return n > 0 && n % 2 == 0; })
1351  .NumOutputs(1, INT_MAX)
1352  .NumInputsOutputs([](int in, int out) { return in == out * 2; })
1353  .EnforceInplace([](int in, int out) { return in == out; })
1354  .SetDoc(R"DOC(
1355 Collect tensor into tensor vector by reservoir sampling,
1356 argument num_to_collect indicates the max number of tensors that will be
1357 collected. The first half of the inputs are tensor vectors, which are also the
1358 outputs. The second half of the inputs are the tensors to be collected into each
1359 vector (in the same order). The input tensors are collected in all-or-none
1360 manner. If they are collected, they will be placed at the same index in the
1361 output vectors.
1362 )DOC")
1363  .Arg("num_to_collect", "The max number of tensors to collect");
1364 
1365 OPERATOR_SCHEMA(PackRecords)
1366  .NumInputs(1, INT_MAX)
1367  .NumOutputs(1)
1368  .SetDoc(R"DOC(
1369 Given a dataset under a schema specified by the `fields` argument will pack all
1370 the input tensors into one, where each tensor element represents a row of data
1371 (batch of size 1). This format allows easier use with the rest of Caffe2
1372 operators.
1373 )DOC")
1374  .Arg(
1375  "fields",
1376  "List of strings representing the string names in the format"
1377  "specified in the doc for CreateTreeCursor.")
1378  .Output(
1379  0,
1380  "tensor",
1381  "One dimensional tensor having a complex type of SharedTensorVectorPtr."
1382  " In order to reverse it back to the original input it has to be "
1383  "inserted into UnPackRecordsOp.");
1384 
1385 OPERATOR_SCHEMA(TrimDataset)
1386  .NumInputs(1, INT_MAX)
1387  .NumOutputs(1, INT_MAX)
1388  .SetDoc(R"DOC(
1389 Trim the given dataset inplace, given the dataset blobs and the field specs.
1390 Trimming happens such that the dataset will contain the largest possible number
1391 of records that is a multiple of the 'multiple_of' argument.
1392 )DOC")
1393  .EnforceInplace([](int input, int output) { return input == output; })
1394  .Arg(
1395  "fields",
1396  "List of strings representing the string names in the format"
1397  "specified in the doc for CreateTreeCursor.");
1398 
1399 OPERATOR_SCHEMA(UnPackRecords)
1400  .NumInputs(1, INT_MAX)
1401  .NumOutputs(1, INT_MAX)
1402  .SetDoc(R"DOC(
1403 Given a packed dataset (packed by the PackRecordsOp) and the `fields` argument
1404 describing the datasets schema returns the original dataset format. Number of
1405 returned tensors is equal to the number of fields in the `fields` argument.
1406 
1407 The first input is the packed tensor to be unpacked. Optionally, you can provide
1408 prototype tensors to give the expected shapes of the output tensors. This is
1409 helpful when you expected to unpack empty tensor, e.g., output of a sampling
1410 process.
1411 )DOC")
1412  .Arg(
1413  "fields",
1414  "List of strings representing the string names in the format"
1415  "specified in the doc for CreateTreeCursor.")
1416  .Input(0, "packed_tensor", "The tensor to be unpacked");
1417 
1418 SHOULD_NOT_DO_GRADIENT(CreateTreeCursor);
1419 SHOULD_NOT_DO_GRADIENT(ResetCursor);
1420 SHOULD_NOT_DO_GRADIENT(ReadNextBatch);
1421 SHOULD_NOT_DO_GRADIENT(ComputeOffset);
1422 SHOULD_NOT_DO_GRADIENT(ReadRandomBatch);
1423 SHOULD_NOT_DO_GRADIENT(CheckDatasetConsistency);
1424 SHOULD_NOT_DO_GRADIENT(Append);
1425 SHOULD_NOT_DO_GRADIENT(AtomicAppend);
1426 SHOULD_NOT_DO_GRADIENT(CreateTensorVector);
1427 SHOULD_NOT_DO_GRADIENT(TensorVectorSize);
1428 SHOULD_NOT_DO_GRADIENT(ConcatTensorVector);
1429 SHOULD_NOT_DO_GRADIENT(CollectTensor);
1430 SHOULD_NOT_DO_GRADIENT(UnPackRecords);
1431 SHOULD_NOT_DO_GRADIENT(PackRecords);
1432 
1433 class TreeCursorSerializer : public BlobSerializerBase {
1434  public:
1435  TreeCursorSerializer() {}
1436  ~TreeCursorSerializer() override {}
1437 
1438  void Serialize(
1439  const void* pointer,
1440  TypeMeta typeMeta,
1441  const string& name,
1442  SerializationAcceptor acceptor) override {
1443  CAFFE_ENFORCE(typeMeta.Match<std::unique_ptr<TreeCursor>>());
1444  const auto& cursor =
1445  *static_cast<const std::unique_ptr<TreeCursor>*>(pointer);
1446  BlobProto blob_proto;
1447 
1448  // serialize offsets as a tensor
1449  if (cursor->offsets.size() > 0) {
1450  Blob offsets_blob;
1451  auto* offsets = BlobGetMutableTensor(&offsets_blob, CPU);
1452  offsets->Resize(cursor->offsets.size());
1453  std::copy(
1454  cursor->offsets.begin(),
1455  cursor->offsets.end(),
1456  offsets->template mutable_data<TOffset>());
1457  TensorSerializer ser;
1458  ser.Serialize(
1459  *offsets, name, blob_proto.mutable_tensor(), 0, offsets->numel());
1460  }
1461  blob_proto.set_name(name);
1462  blob_proto.set_type("std::unique_ptr<TreeCursor>");
1463 
1464  // serialize field names in the content
1465  std::ostringstream os;
1466  for (const auto& field : cursor->it.fields()) {
1467  os << field.name << " ";
1468  }
1469  blob_proto.set_content(os.str());
1470 
1471  acceptor(name, SerializeBlobProtoAsString_EnforceCheck(blob_proto));
1472  }
1473 };
1474 
1475 class TreeCursorDeserializer : public BlobDeserializerBase {
1476  public:
1477  void Deserialize(const BlobProto& proto, Blob* blob) override {
1478  // Deserialize the field names
1479  std::vector<std::string> fieldNames;
1480  std::istringstream is(proto.content());
1481  std::string field;
1482  while (true) {
1483  is >> field;
1484  if (is.eof()) {
1485  break;
1486  }
1487  fieldNames.push_back(field);
1488  }
1489  TreeIterator it(fieldNames);
1490 
1491  auto* base = blob->template GetMutable<std::unique_ptr<TreeCursor>>();
1492  CAFFE_ENFORCE(base != nullptr, "TreeCursor doesn't exist.");
1493  (*base).reset(new TreeCursor(it));
1494 
1495  // Deserialize the offset vector when it is not empty. The proto.tensor()
1496  // function will return a TensorProto associated with offset vector. The
1497  // offset vector contains fields of type int64_t, and we verify it is not
1498  // empty before calling the deserializer.
1499  if (proto.tensor().int64_data().size() > 0) {
1500  TensorDeserializer deser;
1501  Blob offset_blob;
1502  deser.Deserialize(proto, &offset_blob);
1503  auto& offsets = offset_blob.template Get<Tensor>();
1504  auto* offsets_ptr = offsets.data<TOffset>();
1505  (*base)->offsets.assign(offsets_ptr, offsets_ptr + offsets.numel());
1506  }
1507  }
1508 };
1509 
1510 REGISTER_BLOB_SERIALIZER(
1511  (TypeMeta::Id<std::unique_ptr<TreeCursor>>()),
1512  TreeCursorSerializer);
1513 REGISTER_BLOB_DESERIALIZER(std::unique_ptr<TreeCursor>, TreeCursorDeserializer);
1514 
1515 } // namespace
1516 
1517 void SharedTensorVectorPtrSerializer::Serialize(
1518  const void* pointer,
1519  TypeMeta typeMeta,
1520  const string& name,
1521  BlobSerializerBase::SerializationAcceptor acceptor) {
1522  /* This is dummy serialize that doesn't save anything. If saving the content
1523  is desired in future use case, you can change this serializer. Note: special
1524  care need to be taken for the parameter initialization of
1525  LastNWindowCollectorOp and ReservoirSamplingOp if this serializer actually
1526  saves the content.
1527  */
1528  CAFFE_ENFORCE(typeMeta.Match<std::shared_ptr<std::vector<TensorCPU>>>());
1529  BlobProto blob_proto;
1530  blob_proto.set_name(name);
1531  blob_proto.set_type("std::shared_ptr<std::vector<TensorCPU>>");
1532  blob_proto.set_content("");
1533  acceptor(name, SerializeBlobProtoAsString_EnforceCheck(blob_proto));
1534 };
1535 
1536 void SharedTensorVectorPtrDeserializer::Deserialize(
1537  const BlobProto& /* unused */,
1538  Blob* blob) {
1539  /* This is dummy deserialize which creates a nullptr
1540  */
1541  blob->GetMutable<std::shared_ptr<std::vector<TensorCPU>>>();
1542 }
1543 
1544 REGISTER_BLOB_SERIALIZER(
1545  (TypeMeta::Id<std::shared_ptr<std::vector<TensorCPU>>>()),
1546  SharedTensorVectorPtrSerializer);
1547 
1548 REGISTER_BLOB_DESERIALIZER(
1549  std::shared_ptr<std::vector<TensorCPU>>,
1550  SharedTensorVectorPtrDeserializer);
1551 
1552 } // namespace dataset_ops
1553 } // namespace caffe2
caffe2::TypeMeta dtype() const noexcept
Returns a Tensor&#39;s dtype (TypeMeta). Defined in TensorMethods.h.
const Tensor & Input(int idx, DeviceType type=Context::GetDeviceType())
Retrieve a non-owning reference to the input at position &#39;idx&#39; for this operator. ...
Definition: operator.h:702
A global dictionary that holds information about what Caffe2 modules have been loaded in the current ...
Definition: blob.h:13