4 Defines the base interface for reading and writing operations.     6 Readers/Writers are objects that produce operations that read/write sequences     7 of data. Each operation reads or writes a list of BlobReferences.     9 Readers and Writers must be implemented such that read and write operations    10 are atomic and thread safe.    12 Examples of possible Readers and Writers:    13     QueueReader, QueueWriter,    14     DatasetReader, DatasetWriter,    16 See `dataset.py` for an example of implementation.    18 from __future__ 
import absolute_import
    19 from __future__ 
import division
    20 from __future__ 
import print_function
    21 from __future__ 
import unicode_literals
    31     Reader is an abstract class to be implemented in order to provide    32     operations capable of iterating through a dataset or stream of data.    34     A Reader must implement at least one operation, `read`, which    35     adds operations to a net that read the next batch of data. Readers can    36     optionally support the `reset` operation, which is useful when multiple    37     passes over the data are required.    39     def __init__(self, schema=None):
    40         if schema 
is not None:
    41             assert isinstance(schema, Field)
    45         assert self.
_schema is not None, 
'Schema not provided for this reader.'    48     def _set_schema(self, schema):
    52         """Setup nets to run at task initialization and cleanup time.    55             global_init_net: A net invoked at task init time.    56             global_finish_net: A net invoked at task cleanup time.    60     def read_ex(self, local_init_net, local_finish_net):
    61         read_net = core.Net(
'reader_body')
    62         return ([read_net], ) + self.read(read_net)
    64     def read_record_ex(self, local_init_net, local_finish_net):
    65         nets, should_stop, fields = self.read_ex(
    66             local_init_net, local_finish_net)
    68             fields = from_blob_list(self._schema, fields)
    69         return nets, should_stop, fields
    72         """Append operations to read_net that will read a batch from the    73         underlying data soruce.    75         Operations added to `read_net` must be thread safe and atomic, that is,    76         it should be possible to clone `read_net` and run multiple instances of    80             read_net: the net that will be appended with read operations    83             A tuple (should_stop, fields), with:    84                 should_stop: BlobReference pointing to a boolean scalar    85                     blob that indicates whether the read operation    86                     was succesfull or whether the end of data has    88                 fields: A tuple of BlobReference containing the latest batch    89                     of data that was read.    91         raise NotImplementedError(
'Readers must implement `read`.')
    94         """Append operations to `net` that will reset the reader.    96         This can be used to read the data multiple times.    97         Not all readers support this operation.    99         raise NotImplementedError(
'This reader cannot be resetted.')
   101     def read_record(self, read_net):
   102         should_stop, fields = self.
read(read_net)
   104             fields = from_blob_list(self.
_schema, fields)
   105         return should_stop, fields
   108         """Create an execution step with a net containing read operators.   110         The execution step will contain a `stop_blob` that knows how to stop   111         the execution loop when end of data was reached.   115             read_step, fields = reader.execution_step()   116             consume_net = core.Net('consume')   117             consume_net.Print(fields[0], [])   118             p = core.Plan('reader')   119             p.AddStep(read_step.AddNet(consume_net))   123             reader_net_name: (optional) the name of the reader_net to be   124                              created. The execution step will   125                              be named accordingly.   128             A tuple (read_step, fields), with:   129                 read_step: A newly created execution step containing a net with   130                            read operations. The step will have `stop_blob` set,   131                            in order to stop the loop on end of data.   132                 fields: A tuple of BlobReference containing the latest batch   133                         of data that was read.   135         reader_net = 
core.Net(reader_net_name 
or 'reader')
   137         if external_should_stop 
