Caffe2 - Python API
A deep learning, cross platform ML framework
queue.py
1 import io
2 import multiprocessing
3 import multiprocessing.queues
4 from multiprocessing.reduction import ForkingPickler
5 import pickle
6 
7 
8 class ConnectionWrapper(object):
9  """Proxy class for _multiprocessing.Connection which uses ForkingPickler to
10  serialize objects"""
11 
12  def __init__(self, conn):
13  self.conn = conn
14 
15  def send(self, obj):
16  buf = io.BytesIO()
17  ForkingPickler(buf, pickle.HIGHEST_PROTOCOL).dump(obj)
18  self.send_bytes(buf.getvalue())
19 
20  def recv(self):
21  buf = self.recv_bytes()
22  return pickle.loads(buf)
23 
24  def __getattr__(self, name):
25  if 'conn' in self.__dict__:
26  return getattr(self.conn, name)
27  raise AttributeError("'{}' object has no attribute '{}'".format(
28  type(self).__name__, 'conn'))
29 
30 
31 class Queue(multiprocessing.queues.Queue):
32 
33  def __init__(self, *args, **kwargs):
34  super(Queue, self).__init__(*args, **kwargs)
35  self._reader = ConnectionWrapper(self._reader)
36  self._writer = ConnectionWrapper(self._writer)
37  self._send = self._writer.send
38  self._recv = self._reader.recv
39 
40 
41 class SimpleQueue(multiprocessing.queues.SimpleQueue):
42 
43  def _make_methods(self):
44  if not isinstance(self._reader, ConnectionWrapper):
45  self._reader = ConnectionWrapper(self._reader)
46  self._writer = ConnectionWrapper(self._writer)
47  super(SimpleQueue, self)._make_methods()