1 #ifndef CAFFE2_OPERATORS_PARTITION_OPS_H_ 2 #define CAFFE2_OPERATORS_PARTITION_OPS_H_ 4 #include "caffe2/core/context.h" 5 #include "caffe2/core/operator.h" 9 template <
typename Index>
10 static inline int moduloPartition(Index key,
int numPartitions) {
11 int shard = key % numPartitions;
13 shard += numPartitions & (shard >> (
sizeof(int) * 8 - 1));
21 template <
class... Args>
26 bool RunOnDevice()
override {
31 template <
typename Index>
32 bool DoRunWithType() {
33 const auto numPartitions = InputSize() - 1;
34 CAFFE_ENFORCE_GE(numPartitions, 1);
35 const auto& keysTensor =
Input(0);
36 const auto* keysData = keysTensor.template data<Index>();
37 const auto& keysShape =
Input(0).sizes();
39 keysShape.size(), 1,
"Only 1D keys tensor supported currently.");
42 const auto& in0Shape =
Input(1).sizes();
43 CAFFE_ENFORCE_GE(in0Shape.size(), 1);
45 vector<int64_t> outShape(keysShape.vec());
46 outShape.insert(outShape.end(), in0Shape.begin() + 1, in0Shape.end());
48 CAFFE_ENFORCE_GE(outShape.size(), 1);
49 auto totalSize = in0Shape[0];
50 auto meta =
Input(1).dtype();
51 for (
int i = 2; i < InputSize(); ++i) {
52 const auto& input =
Input(i);
53 CAFFE_ENFORCE(meta == input.dtype());
54 CAFFE_ENFORCE_GE(input.dim(), 1);
55 CAFFE_ENFORCE(std::equal(
56 outShape.begin() + keysShape.size(),
58 input.sizes().begin() + 1));
59 totalSize += input.size(0);
61 CAFFE_ENFORCE_EQ(keysTensor.numel(), totalSize);
63 auto* outTensor = Output(0);
64 outTensor->Resize(outShape);
65 auto* outData =
static_cast<char*
>(outTensor->raw_mutable_data(meta));
66 const auto blockSize = outTensor->size_from_dim(1);
68 inputDatas_.resize(numPartitions);
69 for (
int i = 0; i < numPartitions; ++i) {
70 inputDatas_[i] =
static_cast<const char*
>(
Input(i + 1).raw_data());
72 inStartOffsets_.assign(numPartitions, 0);
73 Index outStartOffset = 0;
74 int currentShard = -1;
77 const auto numEntries = keysTensor.numel();
78 for (int64_t i = 0; i <= numEntries; ++i) {
80 i < numEntries ? moduloPartition(keysData[i], numPartitions) : -1;
81 if (newShard != currentShard) {
82 if (currentShard != -1) {
83 auto inStartOffset = inStartOffsets_[currentShard];
84 auto numItems = i - outStartOffset;
85 context_.CopyItemsSameDevice(
88 inputDatas_[currentShard] +
89 inStartOffset * blockSize * meta.itemsize(),
90 outData + outStartOffset * blockSize * meta.itemsize());
91 inStartOffsets_[currentShard] += numItems;
93 currentShard = newShard;
101 std::vector<const char*> inputDatas_;
102 std::vector<int64_t> inStartOffsets_;
109 template <
class... Args>
112 OP_SINGLE_ARG(
int,
"pack_first_input", pack_first_input_, 0) {}
115 template <
typename Index>
116 void ApplyPartition(
bool skipFirstArgument) {
118 OutputSize() % InputSize(),
120 "Output number must be a multiple of input number");
121 int partitions = OutputSize() / InputSize();
122 int inputSize = InputSize();
123 int mainInputIndex = skipFirstArgument;
124 CAFFE_ENFORCE_GT(partitions, 0,
"Invalid number of partitions");
126 auto& main_input =
Input(mainInputIndex);
127 int64_t size = main_input.numel();
128 const Index* data = main_input.template data<Index>();
129 counts_.assign(partitions, 0);
130 for (int64_t p = 0; p < size; p++) {
131 int shard = moduloPartition(data[p], partitions);
135 raw_datas_.resize(inputSize);
136 block_sizes_.resize(inputSize);
137 metas_.resize(inputSize);
138 out_datas_.resize(OutputSize());
139 for (
int i = mainInputIndex; i < inputSize; ++i) {
140 auto& input =
Input(i);
141 if (i > mainInputIndex) {
145 "Prefix of extra input's shape must match main input's shape, ",
148 for (
int j = 0; j < main_input.dim(); ++j) {
152 "Prefix of extra input's shape must match main input's shape, ",
159 raw_datas_[i] = input.raw_data();
160 block_sizes_[i] = input.size_from_dim(main_input.dim());
161 metas_[i] = input.dtype();
163 vector<int64_t> shape(
164 input.sizes().begin() + main_input.dim() - 1, input.sizes().end());
165 for (
int j = 0; j < partitions; ++j) {
166 int out_idx = i + j * inputSize;
167 auto output = Output(out_idx);
168 shape[0] = counts_[j];
169 output->Resize(shape);
170 out_datas_[out_idx] = output->raw_mutable_data(input.dtype());
174 counts_.assign(partitions, 0);
175 for (int64_t p = 0; p < size; p++) {
176 int shard = moduloPartition(data[p], partitions);
177 int64_t idx = counts_[shard]++;
180 static_cast<Index*
>(out_datas_[shard * inputSize + mainInputIndex])[idx] =
181 pack_first_input_ ? ((data[p] - shard) / partitions) : data[p];
183 int baseIndex = shard * inputSize;
184 for (
int i = mainInputIndex + 1; i < inputSize; ++i) {
185 auto bs = block_sizes_[i];
186 auto meta = metas_[i];
188 context_.CopyItemsSameDevice(
191 static_cast<const char*>(raw_datas_[i]) + p * bs * meta.itemsize(),
192 static_cast<char*
>(out_datas_[baseIndex + i]) +
193 idx * bs * meta.itemsize());
198 bool pack_first_input_;
201 vector<int64_t> counts_;
202 vector<int64_t> block_sizes_;
203 vector<TypeMeta> metas_;
204 vector<const void*> raw_datas_;
205 vector<void*> out_datas_;
212 template <
class... Args>
216 bool RunOnDevice()
override {
221 template <
typename Index>
222 bool DoRunWithType() {
223 ApplyPartition<Index>(
false );
227 C10_DISABLE_COPY_AND_ASSIGN(PartitionOp);
234 template <
class... Args>
238 bool RunOnDevice()
override {
243 template <
typename Index>
244 bool DoRunWithType() {
246 OutputSize() % InputSize() == 0,
247 "Output number must be a multiple of input number");
248 int partitions = OutputSize() / InputSize();
249 CAFFE_ENFORCE_GT(partitions, 0,
"Invalid number of partitions");
253 "Only 1-D tensors supported as a partitioning tensor for sharding");
255 if (partitions == 1) {
257 for (
int i = 0; i < InputSize(); ++i) {
258 auto& input =
Input(i);
259 auto& output = *Output(i);
260 output.ResizeLike(input);
261 context_.CopyItemsSameDevice(
265 output.raw_mutable_data(input.dtype()));
271 ApplyPartition<Index>(
true );
274 auto& main_input =
Input(1);
275 int64_t size = main_input.numel();
276 const Index* data = main_input.template data<Index>();
278 auto& length_input =
Input(0);
279 int64_t elements = length_input.numel();
280 const int32_t* lengths_data = length_input.template data<int32_t>();
281 out_length_.resize(partitions);
282 for (
int i = 0; i < partitions; ++i) {
283 auto& output = *Output(i * InputSize());
284 output.Resize(elements);
285 out_length_[i] = output.template mutable_data<int32_t>();
288 int total_length = 0;
289 for (
int i = 0; i < elements; ++i) {
290 total_length += lengths_data[i];
293 total_length == size,
294 "Total length is not matching to the number of elements");
297 for (
int i = 0; i < elements; ++i) {
298 for (
int j = 0; j < partitions; ++j) {
299 out_length_[j][i] = 0;
301 for (
int j = 0; j < lengths_data[i]; ++j, ++index) {
302 int shard = moduloPartition(data[index], partitions);
303 ++out_length_[shard][i];
309 C10_DISABLE_COPY_AND_ASSIGN(LengthsPartitionOp);
311 vector<int32_t*> out_length_;
316 #endif // CAFFE2_OPERATORS_PARTITION_OPS_H_
The CPU Context, representing the bare minimum of what a Context class in Caffe2 should implement...
const Tensor & Input(int idx, DeviceType type=CPUContext::GetDeviceType())
Retrieve a non-owning reference to the input at position 'idx' for this operator. ...
A global dictionary that holds information about what Caffe2 modules have been loaded in the current ...