Caffe2 - Python API
A deep learning, cross platform ML framework
pool.py
1 import multiprocessing
2 import multiprocessing.pool
3 import multiprocessing.util as util
4 
5 from .queue import SimpleQueue
6 
7 
8 def clean_worker(*args, **kwargs):
9  import gc
10  multiprocessing.pool.worker(*args, **kwargs)
11  # Regular multiprocessing workers don't fully clean up after themselves,
12  # so we have to explicitly trigger garbage collection to make sure that all
13  # destructors are called...
14  gc.collect()
15 
16 
18  """Pool implementation which uses our version of SimpleQueue.
19  This lets us pass tensors in shared memory across processes instead of
20  serializing the underlying data."""
21 
22  def _setup_queues(self):
23  self._inqueue = SimpleQueue()
24  self._outqueue = SimpleQueue()
25  self._quick_put = self._inqueue._writer.send
26  self._quick_get = self._outqueue._reader.recv
27 
28  def _repopulate_pool(self):
29  """Bring the number of pool processes up to the specified number,
30  for use after reaping workers which have exited.
31  """
32  for i in range(self._processes - len(self._pool)):
33  # changed worker -> clean_worker
34  args = (self._inqueue, self._outqueue,
35  self._initializer,
36  self._initargs, self._maxtasksperchild)
37  if hasattr(self, '_wrap_exception'):
38  args += (self._wrap_exception,)
39  w = self.Process(target=clean_worker, args=args)
40  self._pool.append(w)
41  w.name = w.name.replace('Process', 'PoolWorker')
42  w.daemon = True
43  w.start()
44  util.debug('added worker')