is not None:
   138             should_stop = reader_net.Or([external_should_stop, should_stop])
   139         read_step = core.execution_step(
   140             '{}_step'.format(reader_net_name),
   142             should_stop_blob=should_stop)
   143         return (read_step, fields)
   148     Writer is an abstract class to be implemented in order to provide   149     operations capable of feeding a data stream or a dataset.   151     A Writer must implement 2 operations:   152     `write`, which adds operations to a net that write the write batch of   153     data, and `commit`, which adds operations to a net in order to indicate   154     that no more data will be written.   161     def write(self, writer_net, fields):
   162         """Add operations to `writer_net` that write the next batch of data.   164         Operations added to the net must be thread-safe and unique, that is:   165         multiple writers must be able to write to the dataset in parallel.   168             fields: a tuple of BlobReference containing the batch of data to   171         raise NotImplementedError(
'Writers must implement write.')
   173     def write_record(self, writer_net, fields):
   174         if isinstance(fields, Field):
   176             fields = fields.field_blobs()
   177         self.
write(writer_net, fields)
   180         """Experimental, don't use yet"""   183     def write_ex(self, fields, local_init_net, local_finish_net, stop_blob):
   184         """Experimental extension to the interface. Don't use yet"""   186         self.
write(write_net, fields)
   190             self, fields, local_init_net, local_finish_net, stop_blob=
None):
   191         """Experimental extension to the interface. Don't use yet."""   192         if isinstance(fields, Field):
   194             fields = fields.field_blobs()
   195         if stop_blob 
is None:
   196             stop_blob = local_init_net.NextName(
"dequeue_status")
   198             fields, local_init_net, local_finish_net, stop_blob)
   199         return (write_nets, stop_blob)
   202         """Add operations to `finish_net` that signal end of data.   204         This must be implemented by all Writers, but may be no-op for some   210 class ReaderBuilder(object):
   211     """ Allow usage of a reader in distributed fashion. """   213         raise NotImplementedError()
   217         Optionally, perform one-time setup before calling new_reader().   218         Subclass should make sure this function is only called once.   220         raise NotImplementedError()
   222     def new_reader(self, **kwargs):
   223         raise NotImplementedError()
   227     """ReaderBuilder that modifies underlying builder by calling `piper`   228     function on each new reader produced, and return the result of   229     the function. This way, it is possible to append data processing   230     pipelines that will be replicated for each reader that gets created.   236         lambda reader: pipe(reader, processor=my_proc))   239     def __init__(self, builder, piper):
   244         return self._builder.schema()
   246     def setup(self, **kwargs):
   247         return self._builder.setup(**kwargs)
   249     def new_reader(self, **kwargs):
   253             reader=self._builder.new_reader(**kwargs),
   256         return output 
if isinstance(output, Reader) 
else output.reader()
   260     def __init__(self, schema=None, obj_key=None):
   269     def setup(self, global_init_net):
   273         raise NotImplementedError()
   276         raise NotImplementedError()
   278     def num_readers(self):
   279         return self._num_readers
   281     def num_writers(self):
   282         return self._num_writers
   284     def _new_writer(self, writer_schema, writer_init_net):
   285         if writer_schema 
is not None and self._schema 
is None:
   286             self._schema = writer_schema
   287         self._num_writers += 1
   288         if self._obj_key 
is not None:
   289             writer_init_net.add_attribute(self._obj_key, self)
   291     def _new_reader(self, reader_init_net):
   292         self._num_readers += 1
   293         if self._obj_key 
is not None:
   294             reader_init_net.add_attribute(self._obj_key, self)
   298     """ Reader that produces increasing integers. """   300         Reader.__init__(self, schema=
Struct((
'iter', np.int64)))
   304     def setup_ex(self, global_init_net, global_finish_net):
   306             self.
counter = global_init_net.CreateCounter([], init_count=0)
   308                 [], shape=[], dtype=core.DataType.BOOL, value=
False)
   310     def read_ex(self, local_init_net, local_finish_net):
   311         count_net = 
