Caffe2 - Python API
A deep learning, cross platform ML framework
dataset.py
1 # Copyright (c) 2016-present, Facebook, Inc.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 ##############################################################################
15 
16 ## @package dataset
17 # Module caffe2.python.dataset
18 """
19 Implementation of an in-memory dataset with structured schema.
20 
21 Use this to store and iterate through datasets with complex schema that
22 fit in memory.
23 
24 Iterating through entries of this dataset is very fast since the dataset
25 is stored as a set of native Caffe2 tensors, thus no type conversion or
26 deserialization is necessary.
27 """
28 from __future__ import absolute_import
29 from __future__ import division
30 from __future__ import print_function
31 from __future__ import unicode_literals
32 
33 from caffe2.python import core, workspace
34 from caffe2.python.dataio import Reader, Writer
35 from caffe2.python.schema import (
36  Struct, from_blob_list, from_column_list, InitEmptyRecord)
37 import numpy as np
38 
39 
41  def __init__(self, dataset, name, batch_size=1, enforce_batch_size=False):
42  """Don't call this directly. Instead, use dataset.reader()"""
43  Reader.__init__(self, dataset.content())
44  self.dataset = dataset
45  self.name = name or (dataset.name + '_cursor')
46  self.batch_size = batch_size
47  self.enforce_batch_size = enforce_batch_size
48  self.cursor = None
49 
50  def setup_ex(self, init_net, exit_net):
51  if self.cursor is None:
52  self.cursor = init_net.CreateTreeCursor(
53  [],
54  init_net.NextScopedBlob(self.name),
55  fields=self.dataset.fields)
56 
57  def read(self, read_net):
58  assert self.cursor, 'setup not called.'
59  content = self.dataset.content()
60  with core.NameScope(read_net.NextName(self.name)):
61  fields = read_net.ReadNextBatch(
62  [self.cursor] + content.field_blobs(),
63  content.field_names(),
64  batch_size=self.batch_size,
65  enforce_batch_size=self.enforce_batch_size)
66  if type(fields) is core.BlobReference:
67  fields = [fields]
68  return (read_net.IsEmpty([fields[0]]), fields)
69 
70  def reset(self, net):
71  net.ResetCursor([self.cursor], [])
72 
73 
75  def __init__(self, dataset, name, indices, batch_size=1, loop_over=False,
76  enforce_batch_size=False):
77  """Don't call this directly. Instead, use dataset.random_reader()"""
78  Reader.__init__(self, dataset.content())
79  self.dataset = dataset
80  self.cursor = None
81  self.name = name or (dataset.name + '_cursor')
82  self.indices = indices
83  self.batch_size = batch_size
84  self.loop_over = loop_over
85  self.enforce_batch_size = enforce_batch_size
86 
87  def setup_ex(self, init_net, exit_net):
88  if self.cursor is None:
89  self.cursor = init_net.CreateTreeCursor(
90  [],
91  [self.name],
92  fields=self.dataset.fields)
93 
94  def reset(self, net):
95  net.ResetCursor([self.cursor], [])
96 
97  def computeoffset(self, net):
98  self.reset(net)
99  offsets = net.ComputeOffset(
100  [self.cursor] + self.dataset.content().field_blobs(),
101  'offsets')
102  self.offsets = offsets
103 
104  def sort_and_shuffle(self, net, sort_by_field=None,
105  shuffle_size=1, batch_size=1):
106  # no sorting by default
107  content = self.dataset.content()
108  sort_by_field_idx = -1
109  if sort_by_field:
110  assert sort_by_field in content.field_names(), (
111  'Must be valid field.')
112  sort_by_field_idx = content.field_names().index(sort_by_field)
113  self.reset(net)
114 
115  indices = net.SortAndShuffle(
116  [self.cursor] + content.field_blobs(),
117  'indices',
118  sort_by_field_idx=sort_by_field_idx,
119  shuffle_size=shuffle_size,
120  batch_size=batch_size)
121  self.indices = indices
122 
123  def read(self, read_net):
124  with core.NameScope(read_net.NextName(self.name)):
125  fields = read_net.ReadRandomBatch(
126  [self.cursor, self.indices, self.offsets] + (
127  self.dataset.content().field_blobs()),
128  self.dataset.content().field_names(),
129  batch_size=self.batch_size,
130  enforce_batch_size=self.enforce_batch_size,
131  loop_over=self.loop_over)
132  return (read_net.IsEmpty([fields[0]]), fields)
133 
134 
136  def __init__(self, content):
137  """Don't call this directly. Use dataset.writer() instead."""
138  self._content = content
139  self.mutex = None
140 
141  def setup_ex(self, init_net, exit_net):
142  if self.mutex is None:
143  self.mutex = init_net.CreateMutex([])
144 
145  def write(self, writer_net, fields):
146  """
147  Add operations to `net` that append the blobs in `fields` to the end
148  of the dataset. An additional operator will also be added that checks
149  the consistency of the data in `fields` against the dataset schema.
150 
151  Args:
152  writer_net: The net that will contain the Append operators.
153  fields: A list of BlobReference to be appeneded to this dataset.
154  """
155  assert self.mutex is not None, 'setup not called.'
156  field_blobs = self._content.field_blobs()
157  assert len(fields) == len(field_blobs), (
158  'Expected %s fields, got %s.' % (len(field_blobs), len(fields)))
159  writer_net.CheckDatasetConsistency(
160  fields, [], fields=self._content.field_names())
161  writer_net.AtomicAppend(
162  [self.mutex] + field_blobs + list(fields),
163  field_blobs)
164 
165  def commit(self, finish_net):
166  """Commit is a no-op for an in-memory dataset."""
167  pass
168 
169 
170 def Const(net, value, dtype=None, name=None):
171  """
172  Create a 'constant' by first creating an external input in the given
173  net, and then feeding the corresponding blob with its provided value
174  in the current workspace. The name is automatically generated in order
175  to avoid clashes with existing blob names.
176  """
177  assert isinstance(net, core.Net), 'net must be a core.Net instance.'
178  value = np.array(value, dtype=dtype)
179  blob = net.AddExternalInput(net.NextName(prefix=name))
180  workspace.FeedBlob(str(blob), value)
181  return blob
182 
183 
184 def execution_step_with_progress(name, init_net, substeps, rows_read):
185  # progress reporter
186  report_net = core.Net('report_net')
187  report_net.Print([rows_read], [])
188  return core.execution_step(
189  name,
190  substeps,
191  report_net=report_net,
192  concurrent_substeps=True,
193  report_interval=5)
194 
195 
196 class Dataset(object):
197  """Represents an in-memory dataset with fixed schema.
198 
199  Use this to store and iterate through datasets with complex schema that
200  fit in memory.
201 
202  Iterating through entries of this dataset is very fast since the dataset
203  is stored as a set of native Caffe2 tensors, thus no type conversion or
204  deserialization is necessary.
205  """
206 
207  def __init__(self, fields, name=None):
208  """Create an un-initialized dataset with schema provided by `fields`.
209 
210  Before this dataset can be used, it must be initialized, either by
211  `init_empty` or `init_from_dataframe`.
212 
213  Args:
214  fields: either a schema.Struct or a list of field names in a format
215  compatible with the one described in schema.py.
216  name: optional name to prepend to blobs that will store the data.
217  """
218  assert isinstance(fields, list) or isinstance(fields, Struct), (
219  'fields must be either a Struct or a list of raw field names.')
220  if isinstance(fields, list):
221  fields = from_column_list(fields)
222  self.schema = fields
223  self.fields = fields.field_names()
224  self.field_types = fields.field_types()
225  self.name = name or 'dataset'
226  self.field_blobs = fields.field_blobs() if fields.has_blobs() else None
227 
228  def trim(self, net, multiple_of):
229  """
230  Trims the contents of this dataset so that the number of records is
231  multiple of the given argument.
232  """
233  net.TrimDataset(
234  self.field_blobs,
235  self.field_blobs,
236  fields=self.fields,
237  multiple_of=multiple_of)
238 
239  def init_empty(self, init_net):
240  """Initialize the blobs for this dataset with empty values.
241 
242  Empty arrays will be immediately fed into the current workspace,
243  and `init_net` will take those blobs as external inputs.
244  """
245  self.field_blobs = InitEmptyRecord(
246  init_net, self.schema.clone_schema()).field_blobs()
247 
248  def init_from_dataframe(self, net, dataframe):
249  """Initialize the blobs for this dataset from a Pandas dataframe.
250 
251  Each column of the dataframe will be immediately fed into the current
252  workspace, and the `net` will take this blobs as external inputs.
253  """
254  assert len(self.fields) == len(dataframe.columns)
255  self.field_blobs = [
256  Const(net, dataframe.as_matrix([col]).flatten(), name=field)
257  for col, field in enumerate(self.fields)]
258 
259  def get_blobs(self):
260  """
261  Return the list of BlobReference pointing to the blobs that contain
262  the data for this dataset.
263  """
264  assert self
265  return self.field_blobs
266 
267  def content(self):
268  """
269  Return a Record of BlobReferences pointing to the full content of
270  this dataset.
271  """
272  return from_blob_list(self.schema, self.field_blobs)
273 
274  def field_names(self):
275  """Return the list of field names for this dataset."""
276  return self.fields
277 
278  def field_types(self):
279  """
280  Return the list of field dtypes for this dataset.
281 
282  If a list of strings, not a schema.Struct, was passed to the
283  constructor, this will return a list of dtype(np.void).
284  """
285  return self.field_types
286 
287  def reader(self, init_net=None, cursor_name=None, batch_size=1,
288  enforce_batch_size=False):
289  """Create a Reader object that is used to iterate through the dataset.
290 
291  This will append operations to `init_net` that create a TreeCursor,
292  used to iterate through the data.
293 
294  NOTE: Currently, it is not safe to append to a dataset while reading.
295 
296  Args:
297  init_net: net that will be run once to create the cursor.
298  cursor_name: optional name for the blob containing a pointer
299  to the cursor.
300  batch_size: how many samples to read per iteration.
301 
302  Returns:
303  A _DatasetReader that can be used to create operators that will
304  iterate through the dataset.
305  """
306  assert self.field_blobs, 'Dataset not initialized.'
307  reader = _DatasetReader(self, cursor_name, batch_size,
308  enforce_batch_size)
309  if init_net is not None:
310  reader.setup_ex(init_net, None)
311  return reader
312 
313  def random_reader(self, init_net=None, indices=None, cursor_name=None,
314  batch_size=1, loop_over=False, enforce_batch_size=False):
315  """Create a Reader object that is used to iterate through the dataset.
316 
317  NOTE: The reader order depends on the order in indices.
318 
319  Args:
320  init_net: net that will be run once to create the cursor.
321  indices: blob of reading order
322  cursor_name: optional name for the blob containing a pointer
323  to the cursor.
324  batch_size: how many samples to read per iteration.
325  loop_over: repeat the dataset indefinitely (in the same order)
326 
327  Returns:
328  A DatasetReader that can be used to create operators that will
329  iterate through the dataset according to indices.
330  """
331  assert self.field_blobs, 'Dataset not initialized.'
332  reader = _DatasetRandomReader(
333  self, cursor_name, indices, batch_size, loop_over,
334  enforce_batch_size)
335  if init_net is not None:
336  reader.setup_ex(init_net, None)
337  return reader
338 
339  def writer(self, init_net=None):
340  """Create a Writer that can be used to append entries into the dataset.
341 
342  NOTE: Currently, it is not safe to append to a dataset
343  while reading from it.
344  NOTE: Currently implementation of writer is not thread safe.
345  TODO: fixme
346 
347  Args:
348  init_net: net that will be run once in order to create the writer.
349  (currently not used)
350  """
351  assert self.field_blobs, 'Dataset not initialized.'
352  writer = _DatasetWriter(self.content())
353  if init_net is not None:
354  writer.setup_ex(init_net, None)
355  return writer
def writer(self, init_net=None)
Definition: dataset.py:339
def random_reader(self, init_net=None, indices=None, cursor_name=None, batch_size=1, loop_over=False, enforce_batch_size=False)
Definition: dataset.py:314
def init_empty(self, init_net)
Definition: dataset.py:239
def write(self, writer_net, fields)
Definition: dataset.py:145
def __init__(self, dataset, name, indices, batch_size=1, loop_over=False, enforce_batch_size=False)
Definition: dataset.py:76
def commit(self, finish_net)
Definition: dataset.py:165
def init_from_dataframe(self, net, dataframe)
Definition: dataset.py:248
def __init__(self, fields, name=None)
Definition: dataset.py:207
def trim(self, net, multiple_of)
Definition: dataset.py:228
def reset(self, net)
Definition: dataio.py:107
def reader(self, init_net=None, cursor_name=None, batch_size=1, enforce_batch_size=False)
Definition: dataset.py:288
def __init__(self, dataset, name, batch_size=1, enforce_batch_size=False)
Definition: dataset.py:41