Caffe2 - C++ API
A deep learning, cross platform ML framework
partition_ops.h
1 #ifndef CAFFE2_OPERATORS_PARTITION_OPS_H_
2 #define CAFFE2_OPERATORS_PARTITION_OPS_H_
3 
4 #include "caffe2/core/context.h"
5 #include "caffe2/core/operator.h"
6 
7 namespace caffe2 {
8 
9 template <typename Index>
10 static inline int moduloPartition(Index key, int numPartitions) {
11  int shard = key % numPartitions;
12  // equivalent to `if (shard < 0) shard += partitions;`
13  shard += numPartitions & (shard >> (sizeof(int) * 8 - 1));
14  return shard;
15 }
16 
17 class GatherByKeyOp : public Operator<CPUContext> {
18  public:
19  USE_DISPATCH_HELPER;
20  USE_OPERATOR_FUNCTIONS(CPUContext);
21  template <class... Args>
22  explicit GatherByKeyOp(Args&&... args)
23  : Operator<CPUContext>(std::forward<Args>(args)...) {}
24 
25  private:
26  bool RunOnDevice() override {
28  }
29 
30  private:
31  template <typename Index>
32  bool DoRunWithType() {
33  const auto numPartitions = InputSize() - 1;
34  CAFFE_ENFORCE_GE(numPartitions, 1);
35  const auto& keysTensor = Input(0);
36  const auto* keysData = keysTensor.template data<Index>();
37  const auto& keysShape = Input(0).sizes();
38  CAFFE_ENFORCE_EQ(
39  keysShape.size(), 1, "Only 1D keys tensor supported currently.");
40 
41  // 1. Shape and type consistency checks
42  const auto& in0Shape = Input(1).sizes();
43  CAFFE_ENFORCE_GE(in0Shape.size(), 1);
44 
45  vector<int64_t> outShape(keysShape.vec());
46  outShape.insert(outShape.end(), in0Shape.begin() + 1, in0Shape.end());
47 
48  CAFFE_ENFORCE_GE(outShape.size(), 1);
49  auto totalSize = in0Shape[0];
50  auto meta = Input(1).dtype();
51  for (int i = 2; i < InputSize(); ++i) {
52  const auto& input = Input(i);
53  CAFFE_ENFORCE(meta == input.dtype());
54  CAFFE_ENFORCE_GE(input.dim(), 1);
55  CAFFE_ENFORCE(std::equal(
56  outShape.begin() + keysShape.size(),
57  outShape.end(),
58  input.sizes().begin() + 1));
59  totalSize += input.size(0);
60  }
61  CAFFE_ENFORCE_EQ(keysTensor.numel(), totalSize);
62 
63  auto* outTensor = Output(0);
64  outTensor->Resize(outShape);
65  auto* outData = static_cast<char*>(outTensor->raw_mutable_data(meta));
66  const auto blockSize = outTensor->size_from_dim(1);
67 
68  inputDatas_.resize(numPartitions);
69  for (int i = 0; i < numPartitions; ++i) {
70  inputDatas_[i] = static_cast<const char*>(Input(i + 1).raw_data());
71  }
72  inStartOffsets_.assign(numPartitions, 0);
73  Index outStartOffset = 0;
74  int currentShard = -1;
75 
76  // 2. copy from inputs into output based on shard for each input key
77  const auto numEntries = keysTensor.numel();
78  for (int64_t i = 0; i <= numEntries; ++i) {
79  auto newShard =
80  i < numEntries ? moduloPartition(keysData[i], numPartitions) : -1;
81  if (newShard != currentShard) {
82  if (currentShard != -1) {
83  auto inStartOffset = inStartOffsets_[currentShard];
84  auto numItems = i - outStartOffset;
85  context_.CopyItemsSameDevice(
86  meta,
87  numItems * blockSize,
88  inputDatas_[currentShard] +
89  inStartOffset * blockSize * meta.itemsize(),
90  outData + outStartOffset * blockSize * meta.itemsize());
91  inStartOffsets_[currentShard] += numItems;
92  }
93  currentShard = newShard;
94  outStartOffset = i;
95  }
96  }
97 
98  return true;
99  }
100 
101  std::vector<const char*> inputDatas_;
102  std::vector<int64_t> inStartOffsets_;
103 };
104 
105 class PartitionOpBase : public Operator<CPUContext> {
106  public:
107  USE_OPERATOR_FUNCTIONS(CPUContext);
108 
109  template <class... Args>
110  explicit PartitionOpBase(Args&&... args)
111  : Operator<CPUContext>(std::forward<Args>(args)...),
112  OP_SINGLE_ARG(int, "pack_first_input", pack_first_input_, 0) {}
113 
114  protected:
115  template <typename Index>
116  void ApplyPartition(bool skipFirstArgument) {
117  CAFFE_ENFORCE_EQ(
118  OutputSize() % InputSize(),
119  0,
120  "Output number must be a multiple of input number");
121  int partitions = OutputSize() / InputSize();
122  int inputSize = InputSize();
123  int mainInputIndex = skipFirstArgument;
124  CAFFE_ENFORCE_GT(partitions, 0, "Invalid number of partitions");
125 
126  auto& main_input = Input(mainInputIndex);
127  int64_t size = main_input.numel();
128  const Index* data = main_input.template data<Index>();
129  counts_.assign(partitions, 0);
130  for (int64_t p = 0; p < size; p++) {
131  int shard = moduloPartition(data[p], partitions);
132  ++counts_[shard];
133  }
134 
135  raw_datas_.resize(inputSize);
136  block_sizes_.resize(inputSize);
137  metas_.resize(inputSize);
138  out_datas_.resize(OutputSize());
139  for (int i = mainInputIndex; i < inputSize; ++i) {
140  auto& input = Input(i);
141  if (i > mainInputIndex) {
142  CAFFE_ENFORCE_GE(
143  input.dim(),
144  main_input.dim(),
145  "Prefix of extra input's shape must match main input's shape, ",
146  "input: ",
147  i);
148  for (int j = 0; j < main_input.dim(); ++j) {
149  CAFFE_ENFORCE_GE(
150  input.size(j),
151  main_input.size(j),
152  "Prefix of extra input's shape must match main input's shape, ",
153  "input: ",
154  i,
155  ", dim ",
156  j);
157  }
158  }
159  raw_datas_[i] = input.raw_data();
160  block_sizes_[i] = input.size_from_dim(main_input.dim());
161  metas_[i] = input.dtype();
162  // shape = partition_size + suffix of input dims
163  vector<int64_t> shape(
164  input.sizes().begin() + main_input.dim() - 1, input.sizes().end());
165  for (int j = 0; j < partitions; ++j) {
166  int out_idx = i + j * inputSize;
167  auto output = Output(out_idx);
168  shape[0] = counts_[j];
169  output->Resize(shape);
170  out_datas_[out_idx] = output->raw_mutable_data(input.dtype());
171  }
172  }
173 
174  counts_.assign(partitions, 0);
175  for (int64_t p = 0; p < size; p++) {
176  int shard = moduloPartition(data[p], partitions);
177  int64_t idx = counts_[shard]++;
178 
179  // special case first input
180  static_cast<Index*>(out_datas_[shard * inputSize + mainInputIndex])[idx] =
181  pack_first_input_ ? ((data[p] - shard) / partitions) : data[p];
182 
183  int baseIndex = shard * inputSize;
184  for (int i = mainInputIndex + 1; i < inputSize; ++i) {
185  auto bs = block_sizes_[i];
186  auto meta = metas_[i];
187  // special case for small bs?
188  context_.CopyItemsSameDevice(
189  meta,
190  bs,
191  static_cast<const char*>(raw_datas_[i]) + p * bs * meta.itemsize(),
192  static_cast<char*>(out_datas_[baseIndex + i]) +
193  idx * bs * meta.itemsize());
194  }
195  }
196  }
197 
198  bool pack_first_input_;
199 
200  // use member fields to reuse memory
201  vector<int64_t> counts_;
202  vector<int64_t> block_sizes_;
203  vector<TypeMeta> metas_;
204  vector<const void*> raw_datas_;
205  vector<void*> out_datas_;
206 };
207 
208 class PartitionOp : public PartitionOpBase {
209  public:
210  USE_DISPATCH_HELPER;
211 
212  template <class... Args>
213  explicit PartitionOp(Args&&... args)
214  : PartitionOpBase(std::forward<Args>(args)...) {}
215 
216  bool RunOnDevice() override {
217  return DispatchHelper<TensorTypes<int32_t, int64_t>>::call(this, Input(0));
218  }
219 
220  private:
221  template <typename Index>
222  bool DoRunWithType() {
223  ApplyPartition<Index>(false /* skipFirstArgument */);
224  return true;
225  }
226 
227  C10_DISABLE_COPY_AND_ASSIGN(PartitionOp);
228 };
229 
231  public:
232  USE_DISPATCH_HELPER;
233 
234  template <class... Args>
235  explicit LengthsPartitionOp(Args&&... args)
236  : PartitionOpBase(std::forward<Args>(args)...) {}
237 
238  bool RunOnDevice() override {
239  return DispatchHelper<TensorTypes<int32_t, int64_t>>::call(this, Input(1));
240  }
241 
242  private:
243  template <typename Index>
244  bool DoRunWithType() {
245  CAFFE_ENFORCE(
246  OutputSize() % InputSize() == 0,
247  "Output number must be a multiple of input number");
248  int partitions = OutputSize() / InputSize();
249  CAFFE_ENFORCE_GT(partitions, 0, "Invalid number of partitions");
250  CAFFE_ENFORCE_EQ(
251  Input(1).dim(),
252  1,
253  "Only 1-D tensors supported as a partitioning tensor for sharding");
254 
255  if (partitions == 1) {
256  // Specialization when partitions == 1 which just becomes a copy.
257  for (int i = 0; i < InputSize(); ++i) {
258  auto& input = Input(i);
259  auto& output = *Output(i);
260  output.ResizeLike(input);
261  context_.CopyItemsSameDevice(
262  input.dtype(),
263  input.numel(),
264  input.raw_data(),
265  output.raw_mutable_data(input.dtype()));
266  }
267  return true;
268  }
269 
270  // Apply sharding to all parameters except lengths
271  ApplyPartition<Index>(true /* skipFirstArgument */);
272 
273  // Compute lengths after sharding
274  auto& main_input = Input(1);
275  int64_t size = main_input.numel();
276  const Index* data = main_input.template data<Index>();
277 
278  auto& length_input = Input(0);
279  int64_t elements = length_input.numel();
280  const int32_t* lengths_data = length_input.template data<int32_t>();
281  out_length_.resize(partitions);
282  for (int i = 0; i < partitions; ++i) {
283  auto& output = *Output(i * InputSize());
284  output.Resize(elements);
285  out_length_[i] = output.template mutable_data<int32_t>();
286  }
287 
288  int total_length = 0;
289  for (int i = 0; i < elements; ++i) {
290  total_length += lengths_data[i];
291  }
292  CAFFE_ENFORCE(
293  total_length == size,
294  "Total length is not matching to the number of elements");
295 
296  int index = 0;
297  for (int i = 0; i < elements; ++i) {
298  for (int j = 0; j < partitions; ++j) {
299  out_length_[j][i] = 0;
300  }
301  for (int j = 0; j < lengths_data[i]; ++j, ++index) {
302  int shard = moduloPartition(data[index], partitions);
303  ++out_length_[shard][i];
304  }
305  }
306  return true;
307  }
308 
309  C10_DISABLE_COPY_AND_ASSIGN(LengthsPartitionOp);
310 
311  vector<int32_t*> out_length_;
312 };
313 
314 } // namespace caffe2
315 
316 #endif // CAFFE2_OPERATORS_PARTITION_OPS_H_
The CPU Context, representing the bare minimum of what a Context class in Caffe2 should implement...
Definition: context.h:40
const Tensor & Input(int idx, DeviceType type=CPUContext::GetDeviceType())
Retrieve a non-owning reference to the input at position &#39;idx&#39; for this operator. ...
Definition: operator.h:702
A global dictionary that holds information about what Caffe2 modules have been loaded in the current ...
Definition: blob.h:13