Caffe2 - Python API
A deep learning, cross platform ML framework
test_c10d.py
1 import copy
2 import math
3 import multiprocessing
4 import os
5 import sys
6 import tempfile
7 import time
8 import unittest
9 from datetime import timedelta
10 
11 from functools import wraps
12 from collections import namedtuple
13 
14 import torch
15 import common_utils as common
16 from torch import nn
17 import torch.nn.functional as F
18 import torch.distributed as c10d
19 from torch.nn.parallel import DistributedDataParallel
20 
21 from common_utils import TestCase, load_tests, run_tests
22 from common_utils import retry_on_address_already_in_use_error
23 
24 # load_tests from common_utils is used to automatically filter tests for
25 # sharding on sandcastle. This line silences flake warnings
26 load_tests = load_tests
27 
28 if not c10d.is_available():
29  print('c10d not available, skipping tests')
30  sys.exit(0)
31 
32 
33 TIMEOUT_DEFAULT = 30
34 TIMEOUT_OVERRIDE = {}
35 
36 TestSkip = namedtuple('TestSkip', 'exit_code, message')
37 
38 TEST_SKIPS = {
39  "multi-gpu": TestSkip(75, "Need at least 2 CUDA devices"),
40  "nccl": TestSkip(76, "c10d not compiled with NCCL support"),
41  "known_issues": TestSkip(77, "Test skipped due to known issues")
42 }
43 
44 
45 def skip_if_not_multigpu(func):
46  """Multi-GPU tests requires at least 2 GPUS. Skip if this is not met."""
47  @wraps(func)
48  def wrapper(*args, **kwargs):
50  return func(*args, **kwargs)
51  sys.exit(TEST_SKIPS['multi-gpu'].exit_code)
52 
53  return wrapper
54 
55 
56 def skip_if_not_nccl(func):
57  """Skips a test if NCCL is not available (for c10d)."""
58  @wraps(func)
59  def wrapper(*args, **kwargs):
60  if hasattr(c10d, "ProcessGroupNCCL"):
61  return func(*args, **kwargs)
62  sys.exit(TEST_SKIPS['nccl'].exit_code)
63 
64  return wrapper
65 
66 
67 def skip_for_known_issues(func):
68  """Skips a test due to known issues (for c10d)."""
69  @wraps(func)
70  def wrapper(*args, **kwargs):
71  sys.exit(TEST_SKIPS['known_issues'].exit_code)
72 
73  return wrapper
74 
75 
76 def get_timeout(test_id):
77  return TIMEOUT_OVERRIDE.get(test_id.split('.')[-1], TIMEOUT_DEFAULT)
78 
79 
80 def gpus_for_rank(world_size):
81  """Multigpu tests are designed to simulate the multi nodes with multi
82  GPUs on each node. Nccl backend requires equal #GPUs in each process.
83  On a single node, all visible GPUs are evenly
84  divided to subsets, each process only uses a subset.
85  """
86  visible_devices = list(range(torch.cuda.device_count()))
87  gpus_per_process = torch.cuda.device_count() // world_size
88  gpus_for_rank = []
89  for rank in range(world_size):
90  gpus_for_rank.append(visible_devices[rank * gpus_per_process: (rank + 1) * gpus_per_process])
91  return gpus_for_rank
92 
93 
94 def simple_reduce_tests(rank, world_size):
95  return [
96  (
97  c10d.ReduceOp.SUM,
98  torch.Tensor([rank + 1.0]),
99  torch.Tensor([float(world_size * (world_size + 1) / 2)]),
100  ),
101  (
102  c10d.ReduceOp.PRODUCT,
103  torch.Tensor([rank + 1.0]),
104  torch.Tensor([float(math.factorial(world_size))]),
105  ),
106  (
107  c10d.ReduceOp.MIN,
108  torch.Tensor([rank + 1.0]),
109  torch.Tensor([1.0]),
110  ),
111  (
112  c10d.ReduceOp.MAX,
113  torch.Tensor([rank + 1.0]),
114  torch.Tensor([world_size]),
115  ),
116  ]
117 
118 
119 def simple_multi_input_reduce_tests(rank, world_size):
120  return [
121  (
122  c10d.ReduceOp.SUM,
123  [torch.Tensor([2 * rank + 0.0]), torch.Tensor([2 * rank + 1.0])],
124  torch.Tensor([float(world_size * (2 * world_size - 1))]),
125  ),
126  (
127  c10d.ReduceOp.PRODUCT,
128  [torch.Tensor([2 * rank + 1.0]), torch.Tensor([2 * rank + 2.0])],
129  torch.Tensor([float(math.factorial(2 * world_size))]),
130  ),
131  (
132  c10d.ReduceOp.MIN,
133  [torch.Tensor([2 * rank + 1.0]), torch.Tensor([2 * rank + 2.0])],
134  torch.Tensor([1.0]),
135  ),
136  (
137  c10d.ReduceOp.MAX,
138  [torch.Tensor([2 * rank + 1.0]), torch.Tensor([2 * rank + 2.0])],
139  torch.Tensor([2 * world_size]),
140  ),
141  ]
142 
143 
144 class StoreTestBase(object):
145  def _create_store(self, i):
146  raise RuntimeError("not implemented")
147 
148  def _test_set_get(self, fs):
149  fs.add("key", 1)
150  fs.add("key", 2)
151  fs.add("key", 3)
152  fs.set("key0", "value0")
153  fs.add("key3", 1)
154  fs.set("key1", "value1")
155  fs.add("key3", 2)
156  fs.set("key2", "value2")
157  fs.add("key3", 3)
158  fs.add("key3", 4)
159  fs.add("key3", 5)
160  fs.add("key3", 6)
161  self.assertEqual(b"6", fs.get("key"))
162  self.assertEqual(b"value0", fs.get("key0"))
163  self.assertEqual(b"value1", fs.get("key1"))
164  self.assertEqual(b"value2", fs.get("key2"))
165  self.assertEqual(b"21", fs.get("key3"))
166 
167  def test_set_get(self):
168  self._test_set_get(self._create_store())
169 
170 
172  def setUp(self):
173  self.file = tempfile.NamedTemporaryFile(delete=False)
174 
175  def tearDown(self):
176  pass
177 
178  def _create_store(self):
179  store = c10d.FileStore(self.file.name, 1)
180  store.set_timeout(timedelta(seconds=300))
181  return store
182 
183 
185  def setUp(self):
186  self.file = tempfile.NamedTemporaryFile(delete=False)
187  self.filestore = c10d.FileStore(self.file.name, 1)
188  self.prefix = "test_prefix"
189  self.filestore.set_timeout(timedelta(seconds=300))
190 
191  def tearDown(self):
192  pass
193 
194  def _create_store(self):
195  return c10d.PrefixStore(self.prefix, self.filestore)
196 
197 
198 def create_tcp_store(addr):
199  """
200  Creates a TCP store. Retries if the chosen port is already in use.
201  """
202  ports = []
203  for _ in range(10):
204  try:
205  port = common.find_free_port()
206  ports.append(port)
207  return c10d.TCPStore(addr, port, 1, True)
208  except RuntimeError as error:
209  if str(error) == "Address already in use":
210  continue
211  raise
212  raise RuntimeError("Unable to find free port (tried %s)" % ", ".join(ports))
213 
214 
216  def _create_store(self):
217  store = create_tcp_store('localhost')
218  store.set_timeout(timedelta(seconds=300))
219  return store
220 
221  def test_address_already_in_use(self):
222  with self.assertRaisesRegex(RuntimeError, "^Address already in use$"):
223  addr = 'localhost'
224  port = common.find_free_port()
225 
226  # Use noqa to silence flake8.
227  # Need to store in an unused variable here to ensure the first
228  # object is not destroyed before the second object is created.
229  store1 = c10d.TCPStore(addr, port, 1, True) # noqa: F841
230  store2 = c10d.TCPStore(addr, port, 1, True) # noqa: F841
231 
232 
234  def setUp(self):
235  self.tcpstore = create_tcp_store('localhost')
236  self.prefix = "test_prefix"
237  self.tcpstore.set_timeout(timedelta(seconds=300))
238 
239  def _create_store(self):
240  return c10d.PrefixStore(self.prefix, self.tcpstore)
241 
242 
244  def test_unknown_handler(self):
245  with self.assertRaisesRegex(RuntimeError, "^No rendezvous handler"):
246  c10d.rendezvous('invalid://')
247 
248 
250  @retry_on_address_already_in_use_error
251  def test_common_errors(self):
252  # TODO remove this hack
253  if not hasattr(c10d, "ProcessGroupNCCL"):
254  raise unittest.SkipTest("C10D is not built with NCCL process group,"
255  " skipping test")
256  vars = {
257  "WORLD_SIZE": "1",
258  "RANK": "0",
259  "MASTER_ADDR": "127.0.0.1",
260  "MASTER_PORT": common.find_free_port(),
261  }
262 
263  class Env(object):
264  def __init__(self, vars):
265  self.vars = vars
266 
267  def __enter__(self):
268  for key, value in self.vars.items():
269  os.environ[key] = str(value)
270 
271  def __exit__(self, type, value, traceback):
272  for key in self.vars.keys():
273  del os.environ[key]
274 
275  def without(d, key):
276  d = d.copy()
277  d.pop(key)
278  return d
279 
280  def withouts(d, keys):
281  d = d.copy()
282  for key in keys:
283  d.pop(key)
284  return d
285 
286  with Env(without(vars, 'WORLD_SIZE')):
287  with self.assertRaisesRegex(ValueError, 'WORLD_SIZE expected'):
288  gen = c10d.rendezvous('env://')
289  next(gen)
290  c10d.init_process_group(backend='nccl', world_size=1)
291  self.assertEqual(c10d.get_rank(), 0)
292  self.assertEqual(c10d.get_world_size(), 1)
293  c10d.destroy_process_group()
294 
295  with Env(without(vars, 'RANK')):
296  with self.assertRaisesRegex(ValueError, 'RANK expected'):
297  gen = c10d.rendezvous('env://')
298  next(gen)
299  c10d.init_process_group(backend='nccl', rank=0)
300  self.assertEqual(c10d.get_rank(), 0)
301  self.assertEqual(c10d.get_world_size(), 1)
302  c10d.destroy_process_group()
303 
304  with Env(withouts(vars, ['RANK', 'WORLD_SIZE'])):
305  c10d.init_process_group(backend='nccl', rank=0, world_size=1)
306  self.assertEqual(c10d.get_rank(), 0)
307  self.assertEqual(c10d.get_world_size(), 1)
308  c10d.destroy_process_group()
309 
310  with Env(vars):
311  c10d.init_process_group(backend='nccl')
312  self.assertEqual(c10d.get_rank(), 0)
313  self.assertEqual(c10d.get_world_size(), 1)
314  c10d.destroy_process_group()
315 
316  with Env(without(vars, 'MASTER_ADDR')):
317  with self.assertRaisesRegex(ValueError, 'MASTER_ADDR expected'):
318  gen = c10d.rendezvous('env://')
319  next(gen)
320 
321  with Env(without(vars, 'MASTER_PORT')):
322  with self.assertRaisesRegex(ValueError, 'MASTER_PORT expected'):
323  gen = c10d.rendezvous('env://')
324  next(gen)
325 
326  with Env(without(vars, 'WORLD_SIZE')):
327  gen = c10d.rendezvous('env://?world_size={}'.format(1))
328  _, _, size = next(gen)
329  self.assertEqual(size, 1)
330 
331  with Env(without(vars, 'RANK')):
332  gen = c10d.rendezvous('env://?rank={}'.format(0))
333  _, rank, _ = next(gen)
334  self.assertEqual(rank, 0)
335 
336  with Env(withouts(vars, ['RANK', 'WORLD_SIZE'])):
337  gen = c10d.rendezvous('env://?rank={}&world_size={}'.format(0, 1))
338  _, rank, size = next(gen)
339  self.assertEqual(rank, 0)
340  self.assertEqual(size, 1)
341 
342  @retry_on_address_already_in_use_error
343  def test_nominal(self):
344  os.environ['WORLD_SIZE'] = '1'
345  os.environ['MASTER_ADDR'] = '127.0.0.1'
346  os.environ['MASTER_PORT'] = str(common.find_free_port())
347 
348  # Single rank
349  os.environ['RANK'] = '0'
350  gen0 = c10d.rendezvous('env://')
351  store0, rank0, size0 = next(gen0)
352  self.assertEqual(0, rank0)
353  self.assertEqual(1, size0)
354 
355  store0.set("key0", "value0")
356 
357  # check with get
358  self.assertEqual(b"value0", store0.get("key0"))
359 
360 
362  def test_common_errors(self):
363  with self.assertRaisesRegex(ValueError, 'path missing'):
364  gen = c10d.rendezvous('file://?rank=0&world_size=1')
365  next(gen)
366  with self.assertRaisesRegex(ValueError, 'rank parameter missing'):
367  gen = c10d.rendezvous('file:///tmp/foo?world_size=1')
368  next(gen)
369  with self.assertRaisesRegex(ValueError, 'size parameter missing'):
370  gen = c10d.rendezvous('file:///tmp/foo?rank=0')
371  next(gen)
372 
373  def test_nominal(self):
374  with tempfile.NamedTemporaryFile(delete=False) as file:
375  url = 'file://%s?world_size=%d' % (file.name, 2)
376  gen0 = c10d.rendezvous(url + "&rank=0")
377  store0, rank0, size0 = next(gen0)
378  self.assertEqual(0, rank0)
379  self.assertEqual(2, size0)
380  gen1 = c10d.rendezvous(url + "&rank=1")
381  store1, rank1, size1 = next(gen1)
382  self.assertEqual(1, rank1)
383  self.assertEqual(2, size1)
384 
385  # Set value on both stores
386  store0.set("key0", "value0")
387  store1.set("key1", "value1")
388 
389  # Cross check with get
390  self.assertEqual(b"value0", store1.get("key0"))
391  self.assertEqual(b"value1", store0.get("key1"))
392 
393 
395  def test_common_errors(self):
396  with self.assertRaisesRegex(ValueError, 'port number missing'):
397  gen = c10d.rendezvous('tcp://127.0.0.1?rank=0&world_size=1')
398  next(gen)
399  with self.assertRaisesRegex(ValueError, 'rank parameter missing'):
400  gen = c10d.rendezvous('tcp://127.0.0.1:23456?world_size=1')
401  next(gen)
402  with self.assertRaisesRegex(ValueError, 'size parameter missing'):
403  gen = c10d.rendezvous('tcp://127.0.0.1:23456?rank=0')
404  next(gen)
405 
406  @retry_on_address_already_in_use_error
407  def test_nominal(self):
408  addr = 'localhost'
409  port = common.find_free_port()
410  url = 'tcp://%s:%d?world_size=%d' % (addr, port, 1)
411  gen0 = c10d.rendezvous(url + "&rank=0")
412  store0, rank0, size0 = next(gen0)
413  self.assertEqual(0, rank0)
414  self.assertEqual(1, size0)
415 
416  # Set value on the single store
417  store0.set("key0", "value0")
418 
419  # check with get
420  self.assertEqual(b"value0", store0.get("key0"))
421 
422 
424  MAIN_PROCESS_RANK = -1
425 
426  @property
427  def world_size(self):
428  return 4
429 
430  @staticmethod
431  def join_or_run(fn):
432  @wraps(fn)
433  def wrapper(self):
434  if self.rank == self.MAIN_PROCESS_RANK:
435  self._join_processes(fn)
436  else:
437  fn(self)
438  return wrapper
439 
440  # The main process spawns N subprocesses that run the test.
441  # This function patches overwrites every test function to either
442  # assume the role of the main process and join its subprocesses,
443  # or run the underlying test function.
444  @classmethod
445  def setUpClass(cls):
446  for attr in dir(cls):
447  if attr.startswith('test'):
448  fn = getattr(cls, attr)
449  setattr(cls, attr, cls.join_or_run(fn))
450 
451  def setUp(self):
452  self.rank = self.MAIN_PROCESS_RANK
453  self.file = tempfile.NamedTemporaryFile(delete=False)
454  self.processes = [self._spawn_process(rank) for rank in range(int(self.world_size))]
455 
456  def tearDown(self):
457  for p in self.processes:
458  p.terminate()
459 
460  def _spawn_process(self, rank):
461  name = 'process ' + str(rank)
462  process = multiprocessing.Process(target=self._run, name=name, args=(rank,))
463  process.start()
464  return process
465 
466  def _run(self, rank):
467  self.rank = rank
468 
469  # self.id() == e.g. '__main__.TestDistributed.test_get_rank'
470  # We're retreiving a corresponding test and executing it.
471  getattr(self, self.id().split(".")[2])()
472  sys.exit(0)
473 
474  def _join_processes(self, fn):
475  timeout = get_timeout(self.id())
476  start_time = time.time()
477  for p in self.processes:
478  p.join(timeout)
479  elapsed_time = time.time() - start_time
480  self._check_return_codes(elapsed_time)
481 
482  def _check_return_codes(self, elapsed_time):
483  """
484  Checks that the return codes of all spawned processes match, and skips
485  tests if they returned a return code indicating a skipping condition.
486  """
487  first_process = self.processes[0]
488  for i, p in enumerate(self.processes):
489  if p.exitcode is None:
490  raise RuntimeError('Process {} terminated or timed out after {} seconds'.format(i, elapsed_time))
491  self.assertEqual(p.exitcode, first_process.exitcode)
492  for skip in TEST_SKIPS.values():
493  if first_process.exitcode == skip.exit_code:
494  raise unittest.SkipTest(skip.message)
495  self.assertEqual(first_process.exitcode, 0)
496 
497  @property
498  def is_master(self):
499  return self.rank == 0
500 
501 
503  def opts(self, threads=2):
504  opts = c10d.ProcessGroupGloo.Options()
505  opts.devices = [c10d.ProcessGroupGloo.create_tcp_device(interface="lo")]
506  opts.timeout = 1.0
507  opts.threads = threads
508  return opts
509 
510  def test_broadcast_checks(self):
511  store = c10d.FileStore(self.file.name, self.world_size)
512  pg = c10d.ProcessGroupGloo(store, self.rank, self.world_size, self.opts())
513 
514  t1 = torch.zeros([1], dtype=torch.float32)
515  t2 = torch.zeros([1], dtype=torch.float64)
516  t3 = torch.zeros([2], dtype=torch.float32)
517 
518  with self.assertRaisesRegex(ValueError, "invalid root rank"):
519  opts = c10d.BroadcastOptions()
520  opts.rootRank = -1
521  opts.rootTensor = 0
522  pg.broadcast([t1], opts)
523 
524  with self.assertRaisesRegex(ValueError, "invalid root rank"):
525  opts = c10d.BroadcastOptions()
526  opts.rootRank = self.world_size
527  opts.rootTensor = 0
528  pg.broadcast([t1], opts)
529 
530  with self.assertRaisesRegex(ValueError, "invalid root tensor"):
531  opts = c10d.BroadcastOptions()
532  opts.rootRank = self.rank
533  opts.rootTensor = -1
534  pg.broadcast([t1], opts)
535 
536  with self.assertRaisesRegex(ValueError, "invalid root tensor"):
537  opts = c10d.BroadcastOptions()
538  opts.rootRank = self.rank
539  opts.rootTensor = 1
540  pg.broadcast([t1], opts)
541 
542  with self.assertRaisesRegex(ValueError, "invalid root tensor"):
543  opts = c10d.BroadcastOptions()
544  opts.rootRank = self.rank
545  opts.rootTensor = 0
546  pg.broadcast([], opts)
547 
548  with self.assertRaisesRegex(ValueError, "invalid tensor type"):
549  opts = c10d.BroadcastOptions()
550  opts.rootRank = self.rank
551  opts.rootTensor = 0
552  pg.broadcast([t1, t2], opts)
553 
554  with self.assertRaisesRegex(ValueError, "invalid tensor size"):
555  opts = c10d.BroadcastOptions()
556  opts.rootRank = self.rank
557  opts.rootTensor = 0
558  pg.broadcast([t1, t3], opts)
559 
560  def _test_broadcast_basics(self, fn):
561  store = c10d.FileStore(self.file.name, self.world_size)
562  pg = c10d.ProcessGroupGloo(store, self.rank, self.world_size, self.opts())
563 
564  def broadcast(xs, rootRank, rootTensor):
565  opts = c10d.BroadcastOptions()
566  opts.rootRank = rootRank
567  opts.rootTensor = rootTensor
568  work = pg.broadcast(xs, opts)
569  work.wait()
570 
571  # Every rank is root once
572  for i in range(self.world_size):
573  # Run with 1 input tensor
574  x = fn(torch.Tensor([self.rank]))
575  broadcast([x], i, 0)
576  self.assertEqual(torch.Tensor([i]), x)
577 
578  # Run with 2 input tensors
579  num = 2
580  for j in range(num):
581  xs = [
582  fn(torch.Tensor([self.rank * num + 0.0])),
583  fn(torch.Tensor([self.rank * num + 1.0])),
584  ]
585 
586  broadcast(xs, i, j)
587  self.assertEqual(torch.Tensor([i * num + j]), xs[0])
588  self.assertEqual(torch.Tensor([i * num + j]), xs[1])
589 
590  # Test overloaded convenience function
591  x = torch.Tensor([self.rank + 1.0])
592  work = pg.broadcast(x, root=0)
593  work.wait()
594  self.assertEqual(torch.Tensor([1.0]), x)
595 
596  def test_broadcast_basics(self):
597  self._test_broadcast_basics(lambda t: t.clone())
598 
599  @skip_if_not_multigpu
600  def test_broadcast_basics_cuda(self):
601  self._test_broadcast_basics(lambda t: t.clone().cuda())
602 
603  def _test_broadcast_stress(self, inputs):
604  store = c10d.FileStore(self.file.name, self.world_size)
605  pg = c10d.ProcessGroupGloo(store, self.rank, self.world_size, self.opts(threads=8))
606  work_handles = [
607  pg.broadcast(inputs[i], root=(i % self.world_size))
608  for i in range(len(inputs))
609  ]
610  for i, work_handle in enumerate(work_handles):
611  work_handle.wait()
612  self.assertEqual(
613  torch.Tensor([
614  (i * self.world_size) + (i % self.world_size)
615  ]),
616  inputs[i],
617  "Mismatch in iteration %d" % i,
618  )
619 
620  def test_broadcast_stress(self):
621  inputs = [torch.Tensor([i * self.world_size + self.rank]) for i in range(1000)]
622  self._test_broadcast_stress(inputs)
623 
624  @skip_if_not_multigpu
625  def test_broadcast_stress_cuda(self):
626  inputs = [torch.Tensor([i * self.world_size + self.rank]).cuda() for i in range(1000)]
627  self._test_broadcast_stress(inputs)
628 
629  def test_allreduce_checks(self):
630  store = c10d.FileStore(self.file.name, self.world_size)
631  pg = c10d.ProcessGroupGloo(store, self.rank, self.world_size, self.opts())
632 
633  t1 = torch.zeros([1], dtype=torch.float32)
634  t2 = torch.zeros([1], dtype=torch.float64)
635  t3 = torch.zeros([2], dtype=torch.float32)
636 
637  with self.assertRaisesRegex(ValueError, "requires non-empty tensor list"):
638  opts = c10d.AllreduceOptions()
639  pg.allreduce([], opts)
640 
641  with self.assertRaisesRegex(ValueError, "invalid tensor type"):
642  opts = c10d.AllreduceOptions()
643  pg.allreduce([t1, t2], opts)
644 
645  with self.assertRaisesRegex(ValueError, "invalid tensor size"):
646  opts = c10d.AllreduceOptions()
647  pg.allreduce([t1, t3], opts)
648 
649  def _test_allreduce_basics(self, fn):
650  store = c10d.FileStore(self.file.name, self.world_size)
651  pg = c10d.ProcessGroupGloo(store, self.rank, self.world_size, self.opts())
652 
653  # Single input tests
654  tests = simple_reduce_tests(self.rank, self.world_size)
655  for (op, input, output) in tests:
656  opts = c10d.AllreduceOptions()
657  opts.reduceOp = op
658  tensor = fn(input)
659  work = pg.allreduce([tensor], opts)
660  work.wait()
661  self.assertEqual(output, tensor)
662 
663  # Multi input tests
664  tests = simple_multi_input_reduce_tests(self.rank, self.world_size)
665  for (op, inputs, output) in tests:
666  opts = c10d.AllreduceOptions()
667  opts.reduceOp = op
668  tensors = [fn(input) for input in inputs]
669  work = pg.allreduce(tensors, opts)
670  work.wait()
671  for tensor in tensors:
672  self.assertEqual(output, tensor)
673 
674  # Test overloaded convenience function (defaults to using sum)
675  x = fn(torch.Tensor([self.rank + 1.0]))
676  work = pg.allreduce(x)
677  work.wait()
678  self.assertEqual(torch.Tensor([float(self.world_size * (self.world_size + 1) / 2)]), x)
679 
680  def test_allreduce_basics(self):
681  self._test_allreduce_basics(lambda t: t.clone())
682 
683  @skip_if_not_multigpu
684  def test_allreduce_basics_cuda(self):
685  self._test_allreduce_basics(lambda t: t.clone().cuda())
686 
687  def _test_allreduce_stress(self, inputs):
688  store = c10d.FileStore(self.file.name, self.world_size)
689  pg = c10d.ProcessGroupGloo(store, self.rank, self.world_size, self.opts(threads=8))
690  work_handles = [pg.allreduce(inputs[i]) for i in range(len(inputs))]
691  for i, work_handle in enumerate(work_handles):
692  work_handle.wait()
693  self.assertEqual(
694  torch.Tensor([
695  (i * self.world_size) +
696  (self.world_size * (self.world_size - 1) / 2)
697  ]),
698  inputs[i],
699  "Mismatch in iteration %d" % i,
700  )
701 
702  def test_allreduce_stress(self):
703  inputs = [torch.Tensor([i + self.rank]) for i in range(1000)]
704  self._test_allreduce_stress(inputs)
705 
706  @skip_if_not_multigpu
707  def test_allreduce_stress_cuda(self):
708  inputs = [torch.Tensor([i + self.rank]).cuda() for i in range(1000)]
709  self._test_allreduce_stress(inputs)
710 
711  def test_scatter_checks(self):
712  store = c10d.FileStore(self.file.name, self.world_size)
713  pg = c10d.ProcessGroupGloo(store, self.rank, self.world_size, self.opts())
714 
715  t1 = torch.zeros([1], dtype=torch.float32)
716  t2 = torch.zeros([1], dtype=torch.float64)
717  t3 = torch.zeros([2], dtype=torch.float32)
718 
719  with self.assertRaisesRegex(ValueError, "invalid root rank"):
720  opts = c10d.ScatterOptions()
721  opts.rootRank = -1
722  pg.scatter([t1], [], opts)
723 
724  with self.assertRaisesRegex(ValueError, "invalid root rank"):
725  opts = c10d.ScatterOptions()
726  opts.rootRank = self.world_size
727  pg.scatter([t1], [], opts)
728 
729  with self.assertRaisesRegex(ValueError, "requires a single-element output tensor list"):
730  opts = c10d.ScatterOptions()
731  opts.rootRank = 0
732  pg.scatter([], [], opts)
733 
734  with self.assertRaisesRegex(ValueError, "requires a single-element output tensor list"):
735  opts = c10d.ScatterOptions()
736  opts.rootRank = 0
737  pg.scatter([t1, t1], [], opts)
738 
739  with self.assertRaisesRegex(ValueError, "requires a single-element input list"):
740  opts = c10d.ScatterOptions()
741  opts.rootRank = self.rank
742  pg.scatter([t1], [], opts)
743 
744  with self.assertRaisesRegex(ValueError, "requires a single-element input list"):
745  opts = c10d.ScatterOptions()
746  opts.rootRank = self.rank
747  pg.scatter([t1], [[t1] * self.world_size, [t1] * self.world_size], opts)
748 
749  with self.assertRaisesRegex(ValueError, "requires a single-element input list"):
750  opts = c10d.ScatterOptions()
751  opts.rootRank = self.rank
752  pg.scatter([t1], [[t1] * (self.world_size - 1)], opts)
753 
754  with self.assertRaisesRegex(ValueError, "requires a single-element input list"):
755  opts = c10d.ScatterOptions()
756  opts.rootRank = self.rank
757  pg.scatter([t1], [[t1] * (self.world_size + 1)], opts)
758 
759  with self.assertRaisesRegex(ValueError, "requires a single-element input list"):
760  opts = c10d.ScatterOptions()
761  opts.rootRank = self.rank
762  pg.scatter([t1], [[t1] * (self.world_size + 1)], opts)
763 
764  with self.assertRaisesRegex(ValueError, "invalid tensor type"):
765  opts = c10d.ScatterOptions()
766  opts.rootRank = self.rank
767  pg.scatter([t1], [[t2] * self.world_size], opts)
768 
769  with self.assertRaisesRegex(ValueError, "invalid tensor size"):
770  opts = c10d.ScatterOptions()
771  opts.rootRank = self.rank
772  pg.scatter([t1], [[t3] * self.world_size], opts)
773 
774  with self.assertRaisesRegex(ValueError, "requires empty input on non-root"):
775  opts = c10d.ScatterOptions()
776  opts.rootRank = (self.rank + 1) % self.world_size
777  pg.scatter([t1], [[t1] * self.world_size], opts)
778 
779  def _test_scatter_basics(self, fn):
780  store = c10d.FileStore(self.file.name, self.world_size)
781  pg = c10d.ProcessGroupGloo(store, self.rank, self.world_size, self.opts())
782 
783  # Preallocate tensors for input/output
784  input = [fn(torch.Tensor([self.rank])) for _ in range(self.world_size)]
785  outputs = [fn(torch.Tensor([-1])) for _ in range(self.world_size)]
786 
787  # Take turns being the scatter root and accumulate work items
788  work = []
789  for i in range(self.world_size):
790  opts = c10d.ScatterOptions()
791  opts.rootRank = i
792  if i == self.rank:
793  work.append(pg.scatter([outputs[i]], [input], opts))
794  else:
795  work.append(pg.scatter([outputs[i]], [], opts))
796 
797  # Wait for work to complete
798  for i in range(self.world_size):
799  work[i].wait()
800  self.assertEqual(torch.Tensor([i]), outputs[i])
801 
802  def test_scatter_basics(self):
803  self._test_scatter_basics(lambda t: t.clone())
804 
805  @skip_if_not_multigpu
806  def test_scatter_basics_cuda(self):
807  self._test_scatter_basics(lambda t: t.clone().cuda())
808 
809  def _test_scatter_stress(self, inputs, fn):
810  store = c10d.FileStore(self.file.name, self.world_size)
811  pg = c10d.ProcessGroupGloo(store, self.rank, self.world_size, self.opts(threads=8))
812  outputs = [
813  [fn(torch.Tensor([-1])) for _ in range(self.world_size)]
814  for _ in range(len(inputs))
815  ]
816  work_handles = []
817  for i in range(len(inputs)):
818  for root in range(self.world_size):
819  opts = c10d.ScatterOptions()
820  opts.rootRank = root
821  if root == self.rank:
822  work = pg.scatter([outputs[i][root]], [[fn(e) for e in inputs[i]]], opts)
823  else:
824  work = pg.scatter([outputs[i][root]], [], opts)
825  work_handles.append(work)
826 
827  for i, work_handle in enumerate(work_handles):
828  work_handle.wait()
829  iter = i // self.world_size
830  root = i % self.world_size
831 
832  self.assertEqual(
833  torch.Tensor([iter + root]),
834  outputs[iter][root],
835  "Mismatch in iteration %d for rank %d" % (iter, root)
836  )
837 
838  def test_scatter_stress(self):
839  inputs = [
840  [torch.Tensor([i + self.rank]) for _ in range(self.world_size)]
841  for i in range(1000)
842  ]
843  self._test_scatter_stress(inputs, lambda t: t.clone())
844 
845  @unittest.skip("Test is flaky, see https://github.com/pytorch/pytorch/issues/15963")
846  @skip_if_not_multigpu
847  def test_scatter_stress_cuda(self):
848  inputs = [
849  [torch.Tensor([i + self.rank]) for _ in range(self.world_size)]
850  for i in range(1000)
851  ]
852  self._test_scatter_stress(inputs, lambda t: t.clone().cuda())
853 
854  def test_gather_checks(self):
855  store = c10d.FileStore(self.file.name, self.world_size)
856  pg = c10d.ProcessGroupGloo(store, self.rank, self.world_size, self.opts())
857 
858  t1 = torch.zeros([1], dtype=torch.float32)
859  t2 = torch.zeros([1], dtype=torch.float64)
860  t3 = torch.zeros([2], dtype=torch.float32)
861 
862  with self.assertRaisesRegex(ValueError, "invalid root rank"):
863  opts = c10d.GatherOptions()
864  opts.rootRank = -1
865  pg.gather([], [t1], opts)
866 
867  with self.assertRaisesRegex(ValueError, "invalid root rank"):
868  opts = c10d.GatherOptions()
869  opts.rootRank = self.world_size
870  pg.gather([], [t1], opts)
871 
872  with self.assertRaisesRegex(ValueError, "requires a single-element input tensor list"):
873  opts = c10d.GatherOptions()
874  opts.rootRank = 0
875  pg.gather([], [], opts)
876 
877  with self.assertRaisesRegex(ValueError, "requires a single-element input tensor list"):
878  opts = c10d.GatherOptions()
879  opts.rootRank = 0
880  pg.gather([], [t1, t1], opts)
881 
882  with self.assertRaisesRegex(ValueError, "requires a single-element output list"):
883  opts = c10d.GatherOptions()
884  opts.rootRank = self.rank
885  pg.gather([], [t1], opts)
886 
887  with self.assertRaisesRegex(ValueError, "requires a single-element output list"):
888  opts = c10d.GatherOptions()
889  opts.rootRank = self.rank
890  pg.gather([[t1] * self.world_size, [t1] * self.world_size], [t1], opts)
891 
892  with self.assertRaisesRegex(ValueError, "requires a single-element output list"):
893  opts = c10d.GatherOptions()
894  opts.rootRank = self.rank
895  pg.gather([[t1] * (self.world_size - 1)], [t1], opts)
896 
897  with self.assertRaisesRegex(ValueError, "requires a single-element output list"):
898  opts = c10d.GatherOptions()
899  opts.rootRank = self.rank
900  pg.gather([[t1] * (self.world_size + 1)], [t1], opts)
901 
902  with self.assertRaisesRegex(ValueError, "invalid tensor type"):
903  opts = c10d.GatherOptions()
904  opts.rootRank = self.rank
905  pg.gather([[t2] * self.world_size], [t1], opts)
906 
907  with self.assertRaisesRegex(ValueError, "invalid tensor size"):
908  opts = c10d.GatherOptions()
909  opts.rootRank = self.rank
910  pg.gather([[t3] * self.world_size], [t1], opts)
911 
912  with self.assertRaisesRegex(ValueError, "requires empty output on non-root"):
913  opts = c10d.GatherOptions()
914  opts.rootRank = (self.rank + 1) % self.world_size
915  pg.gather([[t1] * self.world_size], [t1], opts)
916 
917  def _test_gather_basics(self, fn):
918  store = c10d.FileStore(self.file.name, self.world_size)
919  pg = c10d.ProcessGroupGloo(store, self.rank, self.world_size, self.opts())
920 
921  # Preallocate tensors for input/output
922  input = [fn(torch.Tensor([self.rank]))]
923  outputs = [fn(torch.Tensor([-1])) for _ in range(self.world_size)]
924 
925  # Take turns being the gather root and accumulate work items
926  work = []
927  for i in range(self.world_size):
928  opts = c10d.GatherOptions()
929  opts.rootRank = i
930  if i == self.rank:
931  work.append(pg.gather([outputs], input, opts))
932  else:
933  work.append(pg.gather([], input, opts))
934 
935  # Wait for work to complete
936  expected = [torch.Tensor([rank]) for rank in range(self.world_size)]
937  for i in range(self.world_size):
938  work[i].wait()
939  if i == self.rank:
940  self.assertEqual(expected, outputs)
941 
942  def test_gather_basics(self):
943  self._test_gather_basics(lambda t: t.clone())
944 
945  @skip_if_not_multigpu
946  def test_gather_basics_cuda(self):
947  self._test_gather_basics(lambda t: t.clone().cuda())
948 
949  def _test_gather_stress(self, inputs, fn):
950  store = c10d.FileStore(self.file.name, self.world_size)
951  pg = c10d.ProcessGroupGloo(store, self.rank, self.world_size, self.opts(threads=8))
952  work_handles = []
953  outputs = [
954  [
955  [fn(torch.Tensor([-1])) for _ in range(self.world_size)]
956  ] for _ in range(len(inputs))
957  ]
958  expected_outputs = [
959  [
960  [torch.Tensor([i + j]) for j in range(self.world_size)]
961  ] for i in range(len(inputs))
962  ]
963  for i in range(len(inputs)):
964  for root in range(self.world_size):
965  opts = c10d.GatherOptions()
966  opts.rootRank = root
967  if root == self.rank:
968  work = pg.gather(outputs[i], [fn(inputs[i])], opts)
969  else:
970  work = pg.gather([], [fn(inputs[i])], opts)
971  work_handles.append(work)
972 
973  for i, work_handle in enumerate(work_handles):
974  work_handle.wait()
975  iter = i // self.world_size
976  root = i % self.world_size
977  if root == self.rank:
978  self.assertEqual(
979  expected_outputs[iter],
980  outputs[iter],
981  "Mismatch in iteration %d for root %d" % (iter, root)
982  )
983 
984  def test_gather_stress(self):
985  inputs = [torch.Tensor([i + self.rank]) for i in range(1000)]
986  self._test_gather_stress(inputs, lambda t: t.clone())
987 
988  @skip_if_not_multigpu
989  def test_gather_stress_cuda(self):
990  inputs = [torch.Tensor([i + self.rank]).cuda() for i in range(1000)]
991  self._test_gather_stress(inputs, lambda t: t.clone().cuda())
992 
993  def test_allgather_checks(self):
994  store = c10d.FileStore(self.file.name, self.world_size)
995  pg = c10d.ProcessGroupGloo(store, self.rank, self.world_size, self.opts())
996 
997  t1 = torch.zeros([1], dtype=torch.float32)
998  t2 = torch.zeros([1], dtype=torch.float64)
999  t3 = torch.zeros([2], dtype=torch.float32)
1000 
1001  with self.assertRaisesRegex(ValueError, "requires non-empty input tensor list"):
1002  pg.allgather([], [])
1003 
1004  with self.assertRaisesRegex(ValueError, "requires input/output tensor lists to have the same length"):
1005  pg.allgather([], [t1])
1006 
1007  with self.assertRaisesRegex(ValueError, "requires input/output tensor lists to have the same length"):
1008  pg.allgather([[t1] * self.world_size, [t1] * self.world_size], [t1])
1009 
1010  with self.assertRaisesRegex(ValueError, "invalid output tensor list"):
1011  pg.allgather([[t1] * (self.world_size - 1)], [t1])
1012 
1013  with self.assertRaisesRegex(ValueError, "invalid output tensor list"):
1014  pg.allgather([[t1] * (self.world_size + 1)], [t1])
1015 
1016  with self.assertRaisesRegex(ValueError, "invalid tensor type"):
1017  pg.allgather([[t1, t1] * (self.world_size), [t1, t1] * (self.world_size)], [t1, t2])
1018 
1019  with self.assertRaisesRegex(ValueError, "invalid tensor size"):
1020  pg.allgather([[t1, t1] * (self.world_size), [t1, t1] * (self.world_size)], [t1, t3])
1021 
1022  with self.assertRaisesRegex(ValueError, "invalid tensor type"):
1023  pg.allgather([([t1, t2] * (self.world_size))[:self.world_size]], [t1])
1024 
1025  with self.assertRaisesRegex(ValueError, "invalid tensor size"):
1026  pg.allgather([([t1, t3] * (self.world_size))[:self.world_size]], [t1])
1027 
1028  def _test_allgather_basics(self, fn):
1029  store = c10d.FileStore(self.file.name, self.world_size)
1030  pg = c10d.ProcessGroupGloo(store, self.rank, self.world_size, self.opts())
1031 
1032  # Run with N input tensor per rank
1033  for n in [1, 2, 3]:
1034  input = [
1035  fn(torch.Tensor([n * self.rank + i])) for i in range(n)
1036  ]
1037  output = [
1038  [
1039  fn(torch.Tensor([-1])) for _ in range(n * self.world_size)
1040  ] for _ in range(n)
1041  ]
1042  expected_output = [
1043  [
1044  torch.Tensor([i]) for i in range(n * self.world_size)
1045  ] for _ in range(n)
1046  ]
1047  work = pg.allgather(output, input)
1048  work.wait()
1049  self.assertEqual(expected_output, output)
1050 
1051  def test_allgather_basics(self):
1052  self._test_allgather_basics(lambda t: t.clone())
1053 
1054  @skip_if_not_multigpu
1055  def test_allgather_basics_cuda(self):
1056  self._test_allgather_basics(lambda t: t.clone().cuda())
1057 
1058  def _test_allgather_stress(self, inputs, fn):
1059  store = c10d.FileStore(self.file.name, self.world_size)
1060  pg = c10d.ProcessGroupGloo(store, self.rank, self.world_size, self.opts(threads=8))
1061  work_handles = []
1062  outputs = [
1063  [
1064  [fn(torch.Tensor([-1])) for _ in range(self.world_size)]
1065  ] for _ in range(len(inputs))
1066  ]
1067  expected_outputs = [
1068  [
1069  [torch.Tensor([i + j]) for j in range(self.world_size)]
1070  ] for i in range(len(inputs))
1071  ]
1072  for i in range(len(inputs)):
1073  work = pg.allgather(outputs[i], [fn(inputs[i])])
1074  work_handles.append(work)
1075 
1076  for i, work_handle in enumerate(work_handles):
1077  work_handle.wait()
1078  self.assertEqual(
1079  expected_outputs[i],
1080  outputs[i],
1081  "Mismatch in iteration %d" % i
1082  )
1083 
1084  def test_allgather_stress(self):
1085  inputs = [torch.Tensor([i + self.rank]) for i in range(1000)]
1086  self._test_allgather_stress(inputs, lambda t: t.clone())
1087 
1088  @skip_if_not_multigpu
1089  def test_allgather_stress_cuda(self):
1090  inputs = [torch.Tensor([i + self.rank]).cuda() for i in range(1000)]
1091  self._test_allgather_stress(inputs, lambda t: t.clone().cuda())
1092 
1093  def test_reduce_checks(self):
1094  store = c10d.FileStore(self.file.name, self.world_size)
1095  pg = c10d.ProcessGroupGloo(store, self.rank, self.world_size, self.opts())
1096 
1097  t1 = torch.zeros([1], dtype=torch.float32)
1098 
1099  with self.assertRaisesRegex(ValueError, "invalid root rank"):
1100  opts = c10d.ReduceOptions()
1101  opts.rootRank = -1
1102  opts.rootTensor = 0
1103  pg.reduce([t1], opts)
1104 
1105  with self.assertRaisesRegex(ValueError, "invalid root rank"):
1106  opts = c10d.ReduceOptions()
1107  opts.rootRank = self.world_size
1108  opts.rootTensor = 0
1109  pg.reduce([t1], opts)
1110 
1111  with self.assertRaisesRegex(ValueError, "invalid root tensor"):
1112  opts = c10d.ReduceOptions()
1113  opts.rootRank = self.rank
1114  opts.rootTensor = 1
1115  pg.reduce([t1], opts)
1116 
1117  with self.assertRaisesRegex(ValueError, "requires a single-element tensor list"):
1118  opts = c10d.ReduceOptions()
1119  opts.rootRank = self.rank
1120  opts.rootTensor = 0
1121  pg.reduce([t1, t1], opts)
1122 
1123  def _test_reduce_basics(self, fn):
1124  store = c10d.FileStore(self.file.name, self.world_size)
1125  pg = c10d.ProcessGroupGloo(store, self.rank, self.world_size, self.opts())
1126  for (op, input, output) in simple_reduce_tests(self.rank, self.world_size):
1127  for root in range(self.world_size):
1128  opts = c10d.ReduceOptions()
1129  opts.reduceOp = op
1130  opts.rootRank = root
1131  tmp = fn(input)
1132  work = pg.reduce([tmp], opts)
1133  work.wait()
1134  if root == self.rank:
1135  self.assertEqual(output, tmp)
1136 
1137  def test_reduce_basics(self):
1138  self._test_reduce_basics(lambda t: t.clone())
1139 
1140  @skip_if_not_multigpu
1141  def test_reduce_basics_cuda(self):
1142  self._test_reduce_basics(lambda t: t.clone().cuda())
1143 
1144  def _test_reduce_stress(self, inputs):
1145  store = c10d.FileStore(self.file.name, self.world_size)
1146  pg = c10d.ProcessGroupGloo(store, self.rank, self.world_size, self.opts(threads=8))
1147  work_handles = []
1148  outputs = []
1149  for i in range(len(inputs)):
1150  for root in range(self.world_size):
1151  opts = c10d.ReduceOptions()
1152  opts.rootRank = root
1153  tmp = inputs[i].clone()
1154  outputs.append(tmp)
1155  work = pg.reduce([tmp], opts)
1156  work_handles.append(work)
1157 
1158  for i, work_handle in enumerate(work_handles):
1159  work_handle.wait()
1160  iter = i // self.world_size
1161  root = i % self.world_size
1162  if root == self.rank:
1163  self.assertEqual(
1164  torch.Tensor([
1165  (iter * self.world_size) +
1166  (self.world_size * (self.world_size - 1) / 2)
1167  ]),
1168  outputs[i],
1169  "Mismatch in iteration %d with root rank %d" % (iter, root),
1170  )
1171 
1172  def test_reduce_stress(self):
1173  inputs = [torch.Tensor([i + self.rank]) for i in range(1000)]
1174  self._test_reduce_stress(inputs)
1175 
1176  @skip_if_not_multigpu
1177  def test_reduce_stress_cuda(self):
1178  inputs = [torch.Tensor([i + self.rank]).cuda() for i in range(1000)]
1179  self._test_reduce_stress(inputs)
1180 
1181  def test_send_recv_all_to_all(self):
1182  store = c10d.FileStore(self.file.name, self.world_size)
1183  pg = c10d.ProcessGroupGloo(store, self.rank, self.world_size, self.opts())
1184 
1185  # Preallocate tensors for input/output
1186  inputs = [torch.Tensor([self.rank]) for _ in range(self.world_size)]
1187  outputs = [torch.Tensor([-1]) for _ in range(self.world_size)]
1188 
1189  # Issue sends
1190  send_work = []
1191  for i in range(self.world_size):
1192  if i == self.rank:
1193  continue
1194  send_work.append(pg.send([inputs[i]], i, 0))
1195 
1196  # Issue recvs
1197  recv_work = []
1198  for i in range(self.world_size):
1199  if i == self.rank:
1200  continue
1201  recv_work.append(pg.recv([outputs[i]], i, 0))
1202 
1203  # Wait for sends to complete
1204  for work in send_work:
1205  work.wait()
1206  self.assertTrue(work.is_completed())
1207 
1208  # Wait for recvs to complete
1209  for work in recv_work:
1210  work.wait()
1211  self.assertTrue(work.is_completed())
1212 
1213  # Test that every output other than our own contains the respective rank
1214  for i in range(self.world_size):
1215  if i == self.rank:
1216  continue
1217  self.assertEqual(torch.Tensor([i]), outputs[i])
1218 
1219  def test_timeout_kwarg(self):
1220  store = c10d.FileStore(self.file.name, self.world_size)
1221  pg = c10d.ProcessGroupGloo(
1222  store,
1223  self.rank,
1224  self.world_size,
1225  timeout=timedelta(seconds=0.5))
1226 
1227  # Wait on barrier
1228  pg.barrier().wait()
1229 
1230  # Sleep on one of the processes to trigger barrier timeout
1231  if self.rank == 0:
1232  time.sleep(1.0)
1233 
1234  # The barrier will now time out
1235  with self.assertRaisesRegex(RuntimeError, " (Timed out|closed) "):
1236  pg.barrier().wait()
1237 
1238  def test_barrier_implies_wait(self):
1239  store = c10d.FileStore(self.file.name, self.world_size)
1240  pg = c10d.ProcessGroupGloo(store, self.rank, self.world_size)
1241 
1242  # Kick off allreduce operations
1243  size = (100, 100)
1244  num = 16
1245  tensors = [torch.full(size, float(i)) for i in range(num)]
1246  for tensor in tensors:
1247  # Note: leak the returned work handle
1248  pg.allreduce(tensor)
1249 
1250  # Barrier should ensure all previous work has completed
1251  pg.barrier().wait()
1252 
1253  for i, tensor in enumerate(tensors):
1254  self.assertEqual(torch.full(size, float(i * self.world_size)), tensor)
1255 
1256 
1258  MAIN_PROCESS_RANK = 0
1259 
1260  def setUp(self):
1261  if not hasattr(c10d, "ProcessGroupNCCL"):
1262  raise unittest.SkipTest("C10D is not built with NCCL process group,"
1263  " skipping test")
1264 
1265  self.rank = self.MAIN_PROCESS_RANK
1266  self.world_size = 1
1267  self.file = tempfile.NamedTemporaryFile(delete=False)
1269  if self.num_gpus < 2:
1270  raise unittest.SkipTest("NCCL test requires 2+ GPUs")
1271 
1272  def tearDown(self):
1273  pass
1274 
1275  def test_broadcast_ops(self):
1276  store = c10d.FileStore(self.file.name, self.world_size)
1277  pg = c10d.ProcessGroupNCCL(store, self.rank, self.world_size)
1278 
1279  def broadcast(xs, rootRank, rootTensor):
1280  opts = c10d.BroadcastOptions()
1281  opts.rootRank = rootRank
1282  opts.rootTensor = rootTensor
1283  work = pg.broadcast(xs, opts)
1284  work.wait()
1285 
1286  # for every root tensor
1287  for rt in range(self.num_gpus):
1288  tensors = []
1289  for i in range(self.num_gpus):
1290  tensors.append(torch.Tensor([i]).cuda(i))
1291 
1292  broadcast(tensors, self.rank, rt)
1293 
1294  for i in range(self.num_gpus):
1295  self.assertEqual(tensors[i], tensors[rt])
1296 
1297  def test_allreduce_ops(self):
1298  store = c10d.FileStore(self.file.name, self.world_size)
1299  pg = c10d.ProcessGroupNCCL(store, self.rank, self.world_size)
1300 
1301  def allreduce(tensors, op):
1302  opts = c10d.AllreduceOptions()
1303  opts.reduceOp = op
1304  work = pg.allreduce(tensors, opts)
1305  work.wait()
1306 
1307  # Sum
1308  tensors = []
1309  for i in range(self.num_gpus):
1310  tensors.append(torch.Tensor([i + 1]).cuda(i))
1311 
1312  allreduce(tensors, c10d.ReduceOp.SUM)
1313 
1314  for i in range(self.num_gpus):
1315  self.assertEqual(
1316  torch.Tensor([float(self.num_gpus * (self.num_gpus + 1) / 2)]),
1317  tensors[i])
1318 
1319  # Product
1320  tensors = []
1321  for i in range(self.num_gpus):
1322  tensors.append(torch.Tensor([i + 1]).cuda(i))
1323 
1324  allreduce(tensors, c10d.ReduceOp.PRODUCT)
1325 
1326  for i in range(self.num_gpus):
1327  self.assertEqual(
1328  torch.Tensor([float(math.factorial(self.num_gpus))]),
1329  tensors[i])
1330 
1331  # Min
1332  tensors = []
1333  for i in range(self.num_gpus):
1334  tensors.append(torch.Tensor([i + 1]).cuda(i))
1335 
1336  allreduce(tensors, c10d.ReduceOp.MIN)
1337 
1338  for i in range(self.num_gpus):
1339  self.assertEqual(torch.Tensor([1.0]), tensors[i])
1340 
1341  # Max
1342  tensors = []
1343  for i in range(self.num_gpus):
1344  tensors.append(torch.Tensor([i + 1]).cuda(i))
1345 
1346  allreduce(tensors, c10d.ReduceOp.MAX)
1347 
1348  for i in range(self.num_gpus):
1349  self.assertEqual(torch.Tensor([self.num_gpus]), tensors[i])
1350 
1351  def test_reduce_ops(self):
1352  store = c10d.FileStore(self.file.name, self.world_size)
1353  pg = c10d.ProcessGroupNCCL(store, self.rank, self.world_size)
1354 
1355  def reduce(xs, rootRank, rootTensor):
1356  opts = c10d.ReduceOptions()
1357  opts.rootRank = rootRank
1358  opts.rootTensor = rootTensor
1359  work = pg.reduce(xs, opts)
1360  work.wait()
1361 
1362  # for every root tensor
1363  for rt in range(self.num_gpus):
1364  tensors = []
1365  for i in range(self.num_gpus):
1366  tensors.append(torch.Tensor([i + 1]).cuda(i))
1367 
1368  reduce(tensors, self.rank, rt)
1369 
1370  self.assertEqual(
1371  torch.Tensor([float(self.num_gpus * (self.num_gpus + 1) / 2)]),
1372  tensors[rt])
1373 
1374  def test_allgather_ops(self):
1375  store = c10d.FileStore(self.file.name, self.world_size)
1376  pg = c10d.ProcessGroupNCCL(store, self.rank, self.world_size)
1377 
1378  def allgather(output_ts, input_ts):
1379  work = pg.allgather(output_ts, input_ts)
1380  work.wait()
1381 
1382  tensors = []
1383  output_ts = [[] for _ in range(self.num_gpus)]
1384 
1385  for idx, ls in enumerate(output_ts):
1386  for _ in range(self.world_size * self.num_gpus):
1387  ls.append(torch.Tensor([0]).cuda(idx))
1388 
1389  for i in range(self.num_gpus):
1390  tensors.append(torch.Tensor([i]).cuda(i))
1391 
1392  allgather(output_ts, tensors)
1393 
1394  # Verification
1395  for device_ts in output_ts:
1396  for s_idx, t in enumerate(device_ts):
1397  self.assertEqual(torch.Tensor([s_idx]), t)
1398 
1399  def test_barrier(self):
1400  store = c10d.FileStore(self.file.name, self.world_size)
1401  pg = c10d.ProcessGroupNCCL(store, self.rank, self.world_size)
1402 
1403  def allreduce(tensors):
1404  opts = c10d.AllreduceOptions()
1405  work = pg.allreduce(tensors, opts)
1406  return work
1407 
1408  # Making the collective to operate on
1409  # 1, 2, 3, 4, .... self.num_gpus GPUs
1410  tensors_list = [[] for _ in range(2, self.num_gpus + 1)]
1411  for i in range(2, self.num_gpus + 1):
1412  for j in range(i):
1413  tensors_list[i - 2].append(torch.Tensor([j + 1]).cuda(j))
1414 
1415  works = []
1416  for tensors in tensors_list:
1417  work = allreduce(tensors)
1418  works.append(work)
1419 
1420  # Barrier will ensure that all previous work is completed
1421  pg.barrier().wait()
1422 
1423  for i in range(2, self.num_gpus + 1):
1424  for j in range(i):
1425  self.assertEqual(
1426  torch.Tensor([float(i * (i + 1) / 2)]),
1427  tensors_list[i - 2][j])
1428 
1429 
1430 class Net(nn.Module):
1431  def __init__(self):
1432  super(Net, self).__init__()
1433  self.fc1 = nn.Linear(2, 10, bias=False)
1434  self.fc2 = nn.Linear(10, 50, bias=False)
1435  self.fc3 = nn.Linear(50, 4, bias=False)
1436  self.relu = nn.ReLU()
1437 
1438  def forward(self, x):
1439  x = self.relu(self.fc1(x))
1440  x = self.relu(self.fc2(x))
1441  x = self.fc3(x)
1442  return F.softmax(x, dim=1)
1443 
1444 
1446 
1447  def tearDown(self):
1448  # DistributedDataParallel test doesn't seem to call FileStore destructor
1449  # TODO: investigate this test and the test is known to have issues
1450  # Use this hack to remove files for that test
1451  try:
1452  os.remove(self.file.name)
1453  except OSError:
1454  pass
1455 
1456  @property
1457  def world_size(self):
1458  return 2
1459 
1460  def _test_ddp_with_process_group(self, process_group, gpus):
1461  model = Net()
1462  ddp_model = DistributedDataParallel(
1463  copy.deepcopy(model).cuda(gpus[0]),
1464  device_ids=gpus,
1465  process_group=process_group,
1466  bucket_cap_mb=0.001)
1467 
1468  model.cuda(gpus[0])
1469 
1470  local_batch_size = len(gpus)
1471  global_batch_size = self.world_size * local_batch_size
1472  input = torch.randn(global_batch_size, 2).cuda(gpus[0])
1473  target = torch.randn(global_batch_size, 4).cuda(gpus[0])
1474 
1475  def step_model(model, input, target):
1476  model.train()
1477  output = model(input)
1478  loss = F.mse_loss(output, target)
1479  loss.backward()
1480 
1481  def update_parameters(model):
1482  for param in model.parameters():
1483  param.data -= param.grad
1484  param.grad = None
1485 
1486  # check two model parameters over 2 iterations
1487  for iteration in range(2):
1488  # single cpu/gpu training
1489  step_model(model, input, target)
1490 
1491  # DDP training, DDP scatters subsets of input_cpu to nodes/GPUs
1492  step_model(ddp_model,
1493  input[self.rank * local_batch_size: (self.rank + 1) * local_batch_size],
1494  target[self.rank * local_batch_size: (self.rank + 1) * local_batch_size])
1495 
1496  # Update weights and run a second iteration to shake out errors
1497  update_parameters(model)
1498  update_parameters(ddp_model)
1499  self.assertEqual(len(list(model.parameters())), len(list(ddp_model.parameters())))
1500  for i, j in zip(model.parameters(), ddp_model.parameters()):
1501  self.assertEqual(i, j)
1502 
1503  # Shuffle the input so that DDP input is different
1504  torch.manual_seed(1337 + iteration)
1505  input = input[torch.randperm(global_batch_size)]
1506 
1507  @skip_if_not_multigpu
1508  def test_gloo_backend(self):
1509  store = c10d.FileStore(self.file.name, self.world_size)
1510  options = c10d.ProcessGroupGloo.Options()
1511  options.devices = [c10d.ProcessGroupGloo.create_tcp_device(interface="lo")]
1512  process_group = c10d.ProcessGroupGloo(store, self.rank, self.world_size, options)
1513  gpus = gpus_for_rank(self.world_size)[self.rank]
1514  self._test_ddp_with_process_group(process_group, gpus)
1515  self._test_ddp_with_process_group(process_group, list(map(lambda i: torch.device('cuda:' + str(i)), gpus)))
1516 
1517  @skip_if_not_multigpu
1518  @skip_if_not_nccl
1519  def test_nccl_backend(self):
1520  store = c10d.FileStore(self.file.name, self.world_size)
1521  process_group = c10d.ProcessGroupNCCL(store, self.rank, self.world_size)
1522  gpus = gpus_for_rank(self.world_size)[self.rank]
1523  self._test_ddp_with_process_group(process_group, gpus)
1524  self._test_ddp_with_process_group(process_group, list(map(lambda i: torch.device('cuda:' + str(i)), gpus)))
1525 
1526  @skip_if_not_multigpu
1527  @skip_if_not_nccl
1528  @skip_for_known_issues
1529  def test_dist_broadcast_coalesced_nccl(self):
1530  store = c10d.FileStore(self.file.name, self.world_size)
1531  process_group = c10d.ProcessGroupNCCL(store, self.rank, self.world_size)
1532 
1533  device = torch.device('cuda')
1534 
1535  for fine_grained in [False, True]:
1536  target = torch.arange(60, dtype=torch.float16, device=device).chunk(5)
1537  target += torch.arange(60, dtype=torch.float32, device=device).chunk(5)
1538  target += torch.arange(60, dtype=torch.float16, device=device).chunk(5)
1539  target += torch.arange(60, dtype=torch.float64, device=device).chunk(5)
1540  target += torch.arange(60, dtype=torch.float16, device=device).chunk(5)
1541  target += torch.arange(60, dtype=torch.float32, device=device).chunk(5)
1542 
1543  if self.is_master:
1544  # All processes should have these tensors in the end.
1545  tensors = target
1546  else:
1547  # Non-master processes start with empty tensors and should be
1548  # filled with the tensors from the master.
1549  tensors = torch.zeros(60, dtype=torch.float16, device=device).chunk(5)
1550  tensors += torch.zeros(60, dtype=torch.float32, device=device).chunk(5)
1551  tensors += torch.zeros(60, dtype=torch.float16, device=device).chunk(5)
1552  tensors += torch.zeros(60, dtype=torch.float64, device=device).chunk(5)
1553  tensors += torch.zeros(60, dtype=torch.float16, device=device).chunk(5)
1554  tensors += torch.zeros(60, dtype=torch.float32, device=device).chunk(5)
1555 
1556  c10d._dist_broadcast_coalesced(
1557  process_group,
1558  tensors,
1559  buffer_size=256,
1560  fine_grained=fine_grained)
1561 
1562  self.assertEqual(tensors, target)
1563 
1564  @skip_if_not_multigpu
1565  def test_dist_broadcast_coalesced_gloo(self):
1566  store = c10d.FileStore(self.file.name, self.world_size)
1567  options = c10d.ProcessGroupGloo.Options()
1568  options.devices = [c10d.ProcessGroupGloo.create_tcp_device(interface="lo")]
1569  process_group = c10d.ProcessGroupGloo(store, self.rank, self.world_size, options)
1570 
1571  device = torch.device('cuda')
1572 
1573  for fine_grained in [False, True]:
1574  target = torch.arange(60, dtype=torch.float16, device=device).chunk(5)
1575  target += torch.arange(60, dtype=torch.float32, device=device).chunk(5)
1576  target += torch.arange(60, dtype=torch.float16, device=device).chunk(5)
1577  target += torch.arange(60, dtype=torch.float64, device=device).chunk(5)
1578  target += torch.arange(60, dtype=torch.float16, device=device).chunk(5)
1579  target += torch.arange(60, dtype=torch.float32, device=device).chunk(5)
1580 
1581  if self.is_master:
1582  # All processes should have these tensors in the end.
1583  tensors = target
1584  else:
1585  # Non-master processes start with empty tensors and should be
1586  # filled with the tensors from the master.
1587  tensors = torch.zeros(60, dtype=torch.float16, device=device).chunk(5)
1588  tensors += torch.zeros(60, dtype=torch.float32, device=device).chunk(5)
1589  tensors += torch.zeros(60, dtype=torch.float16, device=device).chunk(5)
1590  tensors += torch.zeros(60, dtype=torch.float64, device=device).chunk(5)
1591  tensors += torch.zeros(60, dtype=torch.float16, device=device).chunk(5)
1592  tensors += torch.zeros(60, dtype=torch.float32, device=device).chunk(5)
1593 
1594  c10d._dist_broadcast_coalesced(
1595  process_group,
1596  tensors,
1597  buffer_size=128,
1598  fine_grained=fine_grained)
1599 
1600  self.assertEqual(tensors, target)
1601 
1602  @skip_if_not_multigpu
1603  def test_sync_params_no_buffers(self):
1604  store = c10d.FileStore(self.file.name, self.world_size)
1605  options = c10d.ProcessGroupGloo.Options()
1606  options.devices = [c10d.ProcessGroupGloo.create_tcp_device(interface="lo")]
1607  process_group = c10d.ProcessGroupGloo(store, self.rank, self.world_size, options)
1608 
1609  # Use all available devices on every process here (data is small, so should be fine).
1610  devices = gpus_for_rank(self.world_size)[self.rank]
1611  target = torch.arange(10, dtype=torch.float64, device='cuda:{}'.format(devices[0])).chunk(5)
1612  parameter_data = [target]
1613  parameter_data += [torch.zeros(10, device=torch.device('cuda', d)).chunk(5) for d in devices[1:]]
1614  buffer_data = [[]] * len(parameter_data)
1615 
1616  c10d._sync_params(
1617  process_group,
1618  parameter_data=parameter_data,
1619  buffer_data=buffer_data,
1620  devices=devices,
1621  broadcast_bucket_size=10,
1622  broadcast_buffers=False)
1623 
1624  for device_data in parameter_data:
1625  for i, parameter in enumerate(device_data):
1626  self.assertEqual(parameter, target[i])
1627 
1628  @skip_if_not_multigpu
1629  def test_sync_params_with_buffers(self):
1630  store = c10d.FileStore(self.file.name, self.world_size)
1631  options = c10d.ProcessGroupGloo.Options()
1632  options.devices = [c10d.ProcessGroupGloo.create_tcp_device(interface="lo")]
1633  process_group = c10d.ProcessGroupGloo(store, self.rank, self.world_size, options)
1634 
1635  devices = gpus_for_rank(self.world_size)[self.rank]
1636  target = torch.arange(10, dtype=torch.float64, device='cuda:{}'.format(devices[0])).chunk(5)
1637  parameter_data = [target]
1638  parameter_data += [torch.zeros(10, device=torch.device('cuda', d)).chunk(5) for d in devices[1:]]
1639 
1640  # sync_params should do a dist_broadcast for buffers, so we only populate the master buffers and
1641  # then check that other processes' tensors end up matching.
1642 
1643  if self.is_master:
1644  buffer_data = [target]
1645  buffer_data += [torch.zeros(10, device=torch.device('cuda', d)).chunk(5) for d in devices[1:]]
1646  else:
1647  buffer_data = [torch.zeros(10, device=torch.device('cuda', d)).chunk(5) for d in devices]
1648 
1649  c10d._sync_params(
1650  process_group,
1651  parameter_data=parameter_data,
1652  buffer_data=buffer_data,
1653  devices=devices,
1654  broadcast_bucket_size=10,
1655  broadcast_buffers=True)
1656 
1657  for device_data in parameter_data:
1658  for i, parameter in enumerate(device_data):
1659  self.assertEqual(parameter, target[i])
1660 
1661  for device_data in buffer_data:
1662  for i, buffer in enumerate(device_data):
1663  self.assertEqual(buffer, target[i])
1664 
1665  @skip_if_not_multigpu
1666  @skip_if_not_nccl
1667  def test_fp16(self):
1668  store = c10d.FileStore(self.file.name, self.world_size)
1669  process_group = c10d.ProcessGroupNCCL(store, self.rank, self.world_size)
1670 
1671  gpus = gpus_for_rank(self.world_size)[self.rank]
1672  model = nn.Linear(1, 1, bias=False).cuda(gpus[0]).half()
1673  nn.init.constant_(model.weight, 1)
1674  ddp_model = DistributedDataParallel(
1675  model,
1676  device_ids=[gpus[0]],
1677  process_group=process_group,
1678  bucket_cap_mb=0.001,
1679  )
1680 
1681  # Input 2**15, so that the gradients will overflow with a
1682  # world_size of 2, unless we normalize the gradient by the
1683  # world_size before the reduction
1684  input = torch.Tensor([[2**15]]).cuda(gpus[0]).half()
1685 
1686  # Step model
1687  ddp_model.train()
1688  output = ddp_model(input)
1689  loss = output.sum()
1690  loss.backward()
1691 
1692  self.assertFalse(
1693  any(torch.isinf(p.grad).any() for p in ddp_model.parameters())
1694  )
1695 
1696  @skip_if_not_nccl
1697  @skip_if_not_multigpu
1698  def test_queue_reduction(self):
1699  # Set up process group.
1700  store = c10d.FileStore(self.file.name, self.world_size)
1701  process_group = c10d.ProcessGroupNCCL(store, self.rank, self.world_size)
1702 
1703  # Get this process' split of devices.
1704  devices = gpus_for_rank(self.world_size)[self.rank]
1705  grads_batch = [(torch.ones(10, device=torch.device('cuda', d)) *
1706  (self.rank + 1)).chunk(5)
1707  for d in devices]
1708 
1709  work, local_grad_sum = c10d._queue_reduction(process_group,
1710  grads_batch,
1711  devices)
1712  # The first return value should be the allreduce work item.
1713  self.assertTrue(isinstance(work, c10d.Work))
1714  # The second return value will be the finished allreduced gradients.
1715  self.assertTrue(isinstance(local_grad_sum, torch.Tensor))
1716 
1717  # Wait for the allreduce to finish.
1718  work.wait()
1719 
1720  # The expected result of the allreduce should be the average
1721  self.assertEqual(local_grad_sum,
1722  torch.ones(10) * (self.world_size + 1) * len(devices) / 2.0)
1723 
1724  @skip_if_not_nccl
1725  @skip_if_not_multigpu
1726  def test_sync_reduction(self):
1727  # Set up process group.
1728  store = c10d.FileStore(self.file.name, self.world_size)
1729  process_group = c10d.ProcessGroupNCCL(store, self.rank, self.world_size)
1730 
1731  # Get this process' split of devices.
1732  devices = gpus_for_rank(self.world_size)[self.rank]
1733  grads_batch = [(torch.ones(10, device=torch.device('cuda', d)) *
1734  (self.rank + 1)).chunk(5)
1735  for d in devices]
1736  work, local_grad_sum = c10d._queue_reduction(process_group,
1737  grads_batch,
1738  devices)
1739  c10d._sync_reduction(work, grads_batch[0], local_grad_sum)
1740  # The expected result of the allreduce should be the average
1741  self.assertEqual(grads_batch[0], (torch.ones(10) * (self.world_size + 1) * len(devices) / 2.0).chunk(5))
1742 
1743 
1744 if __name__ == '__main__':
1745  assert not torch.cuda._initialized, "test_distributed must not have initialized CUDA context on main process"
1746 
1747  run_tests()
def assertEqual(self, x, y, prec=None, message='', allow_inf=False)
def _create_store(self, i)
Definition: test_c10d.py:145
Module caffe2.python.layers.split.
def _test_set_get(self, fs)
Definition: test_c10d.py:148
def _test_allreduce_stress(self, inputs)
Definition: test_c10d.py:687
def _test_scatter_basics(self, fn)
Definition: test_c10d.py:779
def is_available()
Definition: __init__.py:45
Definition: model.py:1
def _test_scatter_stress(self, inputs, fn)
Definition: test_c10d.py:809
def _test_gather_stress(self, inputs, fn)
Definition: test_c10d.py:949
def device_count()
Definition: __init__.py:341
def _test_gather_basics(self, fn)
Definition: test_c10d.py:917
def _spawn_process(self, rank)
Definition: test_c10d.py:460
def _test_allgather_basics(self, fn)
Definition: test_c10d.py:1028
def _test_allgather_stress(self, inputs, fn)
Definition: test_c10d.py:1058
def _test_broadcast_basics(self, fn)
Definition: test_c10d.py:560
def _test_allreduce_basics(self, fn)
Definition: test_c10d.py:649
def _test_ddp_with_process_group(self, process_group, gpus)
Definition: test_c10d.py:1460
def _test_reduce_basics(self, fn)
Definition: test_c10d.py:1123
def opts(self, threads=2)
Definition: test_c10d.py:503
def _test_reduce_stress(self, inputs)
Definition: test_c10d.py:1144
def _check_return_codes(self, elapsed_time)
Definition: test_c10d.py:482
def _test_broadcast_stress(self, inputs)
Definition: test_c10d.py:603