Caffe2 - C++ API
A deep learning, cross platform ML framework
parallel.cpp
1 #include <gtest/gtest.h>
2 
3 #include <torch/csrc/autograd/functions/comm.h>
4 #include <torch/nn/module.h>
5 #include <torch/nn/modules/linear.h>
6 #include <torch/nn/parallel/data_parallel.h>
7 #include <torch/nn/pimpl.h>
8 #include <torch/types.h>
9 
10 #include <test/cpp/api/support.h>
11 
12 #include <iostream>
13 #include <memory>
14 #include <utility>
15 #include <vector>
16 
17 using namespace torch::autograd;
18 using namespace torch::nn;
19 
21 
22 TEST_F(ParallelTest, DifferentiableScatter_MultiCUDA) {
23  Scatter scatter(
24  {torch::Device(torch::kCUDA, 0), torch::Device(torch::kCUDA, 1)});
25 
26  auto input = torch::ones(10, torch::requires_grad(true));
27  auto output = scatter.apply({input});
28 
29  ASSERT_EQ(output.size(), 2);
30  ASSERT_EQ(output[0].size(0), 5);
31  ASSERT_EQ(output[1].size(0), 5);
32 
33  ASSERT_TRUE(torch::cat({output[0].to(torch::kCPU), output[1].to(torch::kCPU)})
34  .allclose(input));
35 
36  torch::Tensor sum = output[0].to({torch::kCUDA, 1}) + output[1];
37  sum.backward();
38 
39  ASSERT_TRUE(input.grad().defined());
40  ASSERT_TRUE(input.grad().device().is_cpu());
41  ASSERT_EQ(input.grad().sum().item<int32_t>(), 10);
42 }
43 
44 TEST_F(ParallelTest, DifferentiableGather_MultiCUDA) {
45  Gather gather(torch::Device(torch::kCUDA, 1));
46 
47  auto a = torch::ones(5, torch::requires_grad(true).device(torch::kCUDA, 0));
48  auto b = torch::ones(5, torch::requires_grad(true).device(torch::kCUDA, 1));
49 
50  auto outputs = gather.apply({a, b});
51  ASSERT_EQ(outputs.size(), 1);
52  torch::Tensor output = outputs.front();
53 
54  ASSERT_EQ(output.size(0), 10);
55  ASSERT_EQ(output.device(), torch::Device(torch::kCUDA, 1));
56 
57  auto chunks = output.chunk(2);
58  ASSERT_TRUE(chunks[0].to({torch::kCUDA, 0}).allclose(a));
59  ASSERT_TRUE(chunks[1].allclose(b));
60 
61  output.backward();
62 
63  ASSERT_TRUE(a.grad().defined());
64  ASSERT_EQ(a.grad().device(), torch::Device(torch::kCUDA, 0));
65  ASSERT_EQ(a.grad().sum().item<int32_t>(), 5);
66 
67  ASSERT_TRUE(b.grad().defined());
68  ASSERT_EQ(b.grad().device(), torch::Device(torch::kCUDA, 1));
69  ASSERT_EQ(b.grad().sum().item<int32_t>(), 5);
70 }
71 
72 TEST_F(ParallelTest, Replicate_MultiCUDA) {
73  Linear linear(3, 4);
74  auto replicas = parallel::replicate(
75  linear, {torch::Device(torch::kCUDA, 0), torch::Device(torch::kCUDA, 1)});
76  ASSERT_EQ(replicas.size(), 2);
77 
78  auto original_parameters = linear->parameters();
79 
80  auto replica1_parameters = replicas[0]->parameters();
81  for (auto& parameter : replica1_parameters) {
82  ASSERT_EQ(parameter.device(), torch::Device(torch::kCUDA, 0));
83  }
84  replicas[0]->to(torch::kCPU);
85  ASSERT_EQ(replica1_parameters.size(), original_parameters.size());
86  for (size_t i = 0; i < original_parameters.size(); ++i) {
87  ASSERT_TRUE(replica1_parameters[i].allclose(original_parameters[i]));
88  ASSERT_TRUE(
89  replica1_parameters[i].data<float>() !=
90  original_parameters[i].data<float>());
91  }
92 
93  auto replica2_parameters = replicas[1]->parameters();
94  for (auto& parameter : replica2_parameters) {
95  ASSERT_EQ(parameter.device(), torch::Device(torch::kCUDA, 1));
96  }
97  replicas[1]->to(torch::kCPU);
98  ASSERT_EQ(replica2_parameters.size(), original_parameters.size());
99  for (size_t i = 0; i < original_parameters.size(); ++i) {
100  ASSERT_TRUE(replica2_parameters[i].allclose(original_parameters[i]));
101  ASSERT_TRUE(
102  replica2_parameters[i].data<float>() !=
103  original_parameters[i].data<float>());
104  }
105 }
106 
107 TEST_F(ParallelTest, ParallelApply_MultiCUDA) {
108  Linear a(3, 4);
109 
110  Linear b(std::dynamic_pointer_cast<LinearImpl>(a->clone()));
111  b->to({torch::kCUDA, 0});
112 
113  Linear c(std::dynamic_pointer_cast<LinearImpl>(a->clone()));
114  c->to({torch::kCUDA, 1});
115 
116  std::vector<Linear> modules = {a, b, c};
117  std::vector<torch::Tensor> inputs = {
118  torch::ones({2, 3}),
119  torch::ones({2, 3}, torch::device({torch::kCUDA, 0})),
120  torch::ones({2, 3}, torch::device({torch::kCUDA, 1}))};
121 
122  auto outputs = parallel::parallel_apply(modules, inputs);
123 
124  ASSERT_EQ(outputs.size(), 3);
125  ASSERT_TRUE(outputs[0].device().is_cpu());
126 
127  ASSERT_EQ(outputs[1].device(), torch::Device(torch::kCUDA, 0));
128  ASSERT_TRUE(outputs[1].to(torch::kCPU).allclose(outputs[0]));
129 
130  ASSERT_EQ(outputs[2].device(), torch::Device(torch::kCUDA, 1));
131  ASSERT_TRUE(outputs[2].to(torch::kCPU).allclose(outputs[0]));
132 }
133 
134 TEST_F(ParallelTest, ParallelApplyWithDifferentOutputDevice_MultiCUDA) {
135  struct M : torch::nn::Module {
136  torch::Tensor forward(torch::Tensor input) {
137  return torch::ones(5, torch::kInt32);
138  }
139  };
140 
141  std::vector<std::shared_ptr<M>> modules = {
142  std::make_shared<M>(), std::make_shared<M>(), std::make_shared<M>()};
143  std::vector<torch::Tensor> inputs = {
144  torch::empty({}), torch::empty({}), torch::empty({})};
145  std::vector<torch::Device> devices = {
146  {torch::kCUDA, 1}, {torch::kCUDA, 0}, {torch::kCPU}};
147 
148  auto outputs = parallel::parallel_apply(modules, inputs, devices);
149 
150  ASSERT_EQ(outputs.size(), 3);
151  ASSERT_TRUE(outputs[0].device().is_cuda());
152  ASSERT_EQ(outputs[0].device(), torch::Device(torch::kCUDA, 1));
153 
154  ASSERT_TRUE(outputs[1].device().is_cuda());
155  ASSERT_EQ(outputs[1].device(), torch::Device(torch::kCUDA, 0));
156 
157  ASSERT_TRUE(outputs[2].device().is_cpu());
158 }
159 
160 TEST_F(ParallelTest, ParallelApplyRethrowsException_MultiCUDA) {
161  struct M : torch::nn::Cloneable<M> {
162  void reset() override {}
163  torch::Tensor forward(torch::Tensor input) {
164  throw std::runtime_error("Badness!");
165  }
166  };
167 
168  auto m = std::make_shared<M>();
169  auto input = torch::ones({10, 3});
170  ASSERT_THROWS_WITH(parallel::data_parallel(m, input), "Badness!");
171 }
172 
173 TEST_F(
174  ParallelTest,
175  DataParallelPlacesTheOutputOnTheRequestedDevice_MultiCUDA) {
176  struct M : torch::nn::Cloneable<M> {
177  void reset() override {}
178  torch::Tensor forward(torch::Tensor input) {
179  // The returned tensor should be on the output device.
180  return torch::ones(3);
181  }
182  };
183  auto m = std::make_shared<M>();
184  auto input = torch::ones({10, 3});
185  {
186  auto output = parallel::data_parallel(
187  m,
188  input,
189  /*devices=*/torch::nullopt,
190  /*output_device=*/torch::Device(torch::kCUDA, 1));
191  ASSERT_TRUE(output.defined());
192  ASSERT_TRUE(output.device().is_cuda());
193  ASSERT_EQ(output.device().index(), 1);
194  }
195  {
196  // Verify for the single-device case (where we don't scatter/gather).
197  auto output = parallel::data_parallel(
198  m,
199  input,
200  /*devices=*/std::vector<torch::Device>{torch::Device(torch::kCUDA, 0)},
201  /*output_device=*/torch::Device(torch::kCUDA, 1));
202  ASSERT_TRUE(output.defined());
203  ASSERT_TRUE(output.device().is_cuda());
204  ASSERT_EQ(output.device().index(), 1);
205  }
206 }
207 
208 TEST_F(ParallelTest, DataParallelUsesAllAvailableCUDADevices_CUDA) {
209  struct M : torch::nn::Cloneable<M> {
210  void reset() override {}
211  torch::Tensor forward(torch::Tensor input) {
212  return torch::tensor(input.device().index());
213  }
214  };
215 
216  auto m = std::make_shared<M>();
217  auto input = torch::ones({10, 3});
218  auto output = parallel::data_parallel(m, input);
219 
220  const auto device_count = torch::cuda::device_count();
221  ASSERT_EQ(output.numel(), device_count);
222  for (size_t i = 0; i < device_count; ++i) {
223  ASSERT_EQ(output[i].item<int32_t>(), i);
224  }
225 }
Definition: any.cpp:108
Represents a a compute device on which a tensor is located.
Definition: Device.h:30
Device device() const
Returns a Tensor&#39;s device.
The clone() method in the base Module class does not have knowledge of the concrete runtime type of i...
Definition: cloneable.h:23
The base class for all modules in PyTorch.
Definition: module.h:62
DeviceIndex index() const noexcept
Returns the optional index.
Definition: Device.h:70