Caffe2 - Python API
A deep learning, cross platform ML framework
worker.py
1 r""""Contains definitions of the methods used by the _DataLoaderIter workers.
2 
3 These **needs** to be in global scope since Py2 doesn't support serializing
4 static methods.
5 """
6 
7 import torch
8 import random
9 import sys
10 import os
11 from torch._six import queue
12 from . import collate, signal_handling, MP_STATUS_CHECK_INTERVAL, \
13  ExceptionWrapper, IS_WINDOWS
14 
15 if IS_WINDOWS:
16  import ctypes
17  from ctypes.wintypes import DWORD, BOOL, HANDLE
18 
19  # On Windows, the parent ID of the worker process remains unchanged when the manager process
20  # is gone, and the only way to check it through OS is to let the worker have a process handle
21  # of the manager and ask if the process status has changed.
22  class ManagerWatchdog(object):
23  def __init__(self):
24  self.manager_pid = os.getppid()
25 
26  self.kernel32 = ctypes.WinDLL('kernel32', use_last_error=True)
27  self.kernel32.OpenProcess.argtypes = (DWORD, BOOL, DWORD)
28  self.kernel32.OpenProcess.restype = HANDLE
29  self.kernel32.WaitForSingleObject.argtypes = (HANDLE, DWORD)
30  self.kernel32.WaitForSingleObject.restype = DWORD
31 
32  # Value obtained from https://msdn.microsoft.com/en-us/library/ms684880.aspx
33  SYNCHRONIZE = 0x00100000
34  self.manager_handle = self.kernel32.OpenProcess(SYNCHRONIZE, 0, self.manager_pid)
35 
36  if not self.manager_handle:
37  raise ctypes.WinError(ctypes.get_last_error())
38 
39  self.manager_dead = False
40 
41  def is_alive(self):
42  if not self.manager_dead:
43  # Value obtained from https://msdn.microsoft.com/en-us/library/windows/desktop/ms687032.aspx
44  self.manager_dead = self.kernel32.WaitForSingleObject(self.manager_handle, 0) == 0
45  return not self.manager_dead
46 else:
47  class ManagerWatchdog(object):
48  def __init__(self):
49  self.manager_pid = os.getppid()
50  self.manager_dead = False
51 
52  def is_alive(self):
53  if not self.manager_dead:
54  self.manager_dead = os.getppid() != self.manager_pid
55  return not self.manager_dead
56 
57 
58 def _worker_loop(dataset, index_queue, data_queue, done_event, collate_fn, seed, init_fn, worker_id):
59  # See NOTE [ Data Loader Multiprocessing Shutdown Logic ] for details on the
60  # logic of this function.
61 
62  try:
63  collate._use_shared_memory = True
64 
65  # Intialize C side signal handlers for SIGBUS and SIGSEGV. Python signal
66  # module's handlers are executed after Python returns from C low-level
67  # handlers, likely when the same fatal signal had already happened
68  # again.
69  # https://docs.python.org/3/library/signal.html#execution-of-python-signal-handlers
70  signal_handling._set_worker_signal_handlers()
71 
72  torch.set_num_threads(1)
73  random.seed(seed)
74  torch.manual_seed(seed)
75 
76  data_queue.cancel_join_thread()
77 
78  if init_fn is not None:
79  init_fn(worker_id)
80 
81  watchdog = ManagerWatchdog()
82 
83  while watchdog.is_alive():
84  try:
85  r = index_queue.get(timeout=MP_STATUS_CHECK_INTERVAL)
86  except queue.Empty:
87  continue
88  if r is None:
89  # Received the final signal
90  assert done_event.is_set()
91  return
92  elif done_event.is_set():
93  # Done event is set. But I haven't received the final signal
94  # (None) yet. I will keep continuing until get it, and skip the
95  # processing steps.
96  continue
97  idx, batch_indices = r
98  try:
99  samples = collate_fn([dataset[i] for i in batch_indices])
100  except Exception:
101  # It is important that we don't store exc_info in a variable,
102  # see NOTE [ Python Traceback Reference Cycle Problem ]
103  data_queue.put((idx, ExceptionWrapper(sys.exc_info())))
104  else:
105  data_queue.put((idx, samples))
106  del samples
107  except KeyboardInterrupt:
108  # Main process will raise KeyboardInterrupt anyways.
109  pass