Caffe2 - C++ API
A deep learning, cross platform ML framework
ddp.cpp
1 #include <torch/csrc/distributed/c10d/ddp.h>
2 
3 #include <torch/csrc/cuda/comm.h>
4 #include <torch/csrc/utils/tensor_flatten.h>
5 
6 #include <torch/csrc/cuda/nccl.h>
7 
8 #include <c10d/ProcessGroup.hpp>
9 
10 #include <ATen/ATen.h>
11 #include <ATen/cuda/CUDAEvent.h>
12 #include <c10/cuda/CUDAGuard.h>
13 #include <ATen/cuda/CUDAMultiStreamGuard.h>
14 
15 #include <cstddef>
16 #include <memory>
17 #include <tuple>
18 #include <utility>
19 #include <vector>
20 
21 namespace c10d {
22 namespace {
25 void copyBroadcastTensorsToReplicas(
26  const std::vector<std::vector<at::Tensor>>& broadcastTensors,
27  std::vector<std::vector<at::Tensor>>& replicaData) {
28  AT_ASSERT(replicaData.size() == broadcastTensors.size());
29  // replica = 1 means we skip the root (replica 0).
30  for (size_t replica = 1; replica < replicaData.size(); ++replica) {
31  AT_ASSERT(replicaData[replica].size() == broadcastTensors[replica].size());
32  for (size_t tensor = 0; tensor < replicaData[replica].size(); ++tensor) {
33  replicaData[replica][tensor].set_(broadcastTensors[replica][tensor]);
34  }
35  }
36 }
37 } // namespace
38 
39 std::vector<std::vector<at::Tensor>> bucketTensors(
40  std::vector<at::Tensor>& tensors,
41  int64_t bucketSize,
42  bool fineGrained) {
43  std::vector<std::vector<at::Tensor>> bucketedTensors;
44  auto tensorGroups =
45  torch::utils::take_tensors(tensors, bucketSize, fineGrained);
46 
47  bucketedTensors.reserve(tensorGroups.size());
48  for (auto& tensorGroup : tensorGroups) {
49  bucketedTensors.push_back(std::move(tensorGroup.tensors));
50  }
51  return bucketedTensors;
52 }
53 
54 void distBroadcastCoalesced(
55  ProcessGroup& processGroup,
56  std::vector<at::Tensor>& tensors,
57  int64_t bufferSize,
58  bool fineGrained) {
59  std::vector<std::vector<at::Tensor>> bucketedTensors =
60  bucketTensors(tensors, bufferSize, fineGrained);
61  // We store single-element vectors in `flatTensors` because
62  // `ProcessGroup::broadcast` takes a reference to a vector, which must be
63  // alive until the `wait()` call on the returned `Work` completes.
64  std::vector<std::vector<at::Tensor>> flatTensors;
65  std::vector<std::shared_ptr<ProcessGroup::Work>> work;
66  flatTensors.reserve(bucketedTensors.size());
67  work.reserve(bucketedTensors.size());
68  for (const auto& tensorBucket : bucketedTensors) {
69  // Flatten each bucket of tensors (whose size equals `bufferSize`) into a
70  // single tensor.
71  flatTensors.push_back({torch::utils::flatten_dense_tensors(tensorBucket)});
72  BroadcastOptions broadcastOptions;
73  broadcastOptions.rootRank = 0;
74  broadcastOptions.rootTensor = 0;
75  // Enqueue a work item and collect the `Work` (essentially a "future") so we
76  // can `wait()` for its completion after we have collected all `Work` items.
77  work.push_back(
78  processGroup.broadcast(flatTensors.back(), broadcastOptions));
79  }
80  // Now loop through each bucket, wait for the broadcast to complete, and
81  // un-flatten the broadcast tensor back into device-local individual tensors.
82  for (size_t bucket = 0; bucket < bucketedTensors.size(); ++bucket) {
83  auto& tensors = bucketedTensors[bucket];
84  work[bucket]->wait();
85  const auto synced =
86  torch::utils::unflatten_dense_tensors(flatTensors[bucket][0], tensors);
87  AT_ASSERT(synced.size() == tensors.size());
88  for (size_t i = 0; i < synced.size(); ++i) {
89  // Copy into the per-process tensors.
90  tensors[i].copy_(synced[i], /*non_blocking=*/true);
91  }
92  }
93 }
94 
95 void syncParams(
96  ProcessGroup& processGroup,
97  std::vector<std::vector<at::Tensor>>& parameterData,
98  std::vector<std::vector<at::Tensor>>& bufferData,
99  const std::vector<int64_t>& devices,
100  int64_t broadcastBucketSize,
101  bool broadcastBuffers) {
102  AT_ASSERT(!parameterData.empty());
103  AT_ASSERT(!bufferData.empty());
104  AT_ASSERT(!devices.empty());
105 
106  // Do an intra-node sync if we have more than one device.
107  if (devices.size() > 1) {
108  // Broadcast the parameters, get back a vector<vector<Tensor>>, one
109  // vector<Tensor> per device. Each such vector then needs to be copied into
110  // the `parameterData` of every step.
111  auto result = torch::cuda::broadcast_coalesced(
112  parameterData[0], devices, broadcastBucketSize);
113  copyBroadcastTensorsToReplicas(result, parameterData);
114  }
115 
116  if (broadcastBuffers && !bufferData[0].empty()) {
117  // Do an inter-node sync first.
118  distBroadcastCoalesced(processGroup, bufferData[0], broadcastBucketSize);
119  // Then an intra-node sync if we have more than one device.
120  if (devices.size() > 1) {
121  auto result = torch::cuda::broadcast_coalesced(
122  bufferData[0], devices, broadcastBucketSize);
123  copyBroadcastTensorsToReplicas(result, bufferData);
124  }
125  }
126 }
127 
128 std::tuple<std::shared_ptr<ProcessGroup::Work>, at::Tensor> queueReduction(
129  ProcessGroup& processGroup,
130  std::vector<std::vector<at::Tensor>>& gradsBatch,
131  const std::vector<int64_t>& devices) {
132  AT_ASSERT(!gradsBatch.empty());
133  AT_ASSERT(!devices.empty());
134 
135  // Events to record the current state on the default stream of each GPUs
136  std::vector<at::cuda::CUDAEvent> events;
137  events.resize(devices.size());
138 
139  // Creating a separate CUDA stream to allow memory copy
140  // and intra-node reduce to be operated on this worker stream to
141  // improve performance
142  std::vector<at::cuda::CUDAStream> workerStreams;
143  for (size_t devIdx = 0; devIdx < devices.size(); ++devIdx) {
144  at::cuda::CUDAGuard guard(devices[devIdx]);
145  events[devIdx].record();
146  workerStreams.push_back(
147  at::cuda::getStreamFromPool(false, devices[devIdx]));
148  // Let the worker stream to wait for the default stream
149  events[devIdx].block(workerStreams.back());
150  }
151 
152  // Stream guards, now the current stream is the worker stream
153  at::cuda::CUDAMultiStreamGuard cudaGuard(workerStreams);
154 
155  std::vector<at::Tensor> gradsBatchCoalesced;
156  for (size_t devIdx = 0; devIdx < devices.size(); ++devIdx) {
157  at::cuda::CUDAGuard guard(devices[devIdx]);
158  gradsBatchCoalesced.push_back(
159  torch::utils::flatten_dense_tensors(gradsBatch[devIdx]));
160  }
161 
162  if (devices.size() > 1) {
163  torch::cuda::nccl::reduce(gradsBatchCoalesced, 0);
164  }
165 
166  gradsBatchCoalesced[0] /= processGroup.getSize();
167 
168  std::vector<at::Tensor> allreduceInput = {gradsBatchCoalesced[0]};
169  auto reductionWork = processGroup.allreduce(allreduceInput);
170 
171  return std::make_tuple(reductionWork, gradsBatchCoalesced[0]);
172 }
173 
174 void syncReduction(
175  std::shared_ptr<ProcessGroup::Work>& reductionWork,
176  std::vector<at::Tensor>& gradsBatch,
177  at::Tensor& gradsBatchCoalesced) {
178  // Creating a separate CUDA stream to allow memory copy
179  // and intra-node reduce to be operated on this worker stream to
180  // improve performance
181  at::cuda::CUDAStream workerStream = at::cuda::getStreamFromPool();
182  at::cuda::CUDAStreamGuard cudaGuard(workerStream);
183 
184  // Let the worker stream wait on the reduction stream
185  reductionWork->wait();
186  // Now do the copy in worker stream
187  std::vector<at::Tensor> gradsReduced =
188  torch::utils::unflatten_dense_tensors(gradsBatchCoalesced, gradsBatch);
189 
190  AT_ASSERT(gradsReduced.size() == gradsBatch.size());
191 
192  for (size_t i = 0; i < gradsReduced.size(); ++i) {
193  gradsBatch[i].copy_(gradsReduced[i]);
194  }
195 
196  // Record the state in the worker stream
197  at::cuda::CUDAEvent event;
198  event.record(workerStream);
199 
200  // Now let the BW stream wait for the worker stream
201  // (NB: original_stream is the current stream PRIOR to the guard. Might
202  // live on a completely different device than our current device here!)
203  event.block(cudaGuard.original_stream());
204 }
205 
206 } // namespace c10d
A variant of StreamGuard that is specialized for CUDA.
Definition: CUDAGuard.h:117
Definition: ddp.cpp:21
A variant of DeviceGuard that is specialized for CUDA.
Definition: CUDAGuard.h:20