1 from __future__
import absolute_import, division, print_function, unicode_literals
10 from contextlib
import contextmanager
11 from datetime
import timedelta
12 from functools
import reduce, wraps
20 from common_utils
import TestCase, run_tests
23 import common_utils
as common
25 BACKEND = os.environ[
"BACKEND"]
26 TEMP_DIR = os.environ[
"TEMP_DIR"]
27 INIT_METHOD = os.getenv(
"INIT_METHOD",
"env://")
30 CUSTOMIZED_TIMEOUT = {
"test_DistributedDataParallel": 500}
33 if INIT_METHOD.startswith(
"file://"):
34 FOLDER = INIT_METHOD[7:]
39 super(_FC2, self).__init__()
40 self.
fc = nn.Linear(10, 50, bias=
True)
41 self.fc.bias.requires_grad =
False 50 super(Net, self).__init__()
51 self.
fc1 = nn.Linear(2, 10, bias=
False)
53 self.
fc3 = nn.Linear(50, 4, bias=
False)
55 self.
no_grad_param = nn.Parameter(torch.Tensor([2, 2]).long(),
62 return F.softmax(x, dim=1)
68 super(BatchNormNet, self).__init__()
69 self.
fc1 = nn.Linear(2, 40, bias=
False)
70 self.
bn = nn.BatchNorm1d(4)
71 self.
fc2 = nn.Linear(40, 4, bias=
False)
74 x = torch.reshape(self.
fc1(x), (-1, 4, 10))
76 x = torch.reshape(x, (-1, 40))
78 return F.softmax(x, dim=1)
85 def get_timeout(test_id):
86 test_name = test_id.split(
".")[-1]
87 if test_name
in CUSTOMIZED_TIMEOUT:
88 return CUSTOMIZED_TIMEOUT[test_name]
90 return DEFAULT_TIMEOUT
93 if not dist.is_available():
94 print(
"Distributed not available, skipping tests")
98 SKIP_IF_NO_CUDA_EXIT_CODE = 75
99 SKIP_IF_NO_GPU_EXIT_CODE = 76
100 SKIP_IF_SMALL_WORLDSIZE_EXIT_CODE = 77
101 SKIP_IF_BACKEND_UNAVAILABLE = 78
104 def skip_if_no_cuda_distributed(func):
105 func.skip_if_no_cuda_distributed =
True 108 def wrapper(*args, **kwargs):
110 sys.exit(SKIP_IF_NO_CUDA_EXIT_CODE)
112 return func(*args, **kwargs)
117 def skip_if_no_gpu(func):
118 """ Nccl multigpu tests requires at least 2 GPUS. Skip if this is not met""" 119 func.skip_if_no_gpu =
True 122 def wrapper(*args, **kwargs):
124 sys.exit(SKIP_IF_NO_CUDA_EXIT_CODE)
126 sys.exit(SKIP_IF_NO_GPU_EXIT_CODE)
128 return func(*args, **kwargs)
133 def skip_if_small_worldsize(func):
134 func.skip_if_small_worldsize =
True 137 def wrapper(*args, **kwargs):
138 if (os.environ[
"BACKEND"] !=
"mpi")
and int(os.environ[
"WORLD_SIZE"]) <= 2:
139 sys.exit(SKIP_IF_SMALL_WORLDSIZE_EXIT_CODE)
141 return func(*args, **kwargs)
146 def apply_hack_for_nccl():
153 os.environ[
"NCCL_MAX_NRINGS"] =
"1" 158 lockfile = os.path.join(TEMP_DIR,
"lockfile")
159 with open(lockfile,
"w")
as lf:
161 fcntl.flock(lf.fileno(), fcntl.LOCK_EX)
164 fcntl.flock(lf.fileno(), fcntl.LOCK_UN)
168 def _build_tensor(size, value=None):
171 return torch.FloatTensor(size, size, size).fill_(value)
180 barrier_dir = os.path.join(TEMP_DIR,
"barrier")
181 for f_name
in os.listdir(barrier_dir):
182 os.unlink(os.path.join(barrier_dir, f_name))
185 def sync(cls, wait_for=None, timeout=5):
187 wait_for = dist.get_world_size()
189 barrier_dir = os.path.join(TEMP_DIR,
"barrier")
190 pid = str(os.getpid())
191 barrier_file = os.path.join(barrier_dir, pid)
193 with open(barrier_file,
"w")
as f:
196 start_time = time.time()
200 for f_name
in os.listdir(barrier_dir):
201 with open(os.path.join(barrier_dir, f_name),
"r") as f: 205 if arrived == wait_for:
208 if time.time() - start_time > timeout:
209 raise RuntimeError(
"barrier timeout")
214 def _barrier(self, *args, **kwargs):
215 Barrier.sync(*args, **kwargs)
217 def _init_group_test(self, **kwargs):
219 group_id = dist.new_group(group, **kwargs)
220 rank = dist.get_rank()
221 if rank
not in group:
222 return ([],
None, rank)
224 return (group, group_id, rank)
226 def _init_full_group_test(self, **kwargs):
227 group = [i
for i
in range(0, dist.get_world_size())]
228 group_id = dist.new_group(**kwargs)
229 rank = dist.get_rank()
230 return (group, group_id, rank)
232 def _init_global_test(self):
233 group = [i
for i
in range(0, dist.get_world_size())]
234 group_id = dist.group.WORLD
235 rank = dist.get_rank()
236 return (group, group_id, rank)
239 def _init_multigpu_helper(self):
240 """Multigpu tests are designed to simulate the multi nodes with multi 241 GPUs on each node. Nccl backend requires equal #GPUs in each process. 242 On a single node, all visible GPUs are evenly 243 divided to subsets, each process only uses a subset. 246 world_size = dist.get_world_size()
247 visible_devices = range(nGPUs)
249 if BACKEND ==
"nccl":
250 apply_hack_for_nccl()
252 nGPUs_per_process = nGPUs // world_size
255 visible_devices[i * nGPUs_per_process: (i + 1) * nGPUs_per_process]
257 for i
in range(world_size)
262 def test_get_rank(self):
263 test_dir = os.path.join(TEMP_DIR,
"test_dir")
264 pid = str(os.getpid())
265 num_processes = dist.get_world_size()
266 with open(os.path.join(test_dir, pid),
"w")
as f:
267 f.write(str(dist.get_rank()))
272 for f_name
in os.listdir(test_dir):
273 with open(os.path.join(test_dir, f_name),
"r") as f: 274 all_ranks.add(int(f.read())) 275 self.assertEqual(len(all_ranks), num_processes) 279 if dist.get_rank() == 0:
280 for f_name
in os.listdir(test_dir):
281 os.unlink(os.path.join(test_dir, f_name))
285 def test_get_backend(self):
286 if dist.get_world_size() > 2:
290 group_id = dist.new_group(group)
291 backend_str = BACKEND.lower()
292 self.assertEqual(dist.get_backend(), backend_str)
293 if dist.get_rank()
in group:
294 self.assertEqual(dist.get_backend(group_id), backend_str)
296 with self.assertRaisesRegex(RuntimeError,
"Invalid process group specified"):
297 dist.get_backend(group_id)
299 def test_Backend_enum_class(self):
301 backend = BACKEND.lower()
302 self.assertEqual(dist.Backend(BACKEND.upper()), backend)
303 self.assertEqual(dist.Backend(BACKEND), backend)
304 with self.assertRaisesRegex(ValueError,
"Invalid backend: 'undefined'"):
305 dist.Backend(
"undefined")
306 with self.assertRaisesRegex(ValueError,
"Invalid backend: 'xYz'"):
308 with self.assertRaises(ValueError):
310 with self.assertRaises(ValueError):
312 with self.assertRaises(ValueError):
313 dist.Backend([
"gloo"])
316 def test_destroy_group(self):
317 if dist.get_world_size() > 2:
321 group_id = dist.new_group(group)
323 dist.destroy_process_group(group_id)
326 def test_get_rank_size_group(self):
327 if dist.get_world_size() > 2:
331 group_id = dist.new_group(group)
332 if dist.get_rank()
in group:
333 self.assertEqual(dist.get_world_size(group_id), 2)
334 self.assertTrue(dist.get_rank(group_id)
in list(range(2)))
336 self.assertEqual(dist.get_world_size(group_id), -1)
337 self.assertEqual(dist.get_rank(group_id), -1)
340 def test_destroy_full_group(self):
343 dist.destroy_process_group(group_id)
346 def test_get_rank_size_full_group(self):
348 self.assertEqual(dist.get_world_size(group_id), dist.get_world_size())
349 self.assertEqual(dist.get_rank(group_id), dist.get_rank())
351 def _test_barrier_timeout(self, group_id, timeout):
352 local_rank = dist.get_rank(group_id)
356 expected_time = time.time() + timeout.total_seconds()
357 with self.assertRaisesRegex(RuntimeError,
" (Timed out|closed) "):
358 dist.barrier(group_id)
359 self.assertGreaterEqual(time.time(), expected_time)
361 time.sleep(timeout.total_seconds())
363 @unittest.skipIf(BACKEND !=
"gloo",
"Only gloo backend supports timeouts")
365 not INIT_METHOD.startswith(
"file://"),
366 "Requires file:// initialization method. " +
367 "Both tcp:// and env:// rely on the TCP store for which " 368 "reinitialization has proven racy." 370 def test_barrier_timeout_global(self):
371 dist.destroy_process_group()
375 self.
_barrier(wait_for=int(WORLD_SIZE))
378 timeout = timedelta(seconds=0.2)
379 dist.init_process_group(
380 init_method=INIT_METHOD,
382 world_size=int(WORLD_SIZE),
388 @skip_if_small_worldsize
389 @unittest.skipIf(BACKEND !=
"gloo",
"Only gloo backend supports timeouts")
390 def test_barrier_timeout_group(self):
391 timeout = timedelta(seconds=0.2)
393 if group_id
is not None:
396 @unittest.skipIf(BACKEND !=
"gloo",
"Only gloo backend supports timeouts")
397 def test_barrier_timeout_full_group(self):
398 timeout = timedelta(seconds=0.2)
400 if group_id
is not None:
404 @unittest.skipIf(BACKEND ==
"nccl",
"Nccl does not support send/recv")
405 def test_send_recv(self):
406 rank = dist.get_rank()
407 tensor = _build_tensor(rank + 1)
409 for src
in range(0, dist.get_world_size()):
412 for dst
in range(0, dist.get_world_size()):
415 dist.send(tensor, dst)
418 expected_tensor = _build_tensor(src + 1)
419 output_tensor = _build_tensor(src + 1, value=-1)
420 dist.recv(output_tensor, src)
421 self.assertEqual(output_tensor, expected_tensor)
427 BACKEND ==
"nccl",
"Nccl does not support send/recv from any source" 429 def test_send_recv_any_source(self):
430 rank = dist.get_rank()
431 tensor = _build_tensor(10, value=rank)
434 for dst
in range(0, dist.get_world_size()):
437 for dst
in range(0, dist.get_world_size()):
440 output_tensor = _build_tensor(10, value=-1)
441 sender = dist.recv(output_tensor)
446 self.assertTrue(output_tensor.eq(sender).all())
447 recv_ranks.add(sender)
450 dist.send(tensor, dst)
452 self.assertEqual(len(recv_ranks), dist.get_world_size() - 1)
456 @unittest.skipIf(BACKEND ==
"nccl",
"Nccl does not support send/recv")
457 def test_send_recv_with_tag(self):
458 rank = dist.get_rank()
459 world_size = dist.get_world_size()
460 tensor = _build_tensor(10, value=rank)
462 for dst
in range(0, world_size):
465 for src
in range(0, world_size):
468 output_tensor = _build_tensor(10, value=-1)
469 dist.recv(output_tensor, src, tag=src)
470 self.assertTrue(output_tensor.eq(src).all())
473 dist.send(tensor, dst, tag=rank)
476 @unittest.skipIf(BACKEND ==
"nccl",
"Nccl does not support isend")
477 def test_isend(self):
478 rank = dist.get_rank()
479 world_size = dist.get_world_size()
483 dist.isend(_build_tensor(dest, 10), dest)
484 for dest
in range(1, world_size)
486 for request
in requests:
488 self.assertTrue(request.is_completed())
490 tensor = _build_tensor(rank, -1)
492 self.assertEqual(tensor, _build_tensor(rank, 10))
497 @unittest.skipIf(BACKEND ==
"nccl",
"Nccl does not support irecv")
498 def test_irecv(self):
499 rank = dist.get_rank()
500 world_size = dist.get_world_size()
503 expected_tensors = [_build_tensor(src, -1)
for src
in range(1, world_size)]
505 dist.irecv(expected_tensors[src - 1], src)
506 for src
in range(1, world_size)
509 for src
in range(1, world_size):
510 requests[src - 1].wait()
511 self.assertTrue(requests[src - 1].is_completed())
512 self.assertEqual(expected_tensors[src - 1], _build_tensor(src, 10))
514 tensor = _build_tensor(rank, 10)
520 def _test_broadcast_helper(
521 self, group, group_id, rank, cuda=
False, rank_to_GPU=
None 523 for ttype, value, requires_cuda
in [
524 (
"torch.FloatTensor", -1e-10,
False),
525 (
"torch.DoubleTensor", -1e-100,
False),
526 (
"torch.HalfTensor", -0.1,
True),
527 (
"torch.CharTensor", -2,
False),
528 (
"torch.ByteTensor", 129,
False),
529 (
"torch.IntTensor", -1e5,
False),
530 (
"torch.LongTensor", -1e15,
False),
532 if requires_cuda
and not cuda:
535 expected_tensor = _build_tensor(src + 1, value).type(ttype)
537 expected_tensor = expected_tensor.cuda(rank_to_GPU[rank][0])
539 dist.broadcast(expected_tensor, src, group_id)
541 tensor = _build_tensor(src + 1, -1).type(ttype)
543 tensor = tensor.cuda(rank_to_GPU[rank][0])
544 dist.broadcast(tensor, src, group_id)
545 self.assertEqual(tensor.size(), expected_tensor.size())
546 self.assertEqual(tensor.ne(expected_tensor).max(), 0)
550 @unittest.skipIf(BACKEND ==
"nccl",
"Nccl does not support CPU tensors")
551 def test_broadcast(self):
556 BACKEND !=
"gloo" and BACKEND !=
"nccl",
557 "Only Gloo and Nccl backend supports CUDA allReduce",
559 @skip_if_no_cuda_distributed
561 def test_broadcast_cuda(self):
566 @skip_if_small_worldsize
567 @unittest.skipIf(BACKEND ==
"nccl",
"Nccl does not support CPU tensors")
568 def test_broadcast_group(self):
572 @unittest.skipIf(BACKEND ==
"nccl",
"Nccl does not support CPU tensors")
573 def test_broadcast_full_group(self):
578 def _test_reduce_helper(
592 tensor = _build_tensor(src + 1).fill_(master_value)
594 tensor = tensor.cuda(rank_to_GPU[rank][0])
595 dist.reduce(tensor, src, op, group_id)
596 self.assertEqual(tensor, _build_tensor(src + 1, expected_value))
598 tensor = _build_tensor(src + 1).fill_(worker_value)
600 tensor = tensor.cuda(rank_to_GPU[rank][0])
601 dist.reduce(tensor, src, op, group_id)
605 @unittest.skipIf(BACKEND ==
"nccl",
"Nccl does not support CPU tensors")
606 def test_reduce_sum(self):
615 2 + (10 * (len(group) - 1)),
618 @unittest.skipIf(BACKEND !=
"nccl",
"Only Nccl supports CUDA reduce")
619 @skip_if_no_cuda_distributed
621 def test_reduce_sum_cuda(self):
631 2 + 10 * (len(group) - 1),
636 @unittest.skipIf(BACKEND ==
"nccl",
"Nccl does not support CPU tensors")
637 def test_reduce_product(self):
643 dist.ReduceOp.PRODUCT,
646 reduce((
lambda x, y: x * y), [10] * (len(group) - 1), 2),
649 @unittest.skipIf(BACKEND ==
"nccl",
"Nccl does not support CPU tensors")
650 def test_reduce_min(self):
654 @unittest.skipIf(BACKEND ==
"nccl",
"Nccl does not support CPU tensors")
655 def test_reduce_max(self):
659 @unittest.skipIf(BACKEND ==
"nccl",
"Nccl does not support CPU tensors")
660 @skip_if_small_worldsize
661 def test_reduce_group_sum(self):
670 2 + (10 * (len(group) - 1)),
673 @unittest.skipIf(BACKEND ==
"nccl",
"Nccl does not support CPU tensors")
674 @skip_if_small_worldsize
675 def test_reduce_group_product(self):
681 dist.ReduceOp.PRODUCT,
684 reduce((
lambda x, y: x * y), [10] * (len(group) - 1), 2),
687 @unittest.skipIf(BACKEND ==
"nccl",
"Nccl does not support CPU tensors")
688 @skip_if_small_worldsize
689 def test_reduce_group_min(self):
693 @unittest.skipIf(BACKEND ==
"nccl",
"Nccl does not support CPU tensors")
694 @skip_if_small_worldsize
695 def test_reduce_group_max(self):
699 @unittest.skipIf(BACKEND ==
"nccl",
"Nccl does not support CPU tensors")
700 def test_reduce_full_group_sum(self):
709 2 + (10 * (len(group) - 1)),
712 @unittest.skipIf(BACKEND ==
"nccl",
"Nccl does not support CPU tensors")
713 def test_reduce_full_group_product(self):
719 dist.ReduceOp.PRODUCT,
722 reduce((
lambda x, y: x * y), [10] * (len(group) - 1), 2),
725 @unittest.skipIf(BACKEND ==
"nccl",
"Nccl does not support CPU tensors")
726 def test_reduce_full_group_min(self):
730 @unittest.skipIf(BACKEND ==
"nccl",
"Nccl does not support CPU tensors")
731 def test_reduce_full_group_max(self):
736 def _test_all_reduce_helper(
750 tensor = _build_tensor(src + 1).fill_(master_value)
752 tensor = tensor.cuda(rank_to_GPU[rank][0])
753 dist.all_reduce(tensor, op, group_id)
754 self.assertEqual(tensor, _build_tensor(src + 1, expected_value))
756 tensor = _build_tensor(src + 1).fill_(worker_value)
758 tensor = tensor.cuda(rank_to_GPU[rank][0])
759 dist.all_reduce(tensor, op, group_id)
760 self.assertEqual(tensor, _build_tensor(src + 1, expected_value))
764 @unittest.skipIf(BACKEND ==
"nccl",
"Nccl does not support CPU tensors")
765 def test_all_reduce_sum(self):
774 2 + (10 * (len(group) - 1)),
779 "Only Gloo backend will have CUDA allReduce tested",
781 @skip_if_no_cuda_distributed
783 def test_all_reduce_sum_cuda(self):
793 2 + (10 * (len(group) - 1)),
798 @unittest.skipIf(BACKEND ==
"nccl",
"Nccl does not support CPU tensors")
799 def test_all_reduce_product(self):
805 dist.ReduceOp.PRODUCT,
808 reduce((
lambda x, y: x * y), [10] * (len(group) - 1), 2),
811 @unittest.skipIf(BACKEND ==
"nccl",
"Nccl does not support CPU tensors")
812 def test_all_reduce_min(self):
815 group, group_id, rank, dist.ReduceOp.MIN, 1010, 1, 1
818 @unittest.skipIf(BACKEND ==
"nccl",
"Nccl does not support CPU tensors")
819 def test_all_reduce_max(self):
822 group, group_id, rank, dist.ReduceOp.MAX, -1, 10, 10
825 @skip_if_small_worldsize
826 @unittest.skipIf(BACKEND ==
"nccl",
"Nccl does not support CPU tensors")
827 def test_all_reduce_group_sum(self):
836 2 + (10 * (len(group) - 1)),
839 @skip_if_small_worldsize
840 @unittest.skipIf(BACKEND ==
"nccl",
"Nccl does not support CPU tensors")
841 def test_all_reduce_group_product(self):
847 dist.ReduceOp.PRODUCT,
850 reduce((
lambda x, y: x * y), [10] * (len(group) - 1), 2),
853 @skip_if_small_worldsize
854 @unittest.skipIf(BACKEND ==
"nccl",
"Nccl does not support CPU tensors")
855 def test_all_reduce_group_min(self):
858 group, group_id, rank, dist.ReduceOp.MIN, 1010, 1, 1
861 @skip_if_small_worldsize
862 @unittest.skipIf(BACKEND ==
"nccl",
"Nccl does not support CPU tensors")
863 def test_all_reduce_group_max(self):
866 group, group_id, rank, dist.ReduceOp.MAX, -1, 10, 10
869 @unittest.skipIf(BACKEND ==
"nccl",
"Nccl does not support CPU tensors")
870 def test_all_reduce_full_group_sum(self):
879 2 + (10 * (len(group) - 1)),
882 @unittest.skipIf(BACKEND ==
"nccl",
"Nccl does not support CPU tensors")
883 def test_all_reduce_full_group_product(self):
889 dist.ReduceOp.PRODUCT,
892 reduce((
lambda x, y: x * y), [10] * (len(group) - 1), 2),
895 @unittest.skipIf(BACKEND ==
"nccl",
"Nccl does not support CPU tensors")
896 def test_all_reduce_full_group_min(self):
899 group, group_id, rank, dist.ReduceOp.MIN, 1010, 1, 1
902 @unittest.skipIf(BACKEND ==
"nccl",
"Nccl does not support CPU tensors")
903 def test_all_reduce_full_group_max(self):
906 group, group_id, rank, dist.ReduceOp.MAX, -1, 10, 10
910 def _test_scatter_helper(self, group, group_id, rank):
912 tensor = _build_tensor(dest + 1, -1)
913 expected_tensor = _build_tensor(dest + 1, rank)
915 [_build_tensor(dest + 1, i)
for i
in group]
if rank == dest
else []
917 dist.scatter(tensor, src=dest, scatter_list=tensors, group=group_id)
918 self.assertEqual(tensor, expected_tensor)
922 @unittest.skipIf(BACKEND ==
"nccl",
"Nccl does not support scatter")
923 def test_scatter(self):
927 @unittest.skipIf(BACKEND ==
"nccl",
"Nccl does not support scatter")
928 @skip_if_small_worldsize
929 def test_scatter_group(self):
933 @unittest.skipIf(BACKEND ==
"nccl",
"Nccl does not support scatter")
934 def test_scatter_full_group(self):
939 def _test_gather_helper(self, group, group_id, rank):
941 tensor = _build_tensor(dest + 1, rank)
943 [_build_tensor(dest + 1, -1)
for i
in group]
if rank == dest
else []
945 dist.gather(tensor, dst=dest, gather_list=tensors, group=group_id)
947 expected_tensors = [_build_tensor(dest + 1, i)
for i
in group]
948 for t1, t2
in zip(tensors, expected_tensors):
949 self.assertEqual(t1, t2)
953 @unittest.skipIf(BACKEND ==
"nccl",
"Nccl does not support CPU tensors")
954 def test_gather(self):
958 @unittest.skipIf(BACKEND ==
"nccl",
"Nccl does not support CPU tensors")
959 @skip_if_small_worldsize
960 def test_gather_group(self):
964 @unittest.skipIf(BACKEND ==
"nccl",
"Nccl does not support CPU tensors")
965 def test_gather_full_group(self):
970 def _test_all_gather_helper(
971 self, group, group_id, rank, cuda=
False, rank_to_GPU=
None 974 tensor = _build_tensor(dest + 1, rank)
975 tensors = [_build_tensor(dest + 1, -1)
for i
in group]
977 tensor = tensor.cuda(rank_to_GPU[rank][0])
978 tensors = [t.cuda(rank_to_GPU[rank][0])
for t
in tensors]
979 dist.all_gather(tensors, tensor, group_id)
981 expected_tensors = [_build_tensor(dest + 1, i)
for i
in group]
982 for t1, t2
in zip(tensors, expected_tensors):
983 self.assertEqual(t1, t2)
987 @unittest.skipIf(BACKEND ==
"nccl",
"Nccl does not support CPU tensors")
988 def test_all_gather(self):
992 @unittest.skipIf(BACKEND !=
"nccl",
"Only Nccl supports CUDA all gather")
993 @unittest.skipIf(BACKEND ==
"nccl",
"CUDA all gather skipped for NCCL")
994 @skip_if_no_cuda_distributed
996 def test_all_gather_cuda(self):
1001 @skip_if_small_worldsize
1002 @unittest.skipIf(BACKEND ==
"nccl",
"Nccl does not support CPU tensors")
1003 def test_all_gather_group(self):
1007 @unittest.skipIf(BACKEND ==
"nccl",
"Nccl does not support CPU tensors")
1008 def test_all_gather_full_group(self):
1013 def _test_barrier_helper(
1014 self, group, group_id, rank, cuda=
False, rank_to_GPU=
None):
1018 expected_time = torch.DoubleTensor(1).fill_(0.0)
1020 expected_time = expected_time.cuda(rank_to_GPU[rank][0])
1022 expected_time.fill_(time.time() + WAIT_TIME)
1023 dist.broadcast(expected_time, dest, group_id)
1024 time.sleep(WAIT_TIME + 0.1)
1025 dist.barrier(group_id)
1027 dist.broadcast(expected_time, dest, group_id)
1028 dist.barrier(group_id)
1029 self.assertGreaterEqual(
1031 float(expected_time[0]),
1032 "destination rank: %d, my rank: %d" % (dest, rank) +
1033 " (if you see this failure, please report in #14554)")
1043 @unittest.skipIf(BACKEND ==
"mpi",
"MPI doesn't supports GPU barrier")
1044 def test_barrier_cuda(self):
1049 @skip_if_small_worldsize
1051 @unittest.skipIf(BACKEND ==
"mpi",
"MPI doesn't supports GPU barrier")
1052 def test_barrier_group_cuda(self):
1057 @skip_if_small_worldsize
1059 @unittest.skipIf(BACKEND ==
"mpi",
"MPI doesn't supports GPU barrier")
1060 def test_barrier_full_group_cuda(self):
1065 @unittest.skipIf(BACKEND ==
"nccl",
"NCCL does not support CPU barrier")
1066 def test_barrier(self):
1070 @skip_if_small_worldsize
1071 @unittest.skipIf(BACKEND ==
"nccl",
"NCCL does not support CPU barrier")
1072 def test_barrier_group(self):
1076 @unittest.skipIf(BACKEND ==
"nccl",
"NCCL does not support CPU barrier")
1077 def test_barrier_full_group(self):
1081 def _test_broadcast_multigpu_helper(self, group, group_id, rank, rank_to_GPU):
1083 expected_tensor = _build_tensor(src + 1)
1085 _build_tensor(src + 1, -1).cuda(device=i)
for i
in rank_to_GPU[rank]
1088 tensors[0] = expected_tensor.cuda(device=rank_to_GPU[rank][0])
1090 dist.broadcast_multigpu(tensors, src, group_id)
1091 for tensor
in tensors:
1092 self.assertEqual(tensor, expected_tensor)
1095 @unittest.skipIf(BACKEND ==
"mpi",
"MPI doesn't support broadcast multigpu")
1096 @unittest.skipIf(BACKEND ==
"nccl",
"NCCL broadcast multigpu skipped")
1098 def test_broadcast_multigpu(self):
1103 def _test_all_reduce_multigpu_helper(
1117 _build_tensor(src + 1, master_value).cuda(device=i)
1118 for i
in rank_to_GPU[rank]
1122 _build_tensor(src + 1, worker_value).cuda(device=i)
1123 for i
in rank_to_GPU[rank]
1126 dist.all_reduce_multigpu(tensors, op, group_id)
1127 expected_tensor = _build_tensor(src + 1, expected_value)
1128 for tensor
in tensors:
1129 self.assertEqual(tensor, expected_tensor)
1133 @unittest.skipIf(BACKEND ==
"mpi",
"MPI doesn't support broadcast multigpu")
1134 @unittest.skipIf(BACKEND ==
"nccl",
"CUDA all_reduce multigpu skipped for NCCL")
1136 def test_all_reduce_multigpu(self):
1147 (2 + 10 * (len(group) - 1)) * len(rank_to_GPU[0]),
1150 def _test_reduce_multigpu_helper(
1164 _build_tensor(src + 1, master_value).cuda(device=i)
1165 for i
in rank_to_GPU[rank]
1167 dist.reduce_multigpu(tensors, src, op, group_id)
1168 expected_tensor = _build_tensor(src + 1, expected_value)
1169 self.assertEqual(tensors[0], expected_tensor)
1172 _build_tensor(src + 1, worker_value).cuda(device=i)
1173 for i
in rank_to_GPU[rank]
1175 dist.reduce_multigpu(tensors, src, op, group_id)
1179 @unittest.skipIf(BACKEND !=
"nccl",
"Only Nccl backend supports reduce multigpu")
1181 def test_reduce_multigpu(self):
1192 (2 + 10 * (len(group) - 1)) * len(rank_to_GPU[0]),
1195 def _test_all_gather_multigpu_helper(self, group, group_id, rank, rank_to_GPU):
1198 _build_tensor(dest + 1).cuda(device=i)
for i
in rank_to_GPU[rank]
1204 expected_output = []
1206 [_build_tensor(dest + 1, -1)] * len(rank_to_GPU[0]) * len(group)
1208 expected_per_gpu = (
1209 [_build_tensor(dest + 1)] * len(rank_to_GPU[0]) * len(group)
1211 for gpu
in rank_to_GPU[rank]:
1212 output_tensors.append([t.cuda(device=gpu)
for t
in output_per_gpu])
1213 expected_output.append([t.cuda(device=gpu)
for t
in expected_per_gpu])
1215 dist.all_gather_multigpu(output_tensors, tensors, group_id)
1216 self.assertEqual(output_tensors, expected_output)
1220 @unittest.skipIf(BACKEND !=
"nccl",
"Only Nccl backend supports allgather multigpu")
1222 def test_all_gather_multigpu(self):
1227 def _model_step(self, model):
1228 for param
in model.parameters():
1229 if param.grad
is not None:
1230 param.data += param.grad
1233 def _prepare_dummy_data(self, local_bs):
1235 global_bs = int(WORLD_SIZE) * local_bs
1236 input_cpu = torch.randn(global_bs, 2)
1237 target = torch.randn(global_bs, 4)
1239 return global_bs, input_cpu, target, loss
1242 def _test_DDP_helper(self, model, input_var, target, loss):
1244 output =
model(input_var)
1245 l = loss(output, target)
1248 def _assert_equal_param(self, param_gpu, param_DDP):
1249 self.assertEqual(len(param_gpu), len(param_DDP))
1250 for p_gpu, p_DDP
in zip(param_gpu, param_DDP):
1251 self.assertEqual(p_gpu, p_DDP)
1253 def _test_DDP_5iter(
1254 self, model_base, model_DDP, input, target, loss, local_bs, rank, batch_size, test_save
1256 for idx
in range(5):
1263 input[rank * local_bs: (rank + 1) * local_bs],
1264 target[rank * local_bs: (rank + 1) * local_bs],
1272 list(model_base.parameters()), list(model_DDP.module.parameters())
1276 input = input[torch.randperm(batch_size)]
1279 if test_save
and idx == 2
and INIT_METHOD.startswith(
"file://"):
1280 _, filename = tempfile.mkstemp(prefix=FOLDER)
1281 torch.save(model_DDP, filename)
1282 model_DDP = torch.load(filename)
1284 with tempfile.TemporaryFile()
as tmp_file:
1285 torch.save(model_DDP, tmp_file)
1287 saved_model = torch.load(tmp_file)
1288 for k
in model_DDP.state_dict():
1289 self.assertEqual(model_DDP.state_dict()[k],
1290 saved_model.state_dict()[k])
1292 def _test_DistributedDataParallel(self, gpu_subset, rank, output_device=None):
1300 model_gpu = copy.deepcopy(model)
1301 model_gpu.cuda(gpu_subset[0])
1304 model_DDP = copy.deepcopy(model)
1305 model_DDP.cuda(gpu_subset[0])
1306 model_DDP = nn.parallel.DistributedDataParallel(
1307 model_DDP, device_ids=gpu_subset
1311 if INIT_METHOD.startswith(
"file://"):
1312 _, filename = tempfile.mkstemp(prefix=FOLDER)
1313 torch.save(model_DDP, filename)
1314 model_DDP = torch.load(filename)
1317 local_bs = len(gpu_subset)
1324 input_cpu.cuda(gpu_subset[0]),
1325 target.cuda(gpu_subset[0]),
1335 BACKEND ==
"nccl",
"nccl does not support DistributedDataParallelCPU" 1337 def test_DistributedDataParallelCPU(self):
1343 model_base = DDP_NET
1346 model_DDP = copy.deepcopy(model_base)
1347 model_DDP = nn.parallel.DistributedDataParallelCPU(model_DDP)
1356 model_base, model_DDP, input_cpu, target, loss, local_bs, rank, global_bs,
False 1360 @unittest.skipIf(BACKEND !=
'nccl' and BACKEND !=
'gloo',
1361 "Only Nccl & Gloo backend support DistributedDataParallel")
1362 @skip_if_no_cuda_distributed
1364 def test_DistributedDataParallel(self):
1367 gpus = list(rank_to_GPU[rank])
1374 gpus = list(map(
lambda i: torch.device(
'cuda:' + str(i)), gpus))
1377 def _test_DistributedDataParallel_SyncBatchNorm(self, gpu_subset, rank, output_device=None):
1385 model_gpu = copy.deepcopy(model)
1386 model_gpu.cuda(gpu_subset[0])
1389 model_DDP = nn.utils.convert_sync_batchnorm(copy.deepcopy(model))
1390 model_DDP.cuda(gpu_subset[0])
1391 model_DDP = nn.parallel.DistributedDataParallel(
1392 model_DDP, device_ids=gpu_subset
1396 if INIT_METHOD.startswith(
"file://"):
1397 _, filename = tempfile.mkstemp(prefix=FOLDER)
1398 torch.save(model_DDP, filename)
1399 model_DDP = torch.load(filename)
1402 local_bs = len(gpu_subset)
1409 input_cpu.cuda(gpu_subset[0]),
1410 target.cuda(gpu_subset[0]),
1419 @unittest.skipIf(BACKEND !=
'nccl' and BACKEND !=
'gloo',
1420 "Only Nccl & Gloo backend support DistributedDataParallel")
1421 @skip_if_no_cuda_distributed
1423 def test_DistributedDataParallel_SyncBatchNorm(self):
1426 gpus = list(rank_to_GPU[rank])
1433 gpus = list(map(
lambda i: torch.device(
'cuda:' + str(i)), gpus))
1436 if BACKEND ==
"gloo" or BACKEND ==
"nccl":
1437 WORLD_SIZE = os.environ[
"WORLD_SIZE"]
1440 MANAGER_PROCESS_RANK = -1
1443 def manager_join(fn):
1454 def setUpClass(cls):
1455 os.environ[
"MASTER_ADDR"] = str(MASTER_ADDR)
1456 os.environ[
"MASTER_PORT"] = str(MASTER_PORT)
1457 os.environ[
"WORLD_SIZE"] = str(WORLD_SIZE)
1458 for attr
in dir(cls):
1459 if attr.startswith(
"test"):
1460 fn = getattr(cls, attr)
1467 if INIT_METHOD.startswith(
"file://"):
1468 _, filename = tempfile.mkstemp(prefix=FOLDER)
1469 INIT_METHOD =
"file://{}".format(filename)
1474 for rank
in range(int(WORLD_SIZE)):
1481 def _spawn_process(self, rank):
1482 os.environ[
"RANK"] = str(rank)
1483 name =
"process " + str(rank)
1484 process = multiprocessing.Process(target=self.
_run, name=name, args=(rank,))
1488 def _run(self, rank):
1491 dist.init_process_group(
1492 init_method=INIT_METHOD,
1494 world_size=int(WORLD_SIZE),
1497 except RuntimeError
as e:
1498 if "recompile" in e.args[0]:
1499 sys.exit(SKIP_IF_BACKEND_UNAVAILABLE)
1510 getattr(self, self.id().
split(
".")[2])()
1512 dist.destroy_process_group()
1515 def _join_and_reduce(self, fn):
1517 getattr(fn,
"skip_if_no_cuda_distributed",
False)
or 1518 getattr(fn,
"skip_if_no_gpu",
False)
or 1519 getattr(fn,
"skip_if_small_worldsize",
False)
1521 join_timeout = get_timeout(self.id())
1522 for rank, process
in enumerate(self.
processes):
1523 process.join(join_timeout)
1526 "Timeout waiting for rank %d to terminate" % rank)
1530 self.
assertEqual(p.exitcode, first_process.exitcode)
1532 if first_process.exitcode == SKIP_IF_BACKEND_UNAVAILABLE:
1533 raise unittest.SkipTest(
"Compiled without the " + BACKEND +
" backend")
1539 first_process.exitcode == 0
or 1540 first_process.exitcode == SKIP_IF_NO_CUDA_EXIT_CODE
or 1541 first_process.exitcode == SKIP_IF_NO_GPU_EXIT_CODE
or 1542 first_process.exitcode == SKIP_IF_SMALL_WORLDSIZE_EXIT_CODE
1545 if first_process.exitcode == SKIP_IF_NO_CUDA_EXIT_CODE:
1546 raise unittest.SkipTest(
"cuda is not available")
1547 if first_process.exitcode == SKIP_IF_NO_GPU_EXIT_CODE:
1548 raise unittest.SkipTest(
1549 "One unique gpu per process is not available" 1551 if first_process.exitcode == SKIP_IF_SMALL_WORLDSIZE_EXIT_CODE:
1552 raise unittest.SkipTest(
"worldsize is too small to run group tests")
1557 elif BACKEND ==
"mpi":
1558 WORLD_SIZE = os.environ[
"WORLD_SIZE"]
1559 dist.init_process_group(init_method=INIT_METHOD, backend=
"mpi")
1565 if __name__ ==
"__main__":
1567 not torch.cuda._initialized
1568 ),
"test_distributed must not have initialized CUDA context on main process" def assertEqual(self, x, y, prec=None, message='', allow_inf=False)
def _test_scatter_helper(self, group, group_id, rank)
def _test_reduce_multigpu_helper(self, group, group_id, rank, rank_to_GPU, op, master_value, worker_value, expected_value)
def _test_barrier_helper(self, group, group_id, rank, cuda=False, rank_to_GPU=None)
def _test_all_gather_helper(self, group, group_id, rank, cuda=False, rank_to_GPU=None)
def _test_broadcast_helper(self, group, group_id, rank, cuda=False, rank_to_GPU=None)
def _test_all_reduce_multigpu_helper(self, group, group_id, rank, rank_to_GPU, op, master_value, worker_value, expected_value)
Module caffe2.python.layers.split.
def _test_all_reduce_helper(self, group, group_id, rank, op, master_value, worker_value, expected_value, cuda=False, rank_to_GPU=None)
def _spawn_process(self, rank)
def _prepare_dummy_data(self, local_bs)
def _init_full_group_test(self, kwargs)
def _test_gather_helper(self, group, group_id, rank)
def _test_broadcast_multigpu_helper(self, group, group_id, rank, rank_to_GPU)
def _model_step(self, model)
def _test_reduce_helper(self, group, group_id, rank, op, master_value, worker_value, expected_value, cuda=False, rank_to_GPU=None)
def _assert_equal_param(self, param_gpu, param_DDP)
def _test_DDP_5iter(self, model_base, model_DDP, input, target, loss, local_bs, rank, batch_size, test_save)
def _join_and_reduce(self, fn)
def _init_group_test(self, kwargs)
def _test_barrier_timeout(self, group_id, timeout)
def _test_DistributedDataParallel_SyncBatchNorm(self, gpu_subset, rank, output_device=None)
def _init_multigpu_helper(self)
def _test_all_gather_multigpu_helper(self, group, group_id, rank, rank_to_GPU)
def _init_global_test(self)
def _barrier(self, args, kwargs)
def _test_DDP_helper(self, model, input_var, target, loss)
def _test_DistributedDataParallel(self, gpu_subset, rank, output_device=None)