Caffe2 - Python API
A deep learning, cross platform ML framework
reductions.py
1 import torch
3 import os
4 import weakref
5 import threading
6 import multiprocessing
7 from multiprocessing.reduction import ForkingPickler
8 import sys
9 try:
10  # Early load resource_sharer to prevent a partially initialized instance
11  # from being inherited in a forked child process. The reduce_storage method
12  # requires this module indirectly through DupFd(). The built-in mp.Queue
13  # class pickles arguments in a background thread which may overlap with the
14  # fork.
15  import multiprocessing.resource_sharer
16 except ImportError:
17  pass
18 
19 
20 class StorageWeakRef(object):
21  r"""A weak reference to a Storage.
22 
23  The cdata member is a Python number containing the integer representation of
24  the Storage pointer."""
25 
26  def __init__(self, storage):
27  self.cdata = storage._weak_ref()
28  # Save a direct reference to _free_weak_ref because the `torch` module
29  # might be cleared during Python shutdown before this module is cleared.
30  self._free_weak_ref = torch.Storage._free_weak_ref
31 
32  def expired(self):
33  return torch.Storage._expired(self.cdata)
34 
35  def __del__(self):
36  self._free_weak_ref(self.cdata)
37 
38 
39 class SharedCache(dict):
40  """dictionary from multiprocessing handles to StorageWeakRef"""
41 
42  def __init__(self):
43  # free_dead_references() is called if the len exceeds the current
44  # limit. The limit scales with the number of remaining live objects.
45  self.limit = 128
46  self.lock = threading.Lock()
47 
48  def __setitem__(self, key, storage_ref):
49  dict.__setitem__(self, key, storage_ref)
50  if len(self) > self.limit:
52 
53  def free_dead_references(self):
54  # Multiple Python threads may call free_dead_references() concurrently.
55  # Without a lock, they may try deleting the same entry multiple times.
56  with self.lock:
57  live = 0
58  for key, storage_ref in list(self.items()):
59  if storage_ref.expired():
60  del self[key]
61  else:
62  live += 1
63  self.limit = max(128, live * 2)
64 
65 
66 # mapping from handles to StorageWeakRef objects
67 shared_cache = SharedCache()
68 
69 
70 def rebuild_event(device, handle):
71  return torch.cuda.Event.from_ipc_handle(device, handle)
72 
73 
74 def reduce_event(event):
75  handle = event.ipc_handle()
76  return (rebuild_event, (event.device, handle))
77 
78 
79 def rebuild_tensor(cls, storage, metadata):
80  storage_offset, size, stride, requires_grad = metadata
81  t = torch._utils._rebuild_tensor(storage, storage_offset, size, stride)
84  t.requires_grad = requires_grad
85  return t
86 
87 
88 def rebuild_cuda_tensor(tensor_cls, tensor_size, tensor_stride, tensor_offset,
89  storage_cls, storage_device, storage_handle, storage_size_bytes, storage_offset_bytes,
90  requires_grad):
91  # If storage_handle is None, storage points to nullptr.
92  if storage_handle is None or storage_size_bytes == 0:
93  storage = storage_cls(0)
94  else:
95  storage = storage_from_cache(storage_cls, (storage_handle, storage_offset_bytes))
96  if storage is None:
98  storage = storage_cls._new_shared_cuda(
99  storage_device,
100  storage_handle,
101  storage_size_bytes,
102  storage_offset_bytes)
103  shared_cache[(storage_handle, storage_offset_bytes)] = StorageWeakRef(storage)
104 
105  t = torch._utils._rebuild_tensor(storage, tensor_offset, tensor_size, tensor_stride)
106  if tensor_cls == torch.nn.parameter.Parameter:
108  t.requires_grad = requires_grad
109  return t
110 
111 
112 def reduce_tensor(tensor):
113  storage = tensor.storage()
114 
115  if tensor.requires_grad and not tensor.is_leaf:
116  raise RuntimeError("Cowardly refusing to serialize non-leaf tensor which requires_grad, "
117  "since autograd does not support crossing process boundaries. "
118  "If you just want to transfer the data, call detach() on the tensor "
119  "before serializing (e.g., putting it on the queue).")
120 
122 
123  # Note [CUDA IPC and the caching allocator]
124  # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
125  # When you send a CUDA tensor over IPC, you might expect that you will
126  # get out the same storage from the other end. However, the CUDA caching
127  # allocator makes it difficult to preserve this invariant. Consider
128  # the following situation: a tensor of size 0x100 points to offset 0x20 of
129  # a storage at 0xA100 of size 0x100. (For simplicity, all of these
130  # sizes are given in bytes). HOWEVER, with the caching allocator, this storage
131  # might be part of a larger cudaMalloc allocation 0xA000 of size 0x4000.
132  #
133  # When we want to send this CUDA tensor over IPC, we must send the
134  # *entire* cudaMalloc allocation, i.e., the 0xA000 region, not just
135  # the storage 0xA100 (because that is what CUDA supports). So, on the
136  # other end, there simply isn't any way to say, "Wait, you gave me
137  # a bigger region (0xA000) than the one I wanted (0xA100)".
138  #
139  # OK, so if you sent the cudaMalloc allocation, can you just wrap that up as
140  # one storage itself? No, because this cudaMalloc allocation might contain
141  # storages of mixed types: float, bytes, double... If you make the entire
142  # allocation a single storage of a type A, we'll hit an error when constructing
143  # a tensor of type B on the storage.
144  #
145  # cudaIpcMemHandle is an identifier to access the sender cudaMalloc allocation on the
146  # receiver side. However, cudaIpcMemHandles from each device in a given process may
147  # only be opened by one context per device per other process.
148  # If we open and close a memory handle multiples times in a process, CUDA is allowed
149  # to give it a different address; similarly, once we close the memory, we're not
150  # allowed to access it(and the storage/tensor built on top of it), even if it is
151  # still live in the original process. As we cannot make a cudaMalloc allocation
152  # to a single storage in one go, this requires us to cache the device pointer for
153  # each cudaIpcMemHandle on C++ side to reconstruct types of storages, while keep
154  # the old ones alives.
155  # See [https://docs.nvidia.com/cuda/cuda-runtime-api/group__CUDART__DEVICE.html]
156  #
157  # This is fine, because all we need to do is to save our position in the allocaiton,
158  # and reconstruct storage and tensor from it.
159  # 0xA000 -> -------CUDA Allocation------
160  # | |
161  # | |
162  # | |
163  # | |
164  # 0xA100 -> --------storage1 begin------
165  # | |
166  # 0xA120 -> --------tensor1 begin ------
167  # | |
168  # | |
169  # | |
170  # | |
171  # | |
172  # 0xA160 -> --------tensor1 end---------
173  # | |
174  # | |
175  # | |
176  # 0xA200 -> --------storage1 end--------
177  # | |
178  # 0xE000 -> --------CUDA allocation-----
179  #
180  # To send tensor1, the following info are required from sender to receiver for
181  # storage recontruction.
182  # 1. cudaIpcMemHandle of 0xA000(which can be mapped to a basePtr in receiver process).
183  # basePtr may not be exactly 0xA000 since it's a different process.
184  # 2. offset(0xA100) of storage1 in the CUDA allocation.
185  # 3. size of storage1(0x100).
186  #
187  # On receiver side:
188  # 1. Get the devPtr of the MemHandle to access the memory, reconstruct a storage
189  # of the same type using (basePtr, offset, size).
190  # 2. we can reconstruct the tensor on top of the recontructed storage
191  # Tensor(size=0x040, offset=0x020, storage=Storage(data=basePtr+0xA100, size=0x0100))
192  #
193  # This strategy has a few implications:
194  #
195  # 1. When we serialize a CUDA tensor for IPC, we cannot do it all in one
196  # go (non-compositionally), and this requires to have a global map
197  # memHandle -> devPtr for each process.
198  #
199  # 2. We MUST NOT let the new IPC tensor be resizable. Originally, a resize
200  # of the storage beyond 0x100 would merely have caused us to do a
201  # reallocation. You don't really want to do this, but if you did,
202  # all that would happen is that you would lose IPC sharing. But if
203  # you do this in the new world, we will happily let you write out of
204  # bounds of your "allocation", clobbering unrelated data in the cached
205  # allocator block. BAD!
206  #
207  # By the way, in old versions of PyTorch, we supported this situation
208  # natively using a "storage view", which permitted multiple storages to be
209  # views on each other. But this was the *only* use of storage views, so we
210  # eliminated it so that we could just use tensor views to implement the same
211  # thing.
212  #
213  if storage.is_cuda:
214  (device, handle, storage_size_bytes, storage_offset_bytes) = storage._share_cuda_()
215  tensor_offset = tensor.storage_offset()
216 
217  shared_cache[handle] = StorageWeakRef(storage)
218 
219  # _backward_hooks purposely omitted here, see
220  # Note [Don't serialize hooks]
221  return (rebuild_cuda_tensor,
222  (type(tensor),
223  tensor.size(),
224  tensor.stride(),
225  tensor_offset, # tensor offset in its storage
226  type(storage),
227  device,
228  handle, # identifier which CUDA allocation is the storage in.
229  storage_size_bytes, # size(in bytes) of the storage
230  storage_offset_bytes, # offset(in bytes) of the storage in the CUDA allocation
231  tensor.requires_grad))
232 
233  # _backward_hooks purposely omitted here, see Note [Don't serialize hooks]
234  metadata = (tensor.storage_offset(), tensor.size(), tensor.stride(), tensor.requires_grad)
235  return (rebuild_tensor, (type(tensor), storage, metadata))
236 
237 
238 def fd_id(fd):
239  # Returns a tuple which uniquely identifies a file descriptor. In Mac OS,
240  # this doesn't work with shared memory handles, which is why we don't
241  # support the "file_descriptor" sharing method on that platform.
242  stat = os.fstat(fd)
243  return (stat.st_ino, stat.st_dev)
244 
245 
246 def storage_from_cache(cls, key):
247  storage_ref = shared_cache.get(key)
248  if storage_ref is None:
249  return None
250  return cls._new_with_weak_ptr(storage_ref.cdata)
251 
252 
253 def rebuild_storage_fd(cls, df, size):
254  if sys.version_info[0] == 2:
255  fd = multiprocessing.reduction.rebuild_handle(df)
256  else:
257  fd = df.detach()
258  try:
259  storage = storage_from_cache(cls, fd_id(fd))
260  if storage is not None:
261  return storage
262  storage = cls._new_shared_fd(fd, size)
263  shared_cache[fd_id(fd)] = StorageWeakRef(storage)
264  return storage
265  finally:
266  os.close(fd)
267 
268 
269 def rebuild_storage_filename(cls, manager, handle, size):
270  storage = storage_from_cache(cls, handle)
271  if storage is not None:
272  return storage._shared_decref()
273  storage = cls._new_shared_filename(manager, handle, size)
274  shared_cache[handle] = StorageWeakRef(storage)
275  return storage._shared_decref()
276 
277 
278 def rebuild_storage_empty(cls):
279  return cls()
280 
281 
282 def reduce_storage(storage):
283  from . import get_sharing_strategy
284  if storage.is_cuda:
285  raise RuntimeError("Cannot pickle CUDA storage; try pickling a CUDA tensor instead")
286  elif get_sharing_strategy() == 'file_system':
287  metadata = storage._share_filename_()
288  cache_key = metadata[1]
289  rebuild = rebuild_storage_filename
290  storage._shared_incref()
291  elif storage.size() == 0:
292  # This is special cased because Empty tensors
293  # (with size 0) cannot be mmapped.
294  return (rebuild_storage_empty, (type(storage),))
295  else:
296  fd, size = storage._share_fd_()
297  if sys.version_info[0] == 2:
298  df = multiprocessing.reduction.reduce_handle(fd)
299  else:
300  df = multiprocessing.reduction.DupFd(fd)
301  cache_key = fd_id(fd)
302  metadata = (df, size)
303  rebuild = rebuild_storage_fd
304 
305  shared_cache[cache_key] = StorageWeakRef(storage)
306  return (rebuild, (type(storage),) + metadata)
307 
308 
309 def init_reductions():
310  ForkingPickler.register(torch.cuda.Event, reduce_event)
311 
312  for t in torch._storage_classes:
313  ForkingPickler.register(t, reduce_storage)
314 
315  for t in torch._tensor_classes:
316  ForkingPickler.register(t, reduce_tensor)
317 
318  # TODO: Maybe this should be in tensor_classes? :)
319  ForkingPickler.register(torch.Tensor, reduce_tensor)
320  ForkingPickler.register(torch.nn.parameter.Parameter, reduce_tensor)
def _rebuild_tensor(storage, storage_offset, size, stride)
Definition: _utils.py:127
def _lazy_init()
Definition: __init__.py:148
def get_sharing_strategy()
Definition: __init__.py:68
def warn_if_has_hooks(tensor)
Definition: hooks.py:51