core.Net(
'limited_reader_counter')
   312         value = count_net.CountUp([self.
counter], 1)
   317     """Abstract Reader constrained by certain conditions.   319     Base class for Reader classes which check for certain conditions to stop   320     further processing (e.g. max number of iterations or time limit).   321     Also produces a boolean blob (data_finished) that can be used to see if   322     the reader exausted all input data (true) or stopped for another reason   326     def __init__(self, reader):
   327         Reader.__init__(self, schema=reader._schema)
   331             self.net.NextName(
'data_finished'))
   334     def setup_ex(self, global_init_net, global_finish_net):
   335         global_init_net.ConstantFill(
   337             shape=[], value=
False, dtype=core.DataType.BOOL)
   338         self.reader.setup_ex(global_init_net, global_finish_net)
   341     def read_ex(self, local_init_net, local_finish_net):
   342         """Reads from an underlying Reader class, but may stop due to additional   345         Build and return network(s) to read data from a Reader with   346         additional constraints, depending on which derived class is used.   347         Derived classes implement setup_limited and check_limiter_condition   348         which determine the nature of the constraint imposed on the reader,   349         e.g. iteration limits or time limit.   352             local_init_net: A net invoked at task instance init time (Once per   354             local_finish_net: A net invoked at task instance cleanup time (Once   355                 per parallel thread).   359         stop_condition_net = 
core.Net(
'limited_reader_condition')
   363         nets, local_data_finished, fields = self.reader.read_ex(
   364             local_init_net, local_finish_net)
   368         check_done_net = 
core.Net(
'limited_reader_post')
   373         check_done_net.Copy(local_data_finished, should_stop)
   378         return [stop_condition_net] + nets + [check_done_net], should_stop, fields
   381         """Configure task level init/cleanup nets required to implement limit   382         condition. Must be implemented by subclass.   385             global_init_net: A net invoked at task init time.   386             global_finish_net: A net invoked at task cleanup time.   388         raise NotImplementedError(
"Subclass must implement `setup_limiter`")
   391         """Configure a net that is invoked between reading batches to see if   392         limit condition is met. Must be implemented by subclass.   395             stop_condition_net: A net invoked to evaluate an early termination   398         raise NotImplementedError(
"Subclass must implement `check_limiter_condition")
   402         Return a blob that can be checked after the end of the reading task,   403         which will contain a scalar float indicating whether the underlying   404         reader has been exhausted (True) or whether we stopped because reached   405         the limit of iterations (False).   411     """Reader that stops after `num_iter` batches.   413     If `num_iter` <= 0 or is None, reverts to an unconstrained reader that   414     exports a boolean blob indicating that the reader has exhausted   418         """Class initializer.   421             reader: The underlying reader object doing the actual read.   422             num_iter: Number of batches to read. If `None`,   423                 the class reverts to a normal reader except that it also   424                 produces a data_finished blob as a side effect to indicate   425                 whether the input stream is exhausted.   427         super(ReaderWithLimit, self).
__init__(reader)
   431             self.
counter = self.net.AddExternalInput(
   432                 self.net.NextName(
'counter'))
   434     def setup_limiter(self, global_init_net, global_finish_net):
   436             global_init_net.CreateCounter(
   439     def check_limiter_condition(self, stop_condition_net):
   441             return stop_condition_net.CountDown([self.
counter], 1)
   443             return stop_condition_net.ConstantFill(
   445                 shape=[], value=
False, dtype=core.DataType.BOOL)
   448 def CountUntil(num_iter):
   453     """Reader that stops after `duration` seconds.   455     If `duration` <= 0 or is None, reverts to an unconstrained reader that   456     exports a boolean blob indicating that the reader has exhausted   460         """Class initializer.   463             reader: The underlying reader object doing the actual read.   464             duration: Number of seconds to read. If un-specified, None, or <= 0,   465                 the class reverts to a normal reader except that it also   466                 produces a data_finished blob as a side effect to indicate   467                 whether the input stream is exhausted.   469         super(ReaderWithTimeLimit, self).
__init__(reader)
   475     def setup_limiter(self, global_init_net, global_finish_net):
   477             duration_ns = int(self.
duration * (10**9))
   479             self.
timer = global_init_net.TimerBegin(
   480                 [], counter_name=
'epoch_timer')
   481             start_time = global_init_net.TimerGet(self.
timer)
   483                 [start_time], value=duration_ns)
   485             global_finish_net.TimerEnd([self.
timer], [])
   487     def check_limiter_condition(self, stop_condition_net):
   489             time_elapsed = stop_condition_net.TimerGet(self.
timer)
   490             return stop_condition_net.GE(
   493             return stop_condition_net.ConstantFill(
   494                 [], 1, shape=[], value=
False, dtype=core.DataType.BOOL
   499     """Test reader class that inserts a delay between reading batches."""   501     def __init__(self, reader, delay):
   502         Reader.__init__(self, schema=reader._schema)
   506     def setup_ex(self, global_init_net, global_finish_net):
   507         self.reader.setup_ex(global_init_net, global_finish_net)
   509     def read_ex(self, local_init_net, local_finish_net):
   512         def sleep_op(*args, **argd):
   513             time.sleep(self.
delay)
   515         read_net.Python(sleep_op)([], [])
   516         return ([read_net],) + self.reader.read(read_net)
   521     Base class for a reader that wrap multiple readers, e.g., reading from   522     multiple sources simultaneously.   527             names: list[str] names of readers; used as schema keys   528             readers: list[Reader] Reader instances, must have schema   530         assert len(names) == len(readers)
   532             (name, reader.schema()) 
for name, reader 
in zip(names, readers)
   537     def setup_ex(self, init_net, finish_net):
   539             reader.setup_ex(init_net, finish_net)
   541     def read_ex(self, local_init_net, local_finish_net):
   543         Stops when one of the reader finished   545         local_should_stop = local_init_net.ConstantFill(
   546             [], shape=[], dtype=core.DataType.BOOL, value=
False)
   550             sub_read_nets, should_stop, record = reader.read_record_ex(
   551                 local_init_net, local_finish_net)
   552             stop_net = 
core.Net(
"{}_stop".format(name))
   553             stop_net.Copy(should_stop, local_should_stop)
   554             sub_read_nets.append(stop_net)
   555             read_nets.extend(sub_read_nets)
   556             fields.extend(record.field_blobs())
   557         return read_nets, local_should_stop, fields
   559     def reset(self, net):
   566     A reader builder for CompositeReader   571             names: list[str] names of readers; used as schema keys   572             reader_builders: list[ReaderBuilder] ReaderBuilder instances;   575         super(CompositeReaderBuilder, self).
__init__()
   579             (name, reader_builder.schema())
   580             for name, reader_builder 
in zip(names, reader_builders)
   586     def setup(self, **kwargs):
   588             reader_builder.setup(**kwargs)
   592             if "limiter" in kwargs:
   593                 kwargs.pop(
"limiter")
   595     def new_reader(self, **kwargs):
   598             reader = reader_builder.new_reader(**kwargs)
   599             if isinstance(reader, Reader):
   601             elif hasattr(reader, 
'reader'):
   602                 reader = reader.reader()
   604                 raise ValueError(
'reader must be an instance of Reader or Pipe')
   605             readers.append(reader)
   608         assert multi_reader.schema() == self.
_schema 
Module caffe2.python.schema. 
 
def _set_schema(self, schema)
 
def __init__(self, names, reader_builders)
 
def write_record_ex(self, fields, local_init_net, local_finish_net, stop_blob=None)
 
def __init__(self, reader, duration=0)
 
def setup_ex(self, init_net, finish_net)
 
def read_ex(self, local_init_net, local_finish_net)
 
def commit(self, finish_net)
 
def __init__(self, names, readers)
 
def execution_step(self, reader_net_name=None, external_should_stop=None)
 
def check_limiter_condition(self, stop_condition_net)
 
def read_record(self, read_net)
 
def setup_limiter(self, global_init_net, global_finish_net)
 
def read_ex(self, local_init_net, local_finish_net)
 
def write(self, writer_net, fields)
 
def __init__(self, reader, num_iter=1)
 
def write_ex(self, fields, local_init_net, local_finish_net, stop_blob)
 
def setup_ex(self, init_net, finish_net)