Caffe2 - C++ API
A deep learning, cross platform ML framework
partition_ops.h
1 
17 #ifndef CAFFE2_OPERATORS_PARTITION_OPS_H_
18 #define CAFFE2_OPERATORS_PARTITION_OPS_H_
19 
20 #include "caffe2/core/context.h"
21 #include "caffe2/core/operator.h"
22 
23 namespace caffe2 {
24 
25 template <typename Index>
26 static inline int moduloPartition(Index key, int numPartitions) {
27  int shard = key % numPartitions;
28  // equivalent to `if (shard < 0) shard += partitions;`
29  shard += numPartitions & (shard >> (sizeof(int) * 8 - 1));
30  return shard;
31 }
32 
33 class GatherByKeyOp : public Operator<CPUContext> {
34  public:
35  USE_DISPATCH_HELPER;
36  USE_OPERATOR_FUNCTIONS(CPUContext);
37  GatherByKeyOp(const OperatorDef& operator_def, Workspace* ws)
38  : Operator<CPUContext>(operator_def, ws) {}
39 
40  private:
41  bool RunOnDevice() override {
42  return DispatchHelper<TensorTypes<int32_t, int64_t>>::call(this, Input(0));
43  }
44 
45  private:
46  template <typename Index>
47  bool DoRunWithType() {
48  const auto numPartitions = InputSize() - 1;
49  CAFFE_ENFORCE_GE(numPartitions, 1);
50  const auto& keysTensor = Input(0);
51  const auto* keysData = keysTensor.template data<Index>();
52  const auto& keysShape = Input(0).dims();
53  CAFFE_ENFORCE_EQ(
54  keysShape.size(), 1, "Only 1D keys tensor supported currently.");
55 
56  // 1. Shape and type consistency checks
57  const auto& in0Shape = Input(1).dims();
58  CAFFE_ENFORCE_GE(in0Shape.size(), 1);
59 
60  vector<TIndex> outShape(keysShape);
61  outShape.insert(outShape.end(), in0Shape.begin() + 1, in0Shape.end());
62 
63  CAFFE_ENFORCE_GE(outShape.size(), 1);
64  auto totalSize = in0Shape[0];
65  auto meta = Input(1).meta();
66  for (int i = 2; i < InputSize(); ++i) {
67  const auto& input = Input(i);
68  CAFFE_ENFORCE(meta == input.meta());
69  CAFFE_ENFORCE_GE(input.ndim(), 1);
70  CAFFE_ENFORCE(std::equal(
71  outShape.begin() + keysShape.size(),
72  outShape.end(),
73  input.dims().begin() + 1));
74  totalSize += input.dim(0);
75  }
76  CAFFE_ENFORCE_EQ(keysTensor.size(), totalSize);
77 
78  auto* outTensor = Output(0);
79  outTensor->Resize(outShape);
80  auto* outData = static_cast<char*>(outTensor->raw_mutable_data(meta));
81  const auto blockSize = outTensor->size_from_dim(1);
82 
83  inputDatas_.resize(numPartitions);
84  for (int i = 0; i < numPartitions; ++i) {
85  inputDatas_[i] = static_cast<const char*>(Input(i + 1).raw_data());
86  }
87  inStartOffsets_.assign(numPartitions, 0);
88  Index outStartOffset = 0;
89  int currentShard = -1;
90 
91  // 2. copy from inputs into output based on shard for each input key
92  const auto numEntries = keysTensor.size();
93  for (int64_t i = 0; i <= numEntries; ++i) {
94  auto newShard =
95  i < numEntries ? moduloPartition(keysData[i], numPartitions) : -1;
96  if (newShard != currentShard) {
97  if (currentShard != -1) {
98  auto inStartOffset = inStartOffsets_[currentShard];
99  auto numItems = i - outStartOffset;
100  context_.template CopyItems<CPUContext, CPUContext>(
101  meta,
102  numItems * blockSize,
103  inputDatas_[currentShard] +
104  inStartOffset * blockSize * meta.itemsize(),
105  outData + outStartOffset * blockSize * meta.itemsize());
106  inStartOffsets_[currentShard] += numItems;
107  }
108  currentShard = newShard;
109  outStartOffset = i;
110  }
111  }
112 
113  return true;
114  }
115 
116  std::vector<const char*> inputDatas_;
117  std::vector<int64_t> inStartOffsets_;
118 };
119 
120 class PartitionOpBase : public Operator<CPUContext> {
121  public:
122  USE_OPERATOR_FUNCTIONS(CPUContext);
123 
124  PartitionOpBase(const OperatorDef& operator_def, Workspace* ws)
125  : Operator<CPUContext>(operator_def, ws),
126  OP_SINGLE_ARG(int, "pack_first_input", pack_first_input_, 0) {}
127 
128  protected:
129  template <typename Index>
130  void ApplyPartition(bool skipFirstArgument) {
131  CAFFE_ENFORCE_EQ(
132  OutputSize() % InputSize(),
133  0,
134  "Output number must be a multiple of input number");
135  int partitions = OutputSize() / InputSize();
136  int inputSize = InputSize();
137  int mainInputIndex = skipFirstArgument;
138  CAFFE_ENFORCE_GT(partitions, 0, "Invalid number of partitions");
139 
140  auto& main_input = Input(mainInputIndex);
141  TIndex size = main_input.size();
142  const Index* data = main_input.template data<Index>();
143  counts_.assign(partitions, 0);
144  for (TIndex p = 0; p < size; p++) {
145  int shard = moduloPartition(data[p], partitions);
146  ++counts_[shard];
147  }
148 
149  raw_datas_.resize(inputSize);
150  block_sizes_.resize(inputSize);
151  metas_.resize(inputSize);
152  out_datas_.resize(OutputSize());
153  for (int i = mainInputIndex; i < inputSize; ++i) {
154  auto& input = Input(i);
155  if (i > mainInputIndex) {
156  CAFFE_ENFORCE_GE(
157  input.ndim(),
158  main_input.ndim(),
159  "Prefix of extra input's shape must match main input's shape, ",
160  "input: ",
161  i);
162  for (int j = 0; j < main_input.ndim(); ++j) {
163  CAFFE_ENFORCE_GE(
164  input.dim(j),
165  main_input.dim(j),
166  "Prefix of extra input's shape must match main input's shape, ",
167  "input: ",
168  i,
169  ", dim ",
170  j);
171  }
172  }
173  raw_datas_[i] = input.raw_data();
174  block_sizes_[i] = input.size_from_dim(main_input.ndim());
175  metas_[i] = input.meta();
176  // shape = partition_size + suffix of input dims
177  vector<TIndex> shape(
178  input.dims().begin() + main_input.ndim() - 1, input.dims().end());
179  for (int j = 0; j < partitions; ++j) {
180  int out_idx = i + j * inputSize;
181  auto output = Output(out_idx);
182  shape[0] = counts_[j];
183  output->Resize(shape);
184  out_datas_[out_idx] = output->raw_mutable_data(input.meta());
185  }
186  }
187 
188  counts_.assign(partitions, 0);
189  for (TIndex p = 0; p < size; p++) {
190  int shard = moduloPartition(data[p], partitions);
191  TIndex idx = counts_[shard]++;
192 
193  // special case first input
194  static_cast<Index*>(out_datas_[shard * inputSize + mainInputIndex])[idx] =
195  pack_first_input_ ? ((data[p] - shard) / partitions) : data[p];
196 
197  int baseIndex = shard * inputSize;
198  for (int i = mainInputIndex + 1; i < inputSize; ++i) {
199  auto bs = block_sizes_[i];
200  auto meta = metas_[i];
201  // special case for small bs?
202  context_.template CopyItems<CPUContext, CPUContext>(
203  meta,
204  bs,
205  static_cast<const char*>(raw_datas_[i]) + p * bs * meta.itemsize(),
206  static_cast<char*>(out_datas_[baseIndex + i]) +
207  idx * bs * meta.itemsize());
208  }
209  }
210  }
211 
212  bool pack_first_input_;
213 
214  // use member fields to reuse memory
215  vector<TIndex> counts_;
216  vector<TIndex> block_sizes_;
217  vector<TypeMeta> metas_;
218  vector<const void*> raw_datas_;
219  vector<void*> out_datas_;
220 };
221 
222 class PartitionOp : public PartitionOpBase {
223  public:
224  USE_DISPATCH_HELPER;
225 
226  PartitionOp(const OperatorDef& operator_def, Workspace* ws)
227  : PartitionOpBase(operator_def, ws) {}
228 
229  bool RunOnDevice() override {
230  return DispatchHelper<TensorTypes<int32_t, int64_t>>::call(this, Input(0));
231  }
232 
233  private:
234  template <typename Index>
235  bool DoRunWithType() {
236  ApplyPartition<Index>(false /* skipFirstArgument */);
237  return true;
238  }
239 
240  DISABLE_COPY_AND_ASSIGN(PartitionOp);
241 };
242 
244  public:
245  USE_DISPATCH_HELPER;
246 
247  LengthsPartitionOp(const OperatorDef& operator_def, Workspace* ws)
248  : PartitionOpBase(operator_def, ws) {}
249 
250  bool RunOnDevice() override {
251  return DispatchHelper<TensorTypes<int32_t, int64_t>>::call(this, Input(1));
252  }
253 
254  private:
255  template <typename Index>
256  bool DoRunWithType() {
257  CAFFE_ENFORCE(
258  OutputSize() % InputSize() == 0,
259  "Output number must be a multiple of input number");
260  int partitions = OutputSize() / InputSize();
261  CAFFE_ENFORCE_GT(partitions, 0, "Invalid number of partitions");
262  CAFFE_ENFORCE_EQ(
263  Input(1).ndim(),
264  1,
265  "Only 1-D tensors supported as a partitioning tensor for sharding");
266 
267  // Apply sharding to all parameters except lengths
268  ApplyPartition<Index>(true /* skipFirstArgument */);
269 
270  // Compute lengths after sharding
271  auto& main_input = Input(1);
272  TIndex size = main_input.size();
273  const Index* data = main_input.template data<Index>();
274 
275  auto& length_input = Input(0);
276  TIndex elements = length_input.size();
277  const int32_t* lengths_data = length_input.template data<int32_t>();
278  out_length_.resize(partitions);
279  for (int i = 0; i < partitions; ++i) {
280  auto& output = *Output(i * InputSize());
281  output.Resize(elements);
282  out_length_[i] = output.template mutable_data<int32_t>();
283  }
284 
285  int total_length = 0;
286  for (int i = 0; i < elements; ++i) {
287  total_length += lengths_data[i];
288  }
289  CAFFE_ENFORCE(
290  total_length == size,
291  "Total length is not matching to the number of elements");
292 
293  int index = 0;
294  for (int i = 0; i < elements; ++i) {
295  for (int j = 0; j < partitions; ++j) {
296  out_length_[j][i] = 0;
297  }
298  for (int j = 0; j < lengths_data[i]; ++j, ++index) {
299  int shard = moduloPartition(data[index], partitions);
300  ++out_length_[shard][i];
301  }
302  }
303  return true;
304  }
305 
306  DISABLE_COPY_AND_ASSIGN(LengthsPartitionOp);
307 
308  vector<int32_t*> out_length_;
309 };
310 
311 } // namespace caffe2
312 
313 #endif // CAFFE2_OPERATORS_PARTITION_OPS_H_
const TypeMeta & meta() const
Returns the TypeMeta object associated with the current data type.
Definition: tensor.h:664
The CPU Context, representing the bare minimum of what a Context class in Caffe2 should implement...
Definition: context.h:82
const vector< TIndex > & dims() const
Returns the dimensions of the tensor as a vector.
Definition: tensor.h:627
Workspace is a class that holds all the related objects created during runtime: (1) all blobs...
Definition: workspace.h:63
Copyright (c) 2016-present, Facebook, Inc.
const void * raw_data() const
Returns a const raw void* pointer of the underlying storage.
Definition: tensor.h:488