Caffe2 - C++ API
A deep learning, cross platform ML framework
executor.cpp
1 #include <torch/csrc/jit/fuser/executor.h>
2 
3 #include <ATen/ATen.h>
4 #include <ATen/ExpandUtils.h>
5 #include <ATen/core/functional.h>
6 #include <ATen/core/stack.h>
7 #include <c10/util/Optional.h>
8 #include <torch/csrc/jit/fuser/compiler.h>
9 #include <torch/csrc/jit/fuser/interface.h>
10 #include <torch/csrc/jit/fuser/kernel_cache.h>
11 #include <torch/csrc/jit/fuser/kernel_spec.h>
12 #include <torch/csrc/jit/fuser/tensor_info.h>
13 
14 #include <algorithm>
15 #include <iostream> // TODO: remove, debugging only
16 #include <map>
17 #include <stdexcept>
18 #include <tuple>
19 #include <vector>
20 
21 namespace torch {
22 namespace jit {
23 namespace fuser {
24 
25 // Returns the "map size" for this run, which is the common size for all
26 // intermediate tensors.
27 static c10::optional<std::vector<int64_t>> getMapSize(
28  const KernelSpec& spec,
29  at::TensorList args,
30  at::IntArrayRef arg_subset) {
31  // TODO: this keeps reallocating map_size at every iteration, but we know
32  // exactly how much storage do we need, so this could be fixed in-place at
33  // every step. We're just missing a few functions for ATen, but the fix
34  // should be straightforward.
35  // Note: left unitialized since empty shape is broadcastable to any shape
36  std::vector<int64_t> map_size;
37  map_size.reserve(8);
38  for (const auto arg_idx : arg_subset) {
39  auto& arg = args.at(arg_idx);
40  auto& chunk_desc = spec.inputChunks().at(arg_idx);
41  if (chunk_desc.nSubTensors() == 1) {
42  try {
43  map_size = at::infer_size(map_size, arg.sizes());
44  } catch (...) {
45  return c10::nullopt;
46  }
47  } else {
48  auto tensor_sizes = arg.sizes().vec();
49  const auto num_chunks = chunk_desc.nSubTensors();
50  const auto dim =
51  at::maybe_wrap_dim(chunk_desc.dim(), tensor_sizes.size());
52  if (tensor_sizes[dim] % num_chunks != 0) {
53  return c10::nullopt;
54  }
55  tensor_sizes[dim] /= num_chunks;
56  try {
57  map_size = at::infer_size(map_size, tensor_sizes);
58  } catch (...) {
59  return c10::nullopt;
60  }
61  }
62  }
63 
64  return {map_size};
65 }
66 
67 // Tries to determine a map size for the instantiated kernel (see above)
68 static c10::optional<std::vector<int64_t>> canRunKernel(
69  const KernelSpec& spec,
70  at::TensorList args) {
71  // Short-circuits on size mismatch
72  AT_CHECK(
73  args.size() == spec.inputChunks().size(),
74  "Expected ",
75  spec.inputChunks().size(),
76  " arguments, but got ",
77  args.size());
78 
80  for (const auto& broadcast_group : spec.inputBroadcastGroups()) {
81  if (!map_size) {
82  map_size = getMapSize(spec, args, broadcast_group);
83  if (!map_size)
84  return c10::nullopt;
85  } else {
86  const auto group_map_size = getMapSize(spec, args, broadcast_group);
87  // Note: this checks that group_map_size is defined AND equal to map_size
88  if (map_size != group_map_size)
89  return c10::nullopt;
90  }
91  }
92 
93  return map_size;
94 }
95 
96 // Arguments are expanded to a common shape, referred to as the "map size,"
97 // (see above).
98 // Note: Arguments are mutated by this call, although map_size is restored
99 // to its original value.
100 static bool expandArgs(
101  const KernelSpec& spec,
102  std::vector<at::Tensor>& args,
103  std::vector<int64_t>& map_size, bool dry_run) {
104  bool has_broadcast = false;
105  for (size_t i = 0; i < args.size(); ++i) {
106  auto& arg = args[i];
107  const auto& pdesc = spec.inputChunks()[i];
108  if (pdesc.nSubTensors() == 1) {
109  if (arg.sizes().equals(map_size))
110  continue;
111  if (!dry_run) {
112  arg = arg.expand(map_size);
113  has_broadcast = true;
114  } else {
115  return true;
116  }
117  } else {
118  map_size.at(pdesc.dim()) *= pdesc.nSubTensors();
119  if (!arg.sizes().equals(map_size)) {
120  if (!dry_run) {
121  arg = arg.expand(map_size);
122  has_broadcast = true;
123  } else {
124  return true;
125  }
126  }
127  map_size.at(pdesc.dim()) /= pdesc.nSubTensors();
128  }
129  }
130  return has_broadcast;
131 }
132 
133 static bool shouldExpandArgs(
134  const KernelSpec& spec,
135  std::vector<at::Tensor>& args,
136  std::vector<int64_t>& map_size) {
137  return expandArgs(spec, args, map_size, /*dry_run=*/true);
138 }
139 
140 // Note: assumes that inputs are 32-bit addressable
141 static uint32_t computeNumel(const at::ArrayRef<int64_t>& sizes) {
142  uint32_t result = 1;
143 
144  for (const auto& size : sizes)
145  result *= size;
146 
147  return result;
148 }
149 
150 // Note: Assumes that after at::chunk, all inputs are the same size
151 static std::vector<int64_t> computeMapSize(
152  const at::Tensor& tensor,
153  const PartitionDesc& chunkDesc) {
154  std::vector<int64_t> sizes(tensor.sizes().begin(), tensor.sizes().end());
155  AT_ASSERT(sizes[chunkDesc.dim()] % chunkDesc.nSubTensors() == 0);
156  sizes[chunkDesc.dim()] /= chunkDesc.nSubTensors();
157  return sizes;
158 }
159 
160 // Tries to compress sizes and strides according to cont. Emits the result t
161 // c_sizes, c_strides and throws an error on failure (if can't compress)
162 static void compressContiguous(
163  const at::IntArrayRef& sizes,
164  const at::IntArrayRef& strides,
165  const std::vector<bool>& cont,
166  uint32_t* c_sizes,
167  uint32_t* c_strides) {
168  size_t compressed_dims = 0;
169  size_t cur = 0;
170  size_t ndim = sizes.size();
171  while (cur < ndim) {
172  size_t total_size = sizes[cur];
173  cur++;
174  while (cont[cur - 1] && cur < ndim) {
175  AT_ASSERT(strides[cur - 1] == sizes[cur] * strides[cur]);
176  total_size *= sizes[cur];
177  cur++;
178  }
179  c_sizes[compressed_dims] = total_size;
180  c_strides[compressed_dims] = strides[cur - 1];
181  compressed_dims++;
182  }
183 
184  if (ndim > 0)
185  AT_ASSERT(!cont.back() || strides.back() == 1);
186 }
187 
188 // Launches the requested fusion on the given device with the given inputs.
189 // Output pointers are stored in outputs (to be put on the stack later).
190 void launchFusion(
191  const FusedKernel& fusion,
192  const at::Device device,
193  const at::ArrayRef<at::Tensor>& inputs,
194  std::vector<at::Tensor>& outputs) {
195  // Fails if fusion and given inputs disagree
196  AT_ASSERT(inputs.size() == fusion.inputDesc().size());
197 
198  // Computes number of flattened inputs and outputs
199  size_t flat_inputs_size = 0;
200  size_t flat_outputs_size = 0;
201  for (const auto& c : fusion.chunkDesc())
202  flat_inputs_size += c.nSubTensors();
203  for (const auto& c : fusion.concatDesc())
204  flat_outputs_size += c.nSubTensors();
205 
206  // Fails if the elements of the first (any) tensor are not expressable as
207  // a 32-bit integer.
208  // Note: this code assumes that inputs are 32-bit addressable
209  // Note: this code assumes that all inputs are of the same size
210  AT_ASSERT(inputs[0].numel() <= std::numeric_limits<uint32_t>::max());
211 
212  // Computes map_size, numel from the first input
213  at::IntArrayRef map_size;
214  uint32_t numel;
215  std::vector<int64_t> keep_alive_size;
216  if (fusion.chunkDesc()[0].isNoop()) {
217  map_size = inputs[0].sizes();
218  numel = inputs[0].numel();
219  } else {
220  keep_alive_size = computeMapSize(inputs[0], fusion.chunkDesc()[0]);
221  map_size = keep_alive_size;
222  numel = computeNumel(map_size);
223  }
224 
225  // Computes the storage needed to store TensorInfo structs for inputs and
226  // outputs.
227  size_t uncompressedDim = fusion.inputDesc().at(0).contiguity.size();
228  size_t maxPossibleTensorInfoSize =
229  sizeof(TensorInfo) + 2 * sizeof(uint32_t) * uncompressedDim;
230  size_t maxPossibleBufferSize =
231  maxPossibleTensorInfoSize * (flat_inputs_size + flat_outputs_size);
232  std::vector<char> buffer(maxPossibleBufferSize);
233  char* buffer_next = buffer.data();
234 
235  // A vector of arguments to the kernel (numel, *input_desc_s, *output_desc_s)
236  std::vector<void*> arguments;
237  arguments.reserve(3 + flat_inputs_size + flat_outputs_size);
238  arguments.push_back(&numel);
239 
240  auto addTensorInfoRaw = [&](const TensorDesc& desc,
241  void* data_ptr,
242  at::IntArrayRef sizes,
243  at::IntArrayRef strides) {
244  const auto nDim = desc.nDim(); // NOTE: this is the compressed dim
245  AT_ASSERT(nDim <= uncompressedDim); // We'd overflow the space otherwise
246  auto ti = reinterpret_cast<TensorInfo*>(buffer_next);
247  ti->data = data_ptr;
248  compressContiguous(
249  sizes, strides, desc.contiguity, ti->sizes(nDim), ti->strides(nDim));
250  buffer_next += maxPossibleTensorInfoSize;
251  arguments.push_back(ti);
252  };
253 
254  // Asserts that t's dims can be compressed in the same way as in desc
255  // (that's what the kernel assumes), and appends it to the arguments vector.
256  auto addTensorInfo = [&](const TensorDesc& desc, const at::Tensor& t) {
257  addTensorInfoRaw(desc, t.data_ptr(), t.sizes(), t.strides());
258  };
259 
260  // Adds (flattened) input arguments
261  for (size_t i = 0; i < fusion.inputDesc().size(); ++i) {
262  const auto& chunk = fusion.chunkDesc()[i];
263  const at::Tensor& tensor = inputs[i];
264  if (chunk.isNoop()) {
265  addTensorInfo(fusion.inputDesc()[i], tensor);
266  } else {
267  size_t chunk_offset = map_size[chunk.dim()] * tensor.stride(chunk.dim()) *
268  elementSize(tensor.scalar_type());
269  char* data_ptr = reinterpret_cast<char*>(tensor.data_ptr());
270  for (size_t chunks = 0; chunks < chunk.nSubTensors(); ++chunks) {
271  addTensorInfoRaw(
272  *chunk.subTensorDesc(), data_ptr, map_size, tensor.strides());
273  data_ptr += chunk_offset;
274  }
275  }
276  }
277 
278  // Adds (flattened) output arguments
279  outputs.reserve(fusion.outputDesc().size());
280  const auto& ref_options = inputs[0].options();
281  for (size_t i = 0; i < fusion.outputDesc().size(); ++i) {
282  const auto& c = fusion.concatDesc()[i];
283  if (c.isNoop()) {
284  outputs.push_back(at::empty(
285  map_size, ref_options.dtype(fusion.outputDesc()[i].scalar_type)));
286  addTensorInfo(fusion.outputDesc()[i], outputs[i]);
287  } else {
288  size_t small_size = map_size[c.dim()];
289  std::vector<int64_t> concat_size(map_size.begin(), map_size.end());
290  concat_size[c.dim()] = small_size * c.nSubTensors();
291  outputs.push_back(at::empty(concat_size, ref_options));
292  const auto& o = outputs[i];
293  size_t offset = 0;
294  for (size_t j = 0; j < c.nSubTensors(); ++j) {
295  // because the concatenated_output stays live, the underlying data
296  // in this view remains live through the end of this function
297  // so there is not need to hold onto this tensor
298  const auto view = o.narrow(c.dim(), offset, small_size);
299  addTensorInfo(*c.subTensorDesc(), view);
300  offset += small_size;
301  }
302  }
303  }
304 
305  fusion.launch_raw(numel, arguments);
306 }
307 
308 bool runFusion(const int64_t key, Stack& stack) {
309  // Short-circuits if fusion isn't enabled
310  if (!canFuseOnCPU() && !canFuseOnGPU())
311  return false;
312 
313  // Acquires the FusionSpec
314  auto maybe_spec = retrieve(key);
315  AT_ASSERT(maybe_spec);
316  auto& spec = *(*maybe_spec);
317 
318  // Acquires inputs from stack
319  auto all_inputs = last(stack, spec.nInputs());
320  std::vector<at::Tensor> inputs;
321  inputs.reserve(spec.nTensorInputs());
322  // we know that tensor inputs are first
323  for (int64_t i = 0; i < spec.nTensorInputs(); i++) {
324  inputs.emplace_back(all_inputs[i].toTensor());
325  }
326 
327  // Determines device to dispatch to. If there's a device mismatch in the
328  // inputs, we use the fallback (which should give a nice error message).
329  at::Device device = inputs.at(0).device();
330  for (const auto& t : at::TensorList(inputs).slice(1)) {
331  if (t.device() != device) {
332  return false;
333  }
334  }
335 
336  // Attempts to run fallback if device fusion is disabled
337  if (device.is_cuda() && !canFuseOnGPU())
338  return false;
339  if (device.is_cpu() && !canFuseOnCPU())
340  return false;
341 
342  // Validates sizes and expands inputs as needed
343  auto maybe_map_size = canRunKernel(spec, inputs);
344 
345  // Tries to run fallback if map size can't be computed
346  if (!maybe_map_size)
347  return false;
348  if (spec.hasRandom()) {
349  bool hasBroadcast = shouldExpandArgs(spec,inputs, *maybe_map_size);
350  if (hasBroadcast) return false;
351  }
352  expandArgs(spec, inputs, *maybe_map_size, /*dry_run=*/false);
353 
354  // Retrieves the kernel, compiling (and caching) if necessary
355  ArgSpec arg_spec{inputs, device.index()};
356  auto maybe_kernel = spec.findKernel(arg_spec);
357  if (!maybe_kernel) {
358  const auto kernel = compileKernel(spec, arg_spec, *maybe_map_size, device);
359  spec.cacheKernel(arg_spec, kernel);
360  }
361  maybe_kernel = spec.findKernel(arg_spec);
362  AT_ASSERT(maybe_kernel);
363 
364  // Launches fusion
365  std::vector<at::Tensor> raw_outputs;
366  launchFusion(*(*maybe_kernel), device, inputs, raw_outputs);
367 
368  auto outputs = fmap(spec.outputMapAndSizes(), [&](const OutputMapAndSize& omap) {
369  if (omap.needsSumToSize()) {
370  return at::sum_to(
371  raw_outputs[omap.offset()],
372  all_inputs[omap.sizeInput()].toIntList()->elements());
373  } else {
374  return raw_outputs[omap.offset()];
375  }
376  });
377 
378  // Updates stack
379  drop(stack, spec.nInputs());
380  stack.insert(
381  stack.end(),
382  std::make_move_iterator(outputs.begin()),
383  std::make_move_iterator(outputs.end()));
384 
385  return true;
386 }
387 
388 } // namespace fuser
389 } // namespace jit
390 } // namespace torch
AT_CPP14_CONSTEXPR const T & back() const
back - Get the last element.
Definition: ArrayRef.h:149
bool is_cuda() const noexcept
Return true if the device is of CUDA type.
Definition: Device.h:80
Represents a a compute device on which a tensor is located.
Definition: Device.h:30
constexpr size_t size() const
size - Get the array size.
Definition: ArrayRef.h:138
Device device() const
Returns a Tensor&#39;s device.
Definition: jit_type.h:17
bool is_cpu() const noexcept
Return true if the device is of CPU type.
Definition: Device.h:85
ArrayRef - Represent a constant reference to an array (0 or more elements consecutively in memory)...
Definition: ArrayRef.h:41
DeviceIndex index() const noexcept
Returns the optional index.
Definition: Device.h:70
AT_CPP14_CONSTEXPR const T & at(size_t Index) const
Vector compatibility.
Definition: ArrayRef.h:186