1 from __future__
import absolute_import, division, print_function, unicode_literals
9 from contextlib
import contextmanager
10 from functools
import reduce, wraps
19 from common_utils
import TestCase, run_tests
23 BACKEND = os.environ[
"BACKEND"]
24 TEMP_DIR = os.environ[
"TEMP_DIR"]
25 INIT_METHOD = os.getenv(
"INIT_METHOD",
"env://")
29 CUSTOMIZED_TIMEOUT = {
"test_DistributedDataParallel": 500}
32 def get_timeout(test_id):
33 test_name = test_id.split(
".")[-1]
34 if test_name
in CUSTOMIZED_TIMEOUT:
35 return CUSTOMIZED_TIMEOUT[test_name]
37 return DEFAULT_TIMEOUT
40 if not dist.is_available():
41 print(
"Distributed not available, skipping tests")
44 SKIP_IF_NO_CUDA_EXIT_CODE = 75
45 SKIP_IF_NO_GPU_EXIT_CODE = 76
46 SKIP_IF_SMALL_WORLDSIZE_EXIT_CODE = 77
47 SKIP_IF_BACKEND_UNAVAILABLE = 78
50 def skip_if_no_cuda_distributed(func):
51 func.skip_if_no_cuda_distributed =
True 54 def wrapper(*args, **kwargs):
56 sys.exit(SKIP_IF_NO_CUDA_EXIT_CODE)
58 return func(*args, **kwargs)
63 def skip_if_no_gpu(func):
64 """ Nccl multigpu tests requires at least 2 GPUS. Skip if this is not met""" 65 func.skip_if_no_gpu =
True 68 def wrapper(*args, **kwargs):
70 sys.exit(SKIP_IF_NO_CUDA_EXIT_CODE)
72 sys.exit(SKIP_IF_NO_GPU_EXIT_CODE)
74 return func(*args, **kwargs)
79 def skip_if_small_worldsize(func):
80 func.skip_if_small_worldsize =
True 83 def wrapper(*args, **kwargs):
84 if (os.environ[
"BACKEND"] !=
"mpi")
and int(os.environ[
"WORLD_SIZE"]) <= 2:
85 sys.exit(SKIP_IF_SMALL_WORLDSIZE_EXIT_CODE)
87 return func(*args, **kwargs)
92 def apply_hack_for_nccl():
99 os.environ[
"NCCL_MAX_NRINGS"] =
"1" 104 lockfile = os.path.join(TEMP_DIR,
"lockfile")
105 with open(lockfile,
"w")
as lf:
107 fcntl.flock(lf.fileno(), fcntl.LOCK_EX)
110 fcntl.flock(lf.fileno(), fcntl.LOCK_UN)
114 def _build_tensor(size, value=None):
117 return torch.FloatTensor(size, size, size).fill_(value)
126 barrier_dir = os.path.join(TEMP_DIR,
"barrier")
127 for f_name
in os.listdir(barrier_dir):
128 os.unlink(os.path.join(barrier_dir, f_name))
131 def sync(cls, timeout=5):
133 barrier_dir = os.path.join(TEMP_DIR,
"barrier")
134 pid = str(os.getpid())
135 barrier_file = os.path.join(barrier_dir, pid)
137 with open(barrier_file,
"w")
as f:
140 start_time = time.time()
144 for f_name
in os.listdir(barrier_dir):
145 with open(os.path.join(barrier_dir, f_name),
"r") as f: 149 if arrived == dist.get_world_size():
152 if time.time() - start_time > timeout:
153 raise RuntimeError(
"barrier timeout")
163 super(_FC2, self).__init__()
164 self.
fc = nn.Linear(10, 50, bias=
True)
165 self.fc.bias.requires_grad =
False 167 def forward(self, x):
174 super(Net, self).__init__()
175 self.
fc1 = nn.Linear(2, 10, bias=
False)
177 self.
fc3 = nn.Linear(50, 4, bias=
False)
178 self.
relu = nn.ReLU()
180 def forward(self, x):
184 return F.softmax(x, dim=1)
188 def _barrier(self, *args, **kwargs):
189 Barrier.sync(*args, **kwargs)
191 def _init_group_test(self):
193 group_id = dist.new_group(group)
194 rank = dist.get_rank()
195 if rank
not in group:
196 return ([],
None, rank)
198 return (group, group_id, rank)
200 def _init_global_test(self):
201 group = [i
for i
in range(0, dist.get_world_size())]
202 group_id = dist.group.WORLD
203 rank = dist.get_rank()
204 return (group, group_id, rank)
207 def _init_multigpu_helper(self):
208 """Multigpu tests are designed to simulate the multi nodes with multi 209 GPUs on each node. Nccl backend requires equal #GPUs in each process. 210 On a single node, all visible GPUs are evenly 211 divided to subsets, each process only uses a subset. 214 world_size = dist.get_world_size()
215 visible_devices = range(nGPUs)
217 if BACKEND ==
"nccl":
218 apply_hack_for_nccl()
220 nGPUs_per_process = nGPUs // world_size
223 visible_devices[i * nGPUs_per_process: (i + 1) * nGPUs_per_process]
225 for i
in range(world_size)
230 def test_get_rank(self):
231 test_dir = os.path.join(TEMP_DIR,
"test_dir")
232 pid = str(os.getpid())
233 num_processes = dist.get_world_size()
234 with open(os.path.join(test_dir, pid),
"w")
as f:
235 f.write(str(dist.get_rank()))
240 for f_name
in os.listdir(test_dir):
241 with open(os.path.join(test_dir, f_name),
"r") as f: 242 all_ranks.add(int(f.read())) 243 self.assertEqual(len(all_ranks), num_processes) 247 if dist.get_rank() == 0:
248 for f_name
in os.listdir(test_dir):
249 os.unlink(os.path.join(test_dir, f_name))
254 @unittest.skipIf(BACKEND ==
"gloo",
"Gloo does not support send/recv")
255 @unittest.skipIf(BACKEND ==
"nccl",
"Nccl does not support send/recv")
256 def test_send_recv(self):
257 rank = dist.get_rank()
258 tensor = _build_tensor(rank + 1)
259 for dest
in range(0, dist.get_world_size()):
262 dist.send(tensor, dest)
264 for src
in range(0, dist.get_world_size()):
267 tensor = _build_tensor(src + 1, value=-1)
268 expected_tensor = _build_tensor(src + 1)
269 dist.recv(tensor, src)
270 self.assertEqual(tensor, expected_tensor)
276 BACKEND ==
"gloo",
"Gloo does not support send/recv from any source" 279 BACKEND ==
"nccl",
"Nccl does not support send/recv from any source" 281 def test_send_recv_any_source(self):
282 rank = dist.get_rank()
283 tensor = _build_tensor(10, rank)
284 for dest
in range(0, dist.get_world_size()):
287 dist.send(tensor, dest)
290 for src
in range(0, dist.get_world_size()):
293 tensor = _build_tensor(10, value=-1)
294 sender = dist.recv(tensor)
295 self.assertTrue(tensor.eq(sender).all())
296 recv_ranks.add(sender)
298 self.assertEqual(len(recv_ranks), dist.get_world_size() - 1)
302 @unittest.skipIf(BACKEND ==
"gloo",
"Gloo does not support isend")
303 @unittest.skipIf(BACKEND ==
"nccl",
"Nccl does not support isend")
304 def test_isend(self):
305 rank = dist.get_rank()
306 world_size = dist.get_world_size()
310 dist.isend(_build_tensor(dest, 10), dest)
311 for dest
in range(1, world_size)
313 for request
in requests:
315 self.assertTrue(request.is_completed())
317 tensor = _build_tensor(rank, -1)
319 self.assertEqual(tensor, _build_tensor(rank, 10))
324 @unittest.skipIf(BACKEND ==
"gloo",
"Gloo does not support irecv")
325 @unittest.skipIf(BACKEND ==
"nccl",
"Nccl does not support irecv")
326 def test_irecv(self):
327 rank = dist.get_rank()
328 world_size = dist.get_world_size()
331 expected_tensors = [_build_tensor(src, -1)
for src
in range(1, world_size)]
333 dist.irecv(expected_tensors[src - 1], src)
334 for src
in range(1, world_size)
337 for src
in range(1, world_size):
338 requests[src - 1].wait()
339 self.assertTrue(requests[src - 1].is_completed())
340 self.assertEqual(expected_tensors[src - 1], _build_tensor(src, 10))
342 tensor = _build_tensor(rank, 10)
348 def _test_broadcast_helper(
349 self, group, group_id, rank, cuda=
False, rank_to_GPU=
None 351 for ttype, value, requires_cuda
in [
352 (
"torch.FloatTensor", -1e-10,
False),
353 (
"torch.DoubleTensor", -1e-100,
False),
354 (
"torch.HalfTensor", -0.1,
True),
355 (
"torch.CharTensor", -2,
False),
356 (
"torch.ByteTensor", 129,
False),
357 (
"torch.IntTensor", -1e5,
False),
358 (
"torch.LongTensor", -1e15,
False),
360 if requires_cuda
and not cuda:
363 expected_tensor = _build_tensor(src + 1, value).type(ttype)
365 expected_tensor = expected_tensor.cuda(rank_to_GPU[rank][0])
367 dist.broadcast(expected_tensor, src, group_id)
369 tensor = _build_tensor(src + 1, -1).type(ttype)
371 tensor = tensor.cuda(rank_to_GPU[rank][0])
372 dist.broadcast(tensor, src, group_id)
373 self.assertEqual(tensor.size(), expected_tensor.size())
374 self.assertEqual(tensor.ne(expected_tensor).max(), 0)
378 @unittest.skipIf(BACKEND ==
"nccl",
"Nccl does not support CPU tensors")
379 def test_broadcast(self):
384 BACKEND !=
"gloo" and BACKEND !=
"nccl",
385 "Only Gloo and Nccl backend supports CUDA allReduce",
387 @skip_if_no_cuda_distributed
389 def test_broadcast_cuda(self):
394 @unittest.skipIf(BACKEND ==
"nccl",
"Nccl does not support newGroup")
395 @skip_if_small_worldsize
396 def test_broadcast_group(self):
401 def _test_reduce_helper(
415 tensor = _build_tensor(src + 1).fill_(master_value)
417 tensor = tensor.cuda(rank_to_GPU[rank][0])
418 dist.reduce(tensor, src, op, group_id)
419 self.assertEqual(tensor, _build_tensor(src + 1, expected_value))
421 tensor = _build_tensor(src + 1).fill_(worker_value)
423 tensor = tensor.cuda(rank_to_GPU[rank][0])
424 dist.reduce(tensor, src, op, group_id)
428 @unittest.skipIf(BACKEND ==
"gloo",
"Gloo does not support reduce")
429 @unittest.skipIf(BACKEND ==
"nccl",
"Nccl does not support CPU tensors")
430 def test_reduce_sum(self):
439 2 + (10 * (len(group) - 1)),
442 @unittest.skipIf(BACKEND !=
"nccl",
"Only Nccl supports CUDA reduce")
443 @skip_if_no_cuda_distributed
445 def test_reduce_sum_cuda(self):
455 2 + 10 * (len(group) - 1),
460 @unittest.skipIf(BACKEND ==
"gloo",
"Gloo does not support reduce")
461 @unittest.skipIf(BACKEND ==
"nccl",
"Nccl does not support CPU tensors")
462 def test_reduce_product(self):
468 dist.reduce_op.PRODUCT,
471 reduce((
lambda x, y: x * y), [10] * (len(group) - 1), 2),
474 @unittest.skipIf(BACKEND ==
"gloo",
"Gloo does not support reduce")
475 @unittest.skipIf(BACKEND ==
"nccl",
"Nccl does not support CPU tensors")
476 def test_reduce_min(self):
480 @unittest.skipIf(BACKEND ==
"gloo",
"Gloo does not support reduce")
481 @unittest.skipIf(BACKEND ==
"nccl",
"Nccl does not support CPU tensors")
482 def test_reduce_max(self):
486 @unittest.skipIf(BACKEND ==
"gloo",
"Gloo does not support reduce")
487 @unittest.skipIf(BACKEND ==
"nccl",
"Nccl does not support newGroup")
488 @skip_if_small_worldsize
489 def test_reduce_group_sum(self):
498 2 + (10 * (len(group) - 1)),
501 @unittest.skipIf(BACKEND ==
"gloo",
"Gloo does not support reduce")
502 @unittest.skipIf(BACKEND ==
"nccl",
"Nccl does not support newGroup")
503 @skip_if_small_worldsize
504 def test_reduce_group_product(self):
510 dist.reduce_op.PRODUCT,
513 reduce((
lambda x, y: x * y), [10] * (len(group) - 1), 2),
516 @unittest.skipIf(BACKEND ==
"gloo",
"Gloo does not support reduce")
517 @unittest.skipIf(BACKEND ==
"nccl",
"Nccl does not support newGroup")
518 @skip_if_small_worldsize
519 def test_reduce_group_min(self):
523 @unittest.skipIf(BACKEND ==
"gloo",
"Gloo does not support reduce")
524 @unittest.skipIf(BACKEND ==
"nccl",
"Nccl does not support newGroup")
525 @skip_if_small_worldsize
526 def test_reduce_group_max(self):
531 def _test_all_reduce_helper(
545 tensor = _build_tensor(src + 1).fill_(master_value)
547 tensor = tensor.cuda(rank_to_GPU[rank][0])
548 dist.all_reduce(tensor, op, group_id)
549 self.assertEqual(tensor, _build_tensor(src + 1, expected_value))
551 tensor = _build_tensor(src + 1).fill_(worker_value)
553 tensor = tensor.cuda(rank_to_GPU[rank][0])
554 dist.all_reduce(tensor, op, group_id)
555 self.assertEqual(tensor, _build_tensor(src + 1, expected_value))
559 @unittest.skipIf(BACKEND ==
"nccl",
"Nccl does not support CPU tensors")
560 def test_all_reduce_sum(self):
569 2 + (10 * (len(group) - 1)),
573 BACKEND !=
"gloo" and BACKEND !=
"nccl",
574 "Only Gloo & Nccl backend support CUDA allReduce",
576 @skip_if_no_cuda_distributed
578 def test_all_reduce_sum_cuda(self):
588 2 + (10 * (len(group) - 1)),
593 @unittest.skipIf(BACKEND ==
"nccl",
"Nccl does not support CPU tensors")
594 def test_all_reduce_product(self):
600 dist.reduce_op.PRODUCT,
603 reduce((
lambda x, y: x * y), [10] * (len(group) - 1), 2),
606 @unittest.skipIf(BACKEND ==
"nccl",
"Nccl does not support CPU tensors")
607 def test_all_reduce_min(self):
610 group, group_id, rank, dist.reduce_op.MIN, 1010, 1, 1
613 @unittest.skipIf(BACKEND ==
"nccl",
"Nccl does not support CPU tensors")
614 def test_all_reduce_max(self):
617 group, group_id, rank, dist.reduce_op.MAX, -1, 10, 10
620 @unittest.skipIf(BACKEND ==
"nccl",
"Nccl does not support newGroup")
621 @skip_if_small_worldsize
622 def test_all_reduce_group_sum(self):
631 2 + (10 * (len(group) - 1)),
634 @unittest.skipIf(BACKEND ==
"nccl",
"Nccl does not support newGroup")
635 @skip_if_small_worldsize
636 def test_all_reduce_group_product(self):
642 dist.reduce_op.PRODUCT,
645 reduce((
lambda x, y: x * y), [10] * (len(group) - 1), 2),
648 @unittest.skipIf(BACKEND ==
"nccl",
"Nccl does not support newGroup")
649 @skip_if_small_worldsize
650 def test_all_reduce_group_min(self):
653 group, group_id, rank, dist.reduce_op.MIN, 1010, 1, 1
656 @unittest.skipIf(BACKEND ==
"nccl",
"Nccl does not support newGroup")
657 @skip_if_small_worldsize
658 def test_all_reduce_group_max(self):
661 group, group_id, rank, dist.reduce_op.MAX, -1, 10, 10
665 def _test_scatter_helper(self, group, group_id, rank):
667 tensor = _build_tensor(dest + 1, -1)
668 expected_tensor = _build_tensor(dest + 1, rank)
670 [_build_tensor(dest + 1, i)
for i
in group]
if rank == dest
else []
672 dist.scatter(tensor, src=dest, scatter_list=tensors, group=group_id)
673 self.assertEqual(tensor, expected_tensor)
677 @unittest.skipIf(BACKEND ==
"gloo",
"Gloo does not support scatter")
678 @unittest.skipIf(BACKEND ==
"nccl",
"Nccl does not support scatter")
679 def test_scatter(self):
683 @unittest.skipIf(BACKEND ==
"gloo",
"Gloo does not support scatter")
684 @unittest.skipIf(BACKEND ==
"nccl",
"Nccl does not support scatter")
685 @skip_if_small_worldsize
686 def test_scatter_group(self):
691 def _test_gather_helper(self, group, group_id, rank):
693 tensor = _build_tensor(dest + 1, rank)
695 [_build_tensor(dest + 1, -1)
for i
in group]
if rank == dest
else []
697 dist.gather(tensor, dst=dest, gather_list=tensors, group=group_id)
699 expected_tensors = [_build_tensor(dest + 1, i)
for i
in group]
700 for t1, t2
in zip(tensors, expected_tensors):
701 self.assertEqual(t1, t2)
705 @unittest.skipIf(BACKEND ==
"gloo",
"Gloo does not support gather")
706 @unittest.skipIf(BACKEND ==
"nccl",
"Nccl does not support CPU tensors")
707 def test_gather(self):
711 @unittest.skipIf(BACKEND ==
"gloo",
"Gloo does not support gather")
712 @unittest.skipIf(BACKEND ==
"nccl",
"Nccl does not support newGroup")
713 @skip_if_small_worldsize
714 def test_gather_group(self):
719 def _test_all_gather_helper(
720 self, group, group_id, rank, cuda=
False, rank_to_GPU=
None 723 tensor = _build_tensor(dest + 1, rank)
724 tensors = [_build_tensor(dest + 1, -1)
for i
in group]
726 tensor = tensor.cuda(rank_to_GPU[rank][0])
727 tensors = [t.cuda(rank_to_GPU[rank][0])
for t
in tensors]
728 dist.all_gather(tensors, tensor, group_id)
730 expected_tensors = [_build_tensor(dest + 1, i)
for i
in group]
731 for t1, t2
in zip(tensors, expected_tensors):
732 self.assertEqual(t1, t2)
736 @unittest.skipIf(BACKEND ==
"nccl",
"Nccl does not support CPU tensors")
737 def test_all_gather(self):
741 @unittest.skipIf(BACKEND !=
"nccl",
"Only Nccl supports CUDA all gather")
742 @skip_if_no_cuda_distributed
744 def test_all_gather_cuda(self):
749 @unittest.skipIf(BACKEND ==
"nccl",
"Nccl does not support newGroup")
750 @skip_if_small_worldsize
751 def test_all_gather_group(self):
756 def _test_barrier_helper(self, group, group_id, rank):
760 expected_time = torch.DoubleTensor(1).fill_(0.0)
762 expected_time.fill_(time.time() + WAIT_TIME)
763 dist.broadcast(expected_time, dest, group_id)
764 time.sleep(WAIT_TIME + 0.1)
765 dist.barrier(group_id)
767 dist.broadcast(expected_time, dest, group_id)
768 dist.barrier(group_id)
769 self.assertGreaterEqual(time.time(), expected_time[0])
773 @unittest.skipIf(BACKEND ==
"nccl",
"Nccl does not support CPU tensors")
774 def test_barrier(self):
778 @unittest.skipIf(BACKEND ==
"nccl",
"Nccl does not support newGroup")
779 @skip_if_small_worldsize
780 def test_barrier_group(self):
784 def _test_broadcast_multigpu_helper(self, group, group_id, rank, rank_to_GPU):
786 expected_tensor = _build_tensor(src + 1)
788 _build_tensor(src + 1, -1).cuda(device=i)
for i
in rank_to_GPU[rank]
791 tensors[0] = expected_tensor.cuda(device=rank_to_GPU[rank][0])
793 dist.broadcast_multigpu(tensors, src, group_id)
794 for tensor
in tensors:
795 self.assertEqual(tensor, expected_tensor)
798 @unittest.skipIf(BACKEND !=
"nccl",
"Only Nccl backend supports broadcast multigpu")
800 def test_broadcast_multigpu(self):
805 def _test_all_reduce_multigpu_helper(
819 _build_tensor(src + 1, master_value).cuda(device=i)
820 for i
in rank_to_GPU[rank]
824 _build_tensor(src + 1, worker_value).cuda(device=i)
825 for i
in rank_to_GPU[rank]
828 dist.all_reduce_multigpu(tensors, op, group_id)
829 expected_tensor = _build_tensor(src + 1, expected_value)
830 for tensor
in tensors:
831 self.assertEqual(tensor, expected_tensor)
835 @unittest.skipIf(BACKEND !=
"nccl",
"Only Nccl backend supports allreduce multigpu")
837 def test_all_reduce_multigpu(self):
848 (2 + 10 * (len(group) - 1)) * len(rank_to_GPU[0]),
851 def _test_reduce_multigpu_helper(
865 _build_tensor(src + 1, master_value).cuda(device=i)
866 for i
in rank_to_GPU[rank]
868 dist.reduce_multigpu(tensors, src, op, group_id)
869 expected_tensor = _build_tensor(src + 1, expected_value)
870 self.assertEqual(tensors[0], expected_tensor)
873 _build_tensor(src + 1, worker_value).cuda(device=i)
874 for i
in rank_to_GPU[rank]
876 dist.reduce_multigpu(tensors, src, op, group_id)
880 @unittest.skipIf(BACKEND !=
"nccl",
"Only Nccl backend supports reduce multigpu")
882 def test_reduce_multigpu(self):
893 (2 + 10 * (len(group) - 1)) * len(rank_to_GPU[0]),
896 def _test_all_gather_multigpu_helper(self, group, group_id, rank, rank_to_GPU):
899 _build_tensor(dest + 1).cuda(device=i)
for i
in rank_to_GPU[rank]
907 [_build_tensor(dest + 1, -1)] * len(rank_to_GPU[0]) * len(group)
910 [_build_tensor(dest + 1)] * len(rank_to_GPU[0]) * len(group)
912 for gpu
in rank_to_GPU[rank]:
913 output_tensors.append([t.cuda(device=gpu)
for t
in output_per_gpu])
914 expected_output.append([t.cuda(device=gpu)
for t
in expected_per_gpu])
916 dist.all_gather_multigpu(output_tensors, tensors, group_id)
917 self.assertEqual(output_tensors, expected_output)
921 @unittest.skipIf(BACKEND !=
"nccl",
"Only Nccl backend supports allgather multigpu")
923 def test_all_gather_multigpu(self):
928 def _create_Net(self):
931 def _model_step(self, model):
932 for param
in model.parameters():
933 if param.grad
is not None:
934 param.data += param.grad
937 def _prepare_dummy_data(self, local_bs):
939 global_bs = int(WORLD_SIZE) * local_bs
940 input_cpu = torch.randn(global_bs, 2)
941 target = torch.randn(global_bs, 4)
943 return global_bs, input_cpu, target, loss
946 def _test_DDP_helper(self, model, input_var, target, loss):
948 output =
model(input_var)
949 l = loss(output, target)
952 def _assert_equal_param(self, param_gpu, param_DDP):
953 self.assertEqual(len(param_gpu), len(param_DDP))
954 for p_gpu, p_DDP
in zip(param_gpu, param_DDP):
955 self.assertEqual(p_gpu, p_DDP)
958 self, model_base, model_DDP, input, target, loss, local_bs, rank, batch_size
967 input[rank * local_bs: (rank + 1) * local_bs],
968 target[rank * local_bs: (rank + 1) * local_bs],
976 list(model_base.parameters()), list(model_DDP.module.parameters())
980 input = input[torch.randperm(batch_size)]
983 with tempfile.TemporaryFile()
as tmp_file:
984 torch.save(model_DDP, tmp_file)
986 saved_model_DDP = torch.load(tmp_file)
989 BACKEND !=
"nccl" and BACKEND !=
"gloo",
990 "Only Nccl & Gloo backend support DistributedDataParallel",
992 @skip_if_no_cuda_distributed
994 def test_DistributedDataParallel(self):
1004 model_gpu = copy.deepcopy(model)
1005 gpu_subset = list(rank_to_GPU[rank])
1006 model_gpu.cuda(gpu_subset[0])
1009 model_DDP = copy.deepcopy(model)
1010 model_DDP.cuda(gpu_subset[0])
1011 model_DDP = nn.parallel.deprecated.DistributedDataParallel(
1012 model_DDP, device_ids=gpu_subset
1016 local_bs = len(gpu_subset)
1023 input_cpu.cuda(gpu_subset[0]),
1024 target.cuda(gpu_subset[0]),
1033 BACKEND ==
"nccl",
"nccl does not support DistributedDataParallelCPU" 1035 def test_DistributedDataParallelCPU(self):
1044 model_DDP = copy.deepcopy(model_base)
1045 model_DDP = nn.parallel.deprecated.DistributedDataParallelCPU(model_DDP)
1053 model_base, model_DDP, input_cpu, target, loss, local_bs, rank, global_bs
1058 if BACKEND ==
"tcp" or BACKEND ==
"gloo" or BACKEND ==
"nccl":
1059 WORLD_SIZE = os.environ[
"WORLD_SIZE"]
1062 MANAGER_PROCESS_RANK = -1
1065 def manager_join(fn):
1076 def setUpClass(cls):
1077 os.environ[
"MASTER_ADDR"] = MASTER_ADDR
1078 os.environ[
"MASTER_PORT"] = MASTER_PORT
1079 os.environ[
"WORLD_SIZE"] = WORLD_SIZE
1080 for attr
in dir(cls):
1081 if attr.startswith(
"test"):
1082 fn = getattr(cls, attr)
1089 for rank
in range(int(WORLD_SIZE)):
1096 def _spawn_process(self, rank):
1097 os.environ[
"RANK"] = str(rank)
1098 name =
"process " + str(rank)
1099 process = multiprocessing.Process(target=self.
_run, name=name, args=(rank,))
1103 def _run(self, rank):
1106 dist.init_process_group(
1107 init_method=INIT_METHOD, backend=BACKEND, world_size=int(WORLD_SIZE)
1109 except RuntimeError
as e:
1110 if "recompile" in e.args[0]:
1111 sys.exit(SKIP_IF_BACKEND_UNAVAILABLE)
1116 getattr(self, self.id().
split(
".")[2])()
1119 def _join_and_reduce(self, fn):
1121 getattr(fn,
"skip_if_no_cuda_distributed",
False)
or 1122 getattr(fn,
"skip_if_no_gpu",
False)
or 1123 getattr(fn,
"skip_if_small_worldsize",
False)
1125 join_timeout = get_timeout(self.id())
1126 for rank, process
in enumerate(self.
processes):
1127 process.join(join_timeout)
1130 "Timeout waiting for rank %d to terminate" % rank)
1134 self.
assertEqual(p.exitcode, first_process.exitcode)
1136 if first_process.exitcode == SKIP_IF_BACKEND_UNAVAILABLE:
1137 raise unittest.SkipTest(
"Compiled without the " + BACKEND +
" backend")
1143 first_process.exitcode == 0
or 1144 first_process.exitcode == SKIP_IF_NO_CUDA_EXIT_CODE
or 1145 first_process.exitcode == SKIP_IF_NO_GPU_EXIT_CODE
or 1146 first_process.exitcode == SKIP_IF_SMALL_WORLDSIZE_EXIT_CODE
1149 if first_process.exitcode == SKIP_IF_NO_CUDA_EXIT_CODE:
1150 raise unittest.SkipTest(
"cuda is not available")
1151 if first_process.exitcode == SKIP_IF_NO_GPU_EXIT_CODE:
1152 raise unittest.SkipTest(
1153 "One unique gpu per process is not available" 1155 if first_process.exitcode == SKIP_IF_SMALL_WORLDSIZE_EXIT_CODE:
1156 raise unittest.SkipTest(
"worldsize is too small to run group tests")
1161 elif BACKEND ==
"mpi":
1162 WORLD_SIZE = os.environ[
"WORLD_SIZE"]
1163 dist.init_process_group(init_method=INIT_METHOD, backend=
"mpi")
1169 if __name__ ==
"__main__":
1171 not torch.cuda._initialized
1172 ),
"test_distributed must not have initialized CUDA context on main process" def assertEqual(self, x, y, prec=None, message='', allow_inf=False)
def _test_DDP_helper(self, model, input_var, target, loss)
def _init_global_test(self)
Module caffe2.python.layers.split.
def _test_scatter_helper(self, group, group_id, rank)
def _test_all_gather_multigpu_helper(self, group, group_id, rank, rank_to_GPU)
def _test_broadcast_helper(self, group, group_id, rank, cuda=False, rank_to_GPU=None)
def _assert_equal_param(self, param_gpu, param_DDP)
def _test_all_reduce_multigpu_helper(self, group, group_id, rank, rank_to_GPU, op, master_value, worker_value, expected_value)
def _test_all_gather_helper(self, group, group_id, rank, cuda=False, rank_to_GPU=None)
def _test_broadcast_multigpu_helper(self, group, group_id, rank, rank_to_GPU)
def _test_barrier_helper(self, group, group_id, rank)
def _test_gather_helper(self, group, group_id, rank)
def _join_and_reduce(self, fn)
def _test_all_reduce_helper(self, group, group_id, rank, op, master_value, worker_value, expected_value, cuda=False, rank_to_GPU=None)
def _test_DDP_2iter(self, model_base, model_DDP, input, target, loss, local_bs, rank, batch_size)
def _prepare_dummy_data(self, local_bs)
def _spawn_process(self, rank)
def _model_step(self, model)
def _test_reduce_helper(self, group, group_id, rank, op, master_value, worker_value, expected_value, cuda=False, rank_to_GPU=None)
def _barrier(self, args, kwargs)
def _init_group_test(self)
def _test_reduce_multigpu_helper(self, group, group_id, rank, rank_to_GPU, op, master_value, worker_value, expected_value)
def _init_multigpu_helper(self)