4 Defines the base interface for reading and writing operations. 6 Readers/Writers are objects that produce operations that read/write sequences 7 of data. Each operation reads or writes a list of BlobReferences. 9 Readers and Writers must be implemented such that read and write operations 10 are atomic and thread safe. 12 Examples of possible Readers and Writers: 13 QueueReader, QueueWriter, 14 DatasetReader, DatasetWriter, 16 See `dataset.py` for an example of implementation. 18 from __future__
import absolute_import
19 from __future__
import division
20 from __future__
import print_function
21 from __future__
import unicode_literals
31 Reader is an abstract class to be implemented in order to provide 32 operations capable of iterating through a dataset or stream of data. 34 A Reader must implement at least one operation, `read`, which 35 adds operations to a net that read the next batch of data. Readers can 36 optionally support the `reset` operation, which is useful when multiple 37 passes over the data are required. 39 def __init__(self, schema=None):
40 if schema
is not None:
41 assert isinstance(schema, Field)
45 assert self.
_schema is not None,
'Schema not provided for this reader.' 48 def _set_schema(self, schema):
52 """Setup nets to run at task initialization and cleanup time. 55 global_init_net: A net invoked at task init time. 56 global_finish_net: A net invoked at task cleanup time. 60 def read_ex(self, local_init_net, local_finish_net):
61 read_net = core.Net(
'reader_body')
62 return ([read_net], ) + self.read(read_net)
64 def read_record_ex(self, local_init_net, local_finish_net):
65 nets, should_stop, fields = self.read_ex(
66 local_init_net, local_finish_net)
68 fields = from_blob_list(self._schema, fields)
69 return nets, should_stop, fields
72 """Append operations to read_net that will read a batch from the 73 underlying data soruce. 75 Operations added to `read_net` must be thread safe and atomic, that is, 76 it should be possible to clone `read_net` and run multiple instances of 80 read_net: the net that will be appended with read operations 83 A tuple (should_stop, fields), with: 84 should_stop: BlobReference pointing to a boolean scalar 85 blob that indicates whether the read operation 86 was succesfull or whether the end of data has 88 fields: A tuple of BlobReference containing the latest batch 89 of data that was read. 91 raise NotImplementedError(
'Readers must implement `read`.')
94 """Append operations to `net` that will reset the reader. 96 This can be used to read the data multiple times. 97 Not all readers support this operation. 99 raise NotImplementedError(
'This reader cannot be resetted.')
101 def read_record(self, read_net):
102 should_stop, fields = self.
read(read_net)
104 fields = from_blob_list(self.
_schema, fields)
105 return should_stop, fields
108 """Create an execution step with a net containing read operators. 110 The execution step will contain a `stop_blob` that knows how to stop 111 the execution loop when end of data was reached. 115 read_step, fields = reader.execution_step() 116 consume_net = core.Net('consume') 117 consume_net.Print(fields[0], []) 118 p = core.Plan('reader') 119 p.AddStep(read_step.AddNet(consume_net)) 123 reader_net_name: (optional) the name of the reader_net to be 124 created. The execution step will 125 be named accordingly. 128 A tuple (read_step, fields), with: 129 read_step: A newly created execution step containing a net with 130 read operations. The step will have `stop_blob` set, 131 in order to stop the loop on end of data. 132 fields: A tuple of BlobReference containing the latest batch 133 of data that was read. 135 reader_net =
core.Net(reader_net_name
or 'reader')
137 if external_should_stop
is not None:
138 should_stop = reader_net.Or([external_should_stop, should_stop])
139 read_step = core.execution_step(
140 '{}_step'.format(reader_net_name),
142 should_stop_blob=should_stop)
143 return (read_step, fields)
148 Writer is an abstract class to be implemented in order to provide 149 operations capable of feeding a data stream or a dataset. 151 A Writer must implement 2 operations: 152 `write`, which adds operations to a net that write the write batch of 153 data, and `commit`, which adds operations to a net in order to indicate 154 that no more data will be written. 161 def write(self, writer_net, fields):
162 """Add operations to `writer_net` that write the next batch of data. 164 Operations added to the net must be thread-safe and unique, that is: 165 multiple writers must be able to write to the dataset in parallel. 168 fields: a tuple of BlobReference containing the batch of data to 171 raise NotImplementedError(
'Writers must implement write.')
173 def write_record(self, writer_net, fields):
174 if isinstance(fields, Field):
176 fields = fields.field_blobs()
177 self.
write(writer_net, fields)
180 """Experimental, don't use yet""" 183 def write_ex(self, fields, local_init_net, local_finish_net, stop_blob):
184 """Experimental extension to the interface. Don't use yet""" 186 self.
write(write_net, fields)
190 self, fields, local_init_net, local_finish_net, stop_blob=
None):
191 """Experimental extension to the interface. Don't use yet.""" 192 if isinstance(fields, Field):
194 fields = fields.field_blobs()
195 if stop_blob
is None:
196 stop_blob = local_init_net.NextName(
"dequeue_status")
198 fields, local_init_net, local_finish_net, stop_blob)
199 return (write_nets, stop_blob)
202 """Add operations to `finish_net` that signal end of data. 204 This must be implemented by all Writers, but may be no-op for some 210 class ReaderBuilder(object):
211 """ Allow usage of a reader in distributed fashion. """ 213 raise NotImplementedError()
217 Optionally, perform one-time setup before calling new_reader(). 218 Subclass should make sure this function is only called once. 220 raise NotImplementedError()
222 def new_reader(self, **kwargs):
223 raise NotImplementedError()
227 """ReaderBuilder that modifies underlying builder by calling `piper` 228 function on each new reader produced, and return the result of 229 the function. This way, it is possible to append data processing 230 pipelines that will be replicated for each reader that gets created. 236 lambda reader: pipe(reader, processor=my_proc)) 239 def __init__(self, builder, piper):
244 return self._builder.schema()
246 def setup(self, **kwargs):
247 return self._builder.setup(**kwargs)
249 def new_reader(self, **kwargs):
253 reader=self._builder.new_reader(**kwargs),
256 return output
if isinstance(output, Reader)
else output.reader()
260 def __init__(self, schema=None, obj_key=None):
269 def setup(self, global_init_net):
273 raise NotImplementedError()
276 raise NotImplementedError()
278 def num_readers(self):
279 return self._num_readers
281 def num_writers(self):
282 return self._num_writers
284 def _new_writer(self, writer_schema, writer_init_net):
285 if writer_schema
is not None and self._schema
is None:
286 self._schema = writer_schema
287 self._num_writers += 1
288 if self._obj_key
is not None:
289 writer_init_net.add_attribute(self._obj_key, self)
291 def _new_reader(self, reader_init_net):
292 self._num_readers += 1
293 if self._obj_key
is not None:
294 reader_init_net.add_attribute(self._obj_key, self)
298 """ Reader that produces increasing integers. """ 300 Reader.__init__(self, schema=
Struct((
'iter', np.int64)))
304 def setup_ex(self, global_init_net, global_finish_net):
306 self.
counter = global_init_net.CreateCounter([], init_count=0)
308 [], shape=[], dtype=core.DataType.BOOL, value=
False)
310 def read_ex(self, local_init_net, local_finish_net):
311 count_net =
core.Net(
'limited_reader_counter')
312 value = count_net.CountUp([self.
counter], 1)
317 """Abstract Reader constrained by certain conditions. 319 Base class for Reader classes which check for certain conditions to stop 320 further processing (e.g. max number of iterations or time limit). 321 Also produces a boolean blob (data_finished) that can be used to see if 322 the reader exausted all input data (true) or stopped for another reason 326 def __init__(self, reader):
327 Reader.__init__(self, schema=reader._schema)
331 self.net.NextName(
'data_finished'))
334 def setup_ex(self, global_init_net, global_finish_net):
335 global_init_net.ConstantFill(
337 shape=[], value=
False, dtype=core.DataType.BOOL)
338 self.reader.setup_ex(global_init_net, global_finish_net)
341 def read_ex(self, local_init_net, local_finish_net):
342 """Reads from an underlying Reader class, but may stop due to additional 345 Build and return network(s) to read data from a Reader with 346 additional constraints, depending on which derived class is used. 347 Derived classes implement setup_limited and check_limiter_condition 348 which determine the nature of the constraint imposed on the reader, 349 e.g. iteration limits or time limit. 352 local_init_net: A net invoked at task instance init time (Once per 354 local_finish_net: A net invoked at task instance cleanup time (Once 355 per parallel thread). 359 stop_condition_net =
core.Net(
'limited_reader_condition')
363 nets, local_data_finished, fields = self.reader.read_ex(
364 local_init_net, local_finish_net)
368 check_done_net =
core.Net(
'limited_reader_post')
373 check_done_net.Copy(local_data_finished, should_stop)
378 return [stop_condition_net] + nets + [check_done_net], should_stop, fields
381 """Configure task level init/cleanup nets required to implement limit 382 condition. Must be implemented by subclass. 385 global_init_net: A net invoked at task init time. 386 global_finish_net: A net invoked at task cleanup time. 388 raise NotImplementedError(
"Subclass must implement `setup_limiter`")
391 """Configure a net that is invoked between reading batches to see if 392 limit condition is met. Must be implemented by subclass. 395 stop_condition_net: A net invoked to evaluate an early termination 398 raise NotImplementedError(
"Subclass must implement `check_limiter_condition")
402 Return a blob that can be checked after the end of the reading task, 403 which will contain a scalar float indicating whether the underlying 404 reader has been exhausted (True) or whether we stopped because reached 405 the limit of iterations (False). 411 """Reader that stops after `num_iter` batches. 413 If `num_iter` <= 0 or is None, reverts to an unconstrained reader that 414 exports a boolean blob indicating that the reader has exhausted 418 """Class initializer. 421 reader: The underlying reader object doing the actual read. 422 num_iter: Number of batches to read. If `None`, 423 the class reverts to a normal reader except that it also 424 produces a data_finished blob as a side effect to indicate 425 whether the input stream is exhausted. 427 super(ReaderWithLimit, self).
__init__(reader)
431 self.
counter = self.net.AddExternalInput(
432 self.net.NextName(
'counter'))
434 def setup_limiter(self, global_init_net, global_finish_net):
436 global_init_net.CreateCounter(
439 def check_limiter_condition(self, stop_condition_net):
441 return stop_condition_net.CountDown([self.
counter], 1)
443 return stop_condition_net.ConstantFill(
445 shape=[], value=
False, dtype=core.DataType.BOOL)
448 def CountUntil(num_iter):
453 """Reader that stops after `duration` seconds. 455 If `duration` <= 0 or is None, reverts to an unconstrained reader that 456 exports a boolean blob indicating that the reader has exhausted 460 """Class initializer. 463 reader: The underlying reader object doing the actual read. 464 duration: Number of seconds to read. If un-specified, None, or <= 0, 465 the class reverts to a normal reader except that it also 466 produces a data_finished blob as a side effect to indicate 467 whether the input stream is exhausted. 469 super(ReaderWithTimeLimit, self).
__init__(reader)
475 def setup_limiter(self, global_init_net, global_finish_net):
477 duration_ns = int(self.
duration * (10**9))
479 self.
timer = global_init_net.TimerBegin(
480 [], counter_name=
'epoch_timer')
481 start_time = global_init_net.TimerGet(self.
timer)
483 [start_time], value=duration_ns)
485 global_finish_net.TimerEnd([self.
timer], [])
487 def check_limiter_condition(self, stop_condition_net):
489 time_elapsed = stop_condition_net.TimerGet(self.
timer)
490 return stop_condition_net.GE(
493 return stop_condition_net.ConstantFill(
494 [], 1, shape=[], value=
False, dtype=core.DataType.BOOL
499 """Test reader class that inserts a delay between reading batches.""" 501 def __init__(self, reader, delay):
502 Reader.__init__(self, schema=reader._schema)
506 def setup_ex(self, global_init_net, global_finish_net):
507 self.reader.setup_ex(global_init_net, global_finish_net)
509 def read_ex(self, local_init_net, local_finish_net):
512 def sleep_op(*args, **argd):
513 time.sleep(self.
delay)
515 read_net.Python(sleep_op)([], [])
516 return ([read_net],) + self.reader.read(read_net)
521 Base class for a reader that wrap multiple readers, e.g., reading from 522 multiple sources simultaneously. 527 names: list[str] names of readers; used as schema keys 528 readers: list[Reader] Reader instances, must have schema 530 assert len(names) == len(readers)
532 (name, reader.schema())
for name, reader
in zip(names, readers)
537 def setup_ex(self, init_net, finish_net):
539 reader.setup_ex(init_net, finish_net)
541 def read_ex(self, local_init_net, local_finish_net):
543 Stops when one of the reader finished 545 local_should_stop = local_init_net.ConstantFill(
546 [], shape=[], dtype=core.DataType.BOOL, value=
False)
550 sub_read_nets, should_stop, record = reader.read_record_ex(
551 local_init_net, local_finish_net)
552 stop_net =
core.Net(
"{}_stop".format(name))
553 stop_net.Copy(should_stop, local_should_stop)
554 sub_read_nets.append(stop_net)
555 read_nets.extend(sub_read_nets)
556 fields.extend(record.field_blobs())
557 return read_nets, local_should_stop, fields
559 def reset(self, net):
566 A reader builder for CompositeReader 571 names: list[str] names of readers; used as schema keys 572 reader_builders: list[ReaderBuilder] ReaderBuilder instances; 575 super(CompositeReaderBuilder, self).
__init__()
579 (name, reader_builder.schema())
580 for name, reader_builder
in zip(names, reader_builders)
586 def setup(self, **kwargs):
588 reader_builder.setup(**kwargs)
592 if "limiter" in kwargs:
593 kwargs.pop(
"limiter")
595 def new_reader(self, **kwargs):
598 reader = reader_builder.new_reader(**kwargs)
599 if isinstance(reader, Reader):
601 elif hasattr(reader,
'reader'):
602 reader = reader.reader()
604 raise ValueError(
'reader must be an instance of Reader or Pipe')
605 readers.append(reader)
608 assert multi_reader.schema() == self.
_schema
Module caffe2.python.schema.
def _set_schema(self, schema)
def __init__(self, names, reader_builders)
def write_record_ex(self, fields, local_init_net, local_finish_net, stop_blob=None)
def __init__(self, reader, duration=0)
def setup_ex(self, init_net, finish_net)
def read_ex(self, local_init_net, local_finish_net)
def commit(self, finish_net)
def __init__(self, names, readers)
def execution_step(self, reader_net_name=None, external_should_stop=None)
def check_limiter_condition(self, stop_condition_net)
def read_record(self, read_net)
def setup_limiter(self, global_init_net, global_finish_net)
def read_ex(self, local_init_net, local_finish_net)
def write(self, writer_net, fields)
def __init__(self, reader, num_iter=1)
def write_ex(self, fields, local_init_net, local_finish_net, stop_blob)
def setup_ex(self, init_net, finish_net)