Caffe2 - Python API
A deep learning, cross platform ML framework
timeout_guard.py
1 # Copyright (c) 2016-present, Facebook, Inc.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 ##############################################################################
15 
16 ## @package timeout_guard
17 # Module caffe2.python.timeout_guard
18 from __future__ import absolute_import
19 from __future__ import division
20 from __future__ import print_function
21 from __future__ import unicode_literals
22 
23 import contextlib
24 import threading
25 import os
26 import time
27 import signal
28 import logging
29 from future.utils import viewitems
30 
31 
32 '''
33 Sometimes CUDA devices can get stuck, 'deadlock'. In this case it is often
34 better just the kill the process automatically. Use this guard to set a
35 maximum timespan for a python call, such as RunNet(). If it does not complete
36 in time, process is killed.
37 
38 Example usage:
39  with timeout_guard.CompleteInTimeOrDie(10.0):
40  core.RunNet(...)
41 '''
42 
43 
44 class WatcherThread(threading.Thread):
45 
46  def __init__(self, timeout_secs):
47  threading.Thread.__init__(self)
48  self.timeout_secs = timeout_secs
49  self.completed = False
50  self.condition = threading.Condition()
51  self.daemon = True
52  self.caller_thread = threading.current_thread()
53 
54  def run(self):
55  started = time.time()
56  self.condition.acquire()
57  while time.time() - started < self.timeout_secs and not self.completed:
58  self.condition.wait(self.timeout_secs - (time.time() - started))
59  self.condition.release()
60  if not self.completed:
61  log = logging.getLogger("timeout_guard")
62  log.error("Call did not finish in time. Timeout:{}s PID: {}".format(
63  self.timeout_secs,
64  os.getpid(),
65  ))
66 
67  # First try dying cleanly, but in 10 secs, exit properly
68  def forcequit():
69  time.sleep(10.0)
70  log.info("Prepared output, dumping threads. ")
71  print("Caller thread was: {}".format(self.caller_thread))
72  print("-----After force------")
73  import sys
74  import traceback
75  code = []
76  for threadId, stack in viewitems(sys._current_frames()):
77  if threadId == self.caller_thread.ident:
78  code.append("\n# ThreadID: %s" % threadId)
79  for filename, lineno, name, line in traceback.extract_stack(stack):
80  code.append('File: "%s", line %d, in %s' % (filename, lineno, name))
81  if line:
82  code.append(" %s" % (line.strip()))
83 
84  print("\n".join(code))
85  log.error("Process did not terminate cleanly in 10 s, forcing")
86  os.abort()
87 
88  forcet = threading.Thread(target=forcequit, args=())
89  forcet.daemon = True
90  forcet.start()
91  print("Caller thread was: {}".format(self.caller_thread))
92  print("-----Before forcing------")
93  import sys
94  import traceback
95  code = []
96  for threadId, stack in viewitems(sys._current_frames()):
97  code.append("\n# ThreadID: %s" % threadId)
98  for filename, lineno, name, line in traceback.extract_stack(stack):
99  code.append('File: "%s", line %d, in %s' % (filename, lineno, name))
100  if line:
101  code.append(" %s" % (line.strip()))
102 
103  print("\n".join(code))
104  os.kill(os.getpid(), signal.SIGINT)
105 
106 
107 @contextlib.contextmanager
108 def CompleteInTimeOrDie(timeout_secs):
109  watcher = WatcherThread(timeout_secs)
110  watcher.start()
111  yield
112  watcher.completed = True
113  watcher.condition.acquire()
114  watcher.condition.notify()
115  watcher.condition.release()
116 
117 
118 def EuthanizeIfNecessary(timeout_secs=120):
119  '''
120  Call this if you have problem with process getting stuck at shutdown.
121  It will kill the process if it does not terminate in timeout_secs.
122  '''
123  watcher = WatcherThread(timeout_secs)
124  watcher.start()