Caffe2 - Python API
A deep learning, cross platform ML framework
dataio.py
1 # Copyright (c) 2016-present, Facebook, Inc.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 ##############################################################################
15 
16 ## @package dataio
17 # Module caffe2.python.dataio
18 """
19 Defines the base interface for reading and writing operations.
20 
21 Readers/Writers are objects that produce operations that read/write sequences
22 of data. Each operation reads or writes a list of BlobReferences.
23 
24 Readers and Writers must be implemented such that read and write operations
25 are atomic and thread safe.
26 
27 Examples of possible Readers and Writers:
28  QueueReader, QueueWriter,
29  DatasetReader, DatasetWriter,
30 
31 See `dataset.py` for an example of implementation.
32 """
33 from __future__ import absolute_import
34 from __future__ import division
35 from __future__ import print_function
36 from __future__ import unicode_literals
37 
38 from caffe2.python import core
39 from caffe2.python.schema import Field, Struct, from_blob_list
40 import numpy as np
41 
42 
43 class Reader(object):
44  """
45  Reader is an abstract class to be implemented in order to provide
46  operations capable of iterating through a dataset or stream of data.
47 
48  A Reader must implement at least one operation, `read`, which
49  adds operations to a net that read the next batch of data. Readers can
50  optionally support the `reset` operation, which is useful when multiple
51  passes over the data are required.
52  """
53  def __init__(self, schema=None):
54  if schema is not None:
55  assert isinstance(schema, Field)
56  self._schema = schema
57 
58  def schema(self):
59  assert self._schema is not None, 'Schema not provided for this reader.'
60  return self._schema
61 
62  def _set_schema(self, schema):
63  self._schema = schema
64 
65  def setup_ex(self, init_net, finish_net):
66  """Setup nets to run at task initialization and cleanup time.
67 
68  Args:
69  global_init_net: A net invoked at task init time.
70  global_finish_net: A net invoked at task cleanup time.
71  """
72  pass
73 
74  def read_ex(self, local_init_net, local_finish_net):
75  read_net = core.Net('reader_body')
76  return ([read_net], ) + self.read(read_net)
77 
78  def read_record_ex(self, local_init_net, local_finish_net):
79  nets, should_stop, fields = self.read_ex(
80  local_init_net, local_finish_net)
81  if self._schema:
82  fields = from_blob_list(self._schema, fields)
83  return nets, should_stop, fields
84 
85  def read(self, read_net):
86  """Append operations to read_net that will read a batch from the
87  underlying data soruce.
88 
89  Operations added to `read_net` must be thread safe and atomic, that is,
90  it should be possible to clone `read_net` and run multiple instances of
91  it in parallel.
92 
93  Args:
94  read_net: the net that will be appended with read operations
95 
96  Returns:
97  A tuple (should_stop, fields), with:
98  should_stop: BlobReference pointing to a boolean scalar
99  blob that indicates whether the read operation
100  was succesfull or whether the end of data has
101  been reached.
102  fields: A tuple of BlobReference containing the latest batch
103  of data that was read.
104  """
105  raise NotImplementedError('Readers must implement `read`.')
106 
107  def reset(self, net):
108  """Append operations to `net` that will reset the reader.
109 
110  This can be used to read the data multiple times.
111  Not all readers support this operation.
112  """
113  raise NotImplementedError('This reader cannot be resetted.')
114 
115  def read_record(self, read_net):
116  should_stop, fields = self.read(read_net)
117  if self._schema:
118  fields = from_blob_list(self._schema, fields)
119  return should_stop, fields
120 
121  def execution_step(self, reader_net_name=None, external_should_stop=None):
122  """Create an execution step with a net containing read operators.
123 
124  The execution step will contain a `stop_blob` that knows how to stop
125  the execution loop when end of data was reached.
126 
127  E.g.:
128 
129  read_step, fields = reader.execution_step()
130  consume_net = core.Net('consume')
131  consume_net.Print(fields[0], [])
132  p = core.Plan('reader')
133  p.AddStep(read_step.AddNet(consume_net))
134  core.RunPlan(p)
135 
136  Args:
137  reader_net_name: (optional) the name of the reader_net to be
138  created. The execution step will
139  be named accordingly.
140 
141  Returns:
142  A tuple (read_step, fields), with:
143  read_step: A newly created execution step containing a net with
144  read operations. The step will have `stop_blob` set,
145  in order to stop the loop on end of data.
146  fields: A tuple of BlobReference containing the latest batch
147  of data that was read.
148  """
149  reader_net = core.Net(reader_net_name or 'reader')
150  should_stop, fields = self.read_record(reader_net)
151  if external_should_stop is not None:
152  should_stop = reader_net.Or([external_should_stop, should_stop])
153  read_step = core.execution_step(
154  '{}_step'.format(reader_net_name),
155  reader_net,
156  should_stop_blob=should_stop)
157  return (read_step, fields)
158 
159 
160 class Writer(object):
161  """
162  Writer is an abstract class to be implemented in order to provide
163  operations capable of feeding a data stream or a dataset.
164 
165  A Writer must implement 2 operations:
166  `write`, which adds operations to a net that write the write batch of
167  data, and `commit`, which adds operations to a net in order to indicate
168  that no more data will be written.
169  """
170  _schema = None
171 
172  def schema(self):
173  return self._schema
174 
175  def write(self, writer_net, fields):
176  """Add operations to `writer_net` that write the next batch of data.
177 
178  Operations added to the net must be thread-safe and unique, that is:
179  multiple writers must be able to write to the dataset in parallel.
180 
181  Args:
182  fields: a tuple of BlobReference containing the batch of data to
183  write.
184  """
185  raise NotImplementedError('Writers must implement write.')
186 
187  def write_record(self, writer_net, fields):
188  if isinstance(fields, Field):
189  self._schema = fields
190  fields = fields.field_blobs()
191  self.write(writer_net, fields)
192 
193  def setup_ex(self, init_net, finish_net):
194  """Experimental, don't use yet"""
195  self.commit(finish_net)
196 
197  def write_ex(self, fields, local_init_net, local_finish_net, stop_blob):
198  """Experimental extension to the interface. Don't use yet"""
199  write_net = core.Net('write_net')
200  self.write(write_net, fields)
201  return [write_net]
202 
203  def write_record_ex(
204  self, fields, local_init_net, local_finish_net, stop_blob=None):
205  """Experimental extension to the interface. Don't use yet."""
206  if isinstance(fields, Field):
207  self._schema = fields
208  fields = fields.field_blobs()
209  if stop_blob is None:
210  stop_blob = local_init_net.NextName("dequeue_status")
211  write_nets = self.write_ex(
212  fields, local_init_net, local_finish_net, stop_blob)
213  return (write_nets, stop_blob)
214 
215  def commit(self, finish_net):
216  """Add operations to `finish_net` that signal end of data.
217 
218  This must be implemented by all Writers, but may be no-op for some
219  of them.
220  """
221  pass
222 
223 
224 class ReaderBuilder(object):
225  """ Allow usage of a reader in distributed fashion. """
226  def schema(self):
227  raise NotImplementedError()
228 
229  def splits(self, net):
230  raise NotImplementedError()
231 
232  def new_reader(self, split_reader=None, **kwargs):
233  raise NotImplementedError()
234 
235 
237  """ReaderBuilder that modifies underlying builder by calling `piper`
238  function on each new reader produced, and return the result of
239  the function. This way, it is possible to append data processing
240  pipelines that will be replicated for each reader that gets created.
241 
242  E.g.:
243 
244  PipedReaderBuilder(
245  ReaderBuilder(...),
246  lambda reader: pipe(reader, processor=my_proc))
247  """
248 
249  def __init__(self, builder, piper):
250  self._builder = builder
251  self._piper = piper
252 
253  def schema(self):
254  return self._builder.schema()
255 
256  def splits(self, net):
257  return self._builder.splits(net)
258 
259  def new_reader(self, split_reader=None, **kwargs):
260  output = self._piper(self._builder.new_reader(split_reader), **kwargs)
261  return output if isinstance(output, Reader) else output.reader()
262 
263 
264 class Pipe(object):
265  def __init__(self, schema=None, obj_key=None):
266  self._num_writers = 0
267  self._num_readers = 0
268  self._schema = schema
269  self._obj_key = obj_key
270 
271  def schema(self):
272  return self._schema
273 
274  def setup(self, global_init_net):
275  pass
276 
277  def reader(self):
278  raise NotImplementedError()
279 
280  def writer(self):
281  raise NotImplementedError()
282 
283  def num_readers(self):
284  return self._num_readers
285 
286  def num_writers(self):
287  return self._num_writers
288 
289  def _new_writer(self, writer_schema, writer_init_net):
290  if writer_schema is not None and self._schema is None:
291  self._schema = writer_schema
292  self._num_writers += 1
293  if self._obj_key is not None:
294  writer_init_net.add_attribute(self._obj_key, self)
295 
296  def _new_reader(self, reader_init_net):
297  self._num_readers += 1
298  if self._obj_key is not None:
299  reader_init_net.add_attribute(self._obj_key, self)
300 
301 
303  """ Reader that produces increasing integers. """
304  def __init__(self):
305  Reader.__init__(self, schema=Struct(('iter', np.int64)))
306  self.counter = None
307  self.should_stop = None
308 
309  def setup_ex(self, global_init_net, global_finish_net):
310  if self.counter is None:
311  self.counter = global_init_net.CreateCounter([], init_count=0)
312  self.should_stop = global_init_net.ConstantFill(
313  [], shape=[], dtype=core.DataType.BOOL, value=False)
314 
315  def read_ex(self, local_init_net, local_finish_net):
316  count_net = core.Net('limited_reader_counter')
317  value = count_net.CountUp([self.counter], 1)
318  return [count_net], self.should_stop, [value]
319 
320 
322  """Abstract Reader constrained by certain conditions.
323 
324  Base class for Reader classes which check for certain conditions to stop
325  further processing (e.g. max number of iterations or time limit).
326  Also produces a boolean blob (data_finished) that can be used to see if
327  the reader exausted all input data (true) or stopped for another reason
328  (false).
329  """
330 
331  def __init__(self, reader):
332  Reader.__init__(self, schema=reader._schema)
333  self.reader = reader
334  self.net = core.Net('reader_with_limit')
335  self._data_finished = self.net.AddExternalInput(
336  self.net.NextName('data_finished'))
337  self.should_stop = None
338 
339  def setup_ex(self, global_init_net, global_finish_net):
340  global_init_net.ConstantFill(
341  [], [self._data_finished],
342  shape=[], value=False, dtype=core.DataType.BOOL)
343  self.reader.setup_ex(global_init_net, global_finish_net)
344  self.setup_limiter(global_init_net, global_finish_net)
345 
346  def read_ex(self, local_init_net, local_finish_net):
347  """Reads from an underlying Reader class, but may stop due to additional
348  constraints.
349 
350  Build and return network(s) to read data from a Reader with
351  additional constraints, depending on which derived class is used.
352  Derived classes implement setup_limited and check_limiter_condition
353  which determine the nature of the constraint imposed on the reader,
354  e.g. iteration limits or time limit.
355 
356  Args:
357  local_init_net: A net invoked at task instance init time (Once per
358  parallel thread).
359  local_finish_net: A net invoked at task instance cleanup time (Once
360  per parallel thread).
361  """
362 
363  # Check if limiting constraint is met.
364  stop_condition_net = core.Net('limited_reader_condition')
365  should_stop = self.check_limiter_condition(stop_condition_net)
366 
367  # Call original reader.
368  nets, local_data_finished, fields = self.reader.read_ex(
369  local_init_net, local_finish_net)
370  self._set_schema(self.reader._schema)
371 
372  # Check if original reader is done.
373  check_done_net = core.Net('limited_reader_post')
374  # Copy to the same blob as the counter output to trigger reader
375  # stopping - this is ok because execution will check should_stop_blob
376  # after every single operation, so it has already been checked on this
377  # iteration by this point.
378  check_done_net.Copy(local_data_finished, should_stop)
379  # Update externally-accessible flag indicating if reader is done
380  check_done_net.Or([self._data_finished, local_data_finished],
381  [self._data_finished])
382 
383  return [stop_condition_net] + nets + [check_done_net], should_stop, fields
384 
385  def setup_limiter(self, global_init_net, global_finish_net):
386  """Configure task level init/cleanup nets required to implement limit
387  condition. Must be implemented by subclass.
388 
389  Args:
390  global_init_net: A net invoked at task init time.
391  global_finish_net: A net invoked at task cleanup time.
392  """
393  raise NotImplementedError("Subclass must implement `setup_limiter`")
394 
395  def check_limiter_condition(self, stop_condition_net):
396  """Configure a net that is invoked between reading batches to see if
397  limit condition is met. Must be implemented by subclass.
398 
399  Args:
400  stop_condition_net: A net invoked to evaluate an early termination
401  condition.
402  """
403  raise NotImplementedError("Subclass must implement `check_limiter_condition")
404 
405  def data_finished(self):
406  """
407  Return a blob that can be checked after the end of the reading task,
408  which will contain a scalar float indicating whether the underlying
409  reader has been exhausted (True) or whether we stopped because reached
410  the limit of iterations (False).
411  """
412  return self._data_finished
413 
414 
416  """Reader that stops after `num_iter` batches.
417 
418  If `num_iter` <= 0 or is None, reverts to an unconstrained reader that
419  exports a boolean blob indicating that the reader has exhausted
420  the data steam.
421  """
422  def __init__(self, reader, num_iter=1):
423  """Class initializer.
424 
425  Args:
426  reader: The underlying reader object doing the actual read.
427  num_iter: Number of batches to read. If `None`,
428  the class reverts to a normal reader except that it also
429  produces a data_finished blob as a side effect to indicate
430  whether the input stream is exhausted.
431  """
432  super(ReaderWithLimit, self).__init__(reader)
433  self.counter = None
434  self.num_iter = num_iter
435  if self.num_iter is not None:
436  self.counter = self.net.AddExternalInput(
437  self.net.NextName('counter'))
438 
439  def setup_limiter(self, global_init_net, global_finish_net):
440  if self.counter:
441  global_init_net.CreateCounter(
442  [], [self.counter], init_count=int(self.num_iter))
443 
444  def check_limiter_condition(self, stop_condition_net):
445  if self.counter:
446  return stop_condition_net.CountDown([self.counter], 1)
447  else:
448  return stop_condition_net.ConstantFill(
449  [], 1,
450  shape=[], value=False, dtype=core.DataType.BOOL)
451 
452 
453 def CountUntil(num_iter):
454  return ReaderWithLimit(CounterReader(), num_iter)
455 
456 
458  """Reader that stops after `duration` seconds.
459 
460  If `duration` <= 0 or is None, reverts to an unconstrained reader that
461  exports a boolean blob indicating that the reader has exhausted
462  the data steam.
463  """
464  def __init__(self, reader, duration=0):
465  """Class initializer.
466 
467  Args:
468  reader: The underlying reader object doing the actual read.
469  duration: Number of seconds to read. If un-specified, None, or <= 0,
470  the class reverts to a normal reader except that it also
471  produces a data_finished blob as a side effect to indicate
472  whether the input stream is exhausted.
473  """
474  super(ReaderWithTimeLimit, self).__init__(reader)
475 
476  self.timer = None
477  self.duration = duration
478  self.duration_ns_blob = None
479 
480  def setup_limiter(self, global_init_net, global_finish_net):
481  if self.duration is not None and self.duration > 0:
482  duration_ns = int(self.duration * (10**9))
483 
484  self.timer = global_init_net.TimerBegin(
485  [], counter_name='epoch_timer')
486  start_time = global_init_net.TimerGet(self.timer)
487  self.duration_ns_blob = global_init_net.ConstantFill(
488  [start_time], value=duration_ns)
489 
490  global_finish_net.TimerEnd([self.timer], [])
491 
492  def check_limiter_condition(self, stop_condition_net):
493  if self.duration:
494  time_elapsed = stop_condition_net.TimerGet(self.timer)
495  return stop_condition_net.GE(
496  [time_elapsed, self.duration_ns_blob], str(self.should_stop))
497  else:
498  return stop_condition_net.ConstantFill(
499  [], 1, shape=[], value=False, dtype=core.DataType.BOOL
500  )
Module caffe2.python.schema.
Definition: setup.py:1
def read(self, read_net)
Definition: dataio.py:85
def _set_schema(self, schema)
Definition: dataio.py:62
def write_record_ex(self, fields, local_init_net, local_finish_net, stop_blob=None)
Definition: dataio.py:204
def __init__(self, reader, duration=0)
Definition: dataio.py:464
def setup_ex(self, init_net, finish_net)
Definition: dataio.py:193
def commit(self, finish_net)
Definition: dataio.py:215
def execution_step(self, reader_net_name=None, external_should_stop=None)
Definition: dataio.py:121
def check_limiter_condition(self, stop_condition_net)
Definition: dataio.py:395
def reset(self, net)
Definition: dataio.py:107
def read_record(self, read_net)
Definition: dataio.py:115
def setup_limiter(self, global_init_net, global_finish_net)
Definition: dataio.py:385
def read_ex(self, local_init_net, local_finish_net)
Definition: dataio.py:346
def write(self, writer_net, fields)
Definition: dataio.py:175
def __init__(self, reader, num_iter=1)
Definition: dataio.py:422
def write_ex(self, fields, local_init_net, local_finish_net, stop_blob)
Definition: dataio.py:197
def setup_ex(self, init_net, finish_net)
Definition: dataio.py:65