9 from datetime
import timedelta
11 from functools
import wraps
12 from collections
import namedtuple
15 import common_utils
as common
21 from common_utils
import TestCase, load_tests, run_tests
22 from common_utils
import retry_on_address_already_in_use_error
26 load_tests = load_tests
28 if not c10d.is_available():
29 print(
'c10d not available, skipping tests')
36 TestSkip = namedtuple(
'TestSkip',
'exit_code, message')
39 "multi-gpu": TestSkip(75,
"Need at least 2 CUDA devices"),
40 "nccl": TestSkip(76,
"c10d not compiled with NCCL support"),
41 "known_issues": TestSkip(77,
"Test skipped due to known issues")
45 def skip_if_not_multigpu(func):
46 """Multi-GPU tests requires at least 2 GPUS. Skip if this is not met.""" 48 def wrapper(*args, **kwargs):
50 return func(*args, **kwargs)
51 sys.exit(TEST_SKIPS[
'multi-gpu'].exit_code)
56 def skip_if_not_nccl(func):
57 """Skips a test if NCCL is not available (for c10d).""" 59 def wrapper(*args, **kwargs):
60 if hasattr(c10d,
"ProcessGroupNCCL"):
61 return func(*args, **kwargs)
62 sys.exit(TEST_SKIPS[
'nccl'].exit_code)
67 def skip_for_known_issues(func):
68 """Skips a test due to known issues (for c10d).""" 70 def wrapper(*args, **kwargs):
71 sys.exit(TEST_SKIPS[
'known_issues'].exit_code)
76 def get_timeout(test_id):
77 return TIMEOUT_OVERRIDE.get(test_id.split(
'.')[-1], TIMEOUT_DEFAULT)
80 def gpus_for_rank(world_size):
81 """Multigpu tests are designed to simulate the multi nodes with multi 82 GPUs on each node. Nccl backend requires equal #GPUs in each process. 83 On a single node, all visible GPUs are evenly 84 divided to subsets, each process only uses a subset. 89 for rank
in range(world_size):
90 gpus_for_rank.append(visible_devices[rank * gpus_per_process: (rank + 1) * gpus_per_process])
94 def simple_reduce_tests(rank, world_size):
98 torch.Tensor([rank + 1.0]),
99 torch.Tensor([float(world_size * (world_size + 1) / 2)]),
102 c10d.ReduceOp.PRODUCT,
103 torch.Tensor([rank + 1.0]),
104 torch.Tensor([float(math.factorial(world_size))]),
108 torch.Tensor([rank + 1.0]),
113 torch.Tensor([rank + 1.0]),
114 torch.Tensor([world_size]),
119 def simple_multi_input_reduce_tests(rank, world_size):
123 [torch.Tensor([2 * rank + 0.0]), torch.Tensor([2 * rank + 1.0])],
124 torch.Tensor([float(world_size * (2 * world_size - 1))]),
127 c10d.ReduceOp.PRODUCT,
128 [torch.Tensor([2 * rank + 1.0]), torch.Tensor([2 * rank + 2.0])],
129 torch.Tensor([float(math.factorial(2 * world_size))]),
133 [torch.Tensor([2 * rank + 1.0]), torch.Tensor([2 * rank + 2.0])],
138 [torch.Tensor([2 * rank + 1.0]), torch.Tensor([2 * rank + 2.0])],
139 torch.Tensor([2 * world_size]),
145 def _create_store(self, i):
146 raise RuntimeError(
"not implemented")
148 def _test_set_get(self, fs):
152 fs.set(
"key0",
"value0")
154 fs.set(
"key1",
"value1")
156 fs.set(
"key2",
"value2")
161 self.assertEqual(b
"6", fs.get(
"key"))
162 self.assertEqual(b
"value0", fs.get(
"key0"))
163 self.assertEqual(b
"value1", fs.get(
"key1"))
164 self.assertEqual(b
"value2", fs.get(
"key2"))
165 self.assertEqual(b
"21", fs.get(
"key3"))
167 def test_set_get(self):
173 self.
file = tempfile.NamedTemporaryFile(delete=
False)
178 def _create_store(self):
179 store = c10d.FileStore(self.file.name, 1)
180 store.set_timeout(timedelta(seconds=300))
186 self.
file = tempfile.NamedTemporaryFile(delete=
False)
187 self.
filestore = c10d.FileStore(self.file.name, 1)
188 self.
prefix =
"test_prefix" 189 self.filestore.set_timeout(timedelta(seconds=300))
194 def _create_store(self):
195 return c10d.PrefixStore(self.prefix, self.filestore)
198 def create_tcp_store(addr):
200 Creates a TCP store. Retries if the chosen port is already in use. 205 port = common.find_free_port()
207 return c10d.TCPStore(addr, port, 1,
True)
208 except RuntimeError
as error:
209 if str(error) ==
"Address already in use":
212 raise RuntimeError(
"Unable to find free port (tried %s)" %
", ".join(ports))
216 def _create_store(self):
217 store = create_tcp_store(
'localhost')
218 store.set_timeout(timedelta(seconds=300))
221 def test_address_already_in_use(self):
224 port = common.find_free_port()
229 store1 = c10d.TCPStore(addr, port, 1,
True)
230 store2 = c10d.TCPStore(addr, port, 1,
True)
235 self.
tcpstore = create_tcp_store(
'localhost')
236 self.
prefix =
"test_prefix" 237 self.tcpstore.set_timeout(timedelta(seconds=300))
239 def _create_store(self):
244 def test_unknown_handler(self):
246 c10d.rendezvous(
'invalid://')
250 @retry_on_address_already_in_use_error
251 def test_common_errors(self):
253 if not hasattr(c10d,
"ProcessGroupNCCL"):
254 raise unittest.SkipTest(
"C10D is not built with NCCL process group," 259 "MASTER_ADDR":
"127.0.0.1",
260 "MASTER_PORT": common.find_free_port(),
264 def __init__(self, vars):
268 for key, value
in self.vars.items():
269 os.environ[key] = str(value)
271 def __exit__(self, type, value, traceback):
272 for key
in self.vars.keys():
280 def withouts(d, keys):
286 with Env(without(vars,
'WORLD_SIZE')):
288 gen = c10d.rendezvous(
'env://')
290 c10d.init_process_group(backend=
'nccl', world_size=1)
293 c10d.destroy_process_group()
295 with Env(without(vars,
'RANK')):
297 gen = c10d.rendezvous(
'env://')
299 c10d.init_process_group(backend=
'nccl', rank=0)
302 c10d.destroy_process_group()
304 with Env(withouts(vars, [
'RANK',
'WORLD_SIZE'])):
305 c10d.init_process_group(backend=
'nccl', rank=0, world_size=1)
308 c10d.destroy_process_group()
311 c10d.init_process_group(backend=
'nccl')
314 c10d.destroy_process_group()
316 with Env(without(vars,
'MASTER_ADDR')):
318 gen = c10d.rendezvous(
'env://')
321 with Env(without(vars,
'MASTER_PORT')):
323 gen = c10d.rendezvous(
'env://')
326 with Env(without(vars,
'WORLD_SIZE')):
327 gen = c10d.rendezvous(
'env://?world_size={}'.format(1))
328 _, _, size = next(gen)
331 with Env(without(vars,
'RANK')):
332 gen = c10d.rendezvous(
'env://?rank={}'.format(0))
333 _, rank, _ = next(gen)
336 with Env(withouts(vars, [
'RANK',
'WORLD_SIZE'])):
337 gen = c10d.rendezvous(
'env://?rank={}&world_size={}'.format(0, 1))
338 _, rank, size = next(gen)
342 @retry_on_address_already_in_use_error
343 def test_nominal(self):
344 os.environ[
'WORLD_SIZE'] =
'1' 345 os.environ[
'MASTER_ADDR'] =
'127.0.0.1' 346 os.environ[
'MASTER_PORT'] = str(common.find_free_port())
349 os.environ[
'RANK'] =
'0' 350 gen0 = c10d.rendezvous(
'env://')
351 store0, rank0, size0 = next(gen0)
355 store0.set(
"key0",
"value0")
362 def test_common_errors(self):
364 gen = c10d.rendezvous(
'file://?rank=0&world_size=1')
367 gen = c10d.rendezvous(
'file:///tmp/foo?world_size=1')
370 gen = c10d.rendezvous(
'file:///tmp/foo?rank=0')
373 def test_nominal(self):
374 with tempfile.NamedTemporaryFile(delete=
False)
as file:
375 url =
'file://%s?world_size=%d' % (file.name, 2)
376 gen0 = c10d.rendezvous(url +
"&rank=0")
377 store0, rank0, size0 = next(gen0)
380 gen1 = c10d.rendezvous(url +
"&rank=1")
381 store1, rank1, size1 = next(gen1)
386 store0.set(
"key0",
"value0")
387 store1.set(
"key1",
"value1")
395 def test_common_errors(self):
397 gen = c10d.rendezvous(
'tcp://127.0.0.1?rank=0&world_size=1')
400 gen = c10d.rendezvous(
'tcp://127.0.0.1:23456?world_size=1')
403 gen = c10d.rendezvous(
'tcp://127.0.0.1:23456?rank=0')
406 @retry_on_address_already_in_use_error
407 def test_nominal(self):
409 port = common.find_free_port()
410 url =
'tcp://%s:%d?world_size=%d' % (addr, port, 1)
411 gen0 = c10d.rendezvous(url +
"&rank=0")
412 store0, rank0, size0 = next(gen0)
417 store0.set(
"key0",
"value0")
424 MAIN_PROCESS_RANK = -1
427 def world_size(self):
446 for attr
in dir(cls):
447 if attr.startswith(
'test'):
448 fn = getattr(cls, attr)
453 self.
file = tempfile.NamedTemporaryFile(delete=
False)
460 def _spawn_process(self, rank):
461 name =
'process ' + str(rank)
462 process = multiprocessing.Process(target=self.
_run, name=name, args=(rank,))
466 def _run(self, rank):
471 getattr(self, self.id().
split(
".")[2])()
474 def _join_processes(self, fn):
475 timeout = get_timeout(self.id())
476 start_time = time.time()
479 elapsed_time = time.time() - start_time
482 def _check_return_codes(self, elapsed_time):
484 Checks that the return codes of all spawned processes match, and skips 485 tests if they returned a return code indicating a skipping condition. 489 if p.exitcode
is None:
490 raise RuntimeError(
'Process {} terminated or timed out after {} seconds'.format(i, elapsed_time))
491 self.
assertEqual(p.exitcode, first_process.exitcode)
492 for skip
in TEST_SKIPS.values():
493 if first_process.exitcode == skip.exit_code:
494 raise unittest.SkipTest(skip.message)
499 return self.
rank == 0
503 def opts(self, threads=2):
504 opts = c10d.ProcessGroupGloo.Options()
505 opts.devices = [c10d.ProcessGroupGloo.create_tcp_device(interface=
"lo")]
507 opts.threads = threads
510 def test_broadcast_checks(self):
511 store = c10d.FileStore(self.file.name, self.
world_size)
514 t1 = torch.zeros([1], dtype=torch.float32)
515 t2 = torch.zeros([1], dtype=torch.float64)
516 t3 = torch.zeros([2], dtype=torch.float32)
519 opts = c10d.BroadcastOptions()
522 pg.broadcast([t1], opts)
525 opts = c10d.BroadcastOptions()
528 pg.broadcast([t1], opts)
531 opts = c10d.BroadcastOptions()
532 opts.rootRank = self.
rank 534 pg.broadcast([t1], opts)
537 opts = c10d.BroadcastOptions()
538 opts.rootRank = self.
rank 540 pg.broadcast([t1], opts)
543 opts = c10d.BroadcastOptions()
544 opts.rootRank = self.
rank 546 pg.broadcast([], opts)
549 opts = c10d.BroadcastOptions()
550 opts.rootRank = self.
rank 552 pg.broadcast([t1, t2], opts)
555 opts = c10d.BroadcastOptions()
556 opts.rootRank = self.
rank 558 pg.broadcast([t1, t3], opts)
560 def _test_broadcast_basics(self, fn):
561 store = c10d.FileStore(self.file.name, self.
world_size)
564 def broadcast(xs, rootRank, rootTensor):
565 opts = c10d.BroadcastOptions()
566 opts.rootRank = rootRank
567 opts.rootTensor = rootTensor
568 work = pg.broadcast(xs, opts)
574 x = fn(torch.Tensor([self.
rank]))
582 fn(torch.Tensor([self.
rank * num + 0.0])),
583 fn(torch.Tensor([self.
rank * num + 1.0])),
587 self.
assertEqual(torch.Tensor([i * num + j]), xs[0])
588 self.
assertEqual(torch.Tensor([i * num + j]), xs[1])
591 x = torch.Tensor([self.
rank + 1.0])
592 work = pg.broadcast(x, root=0)
596 def test_broadcast_basics(self):
599 @skip_if_not_multigpu
600 def test_broadcast_basics_cuda(self):
603 def _test_broadcast_stress(self, inputs):
604 store = c10d.FileStore(self.file.name, self.
world_size)
607 pg.broadcast(inputs[i], root=(i % self.
world_size))
608 for i
in range(len(inputs))
610 for i, work_handle
in enumerate(work_handles):
617 "Mismatch in iteration %d" % i,
620 def test_broadcast_stress(self):
621 inputs = [torch.Tensor([i * self.
world_size + self.
rank])
for i
in range(1000)]
624 @skip_if_not_multigpu
625 def test_broadcast_stress_cuda(self):
626 inputs = [torch.Tensor([i * self.
world_size + self.
rank]).cuda()
for i
in range(1000)]
629 def test_allreduce_checks(self):
630 store = c10d.FileStore(self.file.name, self.
world_size)
633 t1 = torch.zeros([1], dtype=torch.float32)
634 t2 = torch.zeros([1], dtype=torch.float64)
635 t3 = torch.zeros([2], dtype=torch.float32)
638 opts = c10d.AllreduceOptions()
639 pg.allreduce([], opts)
642 opts = c10d.AllreduceOptions()
643 pg.allreduce([t1, t2], opts)
646 opts = c10d.AllreduceOptions()
647 pg.allreduce([t1, t3], opts)
649 def _test_allreduce_basics(self, fn):
650 store = c10d.FileStore(self.file.name, self.
world_size)
655 for (op, input, output)
in tests:
656 opts = c10d.AllreduceOptions()
659 work = pg.allreduce([tensor], opts)
664 tests = simple_multi_input_reduce_tests(self.
rank, self.
world_size)
665 for (op, inputs, output)
in tests:
666 opts = c10d.AllreduceOptions()
668 tensors = [fn(input)
for input
in inputs]
669 work = pg.allreduce(tensors, opts)
671 for tensor
in tensors:
675 x = fn(torch.Tensor([self.
rank + 1.0]))
676 work = pg.allreduce(x)
680 def test_allreduce_basics(self):
683 @skip_if_not_multigpu
684 def test_allreduce_basics_cuda(self):
687 def _test_allreduce_stress(self, inputs):
688 store = c10d.FileStore(self.file.name, self.
world_size)
690 work_handles = [pg.allreduce(inputs[i])
for i
in range(len(inputs))]
691 for i, work_handle
in enumerate(work_handles):
699 "Mismatch in iteration %d" % i,
702 def test_allreduce_stress(self):
703 inputs = [torch.Tensor([i + self.
rank])
for i
in range(1000)]
706 @skip_if_not_multigpu
707 def test_allreduce_stress_cuda(self):
708 inputs = [torch.Tensor([i + self.
rank]).cuda()
for i
in range(1000)]
711 def test_scatter_checks(self):
712 store = c10d.FileStore(self.file.name, self.
world_size)
715 t1 = torch.zeros([1], dtype=torch.float32)
716 t2 = torch.zeros([1], dtype=torch.float64)
717 t3 = torch.zeros([2], dtype=torch.float32)
720 opts = c10d.ScatterOptions()
722 pg.scatter([t1], [], opts)
725 opts = c10d.ScatterOptions()
727 pg.scatter([t1], [], opts)
729 with self.
assertRaisesRegex(ValueError,
"requires a single-element output tensor list"):
730 opts = c10d.ScatterOptions()
732 pg.scatter([], [], opts)
734 with self.
assertRaisesRegex(ValueError,
"requires a single-element output tensor list"):
735 opts = c10d.ScatterOptions()
737 pg.scatter([t1, t1], [], opts)
740 opts = c10d.ScatterOptions()
741 opts.rootRank = self.
rank 742 pg.scatter([t1], [], opts)
745 opts = c10d.ScatterOptions()
746 opts.rootRank = self.
rank 750 opts = c10d.ScatterOptions()
751 opts.rootRank = self.
rank 752 pg.scatter([t1], [[t1] * (self.
world_size - 1)], opts)
755 opts = c10d.ScatterOptions()
756 opts.rootRank = self.
rank 757 pg.scatter([t1], [[t1] * (self.
world_size + 1)], opts)
760 opts = c10d.ScatterOptions()
761 opts.rootRank = self.
rank 762 pg.scatter([t1], [[t1] * (self.
world_size + 1)], opts)
765 opts = c10d.ScatterOptions()
766 opts.rootRank = self.
rank 767 pg.scatter([t1], [[t2] * self.
world_size], opts)
770 opts = c10d.ScatterOptions()
771 opts.rootRank = self.
rank 772 pg.scatter([t1], [[t3] * self.
world_size], opts)
775 opts = c10d.ScatterOptions()
777 pg.scatter([t1], [[t1] * self.
world_size], opts)
779 def _test_scatter_basics(self, fn):
780 store = c10d.FileStore(self.file.name, self.
world_size)
784 input = [fn(torch.Tensor([self.
rank]))
for _
in range(self.
world_size)]
785 outputs = [fn(torch.Tensor([-1]))
for _
in range(self.
world_size)]
790 opts = c10d.ScatterOptions()
793 work.append(pg.scatter([outputs[i]], [input], opts))
795 work.append(pg.scatter([outputs[i]], [], opts))
802 def test_scatter_basics(self):
805 @skip_if_not_multigpu
806 def test_scatter_basics_cuda(self):
809 def _test_scatter_stress(self, inputs, fn):
810 store = c10d.FileStore(self.file.name, self.
world_size)
813 [fn(torch.Tensor([-1]))
for _
in range(self.
world_size)]
814 for _
in range(len(inputs))
817 for i
in range(len(inputs)):
819 opts = c10d.ScatterOptions()
821 if root == self.
rank:
822 work = pg.scatter([outputs[i][root]], [[fn(e)
for e
in inputs[i]]], opts)
824 work = pg.scatter([outputs[i][root]], [], opts)
825 work_handles.append(work)
827 for i, work_handle
in enumerate(work_handles):
833 torch.Tensor([iter + root]),
835 "Mismatch in iteration %d for rank %d" % (iter, root)
838 def test_scatter_stress(self):
845 @unittest.skip(
"Test is flaky, see https://github.com/pytorch/pytorch/issues/15963")
846 @skip_if_not_multigpu
847 def test_scatter_stress_cuda(self):
854 def test_gather_checks(self):
855 store = c10d.FileStore(self.file.name, self.
world_size)
858 t1 = torch.zeros([1], dtype=torch.float32)
859 t2 = torch.zeros([1], dtype=torch.float64)
860 t3 = torch.zeros([2], dtype=torch.float32)
863 opts = c10d.GatherOptions()
865 pg.gather([], [t1], opts)
868 opts = c10d.GatherOptions()
870 pg.gather([], [t1], opts)
872 with self.
assertRaisesRegex(ValueError,
"requires a single-element input tensor list"):
873 opts = c10d.GatherOptions()
875 pg.gather([], [], opts)
877 with self.
assertRaisesRegex(ValueError,
"requires a single-element input tensor list"):
878 opts = c10d.GatherOptions()
880 pg.gather([], [t1, t1], opts)
882 with self.
assertRaisesRegex(ValueError,
"requires a single-element output list"):
883 opts = c10d.GatherOptions()
884 opts.rootRank = self.
rank 885 pg.gather([], [t1], opts)
887 with self.
assertRaisesRegex(ValueError,
"requires a single-element output list"):
888 opts = c10d.GatherOptions()
889 opts.rootRank = self.
rank 892 with self.
assertRaisesRegex(ValueError,
"requires a single-element output list"):
893 opts = c10d.GatherOptions()
894 opts.rootRank = self.
rank 895 pg.gather([[t1] * (self.
world_size - 1)], [t1], opts)
897 with self.
assertRaisesRegex(ValueError,
"requires a single-element output list"):
898 opts = c10d.GatherOptions()
899 opts.rootRank = self.
rank 900 pg.gather([[t1] * (self.
world_size + 1)], [t1], opts)
903 opts = c10d.GatherOptions()
904 opts.rootRank = self.
rank 905 pg.gather([[t2] * self.
world_size], [t1], opts)
908 opts = c10d.GatherOptions()
909 opts.rootRank = self.
rank 910 pg.gather([[t3] * self.
world_size], [t1], opts)
913 opts = c10d.GatherOptions()
915 pg.gather([[t1] * self.
world_size], [t1], opts)
917 def _test_gather_basics(self, fn):
918 store = c10d.FileStore(self.file.name, self.
world_size)
922 input = [fn(torch.Tensor([self.
rank]))]
923 outputs = [fn(torch.Tensor([-1]))
for _
in range(self.
world_size)]
928 opts = c10d.GatherOptions()
931 work.append(pg.gather([outputs], input, opts))
933 work.append(pg.gather([], input, opts))
936 expected = [torch.Tensor([rank])
for rank
in range(self.
world_size)]
942 def test_gather_basics(self):
945 @skip_if_not_multigpu
946 def test_gather_basics_cuda(self):
949 def _test_gather_stress(self, inputs, fn):
950 store = c10d.FileStore(self.file.name, self.
world_size)
955 [fn(torch.Tensor([-1]))
for _
in range(self.
world_size)]
956 ]
for _
in range(len(inputs))
960 [torch.Tensor([i + j])
for j
in range(self.
world_size)]
961 ]
for i
in range(len(inputs))
963 for i
in range(len(inputs)):
965 opts = c10d.GatherOptions()
967 if root == self.
rank:
968 work = pg.gather(outputs[i], [fn(inputs[i])], opts)
970 work = pg.gather([], [fn(inputs[i])], opts)
971 work_handles.append(work)
973 for i, work_handle
in enumerate(work_handles):
977 if root == self.
rank:
979 expected_outputs[iter],
981 "Mismatch in iteration %d for root %d" % (iter, root)
984 def test_gather_stress(self):
985 inputs = [torch.Tensor([i + self.
rank])
for i
in range(1000)]
988 @skip_if_not_multigpu
989 def test_gather_stress_cuda(self):
990 inputs = [torch.Tensor([i + self.
rank]).cuda()
for i
in range(1000)]
993 def test_allgather_checks(self):
994 store = c10d.FileStore(self.file.name, self.
world_size)
997 t1 = torch.zeros([1], dtype=torch.float32)
998 t2 = torch.zeros([1], dtype=torch.float64)
999 t3 = torch.zeros([2], dtype=torch.float32)
1001 with self.
assertRaisesRegex(ValueError,
"requires non-empty input tensor list"):
1002 pg.allgather([], [])
1004 with self.
assertRaisesRegex(ValueError,
"requires input/output tensor lists to have the same length"):
1005 pg.allgather([], [t1])
1007 with self.
assertRaisesRegex(ValueError,
"requires input/output tensor lists to have the same length"):
1011 pg.allgather([[t1] * (self.
world_size - 1)], [t1])
1014 pg.allgather([[t1] * (self.
world_size + 1)], [t1])
1028 def _test_allgather_basics(self, fn):
1029 store = c10d.FileStore(self.file.name, self.
world_size)
1035 fn(torch.Tensor([n * self.
rank + i]))
for i
in range(n)
1039 fn(torch.Tensor([-1]))
for _
in range(n * self.
world_size)
1044 torch.Tensor([i])
for i
in range(n * self.
world_size)
1047 work = pg.allgather(output, input)
1051 def test_allgather_basics(self):
1054 @skip_if_not_multigpu
1055 def test_allgather_basics_cuda(self):
1058 def _test_allgather_stress(self, inputs, fn):
1059 store = c10d.FileStore(self.file.name, self.
world_size)
1064 [fn(torch.Tensor([-1]))
for _
in range(self.
world_size)]
1065 ]
for _
in range(len(inputs))
1067 expected_outputs = [
1069 [torch.Tensor([i + j])
for j
in range(self.
world_size)]
1070 ]
for i
in range(len(inputs))
1072 for i
in range(len(inputs)):
1073 work = pg.allgather(outputs[i], [fn(inputs[i])])
1074 work_handles.append(work)
1076 for i, work_handle
in enumerate(work_handles):
1079 expected_outputs[i],
1081 "Mismatch in iteration %d" % i
1084 def test_allgather_stress(self):
1085 inputs = [torch.Tensor([i + self.
rank])
for i
in range(1000)]
1088 @skip_if_not_multigpu
1089 def test_allgather_stress_cuda(self):
1090 inputs = [torch.Tensor([i + self.
rank]).cuda()
for i
in range(1000)]
1093 def test_reduce_checks(self):
1094 store = c10d.FileStore(self.file.name, self.
world_size)
1097 t1 = torch.zeros([1], dtype=torch.float32)
1100 opts = c10d.ReduceOptions()
1103 pg.reduce([t1], opts)
1106 opts = c10d.ReduceOptions()
1109 pg.reduce([t1], opts)
1112 opts = c10d.ReduceOptions()
1113 opts.rootRank = self.
rank 1115 pg.reduce([t1], opts)
1117 with self.
assertRaisesRegex(ValueError,
"requires a single-element tensor list"):
1118 opts = c10d.ReduceOptions()
1119 opts.rootRank = self.
rank 1121 pg.reduce([t1, t1], opts)
1123 def _test_reduce_basics(self, fn):
1124 store = c10d.FileStore(self.file.name, self.
world_size)
1126 for (op, input, output)
in simple_reduce_tests(self.
rank, self.
world_size):
1128 opts = c10d.ReduceOptions()
1130 opts.rootRank = root
1132 work = pg.reduce([tmp], opts)
1134 if root == self.
rank:
1137 def test_reduce_basics(self):
1140 @skip_if_not_multigpu
1141 def test_reduce_basics_cuda(self):
1144 def _test_reduce_stress(self, inputs):
1145 store = c10d.FileStore(self.file.name, self.
world_size)
1149 for i
in range(len(inputs)):
1151 opts = c10d.ReduceOptions()
1152 opts.rootRank = root
1153 tmp = inputs[i].clone()
1155 work = pg.reduce([tmp], opts)
1156 work_handles.append(work)
1158 for i, work_handle
in enumerate(work_handles):
1162 if root == self.
rank:
1169 "Mismatch in iteration %d with root rank %d" % (iter, root),
1172 def test_reduce_stress(self):
1173 inputs = [torch.Tensor([i + self.
rank])
for i
in range(1000)]
1176 @skip_if_not_multigpu
1177 def test_reduce_stress_cuda(self):
1178 inputs = [torch.Tensor([i + self.
rank]).cuda()
for i
in range(1000)]
1181 def test_send_recv_all_to_all(self):
1182 store = c10d.FileStore(self.file.name, self.
world_size)
1186 inputs = [torch.Tensor([self.
rank])
for _
in range(self.
world_size)]
1187 outputs = [torch.Tensor([-1])
for _
in range(self.
world_size)]
1194 send_work.append(pg.send([inputs[i]], i, 0))
1201 recv_work.append(pg.recv([outputs[i]], i, 0))
1204 for work
in send_work:
1206 self.assertTrue(work.is_completed())
1209 for work
in recv_work:
1211 self.assertTrue(work.is_completed())
1219 def test_timeout_kwarg(self):
1220 store = c10d.FileStore(self.file.name, self.
world_size)
1221 pg = c10d.ProcessGroupGloo(
1225 timeout=timedelta(seconds=0.5))
1238 def test_barrier_implies_wait(self):
1239 store = c10d.FileStore(self.file.name, self.
world_size)
1245 tensors = [torch.full(size, float(i))
for i
in range(num)]
1246 for tensor
in tensors:
1248 pg.allreduce(tensor)
1253 for i, tensor
in enumerate(tensors):
1258 MAIN_PROCESS_RANK = 0
1261 if not hasattr(c10d,
"ProcessGroupNCCL"):
1262 raise unittest.SkipTest(
"C10D is not built with NCCL process group," 1267 self.
file = tempfile.NamedTemporaryFile(delete=
False)
1270 raise unittest.SkipTest(
"NCCL test requires 2+ GPUs")
1275 def test_broadcast_ops(self):
1276 store = c10d.FileStore(self.file.name, self.world_size)
1277 pg = c10d.ProcessGroupNCCL(store, self.rank, self.world_size)
1279 def broadcast(xs, rootRank, rootTensor):
1280 opts = c10d.BroadcastOptions()
1281 opts.rootRank = rootRank
1282 opts.rootTensor = rootTensor
1283 work = pg.broadcast(xs, opts)
1287 for rt
in range(self.num_gpus):
1289 for i
in range(self.num_gpus):
1290 tensors.append(torch.Tensor([i]).cuda(i))
1292 broadcast(tensors, self.rank, rt)
1294 for i
in range(self.num_gpus):
1295 self.assertEqual(tensors[i], tensors[rt])
1297 def test_allreduce_ops(self):
1298 store = c10d.FileStore(self.file.name, self.world_size)
1299 pg = c10d.ProcessGroupNCCL(store, self.rank, self.world_size)
1301 def allreduce(tensors, op):
1302 opts = c10d.AllreduceOptions()
1304 work = pg.allreduce(tensors, opts)
1309 for i
in range(self.num_gpus):
1310 tensors.append(torch.Tensor([i + 1]).cuda(i))
1312 allreduce(tensors, c10d.ReduceOp.SUM)
1314 for i
in range(self.num_gpus):
1316 torch.Tensor([float(self.num_gpus * (self.num_gpus + 1) / 2)]),
1321 for i
in range(self.num_gpus):
1322 tensors.append(torch.Tensor([i + 1]).cuda(i))
1324 allreduce(tensors, c10d.ReduceOp.PRODUCT)
1326 for i
in range(self.num_gpus):
1328 torch.Tensor([float(math.factorial(self.num_gpus))]),
1333 for i
in range(self.num_gpus):
1334 tensors.append(torch.Tensor([i + 1]).cuda(i))
1336 allreduce(tensors, c10d.ReduceOp.MIN)
1338 for i
in range(self.num_gpus):
1339 self.assertEqual(torch.Tensor([1.0]), tensors[i])
1343 for i
in range(self.num_gpus):
1344 tensors.append(torch.Tensor([i + 1]).cuda(i))
1346 allreduce(tensors, c10d.ReduceOp.MAX)
1348 for i
in range(self.num_gpus):
1349 self.assertEqual(torch.Tensor([self.num_gpus]), tensors[i])
1351 def test_reduce_ops(self):
1352 store = c10d.FileStore(self.file.name, self.world_size)
1353 pg = c10d.ProcessGroupNCCL(store, self.rank, self.world_size)
1355 def reduce(xs, rootRank, rootTensor):
1356 opts = c10d.ReduceOptions()
1357 opts.rootRank = rootRank
1358 opts.rootTensor = rootTensor
1359 work = pg.reduce(xs, opts)
1363 for rt
in range(self.num_gpus):
1365 for i
in range(self.num_gpus):
1366 tensors.append(torch.Tensor([i + 1]).cuda(i))
1368 reduce(tensors, self.rank, rt)
1371 torch.Tensor([float(self.num_gpus * (self.num_gpus + 1) / 2)]),
1374 def test_allgather_ops(self):
1375 store = c10d.FileStore(self.file.name, self.world_size)
1376 pg = c10d.ProcessGroupNCCL(store, self.rank, self.world_size)
1378 def allgather(output_ts, input_ts):
1379 work = pg.allgather(output_ts, input_ts)
1383 output_ts = [[]
for _
in range(self.num_gpus)]
1385 for idx, ls
in enumerate(output_ts):
1386 for _
in range(self.world_size * self.num_gpus):
1387 ls.append(torch.Tensor([0]).cuda(idx))
1389 for i
in range(self.num_gpus):
1390 tensors.append(torch.Tensor([i]).cuda(i))
1392 allgather(output_ts, tensors)
1395 for device_ts
in output_ts:
1396 for s_idx, t
in enumerate(device_ts):
1397 self.assertEqual(torch.Tensor([s_idx]), t)
1399 def test_barrier(self):
1400 store = c10d.FileStore(self.file.name, self.world_size)
1401 pg = c10d.ProcessGroupNCCL(store, self.rank, self.world_size)
1403 def allreduce(tensors):
1404 opts = c10d.AllreduceOptions()
1405 work = pg.allreduce(tensors, opts)
1410 tensors_list = [[]
for _
in range(2, self.num_gpus + 1)]
1411 for i
in range(2, self.num_gpus + 1):
1413 tensors_list[i - 2].append(torch.Tensor([j + 1]).cuda(j))
1416 for tensors
in tensors_list:
1417 work = allreduce(tensors)
1423 for i
in range(2, self.num_gpus + 1):
1426 torch.Tensor([float(i * (i + 1) / 2)]),
1427 tensors_list[i - 2][j])
1432 super(Net, self).__init__()
1433 self.
fc1 = nn.Linear(2, 10, bias=
False)
1434 self.
fc2 = nn.Linear(10, 50, bias=
False)
1435 self.
fc3 = nn.Linear(50, 4, bias=
False)
1436 self.
relu = nn.ReLU()
1438 def forward(self, x):
1442 return F.softmax(x, dim=1)
1452 os.remove(self.file.name)
1457 def world_size(self):
1460 def _test_ddp_with_process_group(self, process_group, gpus):
1462 ddp_model = DistributedDataParallel(
1463 copy.deepcopy(model).cuda(gpus[0]),
1465 process_group=process_group,
1466 bucket_cap_mb=0.001)
1470 local_batch_size = len(gpus)
1471 global_batch_size = self.
world_size * local_batch_size
1472 input = torch.randn(global_batch_size, 2).cuda(gpus[0])
1473 target = torch.randn(global_batch_size, 4).cuda(gpus[0])
1475 def step_model(model, input, target):
1477 output =
model(input)
1478 loss = F.mse_loss(output, target)
1481 def update_parameters(model):
1482 for param
in model.parameters():
1483 param.data -= param.grad
1487 for iteration
in range(2):
1489 step_model(model, input, target)
1492 step_model(ddp_model,
1493 input[self.
rank * local_batch_size: (self.
rank + 1) * local_batch_size],
1494 target[self.
rank * local_batch_size: (self.
rank + 1) * local_batch_size])
1497 update_parameters(model)
1498 update_parameters(ddp_model)
1499 self.
assertEqual(len(list(model.parameters())), len(list(ddp_model.parameters())))
1500 for i, j
in zip(model.parameters(), ddp_model.parameters()):
1504 torch.manual_seed(1337 + iteration)
1505 input = input[torch.randperm(global_batch_size)]
1507 @skip_if_not_multigpu
1508 def test_gloo_backend(self):
1509 store = c10d.FileStore(self.file.name, self.
world_size)
1510 options = c10d.ProcessGroupGloo.Options()
1511 options.devices = [c10d.ProcessGroupGloo.create_tcp_device(interface=
"lo")]
1512 process_group = c10d.ProcessGroupGloo(store, self.
rank, self.
world_size, options)
1517 @skip_if_not_multigpu
1519 def test_nccl_backend(self):
1520 store = c10d.FileStore(self.file.name, self.
world_size)
1521 process_group = c10d.ProcessGroupNCCL(store, self.
rank, self.
world_size)
1526 @skip_if_not_multigpu
1528 @skip_for_known_issues
1529 def test_dist_broadcast_coalesced_nccl(self):
1530 store = c10d.FileStore(self.file.name, self.
world_size)
1531 process_group = c10d.ProcessGroupNCCL(store, self.
rank, self.
world_size)
1533 device = torch.device(
'cuda')
1535 for fine_grained
in [
False,
True]:
1536 target = torch.arange(60, dtype=torch.float16, device=device).chunk(5)
1537 target += torch.arange(60, dtype=torch.float32, device=device).chunk(5)
1538 target += torch.arange(60, dtype=torch.float16, device=device).chunk(5)
1539 target += torch.arange(60, dtype=torch.float64, device=device).chunk(5)
1540 target += torch.arange(60, dtype=torch.float16, device=device).chunk(5)
1541 target += torch.arange(60, dtype=torch.float32, device=device).chunk(5)
1549 tensors = torch.zeros(60, dtype=torch.float16, device=device).chunk(5)
1550 tensors += torch.zeros(60, dtype=torch.float32, device=device).chunk(5)
1551 tensors += torch.zeros(60, dtype=torch.float16, device=device).chunk(5)
1552 tensors += torch.zeros(60, dtype=torch.float64, device=device).chunk(5)
1553 tensors += torch.zeros(60, dtype=torch.float16, device=device).chunk(5)
1554 tensors += torch.zeros(60, dtype=torch.float32, device=device).chunk(5)
1556 c10d._dist_broadcast_coalesced(
1560 fine_grained=fine_grained)
1564 @skip_if_not_multigpu
1565 def test_dist_broadcast_coalesced_gloo(self):
1566 store = c10d.FileStore(self.file.name, self.
world_size)
1567 options = c10d.ProcessGroupGloo.Options()
1568 options.devices = [c10d.ProcessGroupGloo.create_tcp_device(interface=
"lo")]
1569 process_group = c10d.ProcessGroupGloo(store, self.
rank, self.
world_size, options)
1571 device = torch.device(
'cuda')
1573 for fine_grained
in [
False,
True]:
1574 target = torch.arange(60, dtype=torch.float16, device=device).chunk(5)
1575 target += torch.arange(60, dtype=torch.float32, device=device).chunk(5)
1576 target += torch.arange(60, dtype=torch.float16, device=device).chunk(5)
1577 target += torch.arange(60, dtype=torch.float64, device=device).chunk(5)
1578 target += torch.arange(60, dtype=torch.float16, device=device).chunk(5)
1579 target += torch.arange(60, dtype=torch.float32, device=device).chunk(5)
1587 tensors = torch.zeros(60, dtype=torch.float16, device=device).chunk(5)
1588 tensors += torch.zeros(60, dtype=torch.float32, device=device).chunk(5)
1589 tensors += torch.zeros(60, dtype=torch.float16, device=device).chunk(5)
1590 tensors += torch.zeros(60, dtype=torch.float64, device=device).chunk(5)
1591 tensors += torch.zeros(60, dtype=torch.float16, device=device).chunk(5)
1592 tensors += torch.zeros(60, dtype=torch.float32, device=device).chunk(5)
1594 c10d._dist_broadcast_coalesced(
1598 fine_grained=fine_grained)
1602 @skip_if_not_multigpu
1603 def test_sync_params_no_buffers(self):
1604 store = c10d.FileStore(self.file.name, self.
world_size)
1605 options = c10d.ProcessGroupGloo.Options()
1606 options.devices = [c10d.ProcessGroupGloo.create_tcp_device(interface=
"lo")]
1607 process_group = c10d.ProcessGroupGloo(store, self.
rank, self.
world_size, options)
1611 target = torch.arange(10, dtype=torch.float64, device=
'cuda:{}'.format(devices[0])).chunk(5)
1612 parameter_data = [target]
1613 parameter_data += [torch.zeros(10, device=torch.device(
'cuda', d)).chunk(5)
for d
in devices[1:]]
1614 buffer_data = [[]] * len(parameter_data)
1618 parameter_data=parameter_data,
1619 buffer_data=buffer_data,
1621 broadcast_bucket_size=10,
1622 broadcast_buffers=
False)
1624 for device_data
in parameter_data:
1625 for i, parameter
in enumerate(device_data):
1628 @skip_if_not_multigpu
1629 def test_sync_params_with_buffers(self):
1630 store = c10d.FileStore(self.file.name, self.
world_size)
1631 options = c10d.ProcessGroupGloo.Options()
1632 options.devices = [c10d.ProcessGroupGloo.create_tcp_device(interface=
"lo")]
1633 process_group = c10d.ProcessGroupGloo(store, self.
rank, self.
world_size, options)
1636 target = torch.arange(10, dtype=torch.float64, device=
'cuda:{}'.format(devices[0])).chunk(5)
1637 parameter_data = [target]
1638 parameter_data += [torch.zeros(10, device=torch.device(
'cuda', d)).chunk(5)
for d
in devices[1:]]
1644 buffer_data = [target]
1645 buffer_data += [torch.zeros(10, device=torch.device(
'cuda', d)).chunk(5)
for d
in devices[1:]]
1647 buffer_data = [torch.zeros(10, device=torch.device(
'cuda', d)).chunk(5)
for d
in devices]
1651 parameter_data=parameter_data,
1652 buffer_data=buffer_data,
1654 broadcast_bucket_size=10,
1655 broadcast_buffers=
True)
1657 for device_data
in parameter_data:
1658 for i, parameter
in enumerate(device_data):
1661 for device_data
in buffer_data:
1662 for i, buffer
in enumerate(device_data):
1665 @skip_if_not_multigpu
1667 def test_fp16(self):
1668 store = c10d.FileStore(self.file.name, self.
world_size)
1669 process_group = c10d.ProcessGroupNCCL(store, self.
rank, self.
world_size)
1672 model = nn.Linear(1, 1, bias=
False).cuda(gpus[0]).half()
1673 nn.init.constant_(model.weight, 1)
1674 ddp_model = DistributedDataParallel(
1676 device_ids=[gpus[0]],
1677 process_group=process_group,
1678 bucket_cap_mb=0.001,
1684 input = torch.Tensor([[2**15]]).cuda(gpus[0]).half()
1688 output = ddp_model(input)
1693 any(torch.isinf(p.grad).any()
for p
in ddp_model.parameters())
1697 @skip_if_not_multigpu
1698 def test_queue_reduction(self):
1700 store = c10d.FileStore(self.file.name, self.
world_size)
1701 process_group = c10d.ProcessGroupNCCL(store, self.
rank, self.
world_size)
1705 grads_batch = [(torch.ones(10, device=torch.device(
'cuda', d)) *
1706 (self.
rank + 1)).chunk(5)
1709 work, local_grad_sum = c10d._queue_reduction(process_group,
1713 self.assertTrue(isinstance(work, c10d.Work))
1715 self.assertTrue(isinstance(local_grad_sum, torch.Tensor))
1722 torch.ones(10) * (self.
world_size + 1) * len(devices) / 2.0)
1725 @skip_if_not_multigpu
1726 def test_sync_reduction(self):
1728 store = c10d.FileStore(self.file.name, self.
world_size)
1729 process_group = c10d.ProcessGroupNCCL(store, self.
rank, self.
world_size)
1733 grads_batch = [(torch.ones(10, device=torch.device(
'cuda', d)) *
1734 (self.
rank + 1)).chunk(5)
1736 work, local_grad_sum = c10d._queue_reduction(process_group,
1739 c10d._sync_reduction(work, grads_batch[0], local_grad_sum)
1741 self.
assertEqual(grads_batch[0], (torch.ones(10) * (self.
world_size + 1) * len(devices) / 2.0).chunk(5))
1744 if __name__ ==
'__main__':
1745 assert not torch.cuda._initialized,
"test_distributed must not have initialized CUDA context on main process" def assertEqual(self, x, y, prec=None, message='', allow_inf=False)
def _create_store(self, i)
Module caffe2.python.layers.split.
def _test_set_get(self, fs)
def _test_allreduce_stress(self, inputs)
def _test_scatter_basics(self, fn)
def _test_scatter_stress(self, inputs, fn)
def _test_gather_stress(self, inputs, fn)
def _test_gather_basics(self, fn)
def _spawn_process(self, rank)
def _test_allgather_basics(self, fn)
def _test_allgather_stress(self, inputs, fn)
def _test_broadcast_basics(self, fn)
def _test_allreduce_basics(self, fn)
def _join_processes(self, fn)
def _test_ddp_with_process_group(self, process_group, gpus)
def _test_reduce_basics(self, fn)
def opts(self, threads=2)
def _test_reduce_stress(self, inputs)
def _check_return_codes(self, elapsed_time)
def _test_broadcast_stress(self, inputs)