1 #include <torch/csrc/cuda/comm.h> 3 #include <torch/csrc/cuda/device_set.h> 4 #include <torch/csrc/utils/tensor_flatten.h> 7 #include <torch/csrc/cuda/nccl.h> 10 #include <ATen/ATen.h> 11 #include <ATen/cuda/CUDAContext.h> 12 #include <c10/cuda/CUDAGuard.h> 13 #include <c10/util/Optional.h> 14 #include <torch/csrc/autograd/variable.h> 23 #pragma optimize("", off) 25 return at::cuda::warp_size();
27 #pragma optimize("", on) 31 namespace torch {
namespace cuda {
42 unique = (type == &t);
50 auto & type = tensor.type();
51 if (type.is_cuda() && tensor.
get_device() != devices[0])
52 throw std::runtime_error(
"device of broadcasted tensor must appear as the " 53 "first on devices list");
54 std::vector<Tensor> tensors;
55 tensors.reserve(devices.
size());
58 if (nccl::is_available({tensor})) {
59 tensors.push_back(tensor);
60 for (
auto device : devices.
slice(1)) {
62 tensors.push_back(at::empty(tensor.sizes(), type.options()));
64 nccl::broadcast(tensors);
69 auto & gpu_type = type.toBackend(type.is_sparse() ? at::Backend::SparseCUDA : at::Backend::CUDA);
71 tensors.push_back(tensor);
74 for (
auto device : loop_devices) {
76 tensors.push_back(gpu_type.copy(tensor,
true));
113 if (!std::all_of(tensors.begin(), tensors.end(),
115 throw std::runtime_error(
"all tensors must be on devices[0]");
118 buffer_size = std::min(torch::cuda::nccl::get_max_count(), buffer_size);
121 tensor_list2d outputs(devices.
size());
122 outputs[0] = tensors.vec();
123 for (
auto & o : outputs)
124 o.reserve(tensors.
size());
128 for (
auto & chunk : utils::take_tensors(tensors, buffer_size)) {
129 auto & type = chunk.type();
130 type_checker.show(type);
131 std::vector<at::Tensor> results;
132 if (chunk.type().is_sparse()) {
133 auto flat_tuple = utils::flatten_sparse_tensors(chunk.tensors);
134 std::vector<at::Tensor> broadcast_indices = broadcast(flat_tuple.first, devices);
135 std::vector<at::Tensor> broadcast_values = broadcast(flat_tuple.second, devices);
136 results.reserve(devices.
size());
137 for (
size_t i = 1, num_devices = devices.
size(); i < num_devices; ++i) {
139 auto & device_outputs = outputs[i];
140 auto & inds = broadcast_indices[i];
141 auto & vals = broadcast_values[i];
142 for (
auto & t : utils::unflatten_sparse_tensors(inds, vals, chunk.tensors)) {
146 device_outputs.push_back(make_variable(var.data(),
false));
150 std::vector<Tensor> results = broadcast(utils::flatten_dense_tensors(chunk.tensors),
152 for (
size_t i = 1, num_devices = devices.
size(); i < num_devices; ++i) {
154 auto & device_outputs = outputs[i];
155 for (
auto & t : utils::unflatten_dense_tensors(results[i], chunk.tensors)) {
159 device_outputs.push_back(make_variable(var.data(),
false));
166 if (!type_checker.unique) {
167 for (
auto & o : outputs)
168 utils::reorder_tensors_like(o, tensors);
173 std::vector<at::Tensor> scatter(
179 std::vector<at::Tensor> chunks;
181 const int64_t chunk_size_sum =
182 std::accumulate(chunk_sizes->begin(), chunk_sizes->end(), int64_t{0});
184 chunk_size_sum == tensor.size(dim),
185 "given chunk sizes don't sum up to the tensor's size ",
186 "(sum(chunk_sizes) == ", chunk_size_sum,
187 ", but expected ", tensor.size(dim),
")");
188 chunks.reserve(chunk_sizes->size());
189 int64_t chunk_start = 0;
190 for (
size_t chunk = 0; chunk < chunk_sizes->size(); ++chunk) {
191 const int64_t chunk_size = (*chunk_sizes)[chunk];
192 AT_CHECK(chunk_size > 0,
"Chunk size must be positive");
193 chunks.push_back(tensor.narrow(dim, chunk_start, chunk_size));
194 chunk_start += chunk_size;
196 AT_ASSERT(chunks.size() == chunk_sizes->size());
198 chunks = tensor.chunk(devices.
size(), dim);
201 for (
size_t chunk = 0; chunk < chunks.size(); ++chunk) {
202 const auto device_index =
static_cast<int16_t
>(devices[chunk]);
203 if (streams && (*streams)[chunk]) {
205 (*streams)[chunk]->device_index() == device_index,
206 "Expected the device associated with the stream at index ",
207 chunk,
" (was ", (*streams)[chunk]->device_index(),
") ",
208 "to match the device supplied at that index ",
209 "(expected ", device_index,
")");
212 chunks[chunk] = chunks[chunk].contiguous().to(
213 {at::DeviceType::CUDA, device_index},
true);
222 AT_CHECK(!tensors.
empty(),
"Expected at least one tensor to gather from");
224 int64_t total_size = 0;
225 auto& first = tensors.
front();
226 const auto first_size = first.sizes();
227 std::vector<int64_t> expected_size(first_size.begin(), first_size.end());
228 for (
const auto& tensor : tensors) {
230 tensor.
is_cuda(),
"Gather expects all inputs to have CUDA type");
231 AT_ASSERT(tensor.ndimension() ==
static_cast<int64_t
>(expected_size.size()));
232 expected_size[dim] = tensor.size(dim);
233 for (
size_t dimension = 0; dimension < expected_size.size(); ++dimension) {
235 expected_size[dimension] == tensor.size(dimension),
236 "Gather got an input of invalid size: got ",
239 total_size += tensor.size(dim);
241 expected_size[dim] = total_size;
243 if (!destination_index || *destination_index != -1) {
244 device =
at::Device(at::DeviceType::CUDA, destination_index ? *destination_index : -1);
246 result = at::empty(expected_size, first.options().device(device));
248 int64_t chunk_start = 0;
249 for (
const auto& tensor : tensors) {
250 result.narrow(dim, chunk_start, tensor.size(dim))
251 .copy_(tensor,
true);
252 chunk_start += tensor.size(dim);
AT_CPP14_CONSTEXPR const T & front() const
front - Get the first element.
A variant of OptionalDeviceGuard that is specialized for CUDA.
int64_t get_device() const
Returns a Tensor's device index.
AT_CPP14_CONSTEXPR ArrayRef< T > slice(size_t N, size_t M) const
slice(n, m) - Chop off the first N elements of the array, and keep M elements in the array...
Represents a a compute device on which a tensor is located.
constexpr size_t size() const
size - Get the array size.
bool is_variable() const noexcept
Returns true if the Tensor is actually a torch::autograd::Variable.
bool is_cuda() const
Returns if a Tensor has CUDA backend.
A variant of OptionalStreamGuard that is specialized for CUDA.
constexpr bool empty() const
empty - Check if the array is empty.
Variable A Variable augments a Tensor with the ability to interact in our autograd machinery...
void set_index(DeviceIndex device_index)
Sets the CUDA device to the given device index.
Flush-To-Zero and Denormals-Are-Zero mode.
A variant of DeviceGuard that is specialized for CUDA.
void set_index(DeviceIndex device_index)
Sets the CUDA device to the given device index, initializing the guard if it is not already initializ...
void reset_stream(Stream stream)
Resets the currently set CUDA stream to the original stream and the currently set device to the origi...