4 Implementation of a queue wrapper. 6 from __future__
import absolute_import
7 from __future__
import division
8 from __future__
import print_function
9 from __future__
import unicode_literals
14 Struct, Field, from_column_list)
18 def __init__(self, blobs_queue, schema, name=None):
19 """Don't call this directly. Instead, use dataset.reader()""" 20 super(_QueueReader, self).
__init__(schema)
24 def read(self, read_net):
25 with core.NameScope(read_net.NextName(self.
name)):
26 status = read_net.NextName()
27 fields = read_net.SafeDequeueBlobs(
28 self.
blobs_queue, self._schema.field_names() + [status])
29 return (fields[-1], fields[:-1])
33 def __init__(self, blobs_queue, schema):
37 def write(self, writer_net, fields):
38 if isinstance(fields, Field):
39 fields = fields.field_blobs()
40 writer_net.CheckDatasetConsistency(
41 fields, [], fields=self.schema.field_names())
42 status = writer_net.NextName()
43 writer_net.SafeEnqueueBlobs(
49 """ The class is used to feed data with some process from a reader into a 50 queue and provider a reader interface for data fetching from the queue. 52 def __init__(self, fields, name=None, capacity=1,
53 enforce_unique_name=
False, num_threads=1):
54 assert isinstance(fields, list)
or isinstance(fields, Struct), (
55 'fields must be either a Struct or a list of raw field names.')
56 if isinstance(fields, list):
57 fields = from_column_list(fields)
59 self.
name = name
or 'queue' 61 num_blobs = len(self.schema.field_names())
67 enforce_unique_name=enforce_unique_name)
68 core.workspace.RunNetOnce(init_net)
71 reader_name = self.
name +
'_reader' 77 '{}_close_step'.format(str(exit_net)),
80 def build(self, reader, process=None):
82 Build the producer_step to feed data from reader into the queue, and 83 return the reader interface. 85 reader: read data which will be stored in the queue. 86 process: preprocess data before enqueue. 88 reader: reader to fetch the data from the queue. 89 producer_step: the step insert the data into the queue. Should be 90 run with comsume_step together. 91 exit_step: the step to close queue 92 schema: the schema for the reader. 96 name =
'reader_' + str(i)
98 should_stop, fields = reader.read_record(net_reader)
99 step_read = core.execution_step(name, net_reader)
101 name =
'queue_writer' + str(i)
103 field_blobs = fields.field_blobs()
105 field_blobs = process(net_prod, fields).field_blobs()
107 self.writer.write(net_prod, field_blobs)
108 step_prod = core.execution_step(name, net_prod)
109 step = core.execution_step(
110 'producer_' + str(i),
111 [step_read, step_prod],
112 should_stop_blob=should_stop)
113 producer_steps.append(step)
114 producer_step = core.execution_step(
117 concurrent_substeps=
True)
def build(self, reader, process=None)
def __init__(self, blobs_queue, schema, name=None)