Caffe2 - Python API
A deep learning, cross platform ML framework
timeout_guard.py
1 ## @package timeout_guard
2 # Module caffe2.python.timeout_guard
3 from __future__ import absolute_import
4 from __future__ import division
5 from __future__ import print_function
6 from __future__ import unicode_literals
7 
8 import contextlib
9 import threading
10 import os
11 import time
12 import signal
13 import logging
14 from future.utils import viewitems
15 
16 
17 '''
18 Sometimes CUDA devices can get stuck, 'deadlock'. In this case it is often
19 better just the kill the process automatically. Use this guard to set a
20 maximum timespan for a python call, such as RunNet(). If it does not complete
21 in time, process is killed.
22 
23 Example usage:
24  with timeout_guard.CompleteInTimeOrDie(10.0):
25  core.RunNet(...)
26 '''
27 
28 
29 class WatcherThread(threading.Thread):
30 
31  def __init__(self, timeout_secs):
32  threading.Thread.__init__(self)
33  self.timeout_secs = timeout_secs
34  self.completed = False
35  self.condition = threading.Condition()
36  self.daemon = True
37  self.caller_thread = threading.current_thread()
38 
39  def run(self):
40  started = time.time()
41  self.condition.acquire()
42  while time.time() - started < self.timeout_secs and not self.completed:
43  self.condition.wait(self.timeout_secs - (time.time() - started))
44  self.condition.release()
45  if not self.completed:
46  log = logging.getLogger("timeout_guard")
47  log.error("Call did not finish in time. Timeout:{}s PID: {}".format(
48  self.timeout_secs,
49  os.getpid(),
50  ))
51 
52  # First try dying cleanly, but in 10 secs, exit properly
53  def forcequit():
54  time.sleep(10.0)
55  log.info("Prepared output, dumping threads. ")
56  print("Caller thread was: {}".format(self.caller_thread))
57  print("-----After force------")
58  import sys
59  import traceback
60  code = []
61  for threadId, stack in viewitems(sys._current_frames()):
62  if threadId == self.caller_thread.ident:
63  code.append("\n# ThreadID: %s" % threadId)
64  for filename, lineno, name, line in traceback.extract_stack(stack):
65  code.append('File: "%s", line %d, in %s' % (filename, lineno, name))
66  if line:
67  code.append(" %s" % (line.strip()))
68 
69  print("\n".join(code))
70  log.error("Process did not terminate cleanly in 10 s, forcing")
71  os.abort()
72 
73  forcet = threading.Thread(target=forcequit, args=())
74  forcet.daemon = True
75  forcet.start()
76  print("Caller thread was: {}".format(self.caller_thread))
77  print("-----Before forcing------")
78  import sys
79  import traceback
80  code = []
81  for threadId, stack in viewitems(sys._current_frames()):
82  code.append("\n# ThreadID: %s" % threadId)
83  for filename, lineno, name, line in traceback.extract_stack(stack):
84  code.append('File: "%s", line %d, in %s' % (filename, lineno, name))
85  if line:
86  code.append(" %s" % (line.strip()))
87 
88  print("\n".join(code))
89  os.kill(os.getpid(), signal.SIGINT)
90 
91 
92 @contextlib.contextmanager
93 def CompleteInTimeOrDie(timeout_secs):
94  watcher = WatcherThread(timeout_secs)
95  watcher.start()
96  yield
97  watcher.completed = True
98  watcher.condition.acquire()
99  watcher.condition.notify()
100  watcher.condition.release()
101 
102 
103 def EuthanizeIfNecessary(timeout_secs=120):
104  '''
105  Call this if you have problem with process getting stuck at shutdown.
106  It will kill the process if it does not terminate in timeout_secs.
107  '''
108  watcher = WatcherThread(timeout_secs)
109  watcher.start()