Caffe2 - Python API
A deep learning, cross platform ML framework
test_thd_distributed.py
1 from __future__ import absolute_import, division, print_function, unicode_literals
2 import copy
3 import fcntl
4 import multiprocessing
5 import os
6 import sys
7 import time
8 import unittest
9 from contextlib import contextmanager
10 from functools import reduce, wraps
11 import tempfile
12 
13 import torch
14 import torch.cuda
15 import torch.distributed.deprecated as dist
16 import torch.nn as nn
17 import torch.nn.functional as F
18 import torch.optim as optim
19 from common_utils import TestCase, run_tests
20 from torch._utils_internal import TEST_MASTER_ADDR as MASTER_ADDR
21 
22 
23 BACKEND = os.environ["BACKEND"]
24 TEMP_DIR = os.environ["TEMP_DIR"]
25 INIT_METHOD = os.getenv("INIT_METHOD", "env://")
26 MASTER_PORT = "29500"
27 
28 DEFAULT_TIMEOUT = 300
29 CUSTOMIZED_TIMEOUT = {"test_DistributedDataParallel": 500}
30 
31 
32 def get_timeout(test_id):
33  test_name = test_id.split(".")[-1]
34  if test_name in CUSTOMIZED_TIMEOUT:
35  return CUSTOMIZED_TIMEOUT[test_name]
36  else:
37  return DEFAULT_TIMEOUT
38 
39 
40 if not dist.is_available():
41  print("Distributed not available, skipping tests")
42  sys.exit(0)
43 
44 SKIP_IF_NO_CUDA_EXIT_CODE = 75
45 SKIP_IF_NO_GPU_EXIT_CODE = 76
46 SKIP_IF_SMALL_WORLDSIZE_EXIT_CODE = 77
47 SKIP_IF_BACKEND_UNAVAILABLE = 78
48 
49 
50 def skip_if_no_cuda_distributed(func):
51  func.skip_if_no_cuda_distributed = True
52 
53  @wraps(func)
54  def wrapper(*args, **kwargs):
55  if not torch.cuda.is_available():
56  sys.exit(SKIP_IF_NO_CUDA_EXIT_CODE)
57 
58  return func(*args, **kwargs)
59 
60  return wrapper
61 
62 
63 def skip_if_no_gpu(func):
64  """ Nccl multigpu tests requires at least 2 GPUS. Skip if this is not met"""
65  func.skip_if_no_gpu = True
66 
67  @wraps(func)
68  def wrapper(*args, **kwargs):
69  if not torch.cuda.is_available():
70  sys.exit(SKIP_IF_NO_CUDA_EXIT_CODE)
71  if torch.cuda.device_count() < int(os.environ["WORLD_SIZE"]):
72  sys.exit(SKIP_IF_NO_GPU_EXIT_CODE)
73 
74  return func(*args, **kwargs)
75 
76  return wrapper
77 
78 
79 def skip_if_small_worldsize(func):
80  func.skip_if_small_worldsize = True
81 
82  @wraps(func)
83  def wrapper(*args, **kwargs):
84  if (os.environ["BACKEND"] != "mpi") and int(os.environ["WORLD_SIZE"]) <= 2:
85  sys.exit(SKIP_IF_SMALL_WORLDSIZE_EXIT_CODE)
86 
87  return func(*args, **kwargs)
88 
89  return wrapper
90 
91 
92 def apply_hack_for_nccl():
93  # This is a hack for a known NCCL issue using multiprocess
94  # in conjunction with multiple threads to manage different GPUs which
95  # may cause ncclCommInitRank to fail.
96  # http://docs.nvidia.com/deeplearning/sdk/nccl-release-notes/rel_2.1.4.html#rel_2.1.4
97  # It slows down the performance of collective operations.
98  # Without this setting NCCL might throw unhandled error.
99  os.environ["NCCL_MAX_NRINGS"] = "1"
100 
101 
102 @contextmanager
103 def _lock():
104  lockfile = os.path.join(TEMP_DIR, "lockfile")
105  with open(lockfile, "w") as lf:
106  try:
107  fcntl.flock(lf.fileno(), fcntl.LOCK_EX)
108  yield
109  finally:
110  fcntl.flock(lf.fileno(), fcntl.LOCK_UN)
111  lf.close()
112 
113 
114 def _build_tensor(size, value=None):
115  if value is None:
116  value = size
117  return torch.FloatTensor(size, size, size).fill_(value)
118 
119 
120 class Barrier(object):
121  barrier_id = 0
122 
123  @classmethod
124  def init(cls):
125  cls.barrier_id = 0
126  barrier_dir = os.path.join(TEMP_DIR, "barrier")
127  for f_name in os.listdir(barrier_dir):
128  os.unlink(os.path.join(barrier_dir, f_name))
129 
130  @classmethod
131  def sync(cls, timeout=5):
132  cls.barrier_id += 1
133  barrier_dir = os.path.join(TEMP_DIR, "barrier")
134  pid = str(os.getpid())
135  barrier_file = os.path.join(barrier_dir, pid)
136  with _lock():
137  with open(barrier_file, "w") as f:
138  f.write(str(cls.barrier_id))
139 
140  start_time = time.time()
141  while True:
142  arrived = 0
143  with _lock():
144  for f_name in os.listdir(barrier_dir):
145  with open(os.path.join(barrier_dir, f_name), "r") as f:
146  data = f.read()
147  if int(data) >= cls.barrier_id:
148  arrived += 1
149  if arrived == dist.get_world_size():
150  break
151 
152  if time.time() - start_time > timeout:
153  raise RuntimeError("barrier timeout")
154  time.sleep(0.1)
155 
156 
157 # The test network must live at top level so we can test pickling it.
158 
159 
160 class _FC2(nn.Module):
161 
162  def __init__(self):
163  super(_FC2, self).__init__()
164  self.fc = nn.Linear(10, 50, bias=True)
165  self.fc.bias.requires_grad = False
166 
167  def forward(self, x):
168  x = self.fc(x)
169  return x
170 
171 
172 class Net(nn.Module):
173  def __init__(self):
174  super(Net, self).__init__()
175  self.fc1 = nn.Linear(2, 10, bias=False)
176  self.fc2 = _FC2()
177  self.fc3 = nn.Linear(50, 4, bias=False)
178  self.relu = nn.ReLU()
179 
180  def forward(self, x):
181  x = self.relu(self.fc1(x))
182  x = self.relu(self.fc2(x))
183  x = self.fc3(x)
184  return F.softmax(x, dim=1)
185 
186 
187 class _DistTestBase(object):
188  def _barrier(self, *args, **kwargs):
189  Barrier.sync(*args, **kwargs)
190 
191  def _init_group_test(self):
192  group = [1, 2]
193  group_id = dist.new_group(group)
194  rank = dist.get_rank()
195  if rank not in group:
196  return ([], None, rank)
197 
198  return (group, group_id, rank)
199 
200  def _init_global_test(self):
201  group = [i for i in range(0, dist.get_world_size())]
202  group_id = dist.group.WORLD
203  rank = dist.get_rank()
204  return (group, group_id, rank)
205 
206  # HELPER FOR MULTIGPU TESTS
207  def _init_multigpu_helper(self):
208  """Multigpu tests are designed to simulate the multi nodes with multi
209  GPUs on each node. Nccl backend requires equal #GPUs in each process.
210  On a single node, all visible GPUs are evenly
211  divided to subsets, each process only uses a subset.
212  """
213  nGPUs = torch.cuda.device_count()
214  world_size = dist.get_world_size()
215  visible_devices = range(nGPUs)
216 
217  if BACKEND == "nccl":
218  apply_hack_for_nccl()
219 
220  nGPUs_per_process = nGPUs // world_size
221  rank_to_GPU = {
222  i: list(
223  visible_devices[i * nGPUs_per_process: (i + 1) * nGPUs_per_process]
224  )
225  for i in range(world_size)
226  }
227  return rank_to_GPU
228 
229  # GET RANK
230  def test_get_rank(self):
231  test_dir = os.path.join(TEMP_DIR, "test_dir")
232  pid = str(os.getpid())
233  num_processes = dist.get_world_size()
234  with open(os.path.join(test_dir, pid), "w") as f:
235  f.write(str(dist.get_rank()))
236 
237  self._barrier()
238 
239  all_ranks = set()
240  for f_name in os.listdir(test_dir):
241  with open(os.path.join(test_dir, f_name), "r") as f:
242  all_ranks.add(int(f.read()))
243  self.assertEqual(len(all_ranks), num_processes)
244 
245  self._barrier()
246 
247  if dist.get_rank() == 0:
248  for f_name in os.listdir(test_dir):
249  os.unlink(os.path.join(test_dir, f_name))
250 
251  self._barrier()
252 
253  # SEND RECV
254  @unittest.skipIf(BACKEND == "gloo", "Gloo does not support send/recv")
255  @unittest.skipIf(BACKEND == "nccl", "Nccl does not support send/recv")
256  def test_send_recv(self):
257  rank = dist.get_rank()
258  tensor = _build_tensor(rank + 1)
259  for dest in range(0, dist.get_world_size()):
260  if dest == rank:
261  continue
262  dist.send(tensor, dest)
263 
264  for src in range(0, dist.get_world_size()):
265  if src == rank:
266  continue
267  tensor = _build_tensor(src + 1, value=-1)
268  expected_tensor = _build_tensor(src + 1)
269  dist.recv(tensor, src)
270  self.assertEqual(tensor, expected_tensor)
271 
272  self._barrier()
273 
274  # SEND RECV ANY SOURCE
275  @unittest.skipIf(
276  BACKEND == "gloo", "Gloo does not support send/recv from any source"
277  )
278  @unittest.skipIf(
279  BACKEND == "nccl", "Nccl does not support send/recv from any source"
280  )
281  def test_send_recv_any_source(self):
282  rank = dist.get_rank()
283  tensor = _build_tensor(10, rank)
284  for dest in range(0, dist.get_world_size()):
285  if dest == rank:
286  continue
287  dist.send(tensor, dest)
288 
289  recv_ranks = set()
290  for src in range(0, dist.get_world_size()):
291  if src == rank:
292  continue
293  tensor = _build_tensor(10, value=-1)
294  sender = dist.recv(tensor)
295  self.assertTrue(tensor.eq(sender).all())
296  recv_ranks.add(sender)
297 
298  self.assertEqual(len(recv_ranks), dist.get_world_size() - 1)
299  self._barrier()
300 
301  # ISEND
302  @unittest.skipIf(BACKEND == "gloo", "Gloo does not support isend")
303  @unittest.skipIf(BACKEND == "nccl", "Nccl does not support isend")
304  def test_isend(self):
305  rank = dist.get_rank()
306  world_size = dist.get_world_size()
307 
308  if rank == 0:
309  requests = [
310  dist.isend(_build_tensor(dest, 10), dest)
311  for dest in range(1, world_size)
312  ]
313  for request in requests:
314  request.wait()
315  self.assertTrue(request.is_completed())
316  else:
317  tensor = _build_tensor(rank, -1)
318  dist.recv(tensor, 0)
319  self.assertEqual(tensor, _build_tensor(rank, 10))
320 
321  self._barrier()
322 
323  # IRECV
324  @unittest.skipIf(BACKEND == "gloo", "Gloo does not support irecv")
325  @unittest.skipIf(BACKEND == "nccl", "Nccl does not support irecv")
326  def test_irecv(self):
327  rank = dist.get_rank()
328  world_size = dist.get_world_size()
329 
330  if rank == 0:
331  expected_tensors = [_build_tensor(src, -1) for src in range(1, world_size)]
332  requests = [
333  dist.irecv(expected_tensors[src - 1], src)
334  for src in range(1, world_size)
335  ]
336 
337  for src in range(1, world_size):
338  requests[src - 1].wait()
339  self.assertTrue(requests[src - 1].is_completed())
340  self.assertEqual(expected_tensors[src - 1], _build_tensor(src, 10))
341  else:
342  tensor = _build_tensor(rank, 10)
343  dist.send(tensor, 0)
344 
345  self._barrier()
346 
347  # BROADCAST
348  def _test_broadcast_helper(
349  self, group, group_id, rank, cuda=False, rank_to_GPU=None
350  ):
351  for ttype, value, requires_cuda in [
352  ("torch.FloatTensor", -1e-10, False),
353  ("torch.DoubleTensor", -1e-100, False),
354  ("torch.HalfTensor", -0.1, True),
355  ("torch.CharTensor", -2, False),
356  ("torch.ByteTensor", 129, False),
357  ("torch.IntTensor", -1e5, False),
358  ("torch.LongTensor", -1e15, False),
359  ]:
360  if requires_cuda and not cuda:
361  continue
362  for src in group:
363  expected_tensor = _build_tensor(src + 1, value).type(ttype)
364  if cuda:
365  expected_tensor = expected_tensor.cuda(rank_to_GPU[rank][0])
366  if rank == src:
367  dist.broadcast(expected_tensor, src, group_id)
368  else:
369  tensor = _build_tensor(src + 1, -1).type(ttype)
370  if cuda:
371  tensor = tensor.cuda(rank_to_GPU[rank][0])
372  dist.broadcast(tensor, src, group_id)
373  self.assertEqual(tensor.size(), expected_tensor.size())
374  self.assertEqual(tensor.ne(expected_tensor).max(), 0)
375 
376  self._barrier()
377 
378  @unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors")
379  def test_broadcast(self):
380  group, group_id, rank = self._init_global_test()
381  self._test_broadcast_helper(group, group_id, rank)
382 
383  @unittest.skipIf(
384  BACKEND != "gloo" and BACKEND != "nccl",
385  "Only Gloo and Nccl backend supports CUDA allReduce",
386  )
387  @skip_if_no_cuda_distributed
388  @skip_if_no_gpu
389  def test_broadcast_cuda(self):
390  group, group_id, rank = self._init_global_test()
391  rank_to_GPU = self._init_multigpu_helper()
392  self._test_broadcast_helper(group, group_id, rank, True, rank_to_GPU)
393 
394  @unittest.skipIf(BACKEND == "nccl", "Nccl does not support newGroup")
395  @skip_if_small_worldsize
396  def test_broadcast_group(self):
397  group, group_id, rank = self._init_group_test()
398  self._test_broadcast_helper(group, group_id, rank)
399 
400  # REDUCE
401  def _test_reduce_helper(
402  self,
403  group,
404  group_id,
405  rank,
406  op,
407  master_value,
408  worker_value,
409  expected_value,
410  cuda=False,
411  rank_to_GPU=None,
412  ):
413  for src in group:
414  if rank == src:
415  tensor = _build_tensor(src + 1).fill_(master_value)
416  if cuda:
417  tensor = tensor.cuda(rank_to_GPU[rank][0])
418  dist.reduce(tensor, src, op, group_id)
419  self.assertEqual(tensor, _build_tensor(src + 1, expected_value))
420  else:
421  tensor = _build_tensor(src + 1).fill_(worker_value)
422  if cuda:
423  tensor = tensor.cuda(rank_to_GPU[rank][0])
424  dist.reduce(tensor, src, op, group_id)
425 
426  self._barrier()
427 
428  @unittest.skipIf(BACKEND == "gloo", "Gloo does not support reduce")
429  @unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors")
430  def test_reduce_sum(self):
431  group, group_id, rank = self._init_global_test()
432  self._test_reduce_helper(
433  group,
434  group_id,
435  rank,
436  dist.reduce_op.SUM,
437  2,
438  10,
439  2 + (10 * (len(group) - 1)),
440  )
441 
442  @unittest.skipIf(BACKEND != "nccl", "Only Nccl supports CUDA reduce")
443  @skip_if_no_cuda_distributed
444  @skip_if_no_gpu
445  def test_reduce_sum_cuda(self):
446  group, group_id, rank = self._init_global_test()
447  rank_to_GPU = self._init_multigpu_helper()
448  self._test_reduce_helper(
449  group,
450  group_id,
451  rank,
452  dist.reduce_op.SUM,
453  2,
454  10,
455  2 + 10 * (len(group) - 1),
456  True,
457  rank_to_GPU,
458  )
459 
460  @unittest.skipIf(BACKEND == "gloo", "Gloo does not support reduce")
461  @unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors")
462  def test_reduce_product(self):
463  group, group_id, rank = self._init_global_test()
464  self._test_reduce_helper(
465  group,
466  group_id,
467  rank,
468  dist.reduce_op.PRODUCT,
469  2,
470  10,
471  reduce((lambda x, y: x * y), [10] * (len(group) - 1), 2),
472  )
473 
474  @unittest.skipIf(BACKEND == "gloo", "Gloo does not support reduce")
475  @unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors")
476  def test_reduce_min(self):
477  group, group_id, rank = self._init_global_test()
478  self._test_reduce_helper(group, group_id, rank, dist.reduce_op.MIN, 1010, 1, 1)
479 
480  @unittest.skipIf(BACKEND == "gloo", "Gloo does not support reduce")
481  @unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors")
482  def test_reduce_max(self):
483  group, group_id, rank = self._init_global_test()
484  self._test_reduce_helper(group, group_id, rank, dist.reduce_op.MAX, -1, 10, 10)
485 
486  @unittest.skipIf(BACKEND == "gloo", "Gloo does not support reduce")
487  @unittest.skipIf(BACKEND == "nccl", "Nccl does not support newGroup")
488  @skip_if_small_worldsize
489  def test_reduce_group_sum(self):
490  group, group_id, rank = self._init_group_test()
491  self._test_reduce_helper(
492  group,
493  group_id,
494  rank,
495  dist.reduce_op.SUM,
496  2,
497  10,
498  2 + (10 * (len(group) - 1)),
499  )
500 
501  @unittest.skipIf(BACKEND == "gloo", "Gloo does not support reduce")
502  @unittest.skipIf(BACKEND == "nccl", "Nccl does not support newGroup")
503  @skip_if_small_worldsize
504  def test_reduce_group_product(self):
505  group, group_id, rank = self._init_group_test()
506  self._test_reduce_helper(
507  group,
508  group_id,
509  rank,
510  dist.reduce_op.PRODUCT,
511  2,
512  10,
513  reduce((lambda x, y: x * y), [10] * (len(group) - 1), 2),
514  )
515 
516  @unittest.skipIf(BACKEND == "gloo", "Gloo does not support reduce")
517  @unittest.skipIf(BACKEND == "nccl", "Nccl does not support newGroup")
518  @skip_if_small_worldsize
519  def test_reduce_group_min(self):
520  group, group_id, rank = self._init_group_test()
521  self._test_reduce_helper(group, group_id, rank, dist.reduce_op.MIN, 1010, 1, 1)
522 
523  @unittest.skipIf(BACKEND == "gloo", "Gloo does not support reduce")
524  @unittest.skipIf(BACKEND == "nccl", "Nccl does not support newGroup")
525  @skip_if_small_worldsize
526  def test_reduce_group_max(self):
527  group, group_id, rank = self._init_group_test()
528  self._test_reduce_helper(group, group_id, rank, dist.reduce_op.MAX, -1, 10, 10)
529 
530  # ALL REDUCE
531  def _test_all_reduce_helper(
532  self,
533  group,
534  group_id,
535  rank,
536  op,
537  master_value,
538  worker_value,
539  expected_value,
540  cuda=False,
541  rank_to_GPU=None,
542  ):
543  for src in group:
544  if rank == src:
545  tensor = _build_tensor(src + 1).fill_(master_value)
546  if cuda:
547  tensor = tensor.cuda(rank_to_GPU[rank][0])
548  dist.all_reduce(tensor, op, group_id)
549  self.assertEqual(tensor, _build_tensor(src + 1, expected_value))
550  else:
551  tensor = _build_tensor(src + 1).fill_(worker_value)
552  if cuda:
553  tensor = tensor.cuda(rank_to_GPU[rank][0])
554  dist.all_reduce(tensor, op, group_id)
555  self.assertEqual(tensor, _build_tensor(src + 1, expected_value))
556 
557  self._barrier()
558 
559  @unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors")
560  def test_all_reduce_sum(self):
561  group, group_id, rank = self._init_global_test()
563  group,
564  group_id,
565  rank,
566  dist.reduce_op.SUM,
567  2,
568  10,
569  2 + (10 * (len(group) - 1)),
570  )
571 
572  @unittest.skipIf(
573  BACKEND != "gloo" and BACKEND != "nccl",
574  "Only Gloo & Nccl backend support CUDA allReduce",
575  )
576  @skip_if_no_cuda_distributed
577  @skip_if_no_gpu
578  def test_all_reduce_sum_cuda(self):
579  group, group_id, rank = self._init_global_test()
580  rank_to_GPU = self._init_multigpu_helper()
582  group,
583  group_id,
584  rank,
585  dist.reduce_op.SUM,
586  2,
587  10,
588  2 + (10 * (len(group) - 1)),
589  True,
590  rank_to_GPU,
591  )
592 
593  @unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors")
594  def test_all_reduce_product(self):
595  group, group_id, rank = self._init_global_test()
597  group,
598  group_id,
599  rank,
600  dist.reduce_op.PRODUCT,
601  2,
602  10,
603  reduce((lambda x, y: x * y), [10] * (len(group) - 1), 2),
604  )
605 
606  @unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors")
607  def test_all_reduce_min(self):
608  group, group_id, rank = self._init_global_test()
610  group, group_id, rank, dist.reduce_op.MIN, 1010, 1, 1
611  )
612 
613  @unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors")
614  def test_all_reduce_max(self):
615  group, group_id, rank = self._init_global_test()
617  group, group_id, rank, dist.reduce_op.MAX, -1, 10, 10
618  )
619 
620  @unittest.skipIf(BACKEND == "nccl", "Nccl does not support newGroup")
621  @skip_if_small_worldsize
622  def test_all_reduce_group_sum(self):
623  group, group_id, rank = self._init_group_test()
625  group,
626  group_id,
627  rank,
628  dist.reduce_op.SUM,
629  2,
630  10,
631  2 + (10 * (len(group) - 1)),
632  )
633 
634  @unittest.skipIf(BACKEND == "nccl", "Nccl does not support newGroup")
635  @skip_if_small_worldsize
636  def test_all_reduce_group_product(self):
637  group, group_id, rank = self._init_group_test()
639  group,
640  group_id,
641  rank,
642  dist.reduce_op.PRODUCT,
643  2,
644  10,
645  reduce((lambda x, y: x * y), [10] * (len(group) - 1), 2),
646  )
647 
648  @unittest.skipIf(BACKEND == "nccl", "Nccl does not support newGroup")
649  @skip_if_small_worldsize
650  def test_all_reduce_group_min(self):
651  group, group_id, rank = self._init_group_test()
653  group, group_id, rank, dist.reduce_op.MIN, 1010, 1, 1
654  )
655 
656  @unittest.skipIf(BACKEND == "nccl", "Nccl does not support newGroup")
657  @skip_if_small_worldsize
658  def test_all_reduce_group_max(self):
659  group, group_id, rank = self._init_group_test()
661  group, group_id, rank, dist.reduce_op.MAX, -1, 10, 10
662  )
663 
664  # SCATTER
665  def _test_scatter_helper(self, group, group_id, rank):
666  for dest in group:
667  tensor = _build_tensor(dest + 1, -1)
668  expected_tensor = _build_tensor(dest + 1, rank)
669  tensors = (
670  [_build_tensor(dest + 1, i) for i in group] if rank == dest else []
671  )
672  dist.scatter(tensor, src=dest, scatter_list=tensors, group=group_id)
673  self.assertEqual(tensor, expected_tensor)
674 
675  self._barrier()
676 
677  @unittest.skipIf(BACKEND == "gloo", "Gloo does not support scatter")
678  @unittest.skipIf(BACKEND == "nccl", "Nccl does not support scatter")
679  def test_scatter(self):
680  group, group_id, rank = self._init_global_test()
681  self._test_scatter_helper(group, group_id, rank)
682 
683  @unittest.skipIf(BACKEND == "gloo", "Gloo does not support scatter")
684  @unittest.skipIf(BACKEND == "nccl", "Nccl does not support scatter")
685  @skip_if_small_worldsize
686  def test_scatter_group(self):
687  group, group_id, rank = self._init_group_test()
688  self._test_scatter_helper(group, group_id, rank)
689 
690  # GATHER
691  def _test_gather_helper(self, group, group_id, rank):
692  for dest in group:
693  tensor = _build_tensor(dest + 1, rank)
694  tensors = (
695  [_build_tensor(dest + 1, -1) for i in group] if rank == dest else []
696  )
697  dist.gather(tensor, dst=dest, gather_list=tensors, group=group_id)
698  if rank == dest:
699  expected_tensors = [_build_tensor(dest + 1, i) for i in group]
700  for t1, t2 in zip(tensors, expected_tensors):
701  self.assertEqual(t1, t2)
702 
703  self._barrier()
704 
705  @unittest.skipIf(BACKEND == "gloo", "Gloo does not support gather")
706  @unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors")
707  def test_gather(self):
708  group, group_id, rank = self._init_global_test()
709  self._test_gather_helper(group, group_id, rank)
710 
711  @unittest.skipIf(BACKEND == "gloo", "Gloo does not support gather")
712  @unittest.skipIf(BACKEND == "nccl", "Nccl does not support newGroup")
713  @skip_if_small_worldsize
714  def test_gather_group(self):
715  group, group_id, rank = self._init_group_test()
716  self._test_gather_helper(group, group_id, rank)
717 
718  # ALL GATHER
719  def _test_all_gather_helper(
720  self, group, group_id, rank, cuda=False, rank_to_GPU=None
721  ):
722  for dest in group:
723  tensor = _build_tensor(dest + 1, rank)
724  tensors = [_build_tensor(dest + 1, -1) for i in group]
725  if cuda:
726  tensor = tensor.cuda(rank_to_GPU[rank][0])
727  tensors = [t.cuda(rank_to_GPU[rank][0]) for t in tensors]
728  dist.all_gather(tensors, tensor, group_id)
729 
730  expected_tensors = [_build_tensor(dest + 1, i) for i in group]
731  for t1, t2 in zip(tensors, expected_tensors):
732  self.assertEqual(t1, t2)
733 
734  self._barrier()
735 
736  @unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors")
737  def test_all_gather(self):
738  group, group_id, rank = self._init_global_test()
739  self._test_all_gather_helper(group, group_id, rank)
740 
741  @unittest.skipIf(BACKEND != "nccl", "Only Nccl supports CUDA all gather")
742  @skip_if_no_cuda_distributed
743  @skip_if_no_gpu
744  def test_all_gather_cuda(self):
745  group, group_id, rank = self._init_global_test()
746  rank_to_GPU = self._init_multigpu_helper()
747  self._test_all_gather_helper(group, group_id, rank, True, rank_to_GPU)
748 
749  @unittest.skipIf(BACKEND == "nccl", "Nccl does not support newGroup")
750  @skip_if_small_worldsize
751  def test_all_gather_group(self):
752  group, group_id, rank = self._init_group_test()
753  self._test_all_gather_helper(group, group_id, rank)
754 
755  # BARRIER
756  def _test_barrier_helper(self, group, group_id, rank):
757  WAIT_TIME = 0.3 # seconds
758 
759  for dest in group:
760  expected_time = torch.DoubleTensor(1).fill_(0.0)
761  if dest == rank:
762  expected_time.fill_(time.time() + WAIT_TIME)
763  dist.broadcast(expected_time, dest, group_id)
764  time.sleep(WAIT_TIME + 0.1) # sleep a little bit longer
765  dist.barrier(group_id)
766  else:
767  dist.broadcast(expected_time, dest, group_id)
768  dist.barrier(group_id)
769  self.assertGreaterEqual(time.time(), expected_time[0])
770 
771  self._barrier()
772 
773  @unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors")
774  def test_barrier(self):
775  group, group_id, rank = self._init_global_test()
776  self._test_barrier_helper(group, group_id, rank)
777 
778  @unittest.skipIf(BACKEND == "nccl", "Nccl does not support newGroup")
779  @skip_if_small_worldsize
780  def test_barrier_group(self):
781  group, group_id, rank = self._init_group_test()
782  self._test_barrier_helper(group, group_id, rank)
783 
784  def _test_broadcast_multigpu_helper(self, group, group_id, rank, rank_to_GPU):
785  for src in group:
786  expected_tensor = _build_tensor(src + 1)
787  tensors = [
788  _build_tensor(src + 1, -1).cuda(device=i) for i in rank_to_GPU[rank]
789  ]
790  if rank == src:
791  tensors[0] = expected_tensor.cuda(device=rank_to_GPU[rank][0])
792 
793  dist.broadcast_multigpu(tensors, src, group_id)
794  for tensor in tensors:
795  self.assertEqual(tensor, expected_tensor)
796  self._barrier()
797 
798  @unittest.skipIf(BACKEND != "nccl", "Only Nccl backend supports broadcast multigpu")
799  @skip_if_no_gpu
800  def test_broadcast_multigpu(self):
801  group, group_id, rank = self._init_global_test()
802  rank_to_GPU = self._init_multigpu_helper()
803  self._test_broadcast_multigpu_helper(group, group_id, rank, rank_to_GPU)
804 
805  def _test_all_reduce_multigpu_helper(
806  self,
807  group,
808  group_id,
809  rank,
810  rank_to_GPU,
811  op,
812  master_value,
813  worker_value,
814  expected_value,
815  ):
816  for src in group:
817  if rank == src:
818  tensors = [
819  _build_tensor(src + 1, master_value).cuda(device=i)
820  for i in rank_to_GPU[rank]
821  ]
822  else:
823  tensors = [
824  _build_tensor(src + 1, worker_value).cuda(device=i)
825  for i in rank_to_GPU[rank]
826  ]
827 
828  dist.all_reduce_multigpu(tensors, op, group_id)
829  expected_tensor = _build_tensor(src + 1, expected_value)
830  for tensor in tensors:
831  self.assertEqual(tensor, expected_tensor)
832 
833  self._barrier()
834 
835  @unittest.skipIf(BACKEND != "nccl", "Only Nccl backend supports allreduce multigpu")
836  @skip_if_no_gpu
837  def test_all_reduce_multigpu(self):
838  group, group_id, rank = self._init_global_test()
839  rank_to_GPU = self._init_multigpu_helper()
841  group,
842  group_id,
843  rank,
844  rank_to_GPU,
845  dist.reduce_op.SUM,
846  2,
847  10,
848  (2 + 10 * (len(group) - 1)) * len(rank_to_GPU[0]),
849  )
850 
851  def _test_reduce_multigpu_helper(
852  self,
853  group,
854  group_id,
855  rank,
856  rank_to_GPU,
857  op,
858  master_value,
859  worker_value,
860  expected_value,
861  ):
862  for src in group:
863  if rank == src:
864  tensors = [
865  _build_tensor(src + 1, master_value).cuda(device=i)
866  for i in rank_to_GPU[rank]
867  ]
868  dist.reduce_multigpu(tensors, src, op, group_id)
869  expected_tensor = _build_tensor(src + 1, expected_value)
870  self.assertEqual(tensors[0], expected_tensor)
871  else:
872  tensors = [
873  _build_tensor(src + 1, worker_value).cuda(device=i)
874  for i in rank_to_GPU[rank]
875  ]
876  dist.reduce_multigpu(tensors, src, op, group_id)
877 
878  self._barrier()
879 
880  @unittest.skipIf(BACKEND != "nccl", "Only Nccl backend supports reduce multigpu")
881  @skip_if_no_gpu
882  def test_reduce_multigpu(self):
883  group, group_id, rank = self._init_global_test()
884  rank_to_GPU = self._init_multigpu_helper()
886  group,
887  group_id,
888  rank,
889  rank_to_GPU,
890  dist.reduce_op.SUM,
891  2,
892  10,
893  (2 + 10 * (len(group) - 1)) * len(rank_to_GPU[0]),
894  )
895 
896  def _test_all_gather_multigpu_helper(self, group, group_id, rank, rank_to_GPU):
897  for dest in group:
898  tensors = [
899  _build_tensor(dest + 1).cuda(device=i) for i in rank_to_GPU[rank]
900  ]
901 
902  # construct expected output along with
903  # a place holder to receive all gather results
904  output_tensors = []
905  expected_output = []
906  output_per_gpu = (
907  [_build_tensor(dest + 1, -1)] * len(rank_to_GPU[0]) * len(group)
908  )
909  expected_per_gpu = (
910  [_build_tensor(dest + 1)] * len(rank_to_GPU[0]) * len(group)
911  )
912  for gpu in rank_to_GPU[rank]:
913  output_tensors.append([t.cuda(device=gpu) for t in output_per_gpu])
914  expected_output.append([t.cuda(device=gpu) for t in expected_per_gpu])
915 
916  dist.all_gather_multigpu(output_tensors, tensors, group_id)
917  self.assertEqual(output_tensors, expected_output)
918 
919  self._barrier()
920 
921  @unittest.skipIf(BACKEND != "nccl", "Only Nccl backend supports allgather multigpu")
922  @skip_if_no_gpu
923  def test_all_gather_multigpu(self):
924  group, group_id, rank = self._init_global_test()
925  rank_to_GPU = self._init_multigpu_helper()
926  self._test_all_gather_multigpu_helper(group, group_id, rank, rank_to_GPU)
927 
928  def _create_Net(self):
929  return Net()
930 
931  def _model_step(self, model):
932  for param in model.parameters():
933  if param.grad is not None:
934  param.data += param.grad
935  param.grad = None
936 
937  def _prepare_dummy_data(self, local_bs):
938  # global_bs for DDP should be divisible by WORLD_SIZE
939  global_bs = int(WORLD_SIZE) * local_bs
940  input_cpu = torch.randn(global_bs, 2)
941  target = torch.randn(global_bs, 4)
942  loss = nn.MSELoss()
943  return global_bs, input_cpu, target, loss
944 
945  # END TO END TEST FOR DISTRIBUTEDDATAPARALLEL
946  def _test_DDP_helper(self, model, input_var, target, loss):
947  model.train()
948  output = model(input_var)
949  l = loss(output, target)
950  l.backward()
951 
952  def _assert_equal_param(self, param_gpu, param_DDP):
953  self.assertEqual(len(param_gpu), len(param_DDP))
954  for p_gpu, p_DDP in zip(param_gpu, param_DDP):
955  self.assertEqual(p_gpu, p_DDP)
956 
957  def _test_DDP_2iter(
958  self, model_base, model_DDP, input, target, loss, local_bs, rank, batch_size
959  ):
960  for _ in range(2):
961  # single cpu/gpu training
962  self._test_DDP_helper(model_base, input, target, loss)
963 
964  # DDP training, DDP scatters subsets of input_cpu to nodes/GPUs
965  self._test_DDP_helper(
966  model_DDP,
967  input[rank * local_bs: (rank + 1) * local_bs],
968  target[rank * local_bs: (rank + 1) * local_bs],
969  loss,
970  )
971 
972  # Update weights and run a second iteration to shake out errors
973  self._model_step(model_base)
974  self._model_step(model_DDP)
975  self._assert_equal_param(
976  list(model_base.parameters()), list(model_DDP.module.parameters())
977  )
978 
979  # Shuffle the input so that DDP input is different
980  input = input[torch.randperm(batch_size)]
981 
982  # Test that saving and loading work
983  with tempfile.TemporaryFile() as tmp_file:
984  torch.save(model_DDP, tmp_file)
985  tmp_file.seek(0)
986  saved_model_DDP = torch.load(tmp_file)
987 
988  @unittest.skipIf(
989  BACKEND != "nccl" and BACKEND != "gloo",
990  "Only Nccl & Gloo backend support DistributedDataParallel",
991  )
992  @skip_if_no_cuda_distributed
993  @skip_if_no_gpu
994  def test_DistributedDataParallel(self):
995  # Run a simple end to end DDP model, use result of single node model
996  # as baseline
997  group, group_id, rank = self._init_global_test()
998  rank_to_GPU = self._init_multigpu_helper()
999 
1000  # cpu training setup
1001  model = self._create_Net()
1002 
1003  # single gpu training setup
1004  model_gpu = copy.deepcopy(model)
1005  gpu_subset = list(rank_to_GPU[rank])
1006  model_gpu.cuda(gpu_subset[0])
1007 
1008  # DDP training setup
1009  model_DDP = copy.deepcopy(model)
1010  model_DDP.cuda(gpu_subset[0])
1011  model_DDP = nn.parallel.deprecated.DistributedDataParallel(
1012  model_DDP, device_ids=gpu_subset
1013  )
1014 
1015  # dummy data initialization
1016  local_bs = len(gpu_subset)
1017  global_bs, input_cpu, target, loss = self._prepare_dummy_data(local_bs)
1018 
1019  # check two model parameters over 2 iterations
1020  self._test_DDP_2iter(
1021  model_gpu,
1022  model_DDP,
1023  input_cpu.cuda(gpu_subset[0]),
1024  target.cuda(gpu_subset[0]),
1025  loss,
1026  local_bs,
1027  rank,
1028  global_bs,
1029  )
1030  self._barrier()
1031 
1032  @unittest.skipIf(
1033  BACKEND == "nccl", "nccl does not support DistributedDataParallelCPU"
1034  )
1035  def test_DistributedDataParallelCPU(self):
1036  # Run a simple end to end DDP-CPU model, use result of single node
1037  # model as baseline
1038  group, group_id, rank = self._init_global_test()
1039 
1040  # cpu training setup
1041  model_base = self._create_Net()
1042 
1043  # DDP-CPU training setup
1044  model_DDP = copy.deepcopy(model_base)
1045  model_DDP = nn.parallel.deprecated.DistributedDataParallelCPU(model_DDP)
1046 
1047  # dummy data initialization
1048  local_bs = 2
1049  global_bs, input_cpu, target, loss = self._prepare_dummy_data(local_bs)
1050 
1051  # check two model parameters over 2 iterations
1052  self._test_DDP_2iter(
1053  model_base, model_DDP, input_cpu, target, loss, local_bs, rank, global_bs
1054  )
1055  self._barrier()
1056 
1057 
1058 if BACKEND == "tcp" or BACKEND == "gloo" or BACKEND == "nccl":
1059  WORLD_SIZE = os.environ["WORLD_SIZE"]
1060 
1062  MANAGER_PROCESS_RANK = -1
1063 
1064  @staticmethod
1065  def manager_join(fn):
1066  @wraps(fn)
1067  def wrapper(self):
1068  if self.rank == self.MANAGER_PROCESS_RANK:
1069  self._join_and_reduce(fn)
1070  else:
1071  fn(self)
1072 
1073  return wrapper
1074 
1075  @classmethod
1076  def setUpClass(cls):
1077  os.environ["MASTER_ADDR"] = MASTER_ADDR
1078  os.environ["MASTER_PORT"] = MASTER_PORT
1079  os.environ["WORLD_SIZE"] = WORLD_SIZE
1080  for attr in dir(cls):
1081  if attr.startswith("test"):
1082  fn = getattr(cls, attr)
1083  setattr(cls, attr, cls.manager_join(fn))
1084 
1085  def setUp(self):
1086  self.processes = []
1087  self.rank = self.MANAGER_PROCESS_RANK
1088  Barrier.init()
1089  for rank in range(int(WORLD_SIZE)):
1090  self.processes.append(self._spawn_process(rank))
1091 
1092  def tearDown(self):
1093  for p in self.processes:
1094  p.terminate()
1095 
1096  def _spawn_process(self, rank):
1097  os.environ["RANK"] = str(rank)
1098  name = "process " + str(rank)
1099  process = multiprocessing.Process(target=self._run, name=name, args=(rank,))
1100  process.start()
1101  return process
1102 
1103  def _run(self, rank):
1104  self.rank = rank
1105  try:
1106  dist.init_process_group(
1107  init_method=INIT_METHOD, backend=BACKEND, world_size=int(WORLD_SIZE)
1108  )
1109  except RuntimeError as e:
1110  if "recompile" in e.args[0]:
1111  sys.exit(SKIP_IF_BACKEND_UNAVAILABLE)
1112  # sys.exit(0)
1113  raise
1114  # self.id() == e.g. '__main__.TestDistributed.test_get_rank'
1115  # We're retreiving a corresponding test and executing it.
1116  getattr(self, self.id().split(".")[2])()
1117  sys.exit(0)
1118 
1119  def _join_and_reduce(self, fn):
1120  skip_ok = (
1121  getattr(fn, "skip_if_no_cuda_distributed", False) or
1122  getattr(fn, "skip_if_no_gpu", False) or
1123  getattr(fn, "skip_if_small_worldsize", False)
1124  )
1125  join_timeout = get_timeout(self.id())
1126  for rank, process in enumerate(self.processes):
1127  process.join(join_timeout)
1128  self.assertFalse(
1129  process.is_alive(),
1130  "Timeout waiting for rank %d to terminate" % rank)
1131 
1132  first_process = self.processes[0]
1133  for p in self.processes:
1134  self.assertEqual(p.exitcode, first_process.exitcode)
1135 
1136  if first_process.exitcode == SKIP_IF_BACKEND_UNAVAILABLE:
1137  raise unittest.SkipTest("Compiled without the " + BACKEND + " backend")
1138 
1139  if skip_ok:
1140  # do this first so we don't give an error message about
1141  # mismatched exit codes if the first isn't valid
1142  assert (
1143  first_process.exitcode == 0 or
1144  first_process.exitcode == SKIP_IF_NO_CUDA_EXIT_CODE or
1145  first_process.exitcode == SKIP_IF_NO_GPU_EXIT_CODE or
1146  first_process.exitcode == SKIP_IF_SMALL_WORLDSIZE_EXIT_CODE
1147  )
1148 
1149  if first_process.exitcode == SKIP_IF_NO_CUDA_EXIT_CODE:
1150  raise unittest.SkipTest("cuda is not available")
1151  if first_process.exitcode == SKIP_IF_NO_GPU_EXIT_CODE:
1152  raise unittest.SkipTest(
1153  "One unique gpu per process is not available"
1154  )
1155  if first_process.exitcode == SKIP_IF_SMALL_WORLDSIZE_EXIT_CODE:
1156  raise unittest.SkipTest("worldsize is too small to run group tests")
1157 
1158  self.assertEqual(first_process.exitcode, 0)
1159 
1160 
1161 elif BACKEND == "mpi":
1162  WORLD_SIZE = os.environ["WORLD_SIZE"]
1163  dist.init_process_group(init_method=INIT_METHOD, backend="mpi")
1164 
1166  pass
1167 
1168 
1169 if __name__ == "__main__":
1170  assert (
1171  not torch.cuda._initialized
1172  ), "test_distributed must not have initialized CUDA context on main process"
1173 
1174  run_tests()
def assertEqual(self, x, y, prec=None, message='', allow_inf=False)
def _test_DDP_helper(self, model, input_var, target, loss)
Module caffe2.python.layers.split.
def _test_scatter_helper(self, group, group_id, rank)
def _test_all_gather_multigpu_helper(self, group, group_id, rank, rank_to_GPU)
def _test_broadcast_helper(self, group, group_id, rank, cuda=False, rank_to_GPU=None)
def _assert_equal_param(self, param_gpu, param_DDP)
def is_available()
Definition: __init__.py:45
Definition: model.py:1
def device_count()
Definition: __init__.py:341
def _test_all_reduce_multigpu_helper(self, group, group_id, rank, rank_to_GPU, op, master_value, worker_value, expected_value)
def _test_all_gather_helper(self, group, group_id, rank, cuda=False, rank_to_GPU=None)
def _test_broadcast_multigpu_helper(self, group, group_id, rank, rank_to_GPU)
def _test_barrier_helper(self, group, group_id, rank)
def _test_gather_helper(self, group, group_id, rank)
def _test_all_reduce_helper(self, group, group_id, rank, op, master_value, worker_value, expected_value, cuda=False, rank_to_GPU=None)
def _test_DDP_2iter(self, model_base, model_DDP, input, target, loss, local_bs, rank, batch_size)
def _test_reduce_helper(self, group, group_id, rank, op, master_value, worker_value, expected_value, cuda=False, rank_to_GPU=None)
def _test_reduce_multigpu_helper(self, group, group_id, rank, rank_to_GPU, op, master_value, worker_value, expected_value)