Caffe2 - Python API
A deep learning, cross platform ML framework
test_multiprocessing.py
1 import contextlib
2 import gc
3 import os
4 import sys
5 import time
6 import unittest
7 from sys import platform
8 
9 import torch
10 import torch.cuda
11 import torch.multiprocessing as mp
12 import torch.utils.hooks
13 from torch.nn import Parameter
14 from common_utils import (TestCase, run_tests, IS_WINDOWS, NO_MULTIPROCESSING_SPAWN, TEST_WITH_ASAN,
15  load_tests)
16 from multiprocessing.reduction import ForkingPickler
17 
18 # load_tests from common_utils is used to automatically filter tests for
19 # sharding on sandcastle. This line silences flake warnings
20 load_tests = load_tests
21 
22 TEST_REPEATS = 30
23 HAS_SHM_FILES = os.path.isdir('/dev/shm')
24 TEST_CUDA_IPC = torch.cuda.is_available() and \
25  sys.version_info[0] == 3 and \
26  sys.platform != 'darwin' and \
27  sys.platform != 'win32'
28 TEST_MULTIGPU = TEST_CUDA_IPC and torch.cuda.device_count() > 1
29 
30 
31 class SubProcess(mp.Process):
32  def __init__(self, tensor):
33  super(SubProcess, self).__init__()
34  self.tensor = tensor
35  self.daemon = True
36 
37  def run(self):
38  self.tensor.add_(3)
39 
40 
41 def simple_fill(queue, event):
42  data = queue.get()
43  data[0][:] = 4
44  event.set()
45 
46 
47 def simple_pool_fill(tensor):
48  tensor.fill_(4)
49  return tensor.add(1)
50 
51 
52 def send_tensor(queue, event, tp):
53  t = torch.ones(5, 5).type(tp)
54  queue.put(t)
55  queue.put(t)
56  event.wait()
57 
58 
59 def call_backward():
60  x = torch.randn(3, 3, requires_grad=True)
61  x.sum().backward()
62 
63 
64 def sum_tensors(inq, outq):
65  with torch.cuda.device(1):
66  tensors = inq.get()
67  for tensor in tensors:
68  outq.put((tensor.sum().item(), tensor.get_device(),
69  tensor.numel(), tensor.storage().size()))
70 
71 
72 def queue_get_exception(inqueue, outqueue):
73  os.close(2) # hide expected error message
74  try:
75  torch.zeros(5, 5).cuda()
76  except Exception as e:
77  outqueue.put(e)
78  else:
79  outqueue.put('no exception')
80 
81 
82 # Multiply by two in a separate stream
83 def cuda_multiply_two(queue, ready, done):
84  ready.set()
85  with torch.cuda.stream(torch.cuda.Stream()):
86  cuda_event, tensor = queue.get()
87  cuda_event.wait()
88  tensor.mul_(2)
89  cuda_event.record()
90  done.set()
91  del cuda_event
92 
93 
94 def requires_grad_variable_sharing(queue, ready):
95  var = queue.get()
96  ready.set()
97  queue.put(var.requires_grad)
98 
99 
100 def autograd_sharing(queue, ready, master_modified, device, is_parameter):
101  var = queue.get()
102  ready.set()
103  master_modified.wait()
104 
105  expected_var = torch.arange(1., 26, device=device).view(5, 5)
106  expected_var[0, 0] = 1000
107  is_ok = var.data.equal(expected_var)
108  var.data[:] = torch.ones(5, 5, device=device)
109 
110  is_ok &= var.grad is None
111  is_ok &= not var._backward_hooks
112  if is_parameter:
113  is_ok &= type(var) == Parameter
114  else:
115  is_ok &= type(var) == torch.Tensor
116  var._grad = torch.ones(5, 5, device=device)
117 
118  queue.put(is_ok)
119 
120 
121 def mixed_type_producer(queue, event):
122  for _ in range(10):
123  float_tensor = torch.ones(2, 2).float().cuda()
124  byte_tensor = torch.zeros(2, 2).byte().cuda()
125 
126  queue.put(float_tensor)
127  queue.put(byte_tensor)
128  event.wait()
129  event.clear()
130 
131 
132 @contextlib.contextmanager
133 def fs_sharing():
134  prev_strategy = mp.get_sharing_strategy()
135  mp.set_sharing_strategy('file_system')
136  try:
137  yield
138  finally:
139  mp.set_sharing_strategy(prev_strategy)
140 
141 
142 class leak_checker(object):
143 
144  def __init__(self, test_case):
145  self.checked_pids = [os.getpid()]
146  self.test_case = test_case
147 
148  def __enter__(self):
149  self.next_fds = self._get_next_fds(10)
150  return self
151 
152  def __exit__(self, *args):
153  if args[0] is None:
154  # Check that the 10th available file-descriptor at the end of the
155  # test is no more than 4 higher than the 10th available at the
156  # start. This attempts to catch file descriptor leaks, but allows
157  # one-off initialization that may use up a file descriptor
158  # TODO: Disabled because this check is too flaky
159  # available_fds = self._get_next_fds(10)
160  # self.test_case.assertLessEqual(
161  # available_fds[-1] - self.next_fds[-1], 5)
162  self.test_case.assertFalse(self.has_shm_files())
163  return False
164 
165  def check_pid(self, pid):
166  self.checked_pids.append(pid)
167 
168  def _get_next_fds(self, n=1):
169  # dup uses the lowest-numbered unused descriptor for the new descriptor
170  fds = [os.dup(0) for i in range(n)]
171  for fd in fds:
172  os.close(fd)
173  return fds
174 
175  def has_shm_files(self, wait=True):
176  if not HAS_SHM_FILES:
177  return False
178  result = self._has_shm_files()
179  if result and mp.get_sharing_strategy() == 'file_system' and wait:
180  time.sleep(0.5)
181  return self._has_shm_files()
182  return result
183 
184  def _has_shm_files(self):
185  gc.collect()
186  names = ['torch_' + str(pid) for pid in self.checked_pids]
187  for filename in os.listdir('/dev/shm'):
188  for name in names:
189  if filename.startswith(name):
190  return True
191  return False
192 
193 
194 class TestMultiprocessing(TestCase):
195 
196  def _test_sharing(self, ctx=mp, type=torch.FloatTensor, repeat=1):
197  def test_fill():
198  x = torch.zeros(5, 5).type(type)
199  q = ctx.Queue()
200  e = ctx.Event()
201  data = [x, x[:, 1]]
202  q.put(data)
203  p = ctx.Process(target=simple_fill, args=(q, e))
204  p.daemon = True
205  lc.check_pid(p.pid)
206  p.start()
207  e.wait(10)
208  self.assertTrue(e.is_set())
209  self.assertTrue(data[0].eq(4).all())
210  self.assertTrue(data[1].eq(4).all())
211  p.join(1)
212  self.assertFalse(p.is_alive())
213 
214  def test_receive():
215  q = ctx.Queue()
216  e = ctx.Event()
217  p = ctx.Process(target=send_tensor, args=(q, e, type))
218  p.daemon = True
219  lc.check_pid(p.pid)
220  p.start()
221  t1 = q.get()
222  t2 = q.get()
223  self.assertTrue(t1.eq(1).all())
224  self.assertTrue(id(t1.storage()) == id(t2.storage()))
225  e.set()
226  p.join(1)
227  self.assertFalse(p.is_alive())
228 
229  with leak_checker(self) as lc:
230  for _ in range(repeat):
231  test_fill()
232  test_receive()
233 
234  def _test_preserve_sharing(self, ctx=mp, repeat=1):
235  def do_test():
236  x = torch.randn(5, 5)
237  data = [x.storage(), x, x[2], x[:, 1]]
238  q = ctx.Queue()
239  q.put(data)
240  new_data = q.get(timeout=1)
241  self.assertEqual(new_data, data, 0)
242  storage_cdata = data[0]._cdata
243  self.assertEqual(new_data[0]._cdata, storage_cdata)
244  for t in new_data[1:]:
245  self.assertEqual(t.storage()._cdata, storage_cdata)
246 
247  with leak_checker(self):
248  for _ in range(repeat):
249  do_test()
250 
251  def _test_pool(self, ctx=mp, repeat=1):
252  def do_test():
253  p = ctx.Pool(2)
254  for proc in p._pool:
255  lc.check_pid(proc.pid)
256 
257  buffers = [torch.zeros(2, 2) for i in range(4)]
258  results = p.map(simple_pool_fill, buffers, 1)
259  self.assertEqual(len(results), len(buffers))
260  for r in results:
261  self.assertEqual(r, torch.ones(2, 2) * 5, 0)
262  for b in buffers:
263  self.assertEqual(b, torch.ones(2, 2) * 4, 0)
264 
265  p.close()
266  p.join()
267 
268  with leak_checker(self) as lc:
269  for _ in range(repeat):
270  do_test()
271 
272  @unittest.skipIf(platform == 'darwin', "file descriptor strategy is not supported on macOS")
273  @unittest.skipIf(TEST_WITH_ASAN,
274  "seems to hang with ASAN, see https://github.com/pytorch/pytorch/issues/5326")
275  def test_fd_sharing(self):
276  self._test_sharing(repeat=TEST_REPEATS)
277 
278  @unittest.skipIf(platform == 'darwin', "file descriptor strategy is not supported on macOS")
279  def test_fd_preserve_sharing(self):
280  self._test_preserve_sharing(repeat=TEST_REPEATS)
281 
282  @unittest.skipIf(platform == 'darwin', "file descriptor strategy is not supported on macOS")
283  def test_fd_pool(self):
284  self._test_pool(repeat=TEST_REPEATS)
285 
286  @unittest.skipIf(TEST_WITH_ASAN,
287  "seems to hang with ASAN, see https://github.com/pytorch/pytorch/issues/5326")
288  def test_fs_sharing(self):
289  with fs_sharing():
290  self._test_sharing(repeat=TEST_REPEATS)
291 
292  def test_fs_preserve_sharing(self):
293  with fs_sharing():
294  self._test_preserve_sharing(repeat=TEST_REPEATS)
295 
296  def test_fs_pool(self):
297  with fs_sharing():
298  self._test_pool(repeat=TEST_REPEATS)
299 
300  @unittest.skipIf(not HAS_SHM_FILES, "don't not how to check if shm files exist")
301  def test_fs(self):
302  def queue_put():
303  x = torch.DoubleStorage(4)
304  q = mp.Queue()
305  self.assertFalse(lc.has_shm_files())
306  q.put(x)
307  time.sleep(0.05) # queue serializes asynchronously
308  self.assertTrue(lc.has_shm_files(wait=False))
309  q.get()
310 
311  with fs_sharing(), leak_checker(self) as lc:
312  for _ in range(TEST_REPEATS):
313  queue_put()
314 
315  def test_inherit_tensor(self):
316  t = torch.zeros(5, 5)
317  p = SubProcess(t.share_memory_())
318  p.start()
319  p.join(1)
320  self.assertEqual(t, torch.ones(5, 5) * 3, 0)
321 
322  @unittest.skipIf(NO_MULTIPROCESSING_SPAWN, "Disabled for environments that \
323  don't support multiprocessing with spawn start method")
324  @unittest.skipIf(not TEST_CUDA_IPC, 'CUDA IPC not available')
325  def test_cuda(self):
326  torch.cuda.FloatTensor([1]) # initialize CUDA outside of leak checker
327  self._test_sharing(mp.get_context('spawn'), torch.cuda.FloatTensor)
328 
329  @unittest.skipIf(NO_MULTIPROCESSING_SPAWN, "Disabled for environments that \
330  don't support multiprocessing with spawn start method")
331  @unittest.skipIf(not TEST_CUDA_IPC, 'CUDA IPC not available')
332  @unittest.skipIf(not TEST_MULTIGPU, 'found only 1 GPU')
333  def test_cuda_small_tensors(self):
334  # Check multiple small tensors which will likely use the same
335  # underlying cached allocation
336  ctx = mp.get_context('spawn')
337  tensors = []
338  for i in range(5):
339  device = i % 2
340  tensors += [torch.arange(i * 5., (i + 1) * 5).cuda(device)]
341 
342  inq = ctx.Queue()
343  outq = ctx.Queue()
344  inq.put(tensors)
345  p = ctx.Process(target=sum_tensors, args=(inq, outq))
346  p.start()
347 
348  results = []
349  for _ in range(5):
350  results.append(outq.get())
351  p.join()
352 
353  for i, _tensor in enumerate(tensors):
354  v, device, tensor_size, storage_size = results[i]
355  self.assertEqual(v, torch.arange(i * 5., (i + 1) * 5).sum())
356  self.assertEqual(device, i % 2)
357  self.assertEqual(tensor_size, 5)
358  # You might think this should be the case, but it's not! After
359  # data from the CUDA caching allocator goes through IPC, the
360  # size of the storage is the size of the *cached cudaMalloc for
361  # the entire memory block* of the storage, not just the storage.
362  # See Note [CUDA IPC and the caching allocator] for more info
363  #
364  # self.assertEqual(storage_size, 5)
365 
366  @unittest.skipIf(IS_WINDOWS, 'not applicable to Windows (only fails with fork)')
367  @unittest.skipIf(not torch.cuda.is_available(), 'CUDA not available')
368  def test_cuda_bad_call(self):
369  # Initialize CUDA
370  t = torch.zeros(5, 5).cuda().cpu()
371  inq = mp.Queue()
372  outq = mp.Queue()
373  p = mp.Process(target=queue_get_exception, args=(inq, outq))
374  p.start()
375  inq.put(t)
376  p.join()
377  self.assertIsInstance(outq.get(), RuntimeError)
378 
379  @unittest.skipIf(NO_MULTIPROCESSING_SPAWN, "Disabled for environments that \
380  don't support multiprocessing with spawn start method")
381  @unittest.skipIf(not TEST_CUDA_IPC, 'CUDA IPC not available')
382  def test_event(self):
383  ctx = mp.get_context('spawn')
384  queue = ctx.Queue()
385  ready = ctx.Event()
386  done = ctx.Event()
387  p = ctx.Process(target=cuda_multiply_two, args=(queue, ready, done))
388  p.start()
389 
390  ready.wait()
391  with torch.cuda.stream(torch.cuda.Stream()):
392  tensor = torch.cuda.FloatTensor([1, 1, 1, 1])
393  # Use a sleep kernel to test events. Without the event, the
394  # multiply happens before the add.
395  event = torch.cuda.Event(interprocess=True)
396  torch.cuda._sleep(20000000) # about 30 ms
397  tensor.add_(1)
398  event.record()
399  queue.put((event, tensor))
400  done.wait() # must wait until subprocess records event
401  event.synchronize()
402  self.assertEqual(list(tensor), [4, 4, 4, 4])
403  p.join()
404 
405  def _test_event_multiprocess_child(event, p2c, c2p):
406  c2p.put(0) # notify parent child is ready
407  p2c.get() # wait for record in parent
408  event.synchronize()
409  c2p.put(1) # notify parent synchronization is done
410 
411  @unittest.skipIf(NO_MULTIPROCESSING_SPAWN, "Disabled for environments that \
412  don't support multiprocessing with spawn start method")
413  @unittest.skipIf(not TEST_CUDA_IPC, 'CUDA IPC not available')
414  def test_event_multiprocess(self):
415  event = torch.cuda.Event(enable_timing=False, interprocess=True)
416  self.assertTrue(event.query())
417 
418  ctx = mp.get_context('spawn')
419  p2c = ctx.SimpleQueue()
420  c2p = ctx.SimpleQueue()
421  p = ctx.Process(
422  target=TestMultiprocessing._test_event_multiprocess_child,
423  args=(event, p2c, c2p))
424  p.start()
425 
426  c2p.get() # wait for until child process is ready
427  torch.cuda._sleep(50000000) # spin for about 50 ms
428  event.record()
429  p2c.put(0) # notify child event is recorded
430 
431  self.assertFalse(event.query())
432  c2p.get() # wait for synchronization in child
433  self.assertTrue(event.query())
434  p.join()
435 
436  @unittest.skipIf(NO_MULTIPROCESSING_SPAWN, "Disabled for environments that \
437  don't support multiprocessing with spawn start method")
438  @unittest.skipIf(not TEST_CUDA_IPC, 'CUDA IPC not available')
439  @unittest.skipIf(not TEST_MULTIGPU, 'found only 1 GPU')
440  def test_event_handle_multi_gpu(self):
441  d0 = torch.device('cuda:0')
442  d1 = torch.device('cuda:1')
443  with torch.cuda.device(d0):
444  e0 = torch.cuda.Event(enable_timing=False, interprocess=True)
445 
446  with torch.cuda.device(d1):
447  # create handle on different device from un-recorded event
448  e0.ipc_handle()
449 
450  with torch.cuda.device(d0):
451  e1 = torch.cuda.Event(enable_timing=False, interprocess=True)
452  stream = torch.cuda.Stream()
453  torch.cuda._sleep(50000000) # spin for about 50 ms
454  e1.record(stream)
455 
456  with torch.cuda.device(d1):
457  # create handle on different device from recorded event
458  e1.ipc_handle()
459 
460  def _test_event_handle_importer_consumer(handle, p2c, c2p):
461  e1 = torch.cuda.Event.from_ipc_handle(0, handle)
462  c2p.put(0) # notify parent child is ready
463  p2c.get() # wait for record in parent
464  e1.synchronize()
465  c2p.put(1) # nofity synchronization is done in child
466  p2c.get() # wait for parent to finish before destructing child event
467 
468  @unittest.skipIf(NO_MULTIPROCESSING_SPAWN, "Disabled for environments that \
469  don't support multiprocessing with spawn start method")
470  @unittest.skipIf(not TEST_CUDA_IPC, 'CUDA IPC not available')
471  def test_event_handle_importer(self):
472  e0 = torch.cuda.Event(enable_timing=False, interprocess=True)
473  self.assertTrue(e0.query())
474 
475  ctx = mp.get_context('spawn')
476  p2c = ctx.SimpleQueue()
477  c2p = ctx.SimpleQueue()
478  p = ctx.Process(
479  target=TestMultiprocessing._test_event_handle_importer_consumer,
480  args=(e0.ipc_handle(), p2c, c2p))
481  p.start()
482 
483  c2p.get() # wait for child to become ready
484  torch.cuda._sleep(50000000) # spin for about 50 ms
485  e0.record()
486  p2c.put(0) # notify child event is recorded
487 
488  self.assertFalse(e0.query())
489  c2p.get() # wait for synchronization in child
490  self.assertTrue(e0.query())
491  p2c.put(1) # notify child that parent is done
492  p.join()
493 
494  def _test_event_handle_exporter_consumer(handle, p2c, c2p):
495  stream = torch.cuda.Stream()
496  with torch.cuda.stream(stream):
497  e1 = torch.cuda.Event.from_ipc_handle(
498  torch.cuda.current_device(), handle)
499  torch.cuda._sleep(50000000) # spin for about 50 ms
500  e1.record()
501  c2p.put(0)
502  # wait for parent process finished synchronization before
503  # destructing e1
504  p2c.get()
505 
506  @unittest.skipIf(NO_MULTIPROCESSING_SPAWN, "Disabled for environments that \
507  don't support multiprocessing with spawn start method")
508  @unittest.skipIf(not TEST_CUDA_IPC, 'CUDA IPC not available')
509  def test_event_handle_exporter(self):
510  e0 = torch.cuda.Event(enable_timing=False, interprocess=True)
511 
512  ctx = mp.get_context('spawn')
513  p2c = ctx.SimpleQueue()
514  c2p = ctx.SimpleQueue()
515  p = ctx.Process(
516  target=TestMultiprocessing._test_event_handle_exporter_consumer,
517  args=(e0.ipc_handle(), p2c, c2p))
518  p.start()
519  # wait for event in child process is recorded
520  c2p.get()
521 
522  self.assertFalse(e0.query())
523  e0.synchronize()
524  self.assertTrue(e0.query())
525  p2c.put(0)
526  p.join()
527 
528  def _test_empty_tensor_sharing(self, dtype, device):
529  q = mp.Queue()
530  empty = torch.tensor([], dtype=dtype, device=device)
531  q.put(empty)
532  out = q.get(timeout=1)
533  self.assertEqual(out, empty)
534 
535  def test_empty_tensor_sharing(self):
536  self._test_empty_tensor_sharing(torch.float32, torch.device('cpu'))
537  self._test_empty_tensor_sharing(torch.int64, torch.device('cpu'))
538 
539  @unittest.skipIf(not torch.cuda.is_available(), 'CUDA not available')
540  def test_empty_tensor_sharing_cuda(self):
541  self._test_empty_tensor_sharing(torch.float32, torch.device('cuda'))
542  self._test_empty_tensor_sharing(torch.int64, torch.device('cuda'))
543 
544  def _test_autograd_sharing(self, var, ctx=mp, is_parameter=False):
545  device = 'cuda' if var.is_cuda else 'cpu'
546 
547  ready = ctx.Event()
548  master_modified = ctx.Event()
549  queue = ctx.Queue()
550  p = ctx.Process(target=autograd_sharing, args=(queue, ready, master_modified, device, is_parameter))
551  p.daemon = True
552  p.start()
553 
554  # This would cause an error if we tried to serialize the hooks,
555  # because it's a closure and pickle doesn't support closures.
556  @torch.utils.hooks.unserializable_hook
557  def hook(*unused):
558  pass
559 
560  if var.requires_grad:
561  var.register_hook(hook)
562  var._grad = torch.zeros(5, 5, device=device)
563  queue.put(var)
564 
565  ready.wait()
566  var.data[0, 0] = 1000
567  var.grad.data[:] = torch.ones(5, 5, device=device) * 4
568  master_modified.set()
569 
570  worker_ok = queue.get()
571  self.assertTrue(worker_ok)
572 
573  self.assertEqual(var.data, torch.ones(5, 5, device=device))
574  self.assertEqual(var.grad.data, torch.ones(5, 5, device=device) * 4)
575  p.join(1)
576  self.assertFalse(p.is_alive())
577 
578  # Check sharing a cudaMalloc allocation with different types of storage.
579  # (Issue #11422)
580  def _test_mixed_types_cuda_sharing(self, ctx=mp):
581  all_ones = torch.ones(2, 2).float()
582  all_zeros = torch.zeros(2, 2).byte()
583  queue = ctx.Queue()
584  event = ctx.Event()
585 
586  p = ctx.Process(target=mixed_type_producer, args=(queue, event))
587 
588  p.start()
589 
590  for _ in range(10):
591  float_tensor = queue.get()
592  byte_tensor = queue.get()
593  self.assertEqual(float_tensor, all_ones)
594  self.assertEqual(byte_tensor, all_zeros)
595  del float_tensor, byte_tensor
596  event.set()
597 
598  time.sleep(5)
599  p.join()
600 
601  def test_variable_sharing(self):
602  for requires_grad in [True, False]:
603  var = torch.arange(1., 26).view(5, 5).requires_grad_(requires_grad)
604  self._test_autograd_sharing(var)
605 
606  # See https://github.com/pytorch/pytorch/issues/14997
607  @unittest.skipIf(TEST_WITH_ASAN,
608  "non-deterministically hangs with ASAN")
609  def test_leaf_variable_sharing(self):
610  devices = ['cpu']
611  if torch.cuda.is_available() and not NO_MULTIPROCESSING_SPAWN and TEST_CUDA_IPC:
612  devices.append('cuda')
613  for device in devices:
614  for requires_grad in [True, False]:
615  var = torch.arange(1., 26, device=device).view(5, 5).requires_grad_(requires_grad)
616  self.assertTrue(var.is_leaf)
617  ctx = mp.get_context('spawn') if device == 'cuda' else mp
618  ready = ctx.Event()
619  queue = ctx.Queue()
620  p = ctx.Process(target=requires_grad_variable_sharing, args=(queue, ready))
621  p.daemon = True
622  p.start()
623  queue.put(var)
624  ready.wait()
625  worker_requires_grad = queue.get()
626  self.assertTrue(worker_requires_grad == requires_grad)
627 
628  def test_non_leaf_variable_sharing(self):
629  devices = ['cpu'] if not torch.cuda.is_available() else ['cpu', 'cuda']
630  for device in devices:
631  var0 = torch.arange(1., 26, device=device).view(5, 5).requires_grad_(True)
632  var = var0 * 2
633  # Don't use a regular Queue; it uses a background thread (which
634  # means we can't catch the exceptions)
635  queue = mp.SimpleQueue()
636  self.assertRaisesRegex(RuntimeError, r'requires_grad', lambda: queue.put(var))
637 
638  @unittest.skipIf(NO_MULTIPROCESSING_SPAWN, "Disabled for environments that \
639  don't support multiprocessing with spawn start method")
640  @unittest.skipIf(not TEST_CUDA_IPC, 'CUDA IPC not available')
641  def test_cuda_variable_sharing(self):
642  for requires_grad in [True, False]:
643  var = torch.arange(1., 26, device='cuda').view(5, 5).requires_grad_(requires_grad)
644  self._test_autograd_sharing(var, mp.get_context('spawn'))
645 
646  @unittest.skipIf(NO_MULTIPROCESSING_SPAWN, "Disabled for environments that \
647  don't support multiprocessing with spawn start method")
648  @unittest.skipIf(not TEST_CUDA_IPC, 'CUDA IPC not available')
649  def test_mixed_types_cuda_sharing(self):
650  self._test_mixed_types_cuda_sharing(mp.get_context('spawn'))
651 
652  def test_parameter_sharing(self):
653  param = Parameter(torch.arange(1., 26).view(5, 5))
654  self._test_autograd_sharing(param, is_parameter=True)
655 
656  @unittest.skipIf(NO_MULTIPROCESSING_SPAWN, "Disabled for environments that \
657  don't support multiprocessing with spawn start method")
658  @unittest.skipIf(not TEST_CUDA_IPC, 'CUDA IPC not available')
659  def test_cuda_parameter_sharing(self):
660  param = Parameter(torch.arange(1., 26, device='cuda').view(5, 5))
661  self._test_autograd_sharing(param, mp.get_context('spawn'), is_parameter=True)
662 
663  def test_empty_shared(self):
664  t = torch.Tensor()
665  t.share_memory_()
666 
667  def _test_is_shared(self):
668  t = torch.randn(5, 5)
669  self.assertFalse(t.is_shared())
670  t.share_memory_()
671  self.assertTrue(t.is_shared())
672 
673  @unittest.skipIf(platform == 'darwin', "file descriptor strategy is not supported on macOS")
674  def test_is_shared(self):
675  self._test_is_shared()
676 
677  def test_fs_is_shared(self):
678  with fs_sharing():
679  self._test_is_shared()
680 
681  @unittest.skipIf(not torch.cuda.is_available(), 'CUDA not available')
682  def test_is_shared_cuda(self):
683  t = torch.randn(5, 5).cuda()
684  self.assertTrue(t.is_shared())
685 
686  @unittest.skip('this test occasionally fails and deadlocks; see https://github.com/pytorch/pytorch/issues/5834')
687  def test_backwards_fork(self):
688  r"backwards() should succeed when called before and after a fork"
689  call_backward()
690  p = mp.Process(target=call_backward)
691  p.start()
692  p.join(1)
693  self.assertFalse(p.is_alive())
694 
695 
696 if __name__ == '__main__':
697  run_tests()
def is_available()
Definition: __init__.py:45
def device_count()
Definition: __init__.py:341
def _sleep(cycles)
Definition: __init__.py:53
def _test_sharing(self, ctx=mp, type=torch.FloatTensor, repeat=1)
def _test_preserve_sharing(self, ctx=mp, repeat=1)
def stream(stream)
Definition: __init__.py:307
def current_device()
Definition: __init__.py:349
def _test_empty_tensor_sharing(self, dtype, device)