Caffe2 - Python API
A deep learning, cross platform ML framework
test_distributed.py
1 from __future__ import absolute_import, division, print_function, unicode_literals
2 import copy
3 import fcntl
4 import multiprocessing
5 import os
6 import sys
7 import time
8 import tempfile
9 import unittest
10 from contextlib import contextmanager
11 from datetime import timedelta
12 from functools import reduce, wraps
13 
14 import torch
15 import torch.cuda
16 import torch.distributed as dist
17 import torch.nn as nn
18 import torch.nn.functional as F
19 import torch.optim as optim
20 from common_utils import TestCase, run_tests
21 from torch._utils_internal import TEST_MASTER_ADDR as MASTER_ADDR
22 from torch._utils_internal import TEST_MASTER_PORT as MASTER_PORT
23 import common_utils as common
24 
25 BACKEND = os.environ["BACKEND"]
26 TEMP_DIR = os.environ["TEMP_DIR"]
27 INIT_METHOD = os.getenv("INIT_METHOD", "env://")
28 
29 DEFAULT_TIMEOUT = 300
30 CUSTOMIZED_TIMEOUT = {"test_DistributedDataParallel": 500}
31 
32 
33 if INIT_METHOD.startswith("file://"):
34  FOLDER = INIT_METHOD[7:]
35 
36 
37 class _FC2(nn.Module):
38  def __init__(self):
39  super(_FC2, self).__init__()
40  self.fc = nn.Linear(10, 50, bias=True)
41  self.fc.bias.requires_grad = False
42 
43  def forward(self, x):
44  x = self.fc(x)
45  return x
46 
47 
48 class Net(nn.Module):
49  def __init__(self):
50  super(Net, self).__init__()
51  self.fc1 = nn.Linear(2, 10, bias=False)
52  self.fc2 = _FC2()
53  self.fc3 = nn.Linear(50, 4, bias=False)
54  self.relu = nn.ReLU()
55  self.no_grad_param = nn.Parameter(torch.Tensor([2, 2]).long(),
56  requires_grad=False)
57 
58  def forward(self, x):
59  x = self.relu(self.fc1(x))
60  x = self.relu(self.fc2(x))
61  x = self.fc3(x)
62  return F.softmax(x, dim=1)
63 
64 
65 class BatchNormNet(nn.Module):
66 
67  def __init__(self):
68  super(BatchNormNet, self).__init__()
69  self.fc1 = nn.Linear(2, 40, bias=False)
70  self.bn = nn.BatchNorm1d(4)
71  self.fc2 = nn.Linear(40, 4, bias=False)
72 
73  def forward(self, x):
74  x = torch.reshape(self.fc1(x), (-1, 4, 10))
75  x = self.bn(x)
76  x = torch.reshape(x, (-1, 40))
77  x = self.fc2(x)
78  return F.softmax(x, dim=1)
79 
80 
81 DDP_NET = Net()
82 BN_NET = BatchNormNet()
83 
84 
85 def get_timeout(test_id):
86  test_name = test_id.split(".")[-1]
87  if test_name in CUSTOMIZED_TIMEOUT:
88  return CUSTOMIZED_TIMEOUT[test_name]
89  else:
90  return DEFAULT_TIMEOUT
91 
92 
93 if not dist.is_available():
94  print("Distributed not available, skipping tests")
95  sys.exit(0)
96 
97 
98 SKIP_IF_NO_CUDA_EXIT_CODE = 75
99 SKIP_IF_NO_GPU_EXIT_CODE = 76
100 SKIP_IF_SMALL_WORLDSIZE_EXIT_CODE = 77
101 SKIP_IF_BACKEND_UNAVAILABLE = 78
102 
103 
104 def skip_if_no_cuda_distributed(func):
105  func.skip_if_no_cuda_distributed = True
106 
107  @wraps(func)
108  def wrapper(*args, **kwargs):
109  if not torch.cuda.is_available():
110  sys.exit(SKIP_IF_NO_CUDA_EXIT_CODE)
111 
112  return func(*args, **kwargs)
113 
114  return wrapper
115 
116 
117 def skip_if_no_gpu(func):
118  """ Nccl multigpu tests requires at least 2 GPUS. Skip if this is not met"""
119  func.skip_if_no_gpu = True
120 
121  @wraps(func)
122  def wrapper(*args, **kwargs):
123  if not torch.cuda.is_available():
124  sys.exit(SKIP_IF_NO_CUDA_EXIT_CODE)
125  if torch.cuda.device_count() < int(os.environ["WORLD_SIZE"]):
126  sys.exit(SKIP_IF_NO_GPU_EXIT_CODE)
127 
128  return func(*args, **kwargs)
129 
130  return wrapper
131 
132 
133 def skip_if_small_worldsize(func):
134  func.skip_if_small_worldsize = True
135 
136  @wraps(func)
137  def wrapper(*args, **kwargs):
138  if (os.environ["BACKEND"] != "mpi") and int(os.environ["WORLD_SIZE"]) <= 2:
139  sys.exit(SKIP_IF_SMALL_WORLDSIZE_EXIT_CODE)
140 
141  return func(*args, **kwargs)
142 
143  return wrapper
144 
145 
146 def apply_hack_for_nccl():
147  # This is a hack for a known NCCL issue using multiprocess
148  # in conjunction with multiple threads to manage different GPUs which
149  # may cause ncclCommInitRank to fail.
150  # http://docs.nvidia.com/deeplearning/sdk/nccl-release-notes/rel_2.1.4.html#rel_2.1.4
151  # It slows down the performance of collective operations.
152  # Without this setting NCCL might throw unhandled error.
153  os.environ["NCCL_MAX_NRINGS"] = "1"
154 
155 
156 @contextmanager
157 def _lock():
158  lockfile = os.path.join(TEMP_DIR, "lockfile")
159  with open(lockfile, "w") as lf:
160  try:
161  fcntl.flock(lf.fileno(), fcntl.LOCK_EX)
162  yield
163  finally:
164  fcntl.flock(lf.fileno(), fcntl.LOCK_UN)
165  lf.close()
166 
167 
168 def _build_tensor(size, value=None):
169  if value is None:
170  value = size
171  return torch.FloatTensor(size, size, size).fill_(value)
172 
173 
174 class Barrier(object):
175  barrier_id = 0
176 
177  @classmethod
178  def init(cls):
179  cls.barrier_id = 0
180  barrier_dir = os.path.join(TEMP_DIR, "barrier")
181  for f_name in os.listdir(barrier_dir):
182  os.unlink(os.path.join(barrier_dir, f_name))
183 
184  @classmethod
185  def sync(cls, wait_for=None, timeout=5):
186  if wait_for is None:
187  wait_for = dist.get_world_size()
188  cls.barrier_id += 1
189  barrier_dir = os.path.join(TEMP_DIR, "barrier")
190  pid = str(os.getpid())
191  barrier_file = os.path.join(barrier_dir, pid)
192  with _lock():
193  with open(barrier_file, "w") as f:
194  f.write(str(cls.barrier_id))
195 
196  start_time = time.time()
197  while True:
198  arrived = 0
199  with _lock():
200  for f_name in os.listdir(barrier_dir):
201  with open(os.path.join(barrier_dir, f_name), "r") as f:
202  data = f.read()
203  if int(data) >= cls.barrier_id:
204  arrived += 1
205  if arrived == wait_for:
206  break
207 
208  if time.time() - start_time > timeout:
209  raise RuntimeError("barrier timeout")
210  time.sleep(0.1)
211 
212 
213 class _DistTestBase(object):
214  def _barrier(self, *args, **kwargs):
215  Barrier.sync(*args, **kwargs)
216 
217  def _init_group_test(self, **kwargs):
218  group = [1, 2]
219  group_id = dist.new_group(group, **kwargs)
220  rank = dist.get_rank()
221  if rank not in group:
222  return ([], None, rank)
223 
224  return (group, group_id, rank)
225 
226  def _init_full_group_test(self, **kwargs):
227  group = [i for i in range(0, dist.get_world_size())]
228  group_id = dist.new_group(**kwargs)
229  rank = dist.get_rank()
230  return (group, group_id, rank)
231 
232  def _init_global_test(self):
233  group = [i for i in range(0, dist.get_world_size())]
234  group_id = dist.group.WORLD
235  rank = dist.get_rank()
236  return (group, group_id, rank)
237 
238  # HELPER FOR MULTIGPU TESTS
239  def _init_multigpu_helper(self):
240  """Multigpu tests are designed to simulate the multi nodes with multi
241  GPUs on each node. Nccl backend requires equal #GPUs in each process.
242  On a single node, all visible GPUs are evenly
243  divided to subsets, each process only uses a subset.
244  """
245  nGPUs = torch.cuda.device_count()
246  world_size = dist.get_world_size()
247  visible_devices = range(nGPUs)
248 
249  if BACKEND == "nccl":
250  apply_hack_for_nccl()
251 
252  nGPUs_per_process = nGPUs // world_size
253  rank_to_GPU = {
254  i: list(
255  visible_devices[i * nGPUs_per_process: (i + 1) * nGPUs_per_process]
256  )
257  for i in range(world_size)
258  }
259  return rank_to_GPU
260 
261  # GET RANK
262  def test_get_rank(self):
263  test_dir = os.path.join(TEMP_DIR, "test_dir")
264  pid = str(os.getpid())
265  num_processes = dist.get_world_size()
266  with open(os.path.join(test_dir, pid), "w") as f:
267  f.write(str(dist.get_rank()))
268 
269  self._barrier()
270 
271  all_ranks = set()
272  for f_name in os.listdir(test_dir):
273  with open(os.path.join(test_dir, f_name), "r") as f:
274  all_ranks.add(int(f.read()))
275  self.assertEqual(len(all_ranks), num_processes)
276 
277  self._barrier()
278 
279  if dist.get_rank() == 0:
280  for f_name in os.listdir(test_dir):
281  os.unlink(os.path.join(test_dir, f_name))
282 
283  self._barrier()
284 
285  def test_get_backend(self):
286  if dist.get_world_size() > 2:
287  group = [1, 2]
288  else:
289  group = [0, 1]
290  group_id = dist.new_group(group)
291  backend_str = BACKEND.lower()
292  self.assertEqual(dist.get_backend(), backend_str)
293  if dist.get_rank() in group:
294  self.assertEqual(dist.get_backend(group_id), backend_str)
295  else:
296  with self.assertRaisesRegex(RuntimeError, "Invalid process group specified"):
297  dist.get_backend(group_id)
298 
299  def test_Backend_enum_class(self):
300  # test parsing
301  backend = BACKEND.lower()
302  self.assertEqual(dist.Backend(BACKEND.upper()), backend)
303  self.assertEqual(dist.Backend(BACKEND), backend)
304  with self.assertRaisesRegex(ValueError, "Invalid backend: 'undefined'"):
305  dist.Backend("undefined")
306  with self.assertRaisesRegex(ValueError, "Invalid backend: 'xYz'"):
307  dist.Backend("xYz")
308  with self.assertRaises(ValueError):
309  dist.Backend(None)
310  with self.assertRaises(ValueError):
311  dist.Backend(3)
312  with self.assertRaises(ValueError):
313  dist.Backend(["gloo"])
314 
315  # Test destroy
316  def test_destroy_group(self):
317  if dist.get_world_size() > 2:
318  group = [1, 2]
319  else:
320  group = [0, 1]
321  group_id = dist.new_group(group)
322  self._barrier()
323  dist.destroy_process_group(group_id)
324 
325  # Test get rank and size of group
326  def test_get_rank_size_group(self):
327  if dist.get_world_size() > 2:
328  group = [1, 2]
329  else:
330  group = [0, 1]
331  group_id = dist.new_group(group)
332  if dist.get_rank() in group:
333  self.assertEqual(dist.get_world_size(group_id), 2)
334  self.assertTrue(dist.get_rank(group_id) in list(range(2)))
335  else:
336  self.assertEqual(dist.get_world_size(group_id), -1)
337  self.assertEqual(dist.get_rank(group_id), -1)
338 
339  # Test destroy full groups
340  def test_destroy_full_group(self):
341  _, group_id, _ = self._init_full_group_test()
342  self._barrier()
343  dist.destroy_process_group(group_id)
344 
345  # Test get rank and size of full group
346  def test_get_rank_size_full_group(self):
347  _, group_id, _ = self._init_full_group_test()
348  self.assertEqual(dist.get_world_size(group_id), dist.get_world_size())
349  self.assertEqual(dist.get_rank(group_id), dist.get_rank())
350 
351  def _test_barrier_timeout(self, group_id, timeout):
352  local_rank = dist.get_rank(group_id)
353 
354  # Only execute barrier on rank == 0, causing it to timeout
355  if local_rank == 0:
356  expected_time = time.time() + timeout.total_seconds()
357  with self.assertRaisesRegex(RuntimeError, " (Timed out|closed) "):
358  dist.barrier(group_id)
359  self.assertGreaterEqual(time.time(), expected_time)
360  else:
361  time.sleep(timeout.total_seconds())
362 
363  @unittest.skipIf(BACKEND != "gloo", "Only gloo backend supports timeouts")
364  @unittest.skipIf(
365  not INIT_METHOD.startswith("file://"),
366  "Requires file:// initialization method. " +
367  "Both tcp:// and env:// rely on the TCP store for which "
368  "reinitialization has proven racy."
369  )
370  def test_barrier_timeout_global(self):
371  dist.destroy_process_group()
372 
373  # Explicitly pass world size to the barrier because we've
374  # just destroyed any state in torch.distributed.
375  self._barrier(wait_for=int(WORLD_SIZE))
376 
377  # Reinitialize global process group
378  timeout = timedelta(seconds=0.2)
379  dist.init_process_group(
380  init_method=INIT_METHOD,
381  backend=BACKEND,
382  world_size=int(WORLD_SIZE),
383  rank=self.rank,
384  timeout=timeout,
385  )
386  self._test_barrier_timeout(dist.group.WORLD, timeout)
387 
388  @skip_if_small_worldsize
389  @unittest.skipIf(BACKEND != "gloo", "Only gloo backend supports timeouts")
390  def test_barrier_timeout_group(self):
391  timeout = timedelta(seconds=0.2)
392  _, group_id, _ = self._init_group_test(timeout=timeout)
393  if group_id is not None:
394  self._test_barrier_timeout(group_id, timeout)
395 
396  @unittest.skipIf(BACKEND != "gloo", "Only gloo backend supports timeouts")
397  def test_barrier_timeout_full_group(self):
398  timeout = timedelta(seconds=0.2)
399  _, group_id, _ = self._init_full_group_test(timeout=timeout)
400  if group_id is not None:
401  self._test_barrier_timeout(group_id, timeout)
402 
403  # SEND RECV
404  @unittest.skipIf(BACKEND == "nccl", "Nccl does not support send/recv")
405  def test_send_recv(self):
406  rank = dist.get_rank()
407  tensor = _build_tensor(rank + 1)
408 
409  for src in range(0, dist.get_world_size()):
410  if src == rank:
411  # Send mode
412  for dst in range(0, dist.get_world_size()):
413  if dst == rank:
414  continue
415  dist.send(tensor, dst)
416  else:
417  # Recv mode
418  expected_tensor = _build_tensor(src + 1)
419  output_tensor = _build_tensor(src + 1, value=-1)
420  dist.recv(output_tensor, src)
421  self.assertEqual(output_tensor, expected_tensor)
422 
423  self._barrier()
424 
425  # SEND RECV ANY SOURCE
426  @unittest.skipIf(
427  BACKEND == "nccl", "Nccl does not support send/recv from any source"
428  )
429  def test_send_recv_any_source(self):
430  rank = dist.get_rank()
431  tensor = _build_tensor(10, value=rank)
432  recv_ranks = set()
433 
434  for dst in range(0, dist.get_world_size()):
435  if dst == rank:
436  # Recv mode
437  for dst in range(0, dist.get_world_size()):
438  if dst == rank:
439  continue
440  output_tensor = _build_tensor(10, value=-1)
441  sender = dist.recv(output_tensor)
442 
443  # Assert the scalar value "sender" that should be
444  # equal to the rank of the sender is equal to all
445  # values in the received tensor.
446  self.assertTrue(output_tensor.eq(sender).all())
447  recv_ranks.add(sender)
448  else:
449  # Send mode
450  dist.send(tensor, dst)
451 
452  self.assertEqual(len(recv_ranks), dist.get_world_size() - 1)
453  self._barrier()
454 
455  # SEND RECV WITH TAG
456  @unittest.skipIf(BACKEND == "nccl", "Nccl does not support send/recv")
457  def test_send_recv_with_tag(self):
458  rank = dist.get_rank()
459  world_size = dist.get_world_size()
460  tensor = _build_tensor(10, value=rank)
461 
462  for dst in range(0, world_size):
463  if dst == rank:
464  # Recv mode
465  for src in range(0, world_size):
466  if src == rank:
467  continue
468  output_tensor = _build_tensor(10, value=-1)
469  dist.recv(output_tensor, src, tag=src)
470  self.assertTrue(output_tensor.eq(src).all())
471  else:
472  # Send mode
473  dist.send(tensor, dst, tag=rank)
474 
475  # ISEND
476  @unittest.skipIf(BACKEND == "nccl", "Nccl does not support isend")
477  def test_isend(self):
478  rank = dist.get_rank()
479  world_size = dist.get_world_size()
480 
481  if rank == 0:
482  requests = [
483  dist.isend(_build_tensor(dest, 10), dest)
484  for dest in range(1, world_size)
485  ]
486  for request in requests:
487  request.wait()
488  self.assertTrue(request.is_completed())
489  else:
490  tensor = _build_tensor(rank, -1)
491  dist.recv(tensor, 0)
492  self.assertEqual(tensor, _build_tensor(rank, 10))
493 
494  self._barrier()
495 
496  # IRECV
497  @unittest.skipIf(BACKEND == "nccl", "Nccl does not support irecv")
498  def test_irecv(self):
499  rank = dist.get_rank()
500  world_size = dist.get_world_size()
501 
502  if rank == 0:
503  expected_tensors = [_build_tensor(src, -1) for src in range(1, world_size)]
504  requests = [
505  dist.irecv(expected_tensors[src - 1], src)
506  for src in range(1, world_size)
507  ]
508 
509  for src in range(1, world_size):
510  requests[src - 1].wait()
511  self.assertTrue(requests[src - 1].is_completed())
512  self.assertEqual(expected_tensors[src - 1], _build_tensor(src, 10))
513  else:
514  tensor = _build_tensor(rank, 10)
515  dist.send(tensor, 0)
516 
517  self._barrier()
518 
519  # BROADCAST
520  def _test_broadcast_helper(
521  self, group, group_id, rank, cuda=False, rank_to_GPU=None
522  ):
523  for ttype, value, requires_cuda in [
524  ("torch.FloatTensor", -1e-10, False),
525  ("torch.DoubleTensor", -1e-100, False),
526  ("torch.HalfTensor", -0.1, True),
527  ("torch.CharTensor", -2, False),
528  ("torch.ByteTensor", 129, False),
529  ("torch.IntTensor", -1e5, False),
530  ("torch.LongTensor", -1e15, False),
531  ]:
532  if requires_cuda and not cuda:
533  continue
534  for src in group:
535  expected_tensor = _build_tensor(src + 1, value).type(ttype)
536  if cuda:
537  expected_tensor = expected_tensor.cuda(rank_to_GPU[rank][0])
538  if rank == src:
539  dist.broadcast(expected_tensor, src, group_id)
540  else:
541  tensor = _build_tensor(src + 1, -1).type(ttype)
542  if cuda:
543  tensor = tensor.cuda(rank_to_GPU[rank][0])
544  dist.broadcast(tensor, src, group_id)
545  self.assertEqual(tensor.size(), expected_tensor.size())
546  self.assertEqual(tensor.ne(expected_tensor).max(), 0)
547 
548  self._barrier()
549 
550  @unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors")
551  def test_broadcast(self):
552  group, group_id, rank = self._init_global_test()
553  self._test_broadcast_helper(group, group_id, rank)
554 
555  @unittest.skipIf(
556  BACKEND != "gloo" and BACKEND != "nccl",
557  "Only Gloo and Nccl backend supports CUDA allReduce",
558  )
559  @skip_if_no_cuda_distributed
560  @skip_if_no_gpu
561  def test_broadcast_cuda(self):
562  group, group_id, rank = self._init_global_test()
563  rank_to_GPU = self._init_multigpu_helper()
564  self._test_broadcast_helper(group, group_id, rank, True, rank_to_GPU)
565 
566  @skip_if_small_worldsize
567  @unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors")
568  def test_broadcast_group(self):
569  group, group_id, rank = self._init_group_test()
570  self._test_broadcast_helper(group, group_id, rank)
571 
572  @unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors")
573  def test_broadcast_full_group(self):
574  group, group_id, rank = self._init_full_group_test()
575  self._test_broadcast_helper(group, group_id, rank)
576 
577  # REDUCE
578  def _test_reduce_helper(
579  self,
580  group,
581  group_id,
582  rank,
583  op,
584  master_value,
585  worker_value,
586  expected_value,
587  cuda=False,
588  rank_to_GPU=None,
589  ):
590  for src in group:
591  if rank == src:
592  tensor = _build_tensor(src + 1).fill_(master_value)
593  if cuda:
594  tensor = tensor.cuda(rank_to_GPU[rank][0])
595  dist.reduce(tensor, src, op, group_id)
596  self.assertEqual(tensor, _build_tensor(src + 1, expected_value))
597  else:
598  tensor = _build_tensor(src + 1).fill_(worker_value)
599  if cuda:
600  tensor = tensor.cuda(rank_to_GPU[rank][0])
601  dist.reduce(tensor, src, op, group_id)
602 
603  self._barrier()
604 
605  @unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors")
606  def test_reduce_sum(self):
607  group, group_id, rank = self._init_global_test()
608  self._test_reduce_helper(
609  group,
610  group_id,
611  rank,
612  dist.ReduceOp.SUM,
613  2,
614  10,
615  2 + (10 * (len(group) - 1)),
616  )
617 
618  @unittest.skipIf(BACKEND != "nccl", "Only Nccl supports CUDA reduce")
619  @skip_if_no_cuda_distributed
620  @skip_if_no_gpu
621  def test_reduce_sum_cuda(self):
622  group, group_id, rank = self._init_global_test()
623  rank_to_GPU = self._init_multigpu_helper()
624  self._test_reduce_helper(
625  group,
626  group_id,
627  rank,
628  dist.ReduceOp.SUM,
629  2,
630  10,
631  2 + 10 * (len(group) - 1),
632  True,
633  rank_to_GPU,
634  )
635 
636  @unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors")
637  def test_reduce_product(self):
638  group, group_id, rank = self._init_global_test()
639  self._test_reduce_helper(
640  group,
641  group_id,
642  rank,
643  dist.ReduceOp.PRODUCT,
644  2,
645  10,
646  reduce((lambda x, y: x * y), [10] * (len(group) - 1), 2),
647  )
648 
649  @unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors")
650  def test_reduce_min(self):
651  group, group_id, rank = self._init_global_test()
652  self._test_reduce_helper(group, group_id, rank, dist.ReduceOp.MIN, 1010, 1, 1)
653 
654  @unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors")
655  def test_reduce_max(self):
656  group, group_id, rank = self._init_global_test()
657  self._test_reduce_helper(group, group_id, rank, dist.ReduceOp.MAX, -1, 10, 10)
658 
659  @unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors")
660  @skip_if_small_worldsize
661  def test_reduce_group_sum(self):
662  group, group_id, rank = self._init_group_test()
663  self._test_reduce_helper(
664  group,
665  group_id,
666  rank,
667  dist.ReduceOp.SUM,
668  2,
669  10,
670  2 + (10 * (len(group) - 1)),
671  )
672 
673  @unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors")
674  @skip_if_small_worldsize
675  def test_reduce_group_product(self):
676  group, group_id, rank = self._init_group_test()
677  self._test_reduce_helper(
678  group,
679  group_id,
680  rank,
681  dist.ReduceOp.PRODUCT,
682  2,
683  10,
684  reduce((lambda x, y: x * y), [10] * (len(group) - 1), 2),
685  )
686 
687  @unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors")
688  @skip_if_small_worldsize
689  def test_reduce_group_min(self):
690  group, group_id, rank = self._init_group_test()
691  self._test_reduce_helper(group, group_id, rank, dist.ReduceOp.MIN, 1010, 1, 1)
692 
693  @unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors")
694  @skip_if_small_worldsize
695  def test_reduce_group_max(self):
696  group, group_id, rank = self._init_group_test()
697  self._test_reduce_helper(group, group_id, rank, dist.ReduceOp.MAX, -1, 10, 10)
698 
699  @unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors")
700  def test_reduce_full_group_sum(self):
701  group, group_id, rank = self._init_full_group_test()
702  self._test_reduce_helper(
703  group,
704  group_id,
705  rank,
706  dist.ReduceOp.SUM,
707  2,
708  10,
709  2 + (10 * (len(group) - 1)),
710  )
711 
712  @unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors")
713  def test_reduce_full_group_product(self):
714  group, group_id, rank = self._init_full_group_test()
715  self._test_reduce_helper(
716  group,
717  group_id,
718  rank,
719  dist.ReduceOp.PRODUCT,
720  2,
721  10,
722  reduce((lambda x, y: x * y), [10] * (len(group) - 1), 2),
723  )
724 
725  @unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors")
726  def test_reduce_full_group_min(self):
727  group, group_id, rank = self._init_full_group_test()
728  self._test_reduce_helper(group, group_id, rank, dist.ReduceOp.MIN, 1010, 1, 1)
729 
730  @unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors")
731  def test_reduce_full_group_max(self):
732  group, group_id, rank = self._init_full_group_test()
733  self._test_reduce_helper(group, group_id, rank, dist.ReduceOp.MAX, -1, 10, 10)
734 
735  # ALL REDUCE
736  def _test_all_reduce_helper(
737  self,
738  group,
739  group_id,
740  rank,
741  op,
742  master_value,
743  worker_value,
744  expected_value,
745  cuda=False,
746  rank_to_GPU=None,
747  ):
748  for src in group:
749  if rank == src:
750  tensor = _build_tensor(src + 1).fill_(master_value)
751  if cuda:
752  tensor = tensor.cuda(rank_to_GPU[rank][0])
753  dist.all_reduce(tensor, op, group_id)
754  self.assertEqual(tensor, _build_tensor(src + 1, expected_value))
755  else:
756  tensor = _build_tensor(src + 1).fill_(worker_value)
757  if cuda:
758  tensor = tensor.cuda(rank_to_GPU[rank][0])
759  dist.all_reduce(tensor, op, group_id)
760  self.assertEqual(tensor, _build_tensor(src + 1, expected_value))
761 
762  self._barrier()
763 
764  @unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors")
765  def test_all_reduce_sum(self):
766  group, group_id, rank = self._init_global_test()
768  group,
769  group_id,
770  rank,
771  dist.ReduceOp.SUM,
772  2,
773  10,
774  2 + (10 * (len(group) - 1)),
775  )
776 
777  @unittest.skipIf(
778  BACKEND != "gloo",
779  "Only Gloo backend will have CUDA allReduce tested",
780  )
781  @skip_if_no_cuda_distributed
782  @skip_if_no_gpu
783  def test_all_reduce_sum_cuda(self):
784  group, group_id, rank = self._init_global_test()
785  rank_to_GPU = self._init_multigpu_helper()
787  group,
788  group_id,
789  rank,
790  dist.ReduceOp.SUM,
791  2,
792  10,
793  2 + (10 * (len(group) - 1)),
794  True,
795  rank_to_GPU,
796  )
797 
798  @unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors")
799  def test_all_reduce_product(self):
800  group, group_id, rank = self._init_global_test()
802  group,
803  group_id,
804  rank,
805  dist.ReduceOp.PRODUCT,
806  2,
807  10,
808  reduce((lambda x, y: x * y), [10] * (len(group) - 1), 2),
809  )
810 
811  @unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors")
812  def test_all_reduce_min(self):
813  group, group_id, rank = self._init_global_test()
815  group, group_id, rank, dist.ReduceOp.MIN, 1010, 1, 1
816  )
817 
818  @unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors")
819  def test_all_reduce_max(self):
820  group, group_id, rank = self._init_global_test()
822  group, group_id, rank, dist.ReduceOp.MAX, -1, 10, 10
823  )
824 
825  @skip_if_small_worldsize
826  @unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors")
827  def test_all_reduce_group_sum(self):
828  group, group_id, rank = self._init_group_test()
830  group,
831  group_id,
832  rank,
833  dist.ReduceOp.SUM,
834  2,
835  10,
836  2 + (10 * (len(group) - 1)),
837  )
838 
839  @skip_if_small_worldsize
840  @unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors")
841  def test_all_reduce_group_product(self):
842  group, group_id, rank = self._init_group_test()
844  group,
845  group_id,
846  rank,
847  dist.ReduceOp.PRODUCT,
848  2,
849  10,
850  reduce((lambda x, y: x * y), [10] * (len(group) - 1), 2),
851  )
852 
853  @skip_if_small_worldsize
854  @unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors")
855  def test_all_reduce_group_min(self):
856  group, group_id, rank = self._init_group_test()
858  group, group_id, rank, dist.ReduceOp.MIN, 1010, 1, 1
859  )
860 
861  @skip_if_small_worldsize
862  @unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors")
863  def test_all_reduce_group_max(self):
864  group, group_id, rank = self._init_group_test()
866  group, group_id, rank, dist.ReduceOp.MAX, -1, 10, 10
867  )
868 
869  @unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors")
870  def test_all_reduce_full_group_sum(self):
871  group, group_id, rank = self._init_full_group_test()
873  group,
874  group_id,
875  rank,
876  dist.ReduceOp.SUM,
877  2,
878  10,
879  2 + (10 * (len(group) - 1)),
880  )
881 
882  @unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors")
883  def test_all_reduce_full_group_product(self):
884  group, group_id, rank = self._init_full_group_test()
886  group,
887  group_id,
888  rank,
889  dist.ReduceOp.PRODUCT,
890  2,
891  10,
892  reduce((lambda x, y: x * y), [10] * (len(group) - 1), 2),
893  )
894 
895  @unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors")
896  def test_all_reduce_full_group_min(self):
897  group, group_id, rank = self._init_full_group_test()
899  group, group_id, rank, dist.ReduceOp.MIN, 1010, 1, 1
900  )
901 
902  @unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors")
903  def test_all_reduce_full_group_max(self):
904  group, group_id, rank = self._init_full_group_test()
906  group, group_id, rank, dist.ReduceOp.MAX, -1, 10, 10
907  )
908 
909  # SCATTER
910  def _test_scatter_helper(self, group, group_id, rank):
911  for dest in group:
912  tensor = _build_tensor(dest + 1, -1)
913  expected_tensor = _build_tensor(dest + 1, rank)
914  tensors = (
915  [_build_tensor(dest + 1, i) for i in group] if rank == dest else []
916  )
917  dist.scatter(tensor, src=dest, scatter_list=tensors, group=group_id)
918  self.assertEqual(tensor, expected_tensor)
919 
920  self._barrier()
921 
922  @unittest.skipIf(BACKEND == "nccl", "Nccl does not support scatter")
923  def test_scatter(self):
924  group, group_id, rank = self._init_global_test()
925  self._test_scatter_helper(group, group_id, rank)
926 
927  @unittest.skipIf(BACKEND == "nccl", "Nccl does not support scatter")
928  @skip_if_small_worldsize
929  def test_scatter_group(self):
930  group, group_id, rank = self._init_group_test()
931  self._test_scatter_helper(group, group_id, rank)
932 
933  @unittest.skipIf(BACKEND == "nccl", "Nccl does not support scatter")
934  def test_scatter_full_group(self):
935  group, group_id, rank = self._init_full_group_test()
936  self._test_scatter_helper(group, group_id, rank)
937 
938  # GATHER
939  def _test_gather_helper(self, group, group_id, rank):
940  for dest in group:
941  tensor = _build_tensor(dest + 1, rank)
942  tensors = (
943  [_build_tensor(dest + 1, -1) for i in group] if rank == dest else []
944  )
945  dist.gather(tensor, dst=dest, gather_list=tensors, group=group_id)
946  if rank == dest:
947  expected_tensors = [_build_tensor(dest + 1, i) for i in group]
948  for t1, t2 in zip(tensors, expected_tensors):
949  self.assertEqual(t1, t2)
950 
951  self._barrier()
952 
953  @unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors")
954  def test_gather(self):
955  group, group_id, rank = self._init_global_test()
956  self._test_gather_helper(group, group_id, rank)
957 
958  @unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors")
959  @skip_if_small_worldsize
960  def test_gather_group(self):
961  group, group_id, rank = self._init_group_test()
962  self._test_gather_helper(group, group_id, rank)
963 
964  @unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors")
965  def test_gather_full_group(self):
966  group, group_id, rank = self._init_full_group_test()
967  self._test_gather_helper(group, group_id, rank)
968 
969  # ALL GATHER
970  def _test_all_gather_helper(
971  self, group, group_id, rank, cuda=False, rank_to_GPU=None
972  ):
973  for dest in group:
974  tensor = _build_tensor(dest + 1, rank)
975  tensors = [_build_tensor(dest + 1, -1) for i in group]
976  if cuda:
977  tensor = tensor.cuda(rank_to_GPU[rank][0])
978  tensors = [t.cuda(rank_to_GPU[rank][0]) for t in tensors]
979  dist.all_gather(tensors, tensor, group_id)
980 
981  expected_tensors = [_build_tensor(dest + 1, i) for i in group]
982  for t1, t2 in zip(tensors, expected_tensors):
983  self.assertEqual(t1, t2)
984 
985  self._barrier()
986 
987  @unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors")
988  def test_all_gather(self):
989  group, group_id, rank = self._init_global_test()
990  self._test_all_gather_helper(group, group_id, rank)
991 
992  @unittest.skipIf(BACKEND != "nccl", "Only Nccl supports CUDA all gather")
993  @unittest.skipIf(BACKEND == "nccl", "CUDA all gather skipped for NCCL")
994  @skip_if_no_cuda_distributed
995  @skip_if_no_gpu
996  def test_all_gather_cuda(self):
997  group, group_id, rank = self._init_global_test()
998  rank_to_GPU = self._init_multigpu_helper()
999  self._test_all_gather_helper(group, group_id, rank, True, rank_to_GPU)
1000 
1001  @skip_if_small_worldsize
1002  @unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors")
1003  def test_all_gather_group(self):
1004  group, group_id, rank = self._init_group_test()
1005  self._test_all_gather_helper(group, group_id, rank)
1006 
1007  @unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors")
1008  def test_all_gather_full_group(self):
1009  group, group_id, rank = self._init_full_group_test()
1010  self._test_all_gather_helper(group, group_id, rank)
1011 
1012  # BARRIER
1013  def _test_barrier_helper(
1014  self, group, group_id, rank, cuda=False, rank_to_GPU=None):
1015  WAIT_TIME = 0.3 # seconds
1016 
1017  for dest in group:
1018  expected_time = torch.DoubleTensor(1).fill_(0.0)
1019  if cuda:
1020  expected_time = expected_time.cuda(rank_to_GPU[rank][0])
1021  if dest == rank:
1022  expected_time.fill_(time.time() + WAIT_TIME)
1023  dist.broadcast(expected_time, dest, group_id)
1024  time.sleep(WAIT_TIME + 0.1) # sleep a little bit longer
1025  dist.barrier(group_id)
1026  else:
1027  dist.broadcast(expected_time, dest, group_id)
1028  dist.barrier(group_id)
1029  self.assertGreaterEqual(
1030  float(time.time()),
1031  float(expected_time[0]),
1032  "destination rank: %d, my rank: %d" % (dest, rank) +
1033  " (if you see this failure, please report in #14554)")
1034 
1035  # Use higher timeout for the instance where the test runs
1036  # against a subgroup and uses a CUDA tensor for expected time.
1037  # The CUDA initialization for the participating processes can
1038  # take long enough for the barrier timeout to trigger on the
1039  # process that doesn't participate in the group.
1040  self._barrier(timeout=20)
1041 
1042  @skip_if_no_gpu
1043  @unittest.skipIf(BACKEND == "mpi", "MPI doesn't supports GPU barrier")
1044  def test_barrier_cuda(self):
1045  group, group_id, rank = self._init_global_test()
1046  rank_to_GPU = self._init_multigpu_helper()
1047  self._test_barrier_helper(group, group_id, rank, True, rank_to_GPU)
1048 
1049  @skip_if_small_worldsize
1050  @skip_if_no_gpu
1051  @unittest.skipIf(BACKEND == "mpi", "MPI doesn't supports GPU barrier")
1052  def test_barrier_group_cuda(self):
1053  group, group_id, rank = self._init_group_test()
1054  rank_to_GPU = self._init_multigpu_helper()
1055  self._test_barrier_helper(group, group_id, rank, True, rank_to_GPU)
1056 
1057  @skip_if_small_worldsize
1058  @skip_if_no_gpu
1059  @unittest.skipIf(BACKEND == "mpi", "MPI doesn't supports GPU barrier")
1060  def test_barrier_full_group_cuda(self):
1061  group, group_id, rank = self._init_full_group_test()
1062  rank_to_GPU = self._init_multigpu_helper()
1063  self._test_barrier_helper(group, group_id, rank, True, rank_to_GPU)
1064 
1065  @unittest.skipIf(BACKEND == "nccl", "NCCL does not support CPU barrier")
1066  def test_barrier(self):
1067  group, group_id, rank = self._init_global_test()
1068  self._test_barrier_helper(group, group_id, rank)
1069 
1070  @skip_if_small_worldsize
1071  @unittest.skipIf(BACKEND == "nccl", "NCCL does not support CPU barrier")
1072  def test_barrier_group(self):
1073  group, group_id, rank = self._init_group_test()
1074  self._test_barrier_helper(group, group_id, rank)
1075 
1076  @unittest.skipIf(BACKEND == "nccl", "NCCL does not support CPU barrier")
1077  def test_barrier_full_group(self):
1078  group, group_id, rank = self._init_full_group_test()
1079  self._test_barrier_helper(group, group_id, rank)
1080 
1081  def _test_broadcast_multigpu_helper(self, group, group_id, rank, rank_to_GPU):
1082  for src in group:
1083  expected_tensor = _build_tensor(src + 1)
1084  tensors = [
1085  _build_tensor(src + 1, -1).cuda(device=i) for i in rank_to_GPU[rank]
1086  ]
1087  if rank == src:
1088  tensors[0] = expected_tensor.cuda(device=rank_to_GPU[rank][0])
1089 
1090  dist.broadcast_multigpu(tensors, src, group_id)
1091  for tensor in tensors:
1092  self.assertEqual(tensor, expected_tensor)
1093  self._barrier()
1094 
1095  @unittest.skipIf(BACKEND == "mpi", "MPI doesn't support broadcast multigpu")
1096  @unittest.skipIf(BACKEND == "nccl", "NCCL broadcast multigpu skipped")
1097  @skip_if_no_gpu
1098  def test_broadcast_multigpu(self):
1099  group, group_id, rank = self._init_global_test()
1100  rank_to_GPU = self._init_multigpu_helper()
1101  self._test_broadcast_multigpu_helper(group, group_id, rank, rank_to_GPU)
1102 
1103  def _test_all_reduce_multigpu_helper(
1104  self,
1105  group,
1106  group_id,
1107  rank,
1108  rank_to_GPU,
1109  op,
1110  master_value,
1111  worker_value,
1112  expected_value,
1113  ):
1114  for src in group:
1115  if rank == src:
1116  tensors = [
1117  _build_tensor(src + 1, master_value).cuda(device=i)
1118  for i in rank_to_GPU[rank]
1119  ]
1120  else:
1121  tensors = [
1122  _build_tensor(src + 1, worker_value).cuda(device=i)
1123  for i in rank_to_GPU[rank]
1124  ]
1125 
1126  dist.all_reduce_multigpu(tensors, op, group_id)
1127  expected_tensor = _build_tensor(src + 1, expected_value)
1128  for tensor in tensors:
1129  self.assertEqual(tensor, expected_tensor)
1130 
1131  self._barrier()
1132 
1133  @unittest.skipIf(BACKEND == "mpi", "MPI doesn't support broadcast multigpu")
1134  @unittest.skipIf(BACKEND == "nccl", "CUDA all_reduce multigpu skipped for NCCL")
1135  @skip_if_no_gpu
1136  def test_all_reduce_multigpu(self):
1137  group, group_id, rank = self._init_global_test()
1138  rank_to_GPU = self._init_multigpu_helper()
1140  group,
1141  group_id,
1142  rank,
1143  rank_to_GPU,
1144  dist.ReduceOp.SUM,
1145  2,
1146  10,
1147  (2 + 10 * (len(group) - 1)) * len(rank_to_GPU[0]),
1148  )
1149 
1150  def _test_reduce_multigpu_helper(
1151  self,
1152  group,
1153  group_id,
1154  rank,
1155  rank_to_GPU,
1156  op,
1157  master_value,
1158  worker_value,
1159  expected_value,
1160  ):
1161  for src in group:
1162  if rank == src:
1163  tensors = [
1164  _build_tensor(src + 1, master_value).cuda(device=i)
1165  for i in rank_to_GPU[rank]
1166  ]
1167  dist.reduce_multigpu(tensors, src, op, group_id)
1168  expected_tensor = _build_tensor(src + 1, expected_value)
1169  self.assertEqual(tensors[0], expected_tensor)
1170  else:
1171  tensors = [
1172  _build_tensor(src + 1, worker_value).cuda(device=i)
1173  for i in rank_to_GPU[rank]
1174  ]
1175  dist.reduce_multigpu(tensors, src, op, group_id)
1176 
1177  self._barrier()
1178 
1179  @unittest.skipIf(BACKEND != "nccl", "Only Nccl backend supports reduce multigpu")
1180  @skip_if_no_gpu
1181  def test_reduce_multigpu(self):
1182  group, group_id, rank = self._init_global_test()
1183  rank_to_GPU = self._init_multigpu_helper()
1185  group,
1186  group_id,
1187  rank,
1188  rank_to_GPU,
1189  dist.ReduceOp.SUM,
1190  2,
1191  10,
1192  (2 + 10 * (len(group) - 1)) * len(rank_to_GPU[0]),
1193  )
1194 
1195  def _test_all_gather_multigpu_helper(self, group, group_id, rank, rank_to_GPU):
1196  for dest in group:
1197  tensors = [
1198  _build_tensor(dest + 1).cuda(device=i) for i in rank_to_GPU[rank]
1199  ]
1200 
1201  # construct expected output along with
1202  # a place holder to receive all gather results
1203  output_tensors = []
1204  expected_output = []
1205  output_per_gpu = (
1206  [_build_tensor(dest + 1, -1)] * len(rank_to_GPU[0]) * len(group)
1207  )
1208  expected_per_gpu = (
1209  [_build_tensor(dest + 1)] * len(rank_to_GPU[0]) * len(group)
1210  )
1211  for gpu in rank_to_GPU[rank]:
1212  output_tensors.append([t.cuda(device=gpu) for t in output_per_gpu])
1213  expected_output.append([t.cuda(device=gpu) for t in expected_per_gpu])
1214 
1215  dist.all_gather_multigpu(output_tensors, tensors, group_id)
1216  self.assertEqual(output_tensors, expected_output)
1217 
1218  self._barrier()
1219 
1220  @unittest.skipIf(BACKEND != "nccl", "Only Nccl backend supports allgather multigpu")
1221  @skip_if_no_gpu
1222  def test_all_gather_multigpu(self):
1223  group, group_id, rank = self._init_global_test()
1224  rank_to_GPU = self._init_multigpu_helper()
1225  self._test_all_gather_multigpu_helper(group, group_id, rank, rank_to_GPU)
1226 
1227  def _model_step(self, model):
1228  for param in model.parameters():
1229  if param.grad is not None:
1230  param.data += param.grad
1231  param.grad = None
1232 
1233  def _prepare_dummy_data(self, local_bs):
1234  # global_bs for DDP should be divisible by WORLD_SIZE
1235  global_bs = int(WORLD_SIZE) * local_bs
1236  input_cpu = torch.randn(global_bs, 2)
1237  target = torch.randn(global_bs, 4)
1238  loss = nn.MSELoss()
1239  return global_bs, input_cpu, target, loss
1240 
1241  # END TO END TEST FOR DISTRIBUTEDDATAPARALLEL
1242  def _test_DDP_helper(self, model, input_var, target, loss):
1243  model.train()
1244  output = model(input_var)
1245  l = loss(output, target)
1246  l.backward()
1247 
1248  def _assert_equal_param(self, param_gpu, param_DDP):
1249  self.assertEqual(len(param_gpu), len(param_DDP))
1250  for p_gpu, p_DDP in zip(param_gpu, param_DDP):
1251  self.assertEqual(p_gpu, p_DDP)
1252 
1253  def _test_DDP_5iter(
1254  self, model_base, model_DDP, input, target, loss, local_bs, rank, batch_size, test_save
1255  ):
1256  for idx in range(5):
1257  # single cpu/gpu training
1258  self._test_DDP_helper(model_base, input, target, loss)
1259 
1260  # DDP training, DDP scatters subsets of input_cpu to nodes/GPUs
1261  self._test_DDP_helper(
1262  model_DDP,
1263  input[rank * local_bs: (rank + 1) * local_bs],
1264  target[rank * local_bs: (rank + 1) * local_bs],
1265  loss,
1266  )
1267 
1268  # Update weights and run a second iteration to shake out errors
1269  self._model_step(model_base)
1270  self._model_step(model_DDP)
1271  self._assert_equal_param(
1272  list(model_base.parameters()), list(model_DDP.module.parameters())
1273  )
1274 
1275  # Shuffle the input so that DDP input is different
1276  input = input[torch.randperm(batch_size)]
1277 
1278  # save the model in the middle and reload
1279  if test_save and idx == 2 and INIT_METHOD.startswith("file://"):
1280  _, filename = tempfile.mkstemp(prefix=FOLDER)
1281  torch.save(model_DDP, filename)
1282  model_DDP = torch.load(filename)
1283 
1284  with tempfile.TemporaryFile() as tmp_file:
1285  torch.save(model_DDP, tmp_file)
1286  tmp_file.seek(0)
1287  saved_model = torch.load(tmp_file)
1288  for k in model_DDP.state_dict():
1289  self.assertEqual(model_DDP.state_dict()[k],
1290  saved_model.state_dict()[k])
1291 
1292  def _test_DistributedDataParallel(self, gpu_subset, rank, output_device=None):
1293  # Run a simple end to end DDP model, use result of single node model
1294  # as baseline
1295 
1296  # cpu training setup
1297  model = DDP_NET
1298 
1299  # single gpu training setup
1300  model_gpu = copy.deepcopy(model)
1301  model_gpu.cuda(gpu_subset[0])
1302 
1303  # DDP training setup
1304  model_DDP = copy.deepcopy(model)
1305  model_DDP.cuda(gpu_subset[0])
1306  model_DDP = nn.parallel.DistributedDataParallel(
1307  model_DDP, device_ids=gpu_subset
1308  )
1309 
1310  # test serializable/unserializable
1311  if INIT_METHOD.startswith("file://"):
1312  _, filename = tempfile.mkstemp(prefix=FOLDER)
1313  torch.save(model_DDP, filename)
1314  model_DDP = torch.load(filename)
1315 
1316  # dummy data initialization
1317  local_bs = len(gpu_subset)
1318  global_bs, input_cpu, target, loss = self._prepare_dummy_data(local_bs)
1319 
1320  # check two model parameters over 5 iterations
1321  self._test_DDP_5iter(
1322  model_gpu,
1323  model_DDP,
1324  input_cpu.cuda(gpu_subset[0]),
1325  target.cuda(gpu_subset[0]),
1326  loss,
1327  local_bs,
1328  rank,
1329  global_bs,
1330  True
1331  )
1332  self._barrier()
1333 
1334  @unittest.skipIf(
1335  BACKEND == "nccl", "nccl does not support DistributedDataParallelCPU"
1336  )
1337  def test_DistributedDataParallelCPU(self):
1338  # Run a simple end to end DDP-CPU model, use result of single node
1339  # model as baseline
1340  group, group_id, rank = self._init_global_test()
1341 
1342  # cpu training setup
1343  model_base = DDP_NET
1344 
1345  # DDP-CPU training setup
1346  model_DDP = copy.deepcopy(model_base)
1347  model_DDP = nn.parallel.DistributedDataParallelCPU(model_DDP)
1348 
1349  # dummy data initialization
1350  local_bs = 2
1351  global_bs, input_cpu, target, loss = self._prepare_dummy_data(local_bs)
1352 
1353  # check two model parameters over 5 iterations
1354  # TODO: add state pickling support for DistributedDataParallelCPU
1355  self._test_DDP_5iter(
1356  model_base, model_DDP, input_cpu, target, loss, local_bs, rank, global_bs, False
1357  )
1358  self._barrier()
1359 
1360  @unittest.skipIf(BACKEND != 'nccl' and BACKEND != 'gloo',
1361  "Only Nccl & Gloo backend support DistributedDataParallel")
1362  @skip_if_no_cuda_distributed
1363  @skip_if_no_gpu
1364  def test_DistributedDataParallel(self):
1365  group, group_id, rank = self._init_global_test()
1366  rank_to_GPU = self._init_multigpu_helper()
1367  gpus = list(rank_to_GPU[rank])
1368  self._test_DistributedDataParallel(gpu_subset=gpus, rank=rank)
1369 
1370  # test output_device
1371  self._test_DistributedDataParallel(gpu_subset=gpus, rank=rank, output_device=torch.device('cuda'))
1372 
1373  # test device_ids
1374  gpus = list(map(lambda i: torch.device('cuda:' + str(i)), gpus))
1375  self._test_DistributedDataParallel(gpu_subset=gpus, rank=rank, output_device=torch.device('cuda'))
1376 
1377  def _test_DistributedDataParallel_SyncBatchNorm(self, gpu_subset, rank, output_device=None):
1378  # Run a simple end to end DDP model, use result of single node model
1379  # as baseline
1380 
1381  # cpu training setup
1382  model = BN_NET
1383 
1384  # single gpu training setup
1385  model_gpu = copy.deepcopy(model)
1386  model_gpu.cuda(gpu_subset[0])
1387 
1388  # DDP training setup
1389  model_DDP = nn.utils.convert_sync_batchnorm(copy.deepcopy(model))
1390  model_DDP.cuda(gpu_subset[0])
1391  model_DDP = nn.parallel.DistributedDataParallel(
1392  model_DDP, device_ids=gpu_subset
1393  )
1394 
1395  # test serializable/unserializable
1396  if INIT_METHOD.startswith("file://"):
1397  _, filename = tempfile.mkstemp(prefix=FOLDER)
1398  torch.save(model_DDP, filename)
1399  model_DDP = torch.load(filename)
1400 
1401  # dummy data initialization
1402  local_bs = len(gpu_subset)
1403  global_bs, input_cpu, target, loss = self._prepare_dummy_data(local_bs)
1404 
1405  # check two model parameters over 5 iterations
1406  self._test_DDP_5iter(
1407  model_gpu,
1408  model_DDP,
1409  input_cpu.cuda(gpu_subset[0]),
1410  target.cuda(gpu_subset[0]),
1411  loss,
1412  local_bs,
1413  rank,
1414  global_bs,
1415  True
1416  )
1417  self._barrier()
1418 
1419  @unittest.skipIf(BACKEND != 'nccl' and BACKEND != 'gloo',
1420  "Only Nccl & Gloo backend support DistributedDataParallel")
1421  @skip_if_no_cuda_distributed
1422  @skip_if_no_gpu
1423  def test_DistributedDataParallel_SyncBatchNorm(self):
1424  group, group_id, rank = self._init_global_test()
1425  rank_to_GPU = self._init_multigpu_helper()
1426  gpus = list(rank_to_GPU[rank])
1427  self._test_DistributedDataParallel_SyncBatchNorm(gpu_subset=gpus, rank=rank)
1428 
1429  # test output_device
1430  self._test_DistributedDataParallel_SyncBatchNorm(gpu_subset=gpus, rank=rank, output_device=torch.device('cuda'))
1431 
1432  # test device_ids
1433  gpus = list(map(lambda i: torch.device('cuda:' + str(i)), gpus))
1434  self._test_DistributedDataParallel_SyncBatchNorm(gpu_subset=gpus, rank=rank, output_device=torch.device('cuda'))
1435 
1436 if BACKEND == "gloo" or BACKEND == "nccl":
1437  WORLD_SIZE = os.environ["WORLD_SIZE"]
1438 
1440  MANAGER_PROCESS_RANK = -1
1441 
1442  @staticmethod
1443  def manager_join(fn):
1444  @wraps(fn)
1445  def wrapper(self):
1446  if self.rank == self.MANAGER_PROCESS_RANK:
1447  self._join_and_reduce(fn)
1448  else:
1449  fn(self)
1450 
1451  return wrapper
1452 
1453  @classmethod
1454  def setUpClass(cls):
1455  os.environ["MASTER_ADDR"] = str(MASTER_ADDR)
1456  os.environ["MASTER_PORT"] = str(MASTER_PORT)
1457  os.environ["WORLD_SIZE"] = str(WORLD_SIZE)
1458  for attr in dir(cls):
1459  if attr.startswith("test"):
1460  fn = getattr(cls, attr)
1461  setattr(cls, attr, cls.manager_join(fn))
1462 
1463  def setUp(self):
1464  # Adding this hack until we fix the FileStore to delete its
1465  # content at the end
1466  global INIT_METHOD
1467  if INIT_METHOD.startswith("file://"):
1468  _, filename = tempfile.mkstemp(prefix=FOLDER)
1469  INIT_METHOD = "file://{}".format(filename)
1470 
1471  self.processes = []
1472  self.rank = self.MANAGER_PROCESS_RANK
1473  Barrier.init()
1474  for rank in range(int(WORLD_SIZE)):
1475  self.processes.append(self._spawn_process(rank))
1476 
1477  def tearDown(self):
1478  for p in self.processes:
1479  p.terminate()
1480 
1481  def _spawn_process(self, rank):
1482  os.environ["RANK"] = str(rank)
1483  name = "process " + str(rank)
1484  process = multiprocessing.Process(target=self._run, name=name, args=(rank,))
1485  process.start()
1486  return process
1487 
1488  def _run(self, rank):
1489  self.rank = rank
1490  try:
1491  dist.init_process_group(
1492  init_method=INIT_METHOD,
1493  backend=BACKEND,
1494  world_size=int(WORLD_SIZE),
1495  rank=self.rank
1496  )
1497  except RuntimeError as e:
1498  if "recompile" in e.args[0]:
1499  sys.exit(SKIP_IF_BACKEND_UNAVAILABLE)
1500  # sys.exit(0)
1501  raise
1502 
1503  # Execute barrier prior to running test to ensure that every process
1504  # has finished initialization and that the following test
1505  # immediately exiting due to a skip doesn't cause flakiness.
1506  self._barrier()
1507 
1508  # self.id() == e.g. '__main__.TestDistributed.test_get_rank'
1509  # We're retreiving a corresponding test and executing it.
1510  getattr(self, self.id().split(".")[2])()
1511  self._barrier()
1512  dist.destroy_process_group()
1513  sys.exit(0)
1514 
1515  def _join_and_reduce(self, fn):
1516  skip_ok = (
1517  getattr(fn, "skip_if_no_cuda_distributed", False) or
1518  getattr(fn, "skip_if_no_gpu", False) or
1519  getattr(fn, "skip_if_small_worldsize", False)
1520  )
1521  join_timeout = get_timeout(self.id())
1522  for rank, process in enumerate(self.processes):
1523  process.join(join_timeout)
1524  self.assertFalse(
1525  process.is_alive(),
1526  "Timeout waiting for rank %d to terminate" % rank)
1527 
1528  first_process = self.processes[0]
1529  for p in self.processes:
1530  self.assertEqual(p.exitcode, first_process.exitcode)
1531 
1532  if first_process.exitcode == SKIP_IF_BACKEND_UNAVAILABLE:
1533  raise unittest.SkipTest("Compiled without the " + BACKEND + " backend")
1534 
1535  if skip_ok:
1536  # do this first so we don't give an error message about
1537  # mismatched exit codes if the first isn't valid
1538  assert (
1539  first_process.exitcode == 0 or
1540  first_process.exitcode == SKIP_IF_NO_CUDA_EXIT_CODE or
1541  first_process.exitcode == SKIP_IF_NO_GPU_EXIT_CODE or
1542  first_process.exitcode == SKIP_IF_SMALL_WORLDSIZE_EXIT_CODE
1543  )
1544 
1545  if first_process.exitcode == SKIP_IF_NO_CUDA_EXIT_CODE:
1546  raise unittest.SkipTest("cuda is not available")
1547  if first_process.exitcode == SKIP_IF_NO_GPU_EXIT_CODE:
1548  raise unittest.SkipTest(
1549  "One unique gpu per process is not available"
1550  )
1551  if first_process.exitcode == SKIP_IF_SMALL_WORLDSIZE_EXIT_CODE:
1552  raise unittest.SkipTest("worldsize is too small to run group tests")
1553 
1554  self.assertEqual(first_process.exitcode, 0)
1555 
1556 
1557 elif BACKEND == "mpi":
1558  WORLD_SIZE = os.environ["WORLD_SIZE"]
1559  dist.init_process_group(init_method=INIT_METHOD, backend="mpi")
1560 
1562  pass
1563 
1564 
1565 if __name__ == "__main__":
1566  assert (
1567  not torch.cuda._initialized
1568  ), "test_distributed must not have initialized CUDA context on main process"
1569 
1570  run_tests()
def assertEqual(self, x, y, prec=None, message='', allow_inf=False)
def _test_scatter_helper(self, group, group_id, rank)
def _test_reduce_multigpu_helper(self, group, group_id, rank, rank_to_GPU, op, master_value, worker_value, expected_value)
def _test_barrier_helper(self, group, group_id, rank, cuda=False, rank_to_GPU=None)
def _test_all_gather_helper(self, group, group_id, rank, cuda=False, rank_to_GPU=None)
def _test_broadcast_helper(self, group, group_id, rank, cuda=False, rank_to_GPU=None)
def _test_all_reduce_multigpu_helper(self, group, group_id, rank, rank_to_GPU, op, master_value, worker_value, expected_value)
Module caffe2.python.layers.split.
def _test_all_reduce_helper(self, group, group_id, rank, op, master_value, worker_value, expected_value, cuda=False, rank_to_GPU=None)
def _prepare_dummy_data(self, local_bs)
def is_available()
Definition: __init__.py:45
def _init_full_group_test(self, kwargs)
Definition: model.py:1
def device_count()
Definition: __init__.py:341
def _test_gather_helper(self, group, group_id, rank)
def _test_broadcast_multigpu_helper(self, group, group_id, rank, rank_to_GPU)
def _test_reduce_helper(self, group, group_id, rank, op, master_value, worker_value, expected_value, cuda=False, rank_to_GPU=None)
def _assert_equal_param(self, param_gpu, param_DDP)
def _test_DDP_5iter(self, model_base, model_DDP, input, target, loss, local_bs, rank, batch_size, test_save)
def _test_barrier_timeout(self, group_id, timeout)
def _test_DistributedDataParallel_SyncBatchNorm(self, gpu_subset, rank, output_device=None)
def _test_all_gather_multigpu_helper(self, group, group_id, rank, rank_to_GPU)
def _barrier(self, args, kwargs)
def _test_DDP_helper(self, model, input_var, target, loss)
def _test_DistributedDataParallel(self, gpu_subset, rank, output_device=None)