7 from sys
import platform
14 from common_utils
import (TestCase, run_tests, IS_WINDOWS, NO_MULTIPROCESSING_SPAWN, TEST_WITH_ASAN,
16 from multiprocessing.reduction
import ForkingPickler
20 load_tests = load_tests
23 HAS_SHM_FILES = os.path.isdir(
'/dev/shm')
25 sys.version_info[0] == 3
and \
26 sys.platform !=
'darwin' and \
27 sys.platform !=
'win32' 32 def __init__(self, tensor):
33 super(SubProcess, self).__init__()
41 def simple_fill(queue, event):
47 def simple_pool_fill(tensor):
52 def send_tensor(queue, event, tp):
53 t = torch.ones(5, 5).type(tp)
60 x = torch.randn(3, 3, requires_grad=
True)
64 def sum_tensors(inq, outq):
67 for tensor
in tensors:
68 outq.put((tensor.sum().item(), tensor.get_device(),
69 tensor.numel(), tensor.storage().size()))
72 def queue_get_exception(inqueue, outqueue):
75 torch.zeros(5, 5).cuda()
76 except Exception
as e:
79 outqueue.put(
'no exception')
83 def cuda_multiply_two(queue, ready, done):
86 cuda_event, tensor = queue.get()
94 def requires_grad_variable_sharing(queue, ready):
97 queue.put(var.requires_grad)
100 def autograd_sharing(queue, ready, master_modified, device, is_parameter):
103 master_modified.wait()
105 expected_var = torch.arange(1., 26, device=device).view(5, 5)
106 expected_var[0, 0] = 1000
107 is_ok = var.data.equal(expected_var)
108 var.data[:] = torch.ones(5, 5, device=device)
110 is_ok &= var.grad
is None 111 is_ok &=
not var._backward_hooks
113 is_ok &= type(var) == Parameter
115 is_ok &= type(var) == torch.Tensor
116 var._grad = torch.ones(5, 5, device=device)
121 def mixed_type_producer(queue, event):
123 float_tensor = torch.ones(2, 2).float().cuda()
124 byte_tensor = torch.zeros(2, 2).byte().cuda()
126 queue.put(float_tensor)
127 queue.put(byte_tensor)
132 @contextlib.contextmanager
134 prev_strategy = mp.get_sharing_strategy()
135 mp.set_sharing_strategy(
'file_system')
139 mp.set_sharing_strategy(prev_strategy)
144 def __init__(self, test_case):
152 def __exit__(self, *args):
165 def check_pid(self, pid):
166 self.checked_pids.append(pid)
168 def _get_next_fds(self, n=1):
170 fds = [os.dup(0)
for i
in range(n)]
175 def has_shm_files(self, wait=True):
176 if not HAS_SHM_FILES:
179 if result
and mp.get_sharing_strategy() ==
'file_system' and wait:
184 def _has_shm_files(self):
186 names = [
'torch_' + str(pid)
for pid
in self.
checked_pids]
187 for filename
in os.listdir(
'/dev/shm'):
189 if filename.startswith(name):
196 def _test_sharing(self, ctx=mp, type=torch.FloatTensor, repeat=1):
198 x = torch.zeros(5, 5).type(type)
203 p = ctx.Process(target=simple_fill, args=(q, e))
208 self.assertTrue(e.is_set())
209 self.assertTrue(data[0].eq(4).all())
210 self.assertTrue(data[1].eq(4).all())
212 self.assertFalse(p.is_alive())
217 p = ctx.Process(target=send_tensor, args=(q, e, type))
223 self.assertTrue(t1.eq(1).all())
224 self.assertTrue(id(t1.storage()) == id(t2.storage()))
227 self.assertFalse(p.is_alive())
230 for _
in range(repeat):
234 def _test_preserve_sharing(self, ctx=mp, repeat=1):
236 x = torch.randn(5, 5)
237 data = [x.storage(), x, x[2], x[:, 1]]
240 new_data = q.get(timeout=1)
241 self.assertEqual(new_data, data, 0)
242 storage_cdata = data[0]._cdata
243 self.assertEqual(new_data[0]._cdata, storage_cdata)
244 for t
in new_data[1:]:
245 self.assertEqual(t.storage()._cdata, storage_cdata)
248 for _
in range(repeat):
251 def _test_pool(self, ctx=mp, repeat=1):
255 lc.check_pid(proc.pid)
257 buffers = [torch.zeros(2, 2)
for i
in range(4)]
258 results = p.map(simple_pool_fill, buffers, 1)
259 self.assertEqual(len(results), len(buffers))
261 self.assertEqual(r, torch.ones(2, 2) * 5, 0)
263 self.assertEqual(b, torch.ones(2, 2) * 4, 0)
269 for _
in range(repeat):
272 @unittest.skipIf(platform ==
'darwin',
"file descriptor strategy is not supported on macOS")
273 @unittest.skipIf(TEST_WITH_ASAN,
274 "seems to hang with ASAN, see https://github.com/pytorch/pytorch/issues/5326")
275 def test_fd_sharing(self):
278 @unittest.skipIf(platform ==
'darwin',
"file descriptor strategy is not supported on macOS")
279 def test_fd_preserve_sharing(self):
282 @unittest.skipIf(platform ==
'darwin',
"file descriptor strategy is not supported on macOS")
283 def test_fd_pool(self):
286 @unittest.skipIf(TEST_WITH_ASAN,
287 "seems to hang with ASAN, see https://github.com/pytorch/pytorch/issues/5326")
288 def test_fs_sharing(self):
292 def test_fs_preserve_sharing(self):
296 def test_fs_pool(self):
300 @unittest.skipIf(
not HAS_SHM_FILES,
"don't not how to check if shm files exist")
305 self.assertFalse(lc.has_shm_files())
308 self.assertTrue(lc.has_shm_files(wait=
False))
312 for _
in range(TEST_REPEATS):
315 def test_inherit_tensor(self):
316 t = torch.zeros(5, 5)
320 self.assertEqual(t, torch.ones(5, 5) * 3, 0)
322 @unittest.skipIf(NO_MULTIPROCESSING_SPAWN,
"Disabled for environments that \ 323 don't support multiprocessing with spawn start method")
324 @unittest.skipIf(
not TEST_CUDA_IPC,
'CUDA IPC not available')
326 torch.cuda.FloatTensor([1])
327 self.
_test_sharing(mp.get_context(
'spawn'), torch.cuda.FloatTensor)
329 @unittest.skipIf(NO_MULTIPROCESSING_SPAWN,
"Disabled for environments that \ 330 don't support multiprocessing with spawn start method")
331 @unittest.skipIf(
not TEST_CUDA_IPC,
'CUDA IPC not available')
332 @unittest.skipIf(
not TEST_MULTIGPU,
'found only 1 GPU')
333 def test_cuda_small_tensors(self):
336 ctx = mp.get_context(
'spawn')
340 tensors += [torch.arange(i * 5., (i + 1) * 5).cuda(device)]
345 p = ctx.Process(target=sum_tensors, args=(inq, outq))
350 results.append(outq.get())
353 for i, _tensor
in enumerate(tensors):
354 v, device, tensor_size, storage_size = results[i]
355 self.assertEqual(v, torch.arange(i * 5., (i + 1) * 5).sum())
356 self.assertEqual(device, i % 2)
357 self.assertEqual(tensor_size, 5)
366 @unittest.skipIf(IS_WINDOWS,
'not applicable to Windows (only fails with fork)')
368 def test_cuda_bad_call(self):
370 t = torch.zeros(5, 5).cuda().cpu()
373 p = mp.Process(target=queue_get_exception, args=(inq, outq))
377 self.assertIsInstance(outq.get(), RuntimeError)
379 @unittest.skipIf(NO_MULTIPROCESSING_SPAWN,
"Disabled for environments that \ 380 don't support multiprocessing with spawn start method")
381 @unittest.skipIf(
not TEST_CUDA_IPC,
'CUDA IPC not available')
382 def test_event(self):
383 ctx = mp.get_context(
'spawn')
387 p = ctx.Process(target=cuda_multiply_two, args=(queue, ready, done))
392 tensor = torch.cuda.FloatTensor([1, 1, 1, 1])
395 event = torch.cuda.Event(interprocess=
True)
399 queue.put((event, tensor))
402 self.assertEqual(list(tensor), [4, 4, 4, 4])
405 def _test_event_multiprocess_child(event, p2c, c2p):
411 @unittest.skipIf(NO_MULTIPROCESSING_SPAWN,
"Disabled for environments that \ 412 don't support multiprocessing with spawn start method")
413 @unittest.skipIf(
not TEST_CUDA_IPC,
'CUDA IPC not available')
414 def test_event_multiprocess(self):
415 event = torch.cuda.Event(enable_timing=
False, interprocess=
True)
416 self.assertTrue(event.query())
418 ctx = mp.get_context(
'spawn')
419 p2c = ctx.SimpleQueue()
420 c2p = ctx.SimpleQueue()
422 target=TestMultiprocessing._test_event_multiprocess_child,
423 args=(event, p2c, c2p))
431 self.assertFalse(event.query())
433 self.assertTrue(event.query())
436 @unittest.skipIf(NO_MULTIPROCESSING_SPAWN,
"Disabled for environments that \ 437 don't support multiprocessing with spawn start method")
438 @unittest.skipIf(
not TEST_CUDA_IPC,
'CUDA IPC not available')
439 @unittest.skipIf(
not TEST_MULTIGPU,
'found only 1 GPU')
440 def test_event_handle_multi_gpu(self):
441 d0 = torch.device(
'cuda:0')
442 d1 = torch.device(
'cuda:1')
444 e0 = torch.cuda.Event(enable_timing=
False, interprocess=
True)
451 e1 = torch.cuda.Event(enable_timing=
False, interprocess=
True)
452 stream = torch.cuda.Stream()
460 def _test_event_handle_importer_consumer(handle, p2c, c2p):
461 e1 = torch.cuda.Event.from_ipc_handle(0, handle)
468 @unittest.skipIf(NO_MULTIPROCESSING_SPAWN,
"Disabled for environments that \ 469 don't support multiprocessing with spawn start method")
470 @unittest.skipIf(
not TEST_CUDA_IPC,
'CUDA IPC not available')
471 def test_event_handle_importer(self):
472 e0 = torch.cuda.Event(enable_timing=
False, interprocess=
True)
473 self.assertTrue(e0.query())
475 ctx = mp.get_context(
'spawn')
476 p2c = ctx.SimpleQueue()
477 c2p = ctx.SimpleQueue()
479 target=TestMultiprocessing._test_event_handle_importer_consumer,
480 args=(e0.ipc_handle(), p2c, c2p))
488 self.assertFalse(e0.query())
490 self.assertTrue(e0.query())
494 def _test_event_handle_exporter_consumer(handle, p2c, c2p):
495 stream = torch.cuda.Stream()
497 e1 = torch.cuda.Event.from_ipc_handle(
506 @unittest.skipIf(NO_MULTIPROCESSING_SPAWN,
"Disabled for environments that \ 507 don't support multiprocessing with spawn start method")
508 @unittest.skipIf(
not TEST_CUDA_IPC,
'CUDA IPC not available')
509 def test_event_handle_exporter(self):
510 e0 = torch.cuda.Event(enable_timing=
False, interprocess=
True)
512 ctx = mp.get_context(
'spawn')
513 p2c = ctx.SimpleQueue()
514 c2p = ctx.SimpleQueue()
516 target=TestMultiprocessing._test_event_handle_exporter_consumer,
517 args=(e0.ipc_handle(), p2c, c2p))
522 self.assertFalse(e0.query())
524 self.assertTrue(e0.query())
528 def _test_empty_tensor_sharing(self, dtype, device):
532 out = q.get(timeout=1)
533 self.assertEqual(out, empty)
535 def test_empty_tensor_sharing(self):
540 def test_empty_tensor_sharing_cuda(self):
544 def _test_autograd_sharing(self, var, ctx=mp, is_parameter=False):
545 device =
'cuda' if var.is_cuda
else 'cpu' 548 master_modified = ctx.Event()
550 p = ctx.Process(target=autograd_sharing, args=(queue, ready, master_modified, device, is_parameter))
556 @torch.utils.hooks.unserializable_hook
560 if var.requires_grad:
561 var.register_hook(hook)
562 var._grad = torch.zeros(5, 5, device=device)
566 var.data[0, 0] = 1000
567 var.grad.data[:] = torch.ones(5, 5, device=device) * 4
568 master_modified.set()
570 worker_ok = queue.get()
571 self.assertTrue(worker_ok)
573 self.assertEqual(var.data, torch.ones(5, 5, device=device))
574 self.assertEqual(var.grad.data, torch.ones(5, 5, device=device) * 4)
576 self.assertFalse(p.is_alive())
580 def _test_mixed_types_cuda_sharing(self, ctx=mp):
581 all_ones = torch.ones(2, 2).float()
582 all_zeros = torch.zeros(2, 2).byte()
586 p = ctx.Process(target=mixed_type_producer, args=(queue, event))
591 float_tensor = queue.get()
592 byte_tensor = queue.get()
593 self.assertEqual(float_tensor, all_ones)
594 self.assertEqual(byte_tensor, all_zeros)
595 del float_tensor, byte_tensor
601 def test_variable_sharing(self):
602 for requires_grad
in [
True,
False]:
603 var = torch.arange(1., 26).view(5, 5).requires_grad_(requires_grad)
604 self._test_autograd_sharing(var)
607 @unittest.skipIf(TEST_WITH_ASAN,
608 "non-deterministically hangs with ASAN")
609 def test_leaf_variable_sharing(self):
612 devices.append(
'cuda')
613 for device
in devices:
614 for requires_grad
in [
True,
False]:
615 var = torch.arange(1., 26, device=device).view(5, 5).requires_grad_(requires_grad)
616 self.assertTrue(var.is_leaf)
617 ctx = mp.get_context(
'spawn')
if device ==
'cuda' else mp
620 p = ctx.Process(target=requires_grad_variable_sharing, args=(queue, ready))
625 worker_requires_grad = queue.get()
626 self.assertTrue(worker_requires_grad == requires_grad)
628 def test_non_leaf_variable_sharing(self):
630 for device
in devices:
631 var0 = torch.arange(1., 26, device=device).view(5, 5).requires_grad_(
True)
635 queue = mp.SimpleQueue()
636 self.assertRaisesRegex(RuntimeError,
r'requires_grad',
lambda: queue.put(var))
638 @unittest.skipIf(NO_MULTIPROCESSING_SPAWN,
"Disabled for environments that \ 639 don't support multiprocessing with spawn start method")
640 @unittest.skipIf(
not TEST_CUDA_IPC,
'CUDA IPC not available')
641 def test_cuda_variable_sharing(self):
642 for requires_grad
in [
True,
False]:
643 var = torch.arange(1., 26, device=
'cuda').view(5, 5).requires_grad_(requires_grad)
644 self._test_autograd_sharing(var, mp.get_context(
'spawn'))
646 @unittest.skipIf(NO_MULTIPROCESSING_SPAWN,
"Disabled for environments that \ 647 don't support multiprocessing with spawn start method")
648 @unittest.skipIf(
not TEST_CUDA_IPC,
'CUDA IPC not available')
649 def test_mixed_types_cuda_sharing(self):
650 self._test_mixed_types_cuda_sharing(mp.get_context(
'spawn'))
652 def test_parameter_sharing(self):
653 param = Parameter(torch.arange(1., 26).view(5, 5))
654 self._test_autograd_sharing(param, is_parameter=
True)
656 @unittest.skipIf(NO_MULTIPROCESSING_SPAWN,
"Disabled for environments that \ 657 don't support multiprocessing with spawn start method")
658 @unittest.skipIf(
not TEST_CUDA_IPC,
'CUDA IPC not available')
659 def test_cuda_parameter_sharing(self):
660 param = Parameter(torch.arange(1., 26, device=
'cuda').view(5, 5))
661 self._test_autograd_sharing(param, mp.get_context(
'spawn'), is_parameter=
True)
663 def test_empty_shared(self):
667 def _test_is_shared(self):
668 t = torch.randn(5, 5)
669 self.assertFalse(t.is_shared())
671 self.assertTrue(t.is_shared())
673 @unittest.skipIf(platform ==
'darwin',
"file descriptor strategy is not supported on macOS")
674 def test_is_shared(self):
675 self._test_is_shared()
677 def test_fs_is_shared(self):
679 self._test_is_shared()
682 def test_is_shared_cuda(self):
683 t = torch.randn(5, 5).cuda()
684 self.assertTrue(t.is_shared())
686 @unittest.skip(
'this test occasionally fails and deadlocks; see https://github.com/pytorch/pytorch/issues/5834')
687 def test_backwards_fork(self):
688 r"backwards() should succeed when called before and after a fork" 690 p = mp.Process(target=call_backward)
693 self.assertFalse(p.is_alive())
696 if __name__ ==
'__main__':
def _test_pool(self, ctx=mp, repeat=1)
def _test_sharing(self, ctx=mp, type=torch.FloatTensor, repeat=1)
def _test_preserve_sharing(self, ctx=mp, repeat=1)
def has_shm_files(self, wait=True)
def _test_empty_tensor_sharing(self, dtype, device)
def _get_next_fds(self, n=1)