Caffe2 - Python API
A deep learning, cross platform ML framework
test_nccl.py
1 import unittest
2 
3 import torch
4 import torch.cuda.nccl as nccl
5 import torch.cuda
6 
7 from common_utils import TestCase, run_tests, IS_WINDOWS, load_tests
8 from common_cuda import TEST_CUDA, TEST_MULTIGPU
9 
10 # load_tests from common_utils is used to automatically filter tests for
11 # sharding on sandcastle. This line silences flake warnings
12 load_tests = load_tests
13 
15 if not TEST_CUDA:
16  print('CUDA not available, skipping tests')
17  TestCase = object # noqa: F811
18 
19 
21 
22  @unittest.skipIf(IS_WINDOWS, "NCCL doesn't support Windows")
23  def test_unique_id(self):
24  uid = nccl.unique_id()
25  self.assertIsInstance(uid, bytes)
26  self.assertGreater(len(uid), 1)
27 
28  @unittest.skipIf(IS_WINDOWS, "NCCL doesn't support Windows")
29  @unittest.skipIf(not TEST_MULTIGPU, "only one GPU detected")
30  def test_broadcast(self):
31  expected = torch.FloatTensor(128).uniform_()
32  tensors = [expected.cuda()]
33  for device in range(1, torch.cuda.device_count()):
34  with torch.cuda.device(device):
35  tensors.append(torch.cuda.FloatTensor(128))
36 
37  nccl.broadcast(tensors)
38  for i in range(torch.cuda.device_count()):
39  self.assertEqual(tensors[i], expected)
40 
41  @unittest.skipIf(IS_WINDOWS, "NCCL doesn't support Windows")
42  @unittest.skipIf(not TEST_MULTIGPU, "only one GPU detected")
43  def test_reduce(self):
44  tensors = [torch.FloatTensor(128).uniform_() for i in range(nGPUs)]
45  expected = torch.FloatTensor(128).zero_()
46  for t in tensors:
47  expected.add_(t)
48 
49  tensors = [tensors[i].cuda(i) for i in range(nGPUs)]
50  nccl.reduce(tensors)
51 
52  self.assertEqual(tensors[0], expected)
53 
54  @unittest.skipIf(IS_WINDOWS, "NCCL doesn't support Windows")
55  @unittest.skipIf(not TEST_MULTIGPU, "only one GPU detected")
56  def test_all_reduce(self):
57  tensors = [torch.FloatTensor(128).uniform_() for i in range(nGPUs)]
58  expected = torch.FloatTensor(128).zero_()
59  for t in tensors:
60  expected.add_(t)
61 
62  tensors = [tensors[i].cuda(i) for i in range(nGPUs)]
63  nccl.all_reduce(tensors)
64 
65  for tensor in tensors:
66  self.assertEqual(tensor, expected)
67 
68  @unittest.skipIf(IS_WINDOWS, "NCCL doesn't support Windows")
69  @unittest.skipIf(not TEST_MULTIGPU, "only one GPU detected")
70  def test_all_gather(self):
71  inputs = [torch.FloatTensor(128).uniform_() for i in range(nGPUs)]
72  expected = torch.cat(inputs, 0)
73 
74  inputs = [inputs[i].cuda(i) for i in range(nGPUs)]
75  outputs = [torch.cuda.FloatTensor(128 * nGPUs, device=i)
76  for i in range(nGPUs)]
77  nccl.all_gather(inputs, outputs)
78 
79  for tensor in outputs:
80  self.assertEqual(tensor, expected)
81 
82  @unittest.skipIf(IS_WINDOWS, "NCCL doesn't support Windows")
83  @unittest.skipIf(not TEST_MULTIGPU, "only one GPU detected")
84  def test_reduce_scatter(self):
85  in_size = 32 * nGPUs
86  out_size = 32
87 
88  inputs = [torch.FloatTensor(in_size).uniform_() for i in range(nGPUs)]
89  expected = torch.FloatTensor(in_size).zero_()
90  for t in inputs:
91  expected.add_(t)
92  expected = expected.view(nGPUs, 32)
93 
94  inputs = [inputs[i].cuda(i) for i in range(nGPUs)]
95  outputs = [torch.cuda.FloatTensor(out_size, device=i)
96  for i in range(nGPUs)]
97  nccl.reduce_scatter(inputs, outputs)
98 
99  for i in range(nGPUs):
100  self.assertEqual(outputs[i], expected[i])
101 
102 
103 if __name__ == '__main__':
104  run_tests()
def assertEqual(self, x, y, prec=None, message='', allow_inf=False)
def device_count()
Definition: __init__.py:341