Caffe2 - Python API
A deep learning, cross platform ML framework
dataset.py
1 ## @package dataset
2 # Module caffe2.python.dataset
3 """
4 Implementation of an in-memory dataset with structured schema.
5 
6 Use this to store and iterate through datasets with complex schema that
7 fit in memory.
8 
9 Iterating through entries of this dataset is very fast since the dataset
10 is stored as a set of native Caffe2 tensors, thus no type conversion or
11 deserialization is necessary.
12 """
13 from __future__ import absolute_import
14 from __future__ import division
15 from __future__ import print_function
16 from __future__ import unicode_literals
17 
18 from caffe2.python import core, workspace
19 from caffe2.python.dataio import Reader, Writer
20 from caffe2.python.schema import (
21  Struct, from_blob_list, from_column_list, InitEmptyRecord)
22 import numpy as np
23 
24 
26  def __init__(self, dataset, name, batch_size=1, enforce_batch_size=False):
27  """Don't call this directly. Instead, use dataset.reader()"""
28  Reader.__init__(self, dataset.content())
29  self.dataset = dataset
30  self.name = name or (dataset.name + '_cursor')
31  self.batch_size = batch_size
32  self.enforce_batch_size = enforce_batch_size
33  self.cursor = None
34 
35  def setup_ex(self, init_net, exit_net):
36  if self.cursor is None:
37  self.cursor = init_net.CreateTreeCursor(
38  [],
39  init_net.NextScopedBlob(self.name),
40  fields=self.dataset.fields)
41 
42  def read(self, read_net):
43  assert self.cursor, 'setup not called.'
44  content = self.dataset.content()
45  with core.NameScope(read_net.NextName(self.name)):
46  fields = read_net.ReadNextBatch(
47  [self.cursor] + content.field_blobs(),
48  content.field_names(),
49  batch_size=self.batch_size,
50  enforce_batch_size=self.enforce_batch_size)
51  fields = core.output_to_list(fields)
52  return (read_net.IsEmpty([fields[0]]), fields)
53 
54  def reset(self, net):
55  net.ResetCursor([self.cursor], [])
56 
57 
59  def __init__(self, dataset, name, indices, batch_size=1, loop_over=False,
60  enforce_batch_size=False):
61  """Don't call this directly. Instead, use dataset.random_reader()"""
62  Reader.__init__(self, dataset.content())
63  self.dataset = dataset
64  self.cursor = None
65  self.name = name or (dataset.name + '_cursor')
66  self.indices = indices
67  self.batch_size = batch_size
68  self.loop_over = loop_over
69  self.enforce_batch_size = enforce_batch_size
70 
71  def setup_ex(self, init_net, exit_net):
72  if self.cursor is None:
73  self.cursor = init_net.CreateTreeCursor(
74  [],
75  init_net.NextScopedBlob(self.name),
76  fields=self.dataset.fields)
77 
78  def reset(self, net):
79  net.ResetCursor([self.cursor], [])
80 
81  def computeoffset(self, net):
82  self.reset(net)
83  offsets = net.ComputeOffset(
84  [self.cursor] + self.dataset.content().field_blobs(),
85  'offsets')
86  self.offsets = offsets
87 
88  def sort_and_shuffle(self, net, sort_by_field=None,
89  shuffle_size=1, batch_size=1):
90  # no sorting by default
91  content = self.dataset.content()
92  sort_by_field_idx = -1
93  if sort_by_field:
94  assert sort_by_field in content.field_names(), (
95  'Must be valid field.')
96  sort_by_field_idx = content.field_names().index(sort_by_field)
97  self.reset(net)
98 
99  indices = net.SortAndShuffle(
100  [self.cursor] + content.field_blobs(),
101  'indices',
102  sort_by_field_idx=sort_by_field_idx,
103  shuffle_size=shuffle_size,
104  batch_size=batch_size)
105  self.indices = indices
106 
107  def read(self, read_net):
108  assert self.cursor, 'setup_ex not called'
109  assert self.indices, 'sort_and_shuffle not called'
110  assert self.offsets, 'computeoffset not called'
111  content = self.dataset.content()
112  with core.NameScope(read_net.NextName(self.name)):
113  fields = read_net.ReadRandomBatch(
114  [self.cursor, self.indices, self.offsets] + (
115  content.field_blobs()),
116  content.field_names(),
117  batch_size=self.batch_size,
118  enforce_batch_size=self.enforce_batch_size,
119  loop_over=self.loop_over)
120  fields = core.output_to_list(fields)
121  return (read_net.IsEmpty([fields[0]]), fields)
122 
123 
125  def __init__(self, content):
126  """Don't call this directly. Use dataset.writer() instead."""
127  self._content = content
128  self.mutex = None
129 
130  def setup_ex(self, init_net, exit_net):
131  if self.mutex is None:
132  self.mutex = init_net.CreateMutex([])
133 
134  def write(self, writer_net, fields):
135  """
136  Add operations to `net` that append the blobs in `fields` to the end
137  of the dataset. An additional operator will also be added that checks
138  the consistency of the data in `fields` against the dataset schema.
139 
140  Args:
141  writer_net: The net that will contain the Append operators.
142  fields: A list of BlobReference to be appeneded to this dataset.
143  """
144  assert self.mutex is not None, 'setup not called.'
145  field_blobs = self._content.field_blobs()
146  assert len(fields) == len(field_blobs), (
147  'Expected %s fields, got %s.' % (len(field_blobs), len(fields)))
148  writer_net.CheckDatasetConsistency(
149  fields, [], fields=self._content.field_names())
150  writer_net.AtomicAppend(
151  [self.mutex] + field_blobs + list(fields),
152  field_blobs)
153 
154  def commit(self, finish_net):
155  """Commit is a no-op for an in-memory dataset."""
156  pass
157 
158 
159 def Const(net, value, dtype=None, name=None):
160  """
161  Create a 'constant' by first creating an external input in the given
162  net, and then feeding the corresponding blob with its provided value
163  in the current workspace. The name is automatically generated in order
164  to avoid clashes with existing blob names.
165  """
166  assert isinstance(net, core.Net), 'net must be a core.Net instance.'
167  value = np.array(value, dtype=dtype)
168  blob = net.AddExternalInput(net.NextName(prefix=name))
169  workspace.FeedBlob(str(blob), value)
170  return blob
171 
172 
173 def execution_step_with_progress(name, init_net, substeps, rows_read):
174  # progress reporter
175  report_net = core.Net('report_net')
176  report_net.Print([rows_read], [])
177  return core.execution_step(
178  name,
179  substeps,
180  report_net=report_net,
181  concurrent_substeps=True,
182  report_interval=5)
183 
184 
185 class Dataset(object):
186  """Represents an in-memory dataset with fixed schema.
187 
188  Use this to store and iterate through datasets with complex schema that
189  fit in memory.
190 
191  Iterating through entries of this dataset is very fast since the dataset
192  is stored as a set of native Caffe2 tensors, thus no type conversion or
193  deserialization is necessary.
194  """
195 
196  def __init__(self, fields, name=None):
197  """Create an un-initialized dataset with schema provided by `fields`.
198 
199  Before this dataset can be used, it must be initialized, either by
200  `init_empty` or `init_from_dataframe`.
201 
202  Args:
203  fields: either a schema.Struct or a list of field names in a format
204  compatible with the one described in schema.py.
205  name: optional name to prepend to blobs that will store the data.
206  """
207  assert isinstance(fields, list) or isinstance(fields, Struct), (
208  'fields must be either a Struct or a list of raw field names.')
209  if isinstance(fields, list):
210  fields = from_column_list(fields)
211  self.schema = fields
212  self.fields = fields.field_names()
213  self.field_types = fields.field_types()
214  self.name = name or 'dataset'
215  self.field_blobs = fields.field_blobs() if fields.has_blobs() else None
216 
217  def trim(self, net, multiple_of):
218  """
219  Trims the contents of this dataset so that the number of records is
220  multiple of the given argument.
221  """
222  net.TrimDataset(
223  self.field_blobs,
224  self.field_blobs,
225  fields=self.fields,
226  multiple_of=multiple_of)
227 
228  def init_empty(self, init_net):
229  """Initialize the blobs for this dataset with empty values.
230 
231  Empty arrays will be immediately fed into the current workspace,
232  and `init_net` will take those blobs as external inputs.
233  """
234  self.field_blobs = InitEmptyRecord(
235  init_net, self.schema.clone_schema()).field_blobs()
236 
237  def init_from_dataframe(self, net, dataframe):
238  """Initialize the blobs for this dataset from a Pandas dataframe.
239 
240  Each column of the dataframe will be immediately fed into the current
241  workspace, and the `net` will take this blobs as external inputs.
242  """
243  assert len(self.fields) == len(dataframe.columns)
244  self.field_blobs = [
245  Const(net, dataframe.as_matrix([col]).flatten(), name=field)
246  for col, field in enumerate(self.fields)]
247 
248  def get_blobs(self):
249  """
250  Return the list of BlobReference pointing to the blobs that contain
251  the data for this dataset.
252  """
253  assert self
254  return self.field_blobs
255 
256  def content(self):
257  """
258  Return a Record of BlobReferences pointing to the full content of
259  this dataset.
260  """
261  return from_blob_list(self.schema, self.field_blobs)
262 
263  def field_names(self):
264  """Return the list of field names for this dataset."""
265  return self.fields
266 
267  def field_types(self):
268  """
269  Return the list of field dtypes for this dataset.
270 
271  If a list of strings, not a schema.Struct, was passed to the
272  constructor, this will return a list of dtype(np.void).
273  """
274  return self.field_types
275 
276  def reader(self, init_net=None, cursor_name=None, batch_size=1,
277  enforce_batch_size=False):
278  """Create a Reader object that is used to iterate through the dataset.
279 
280  This will append operations to `init_net` that create a TreeCursor,
281  used to iterate through the data.
282 
283  NOTE: Currently, it is not safe to append to a dataset while reading.
284 
285  Args:
286  init_net: net that will be run once to create the cursor.
287  cursor_name: optional name for the blob containing a pointer
288  to the cursor.
289  batch_size: how many samples to read per iteration.
290 
291  Returns:
292  A _DatasetReader that can be used to create operators that will
293  iterate through the dataset.
294  """
295  assert self.field_blobs, 'Dataset not initialized.'
296  reader = _DatasetReader(self, cursor_name, batch_size,
297  enforce_batch_size)
298  if init_net is not None:
299  reader.setup_ex(init_net, None)
300  return reader
301 
302  def random_reader(self, init_net=None, indices=None, cursor_name=None,
303  batch_size=1, loop_over=False, enforce_batch_size=False):
304  """Create a Reader object that is used to iterate through the dataset.
305 
306  NOTE: The reader order depends on the order in indices.
307 
308  Args:
309  init_net: net that will be run once to create the cursor.
310  indices: blob of reading order
311  cursor_name: optional name for the blob containing a pointer
312  to the cursor.
313  batch_size: how many samples to read per iteration.
314  loop_over: repeat the dataset indefinitely (in the same order)
315 
316  Returns:
317  A DatasetReader that can be used to create operators that will
318  iterate through the dataset according to indices.
319  """
320  assert self.field_blobs, 'Dataset not initialized.'
321  reader = _DatasetRandomReader(
322  self, cursor_name, indices, batch_size, loop_over,
323  enforce_batch_size)
324  if init_net is not None:
325  reader.setup_ex(init_net, None)
326  return reader
327 
328  def writer(self, init_net=None):
329  """Create a Writer that can be used to append entries into the dataset.
330 
331  NOTE: Currently, it is not safe to append to a dataset
332  while reading from it.
333  NOTE: Currently implementation of writer is not thread safe.
334  TODO: fixme
335 
336  Args:
337  init_net: net that will be run once in order to create the writer.
338  (currently not used)
339  """
340  assert self.field_blobs, 'Dataset not initialized.'
341  writer = _DatasetWriter(self.content())
342  if init_net is not None:
343  writer.setup_ex(init_net, None)
344  return writer
def writer(self, init_net=None)
Definition: dataset.py:328
def random_reader(self, init_net=None, indices=None, cursor_name=None, batch_size=1, loop_over=False, enforce_batch_size=False)
Definition: dataset.py:303
def init_empty(self, init_net)
Definition: dataset.py:228
def write(self, writer_net, fields)
Definition: dataset.py:134
def __init__(self, dataset, name, indices, batch_size=1, loop_over=False, enforce_batch_size=False)
Definition: dataset.py:60
def commit(self, finish_net)
Definition: dataset.py:154
def init_from_dataframe(self, net, dataframe)
Definition: dataset.py:237
def __init__(self, fields, name=None)
Definition: dataset.py:196
def trim(self, net, multiple_of)
Definition: dataset.py:217
def reset(self, net)
Definition: dataio.py:93
def reader(self, init_net=None, cursor_name=None, batch_size=1, enforce_batch_size=False)
Definition: dataset.py:277
def __init__(self, dataset, name, batch_size=1, enforce_batch_size=False)
Definition: dataset.py:26