Caffe2 - Python API
A deep learning, cross platform ML framework
cached_reader.py
1 ## @package cached_reader
2 # Module caffe2.python.cached_reader
3 from __future__ import absolute_import
4 from __future__ import division
5 from __future__ import print_function
6 from __future__ import unicode_literals
7 
8 import os
9 
10 from caffe2.python import core
11 from caffe2.python.db_file_reader import DBFileReader
12 from caffe2.python.pipeline import pipe
13 from caffe2.python.task import Cluster, TaskGroup
14 
15 
17 
18  default_name_suffix = 'cached_reader'
19 
20  """Reader with persistent in-file cache.
21 
22  Example usage:
23  cached_reader = CachedReader(
24  reader,
25  db_path='/tmp/cache.db',
26  db_type='LevelDB',
27  )
28  build_cache_step = cached_reader.build_cache_step()
29  with LocalSession() as session:
30  session.run(build_cache_step)
31 
32  Every time new CachedReader is created, it's expected that
33  db_path exists before calling .setup_ex(...) and .read(...).
34 
35  If db_path doesn't exist, it's expected build_cache_step to be called
36  first to build a cache at db_path.
37 
38  build_cache_step will check existence of provided db_path and in case
39  it's missing will initialize it by reading data from original reader.
40  All consequent attempts to read will ignore original reader
41  (i.e. no additional data will be read from it).
42 
43  Args:
44  original_reader: Reader.
45  If provided, it's the original reader used to build the cache file.
46  db_path: str.
47  db_type: str. DB type of file. A db_type is registed by
48  `REGISTER_CAFFE2_DB(<db_type>, <DB Class>)`.
49  Default to 'LevelDB'.
50  name: str or None. Name of CachedReader.
51  Optional name to prepend to blobs that will store the data.
52  Default to '<db_name>_<default_name_suffix>'.
53  batch_size: int.
54  How many examples are read for each time the read_net is run.
55  """
56  def __init__(
57  self,
58  original_reader,
59  db_path,
60  db_type='LevelDB',
61  name=None,
62  batch_size=100,
63  ):
64  assert original_reader is not None, "original_reader can't be None"
65  self.original_reader = original_reader
66 
67  super(CachedReader, self).__init__(
68  db_path,
69  db_type,
70  name,
71  batch_size,
72  )
73 
74  def _init_reader_schema(self, *args, **kwargs):
75  """Prepare the reader schema.
76 
77  Since an original reader is given,
78  use it's schema as ground truth.
79 
80  Returns:
81  schema: schema.Struct. Used in Reader.__init__(...).
82  """
83  return self.original_reader._schema
84 
85  def build_cache_step(self, overwrite=False):
86  """Build a step for generating cache DB file.
87 
88  If self.db_path exists and not overwritting, build an empty step.
89  Overwise, build a step as follows.
90  Pipe original reader to the _DatasetWriter,
91  so that dataset field blobs are populated.
92  Then save these blobs into a file.
93 
94  Args:
95  overwrite: bool. If true, ignore the existing file
96  and build a new one overwritting the existing one anyway.
97 
98  Returns:
99  build_cache_step: ExcutionStep.
100  The step to be run for building a cache DB file.
101  """
102  if os.path.exists(self.db_path) and not overwrite:
103  # cache already exists, no need to rebuild it
104  return core.execution_step('build_step', [])
105 
106  init_net = core.Net('init')
107  self._init_field_blobs_as_empty(init_net)
108  with Cluster(), core.NameScope(self.name), TaskGroup() as copy_tg:
109  pipe(self.original_reader, self.ds.writer(), num_threads=16)
110  copy_step = copy_tg.to_task().get_step()
111  save_net = core.Net('save')
112  self._save_field_blobs_to_db_file(save_net)
113 
114  return core.execution_step('build_cache', [init_net, copy_step, save_net])
115 
116  def _save_field_blobs_to_db_file(self, net):
117  """Save dataset field blobs to a DB file at db_path"""
118  net.Save(
119  self.ds.get_blobs(),
120  [],
121  db=self.db_path,
122  db_type=self.db_type,
123  blob_name_overrides=self.ds.field_names(),
124  absolute_path=True,
125  )
def build_cache_step(self, overwrite=False)