Caffe2 - Python API
A deep learning, cross platform ML framework
dataio.py
1 ## @package dataio
2 # Module caffe2.python.dataio
3 """
4 Defines the base interface for reading and writing operations.
5 
6 Readers/Writers are objects that produce operations that read/write sequences
7 of data. Each operation reads or writes a list of BlobReferences.
8 
9 Readers and Writers must be implemented such that read and write operations
10 are atomic and thread safe.
11 
12 Examples of possible Readers and Writers:
13  QueueReader, QueueWriter,
14  DatasetReader, DatasetWriter,
15 
16 See `dataset.py` for an example of implementation.
17 """
18 from __future__ import absolute_import
19 from __future__ import division
20 from __future__ import print_function
21 from __future__ import unicode_literals
22 
23 from caffe2.python import core
24 from caffe2.python.schema import Field, Struct, from_blob_list
25 import numpy as np
26 import time
27 
28 
29 class Reader(object):
30  """
31  Reader is an abstract class to be implemented in order to provide
32  operations capable of iterating through a dataset or stream of data.
33 
34  A Reader must implement at least one operation, `read`, which
35  adds operations to a net that read the next batch of data. Readers can
36  optionally support the `reset` operation, which is useful when multiple
37  passes over the data are required.
38  """
39  def __init__(self, schema=None):
40  if schema is not None:
41  assert isinstance(schema, Field)
42  self._schema = schema
43 
44  def schema(self):
45  assert self._schema is not None, 'Schema not provided for this reader.'
46  return self._schema
47 
48  def _set_schema(self, schema):
49  self._schema = schema
50 
51  def setup_ex(self, init_net, finish_net):
52  """Setup nets to run at task initialization and cleanup time.
53 
54  Args:
55  global_init_net: A net invoked at task init time.
56  global_finish_net: A net invoked at task cleanup time.
57  """
58  pass
59 
60  def read_ex(self, local_init_net, local_finish_net):
61  read_net = core.Net('reader_body')
62  return ([read_net], ) + self.read(read_net)
63 
64  def read_record_ex(self, local_init_net, local_finish_net):
65  nets, should_stop, fields = self.read_ex(
66  local_init_net, local_finish_net)
67  if self._schema:
68  fields = from_blob_list(self._schema, fields)
69  return nets, should_stop, fields
70 
71  def read(self, read_net):
72  """Append operations to read_net that will read a batch from the
73  underlying data soruce.
74 
75  Operations added to `read_net` must be thread safe and atomic, that is,
76  it should be possible to clone `read_net` and run multiple instances of
77  it in parallel.
78 
79  Args:
80  read_net: the net that will be appended with read operations
81 
82  Returns:
83  A tuple (should_stop, fields), with:
84  should_stop: BlobReference pointing to a boolean scalar
85  blob that indicates whether the read operation
86  was succesfull or whether the end of data has
87  been reached.
88  fields: A tuple of BlobReference containing the latest batch
89  of data that was read.
90  """
91  raise NotImplementedError('Readers must implement `read`.')
92 
93  def reset(self, net):
94  """Append operations to `net` that will reset the reader.
95 
96  This can be used to read the data multiple times.
97  Not all readers support this operation.
98  """
99  raise NotImplementedError('This reader cannot be resetted.')
100 
101  def read_record(self, read_net):
102  should_stop, fields = self.read(read_net)
103  if self._schema:
104  fields = from_blob_list(self._schema, fields)
105  return should_stop, fields
106 
107  def execution_step(self, reader_net_name=None, external_should_stop=None):
108  """Create an execution step with a net containing read operators.
109 
110  The execution step will contain a `stop_blob` that knows how to stop
111  the execution loop when end of data was reached.
112 
113  E.g.:
114 
115  read_step, fields = reader.execution_step()
116  consume_net = core.Net('consume')
117  consume_net.Print(fields[0], [])
118  p = core.Plan('reader')
119  p.AddStep(read_step.AddNet(consume_net))
120  core.RunPlan(p)
121 
122  Args:
123  reader_net_name: (optional) the name of the reader_net to be
124  created. The execution step will
125  be named accordingly.
126 
127  Returns:
128  A tuple (read_step, fields), with:
129  read_step: A newly created execution step containing a net with
130  read operations. The step will have `stop_blob` set,
131  in order to stop the loop on end of data.
132  fields: A tuple of BlobReference containing the latest batch
133  of data that was read.
134  """
135  reader_net = core.Net(reader_net_name or 'reader')
136  should_stop, fields = self.read_record(reader_net)
137  if external_should_stop is not None:
138  should_stop = reader_net.Or([external_should_stop, should_stop])
139  read_step = core.execution_step(
140  '{}_step'.format(reader_net_name),
141  reader_net,
142  should_stop_blob=should_stop)
143  return (read_step, fields)
144 
145 
146 class Writer(object):
147  """
148  Writer is an abstract class to be implemented in order to provide
149  operations capable of feeding a data stream or a dataset.
150 
151  A Writer must implement 2 operations:
152  `write`, which adds operations to a net that write the write batch of
153  data, and `commit`, which adds operations to a net in order to indicate
154  that no more data will be written.
155  """
156  _schema = None
157 
158  def schema(self):
159  return self._schema
160 
161  def write(self, writer_net, fields):
162  """Add operations to `writer_net` that write the next batch of data.
163 
164  Operations added to the net must be thread-safe and unique, that is:
165  multiple writers must be able to write to the dataset in parallel.
166 
167  Args:
168  fields: a tuple of BlobReference containing the batch of data to
169  write.
170  """
171  raise NotImplementedError('Writers must implement write.')
172 
173  def write_record(self, writer_net, fields):
174  if isinstance(fields, Field):
175  self._schema = fields
176  fields = fields.field_blobs()
177  self.write(writer_net, fields)
178 
179  def setup_ex(self, init_net, finish_net):
180  """Experimental, don't use yet"""
181  self.commit(finish_net)
182 
183  def write_ex(self, fields, local_init_net, local_finish_net, stop_blob):
184  """Experimental extension to the interface. Don't use yet"""
185  write_net = core.Net('write_net')
186  self.write(write_net, fields)
187  return [write_net]
188 
189  def write_record_ex(
190  self, fields, local_init_net, local_finish_net, stop_blob=None):
191  """Experimental extension to the interface. Don't use yet."""
192  if isinstance(fields, Field):
193  self._schema = fields
194  fields = fields.field_blobs()
195  if stop_blob is None:
196  stop_blob = local_init_net.NextName("dequeue_status")
197  write_nets = self.write_ex(
198  fields, local_init_net, local_finish_net, stop_blob)
199  return (write_nets, stop_blob)
200 
201  def commit(self, finish_net):
202  """Add operations to `finish_net` that signal end of data.
203 
204  This must be implemented by all Writers, but may be no-op for some
205  of them.
206  """
207  pass
208 
209 
210 class ReaderBuilder(object):
211  """ Allow usage of a reader in distributed fashion. """
212  def schema(self):
213  raise NotImplementedError()
214 
215  def setup(self, **kwargs):
216  """
217  Optionally, perform one-time setup before calling new_reader().
218  Subclass should make sure this function is only called once.
219  """
220  raise NotImplementedError()
221 
222  def new_reader(self, **kwargs):
223  raise NotImplementedError()
224 
225 
227  """ReaderBuilder that modifies underlying builder by calling `piper`
228  function on each new reader produced, and return the result of
229  the function. This way, it is possible to append data processing
230  pipelines that will be replicated for each reader that gets created.
231 
232  E.g.:
233 
234  PipedReaderBuilder(
235  ReaderBuilder(...),
236  lambda reader: pipe(reader, processor=my_proc))
237  """
238 
239  def __init__(self, builder, piper):
240  self._builder = builder
241  self._piper = piper
242 
243  def schema(self):
244  return self._builder.schema()
245 
246  def setup(self, **kwargs):
247  return self._builder.setup(**kwargs)
248 
249  def new_reader(self, **kwargs):
250  # Passing everything down since you could wrap a PipedReaderBuilder in
251  # another PipedReaderBuilder
252  output = self._piper(
253  reader=self._builder.new_reader(**kwargs),
254  **kwargs
255  )
256  return output if isinstance(output, Reader) else output.reader()
257 
258 
259 class Pipe(object):
260  def __init__(self, schema=None, obj_key=None):
261  self._num_writers = 0
262  self._num_readers = 0
263  self._schema = schema
264  self._obj_key = obj_key
265 
266  def schema(self):
267  return self._schema
268 
269  def setup(self, global_init_net):
270  pass
271 
272  def reader(self):
273  raise NotImplementedError()
274 
275  def writer(self):
276  raise NotImplementedError()
277 
278  def num_readers(self):
279  return self._num_readers
280 
281  def num_writers(self):
282  return self._num_writers
283 
284  def _new_writer(self, writer_schema, writer_init_net):
285  if writer_schema is not None and self._schema is None:
286  self._schema = writer_schema
287  self._num_writers += 1
288  if self._obj_key is not None:
289  writer_init_net.add_attribute(self._obj_key, self)
290 
291  def _new_reader(self, reader_init_net):
292  self._num_readers += 1
293  if self._obj_key is not None:
294  reader_init_net.add_attribute(self._obj_key, self)
295 
296 
298  """ Reader that produces increasing integers. """
299  def __init__(self):
300  Reader.__init__(self, schema=Struct(('iter', np.int64)))
301  self.counter = None
302  self.should_stop = None
303 
304  def setup_ex(self, global_init_net, global_finish_net):
305  if self.counter is None:
306  self.counter = global_init_net.CreateCounter([], init_count=0)
307  self.should_stop = global_init_net.ConstantFill(
308  [], shape=[], dtype=core.DataType.BOOL, value=False)
309 
310  def read_ex(self, local_init_net, local_finish_net):
311  count_net = core.Net('limited_reader_counter')
312  value = count_net.CountUp([self.counter], 1)
313  return [count_net], self.should_stop, [value]
314 
315 
317  """Abstract Reader constrained by certain conditions.
318 
319  Base class for Reader classes which check for certain conditions to stop
320  further processing (e.g. max number of iterations or time limit).
321  Also produces a boolean blob (data_finished) that can be used to see if
322  the reader exausted all input data (true) or stopped for another reason
323  (false).
324  """
325 
326  def __init__(self, reader):
327  Reader.__init__(self, schema=reader._schema)
328  self.reader = reader
329  self.net = core.Net('reader_with_limit')
330  self._data_finished = self.net.AddExternalInput(
331  self.net.NextName('data_finished'))
332  self.should_stop = None
333 
334  def setup_ex(self, global_init_net, global_finish_net):
335  global_init_net.ConstantFill(
336  [], [self._data_finished],
337  shape=[], value=False, dtype=core.DataType.BOOL)
338  self.reader.setup_ex(global_init_net, global_finish_net)
339  self.setup_limiter(global_init_net, global_finish_net)
340 
341  def read_ex(self, local_init_net, local_finish_net):
342  """Reads from an underlying Reader class, but may stop due to additional
343  constraints.
344 
345  Build and return network(s) to read data from a Reader with
346  additional constraints, depending on which derived class is used.
347  Derived classes implement setup_limited and check_limiter_condition
348  which determine the nature of the constraint imposed on the reader,
349  e.g. iteration limits or time limit.
350 
351  Args:
352  local_init_net: A net invoked at task instance init time (Once per
353  parallel thread).
354  local_finish_net: A net invoked at task instance cleanup time (Once
355  per parallel thread).
356  """
357 
358  # Check if limiting constraint is met.
359  stop_condition_net = core.Net('limited_reader_condition')
360  should_stop = self.check_limiter_condition(stop_condition_net)
361 
362  # Call original reader.
363  nets, local_data_finished, fields = self.reader.read_ex(
364  local_init_net, local_finish_net)
365  self._set_schema(self.reader._schema)
366 
367  # Check if original reader is done.
368  check_done_net = core.Net('limited_reader_post')
369  # Copy to the same blob as the counter output to trigger reader
370  # stopping - this is ok because execution will check should_stop_blob
371  # after every single operation, so it has already been checked on this
372  # iteration by this point.
373  check_done_net.Copy(local_data_finished, should_stop)
374  # Update externally-accessible flag indicating if reader is done
375  check_done_net.Or([self._data_finished, local_data_finished],
376  [self._data_finished])
377 
378  return [stop_condition_net] + nets + [check_done_net], should_stop, fields
379 
380  def setup_limiter(self, global_init_net, global_finish_net):
381  """Configure task level init/cleanup nets required to implement limit
382  condition. Must be implemented by subclass.
383 
384  Args:
385  global_init_net: A net invoked at task init time.
386  global_finish_net: A net invoked at task cleanup time.
387  """
388  raise NotImplementedError("Subclass must implement `setup_limiter`")
389 
390  def check_limiter_condition(self, stop_condition_net):
391  """Configure a net that is invoked between reading batches to see if
392  limit condition is met. Must be implemented by subclass.
393 
394  Args:
395  stop_condition_net: A net invoked to evaluate an early termination
396  condition.
397  """
398  raise NotImplementedError("Subclass must implement `check_limiter_condition")
399 
400  def data_finished(self):
401  """
402  Return a blob that can be checked after the end of the reading task,
403  which will contain a scalar float indicating whether the underlying
404  reader has been exhausted (True) or whether we stopped because reached
405  the limit of iterations (False).
406  """
407  return self._data_finished
408 
409 
411  """Reader that stops after `num_iter` batches.
412 
413  If `num_iter` <= 0 or is None, reverts to an unconstrained reader that
414  exports a boolean blob indicating that the reader has exhausted
415  the data steam.
416  """
417  def __init__(self, reader, num_iter=1):
418  """Class initializer.
419 
420  Args:
421  reader: The underlying reader object doing the actual read.
422  num_iter: Number of batches to read. If `None`,
423  the class reverts to a normal reader except that it also
424  produces a data_finished blob as a side effect to indicate
425  whether the input stream is exhausted.
426  """
427  super(ReaderWithLimit, self).__init__(reader)
428  self.counter = None
429  self.num_iter = num_iter
430  if self.num_iter is not None:
431  self.counter = self.net.AddExternalInput(
432  self.net.NextName('counter'))
433 
434  def setup_limiter(self, global_init_net, global_finish_net):
435  if self.counter:
436  global_init_net.CreateCounter(
437  [], [self.counter], init_count=int(self.num_iter))
438 
439  def check_limiter_condition(self, stop_condition_net):
440  if self.counter:
441  return stop_condition_net.CountDown([self.counter], 1)
442  else:
443  return stop_condition_net.ConstantFill(
444  [], 1,
445  shape=[], value=False, dtype=core.DataType.BOOL)
446 
447 
448 def CountUntil(num_iter):
449  return ReaderWithLimit(CounterReader(), num_iter)
450 
451 
453  """Reader that stops after `duration` seconds.
454 
455  If `duration` <= 0 or is None, reverts to an unconstrained reader that
456  exports a boolean blob indicating that the reader has exhausted
457  the data steam.
458  """
459  def __init__(self, reader, duration=0):
460  """Class initializer.
461 
462  Args:
463  reader: The underlying reader object doing the actual read.
464  duration: Number of seconds to read. If un-specified, None, or <= 0,
465  the class reverts to a normal reader except that it also
466  produces a data_finished blob as a side effect to indicate
467  whether the input stream is exhausted.
468  """
469  super(ReaderWithTimeLimit, self).__init__(reader)
470 
471  self.timer = None
472  self.duration = duration
473  self.duration_ns_blob = None
474 
475  def setup_limiter(self, global_init_net, global_finish_net):
476  if self.duration is not None and self.duration > 0:
477  duration_ns = int(self.duration * (10**9))
478 
479  self.timer = global_init_net.TimerBegin(
480  [], counter_name='epoch_timer')
481  start_time = global_init_net.TimerGet(self.timer)
482  self.duration_ns_blob = global_init_net.ConstantFill(
483  [start_time], value=duration_ns)
484 
485  global_finish_net.TimerEnd([self.timer], [])
486 
487  def check_limiter_condition(self, stop_condition_net):
488  if self.duration:
489  time_elapsed = stop_condition_net.TimerGet(self.timer)
490  return stop_condition_net.GE(
491  [time_elapsed, self.duration_ns_blob], str(self.should_stop))
492  else:
493  return stop_condition_net.ConstantFill(
494  [], 1, shape=[], value=False, dtype=core.DataType.BOOL
495  )
496 
497 
499  """Test reader class that inserts a delay between reading batches."""
500 
501  def __init__(self, reader, delay):
502  Reader.__init__(self, schema=reader._schema)
503  self.reader = reader
504  self.delay = delay
505 
506  def setup_ex(self, global_init_net, global_finish_net):
507  self.reader.setup_ex(global_init_net, global_finish_net)
508 
509  def read_ex(self, local_init_net, local_finish_net):
510  read_net = core.Net("reader_body")
511 
512  def sleep_op(*args, **argd):
513  time.sleep(self.delay)
514 
515  read_net.Python(sleep_op)([], [])
516  return ([read_net],) + self.reader.read(read_net)
517 
518 
520  """
521  Base class for a reader that wrap multiple readers, e.g., reading from
522  multiple sources simultaneously.
523  """
524  def __init__(self, names, readers):
525  """
526  Args:
527  names: list[str] names of readers; used as schema keys
528  readers: list[Reader] Reader instances, must have schema
529  """
530  assert len(names) == len(readers)
531  super(CompositeReader, self).__init__(schema=Struct(*[
532  (name, reader.schema()) for name, reader in zip(names, readers)
533  ]))
534  self._names = names
535  self._readers = readers
536 
537  def setup_ex(self, init_net, finish_net):
538  for reader in self._readers:
539  reader.setup_ex(init_net, finish_net)
540 
541  def read_ex(self, local_init_net, local_finish_net):
542  """
543  Stops when one of the reader finished
544  """
545  local_should_stop = local_init_net.ConstantFill(
546  [], shape=[], dtype=core.DataType.BOOL, value=False)
547  read_nets = []
548  fields = []
549  for name, reader in zip(self._names, self._readers):
550  sub_read_nets, should_stop, record = reader.read_record_ex(
551  local_init_net, local_finish_net)
552  stop_net = core.Net("{}_stop".format(name))
553  stop_net.Copy(should_stop, local_should_stop)
554  sub_read_nets.append(stop_net)
555  read_nets.extend(sub_read_nets)
556  fields.extend(record.field_blobs())
557  return read_nets, local_should_stop, fields
558 
559  def reset(self, net):
560  for reader in self._readers:
561  reader.reset(net)
562 
563 
565  """
566  A reader builder for CompositeReader
567  """
568  def __init__(self, names, reader_builders):
569  """
570  Args:
571  names: list[str] names of readers; used as schema keys
572  reader_builders: list[ReaderBuilder] ReaderBuilder instances;
573  must have schema
574  """
575  super(CompositeReaderBuilder, self).__init__()
576  self._names = names
577  self._reader_builders = reader_builders
578  self._schema = Struct(*[
579  (name, reader_builder.schema())
580  for name, reader_builder in zip(names, reader_builders)
581  ])
582 
583  def schema(self):
584  return self._schema
585 
586  def setup(self, **kwargs):
587  for reader_builder in self._reader_builders:
588  reader_builder.setup(**kwargs)
589  # limiter is stateful; it can only be used once. Since
590  # CompositeReader stops when one of the reader stops,
591  # this is fine.
592  if "limiter" in kwargs:
593  kwargs.pop("limiter")
594 
595  def new_reader(self, **kwargs):
596  readers = []
597  for reader_builder in self._reader_builders:
598  reader = reader_builder.new_reader(**kwargs)
599  if isinstance(reader, Reader):
600  pass
601  elif hasattr(reader, 'reader'):
602  reader = reader.reader()
603  else:
604  raise ValueError('reader must be an instance of Reader or Pipe')
605  readers.append(reader)
606 
607  multi_reader = CompositeReader(self._names, readers)
608  assert multi_reader.schema() == self._schema
609  return multi_reader
Module caffe2.python.schema.
Definition: setup.py:1
def read(self, read_net)
Definition: dataio.py:71
def _set_schema(self, schema)
Definition: dataio.py:48
def __init__(self, names, reader_builders)
Definition: dataio.py:568
def write_record_ex(self, fields, local_init_net, local_finish_net, stop_blob=None)
Definition: dataio.py:190
def __init__(self, reader, duration=0)
Definition: dataio.py:459
def setup_ex(self, init_net, finish_net)
Definition: dataio.py:179
def read_ex(self, local_init_net, local_finish_net)
Definition: dataio.py:541
def commit(self, finish_net)
Definition: dataio.py:201
def __init__(self, names, readers)
Definition: dataio.py:524
def execution_step(self, reader_net_name=None, external_should_stop=None)
Definition: dataio.py:107
def check_limiter_condition(self, stop_condition_net)
Definition: dataio.py:390
def reset(self, net)
Definition: dataio.py:93
def read_record(self, read_net)
Definition: dataio.py:101
def setup_limiter(self, global_init_net, global_finish_net)
Definition: dataio.py:380
def read_ex(self, local_init_net, local_finish_net)
Definition: dataio.py:341
def write(self, writer_net, fields)
Definition: dataio.py:161
def __init__(self, reader, num_iter=1)
Definition: dataio.py:417
def write_ex(self, fields, local_init_net, local_finish_net, stop_blob)
Definition: dataio.py:183
def setup_ex(self, init_net, finish_net)
Definition: dataio.py:51