4 Implementation of an in-memory dataset with structured schema. 6 Use this to store and iterate through datasets with complex schema that 9 Iterating through entries of this dataset is very fast since the dataset 10 is stored as a set of native Caffe2 tensors, thus no type conversion or 11 deserialization is necessary. 13 from __future__
import absolute_import
14 from __future__
import division
15 from __future__
import print_function
16 from __future__
import unicode_literals
21 Struct, from_blob_list, from_column_list, InitEmptyRecord)
26 def __init__(self, dataset, name, batch_size=1, enforce_batch_size=False):
27 """Don't call this directly. Instead, use dataset.reader()""" 28 Reader.__init__(self, dataset.content())
30 self.
name = name
or (dataset.name +
'_cursor')
35 def setup_ex(self, init_net, exit_net):
37 self.
cursor = init_net.CreateTreeCursor(
39 init_net.NextScopedBlob(self.
name),
40 fields=self.dataset.fields)
42 def read(self, read_net):
43 assert self.
cursor,
'setup not called.' 44 content = self.dataset.content()
45 with core.NameScope(read_net.NextName(self.
name)):
46 fields = read_net.ReadNextBatch(
47 [self.
cursor] + content.field_blobs(),
48 content.field_names(),
51 fields = core.output_to_list(fields)
52 return (read_net.IsEmpty([fields[0]]), fields)
55 net.ResetCursor([self.
cursor], [])
59 def __init__(self, dataset, name, indices, batch_size=1, loop_over=False,
60 enforce_batch_size=
False):
61 """Don't call this directly. Instead, use dataset.random_reader()""" 62 Reader.__init__(self, dataset.content())
65 self.
name = name
or (dataset.name +
'_cursor')
71 def setup_ex(self, init_net, exit_net):
73 self.
cursor = init_net.CreateTreeCursor(
75 init_net.NextScopedBlob(self.
name),
76 fields=self.dataset.fields)
79 net.ResetCursor([self.
cursor], [])
81 def computeoffset(self, net):
83 offsets = net.ComputeOffset(
84 [self.
cursor] + self.dataset.content().field_blobs(),
88 def sort_and_shuffle(self, net, sort_by_field=None,
89 shuffle_size=1, batch_size=1):
91 content = self.dataset.content()
92 sort_by_field_idx = -1
94 assert sort_by_field
in content.field_names(), (
95 'Must be valid field.')
96 sort_by_field_idx = content.field_names().index(sort_by_field)
99 indices = net.SortAndShuffle(
100 [self.
cursor] + content.field_blobs(),
102 sort_by_field_idx=sort_by_field_idx,
103 shuffle_size=shuffle_size,
104 batch_size=batch_size)
107 def read(self, read_net):
108 assert self.
cursor,
'setup_ex not called' 109 assert self.
indices,
'sort_and_shuffle not called' 110 assert self.
offsets,
'computeoffset not called' 111 content = self.dataset.content()
112 with core.NameScope(read_net.NextName(self.
name)):
113 fields = read_net.ReadRandomBatch(
115 content.field_blobs()),
116 content.field_names(),
120 fields = core.output_to_list(fields)
121 return (read_net.IsEmpty([fields[0]]), fields)
126 """Don't call this directly. Use dataset.writer() instead.""" 130 def setup_ex(self, init_net, exit_net):
131 if self.
mutex is None:
132 self.
mutex = init_net.CreateMutex([])
134 def write(self, writer_net, fields):
136 Add operations to `net` that append the blobs in `fields` to the end 137 of the dataset. An additional operator will also be added that checks 138 the consistency of the data in `fields` against the dataset schema. 141 writer_net: The net that will contain the Append operators. 142 fields: A list of BlobReference to be appeneded to this dataset. 144 assert self.
mutex is not None,
'setup not called.' 145 field_blobs = self._content.field_blobs()
146 assert len(fields) == len(field_blobs), (
147 'Expected %s fields, got %s.' % (len(field_blobs), len(fields)))
148 writer_net.CheckDatasetConsistency(
149 fields, [], fields=self._content.field_names())
150 writer_net.AtomicAppend(
151 [self.
mutex] + field_blobs + list(fields),
155 """Commit is a no-op for an in-memory dataset.""" 159 def Const(net, value, dtype=None, name=None):
161 Create a 'constant' by first creating an external input in the given 162 net, and then feeding the corresponding blob with its provided value 163 in the current workspace. The name is automatically generated in order 164 to avoid clashes with existing blob names. 166 assert isinstance(net, core.Net),
'net must be a core.Net instance.' 167 value = np.array(value, dtype=dtype)
168 blob = net.AddExternalInput(net.NextName(prefix=name))
169 workspace.FeedBlob(str(blob), value)
173 def execution_step_with_progress(name, init_net, substeps, rows_read):
175 report_net = core.Net(
'report_net')
176 report_net.Print([rows_read], [])
177 return core.execution_step(
180 report_net=report_net,
181 concurrent_substeps=
True,
186 """Represents an in-memory dataset with fixed schema. 188 Use this to store and iterate through datasets with complex schema that 191 Iterating through entries of this dataset is very fast since the dataset 192 is stored as a set of native Caffe2 tensors, thus no type conversion or 193 deserialization is necessary. 197 """Create an un-initialized dataset with schema provided by `fields`. 199 Before this dataset can be used, it must be initialized, either by 200 `init_empty` or `init_from_dataframe`. 203 fields: either a schema.Struct or a list of field names in a format 204 compatible with the one described in schema.py. 205 name: optional name to prepend to blobs that will store the data. 207 assert isinstance(fields, list)
or isinstance(fields, Struct), (
208 'fields must be either a Struct or a list of raw field names.')
209 if isinstance(fields, list):
210 fields = from_column_list(fields)
212 self.
fields = fields.field_names()
214 self.
name = name
or 'dataset' 215 self.
field_blobs = fields.field_blobs()
if fields.has_blobs()
else None 217 def trim(self, net, multiple_of):
219 Trims the contents of this dataset so that the number of records is 220 multiple of the given argument. 226 multiple_of=multiple_of)
229 """Initialize the blobs for this dataset with empty values. 231 Empty arrays will be immediately fed into the current workspace, 232 and `init_net` will take those blobs as external inputs. 235 init_net, self.schema.clone_schema()).field_blobs()
238 """Initialize the blobs for this dataset from a Pandas dataframe. 240 Each column of the dataframe will be immediately fed into the current 241 workspace, and the `net` will take this blobs as external inputs. 243 assert len(self.
fields) == len(dataframe.columns)
245 Const(net, dataframe.as_matrix([col]).flatten(), name=field)
246 for col, field
in enumerate(self.
fields)]
250 Return the list of BlobReference pointing to the blobs that contain 251 the data for this dataset. 258 Return a Record of BlobReferences pointing to the full content of 264 """Return the list of field names for this dataset.""" 267 def field_types(self):
269 Return the list of field dtypes for this dataset. 271 If a list of strings, not a schema.Struct, was passed to the 272 constructor, this will return a list of dtype(np.void). 276 def reader(self, init_net=None, cursor_name=None, batch_size=1,
277 enforce_batch_size=
False):
278 """Create a Reader object that is used to iterate through the dataset. 280 This will append operations to `init_net` that create a TreeCursor, 281 used to iterate through the data. 283 NOTE: Currently, it is not safe to append to a dataset while reading. 286 init_net: net that will be run once to create the cursor. 287 cursor_name: optional name for the blob containing a pointer 289 batch_size: how many samples to read per iteration. 292 A _DatasetReader that can be used to create operators that will 293 iterate through the dataset. 295 assert self.
field_blobs,
'Dataset not initialized.' 298 if init_net
is not None:
299 reader.setup_ex(init_net,
None)
302 def random_reader(self, init_net=None, indices=None, cursor_name=None,
303 batch_size=1, loop_over=
False, enforce_batch_size=
False):
304 """Create a Reader object that is used to iterate through the dataset. 306 NOTE: The reader order depends on the order in indices. 309 init_net: net that will be run once to create the cursor. 310 indices: blob of reading order 311 cursor_name: optional name for the blob containing a pointer 313 batch_size: how many samples to read per iteration. 314 loop_over: repeat the dataset indefinitely (in the same order) 317 A DatasetReader that can be used to create operators that will 318 iterate through the dataset according to indices. 320 assert self.
field_blobs,
'Dataset not initialized.' 322 self, cursor_name, indices, batch_size, loop_over,
324 if init_net
is not None:
325 reader.setup_ex(init_net,
None)
329 """Create a Writer that can be used to append entries into the dataset. 331 NOTE: Currently, it is not safe to append to a dataset 332 while reading from it. 333 NOTE: Currently implementation of writer is not thread safe. 337 init_net: net that will be run once in order to create the writer. 340 assert self.
field_blobs,
'Dataset not initialized.' 342 if init_net
is not None:
343 writer.setup_ex(init_net,
None)
def writer(self, init_net=None)
def random_reader(self, init_net=None, indices=None, cursor_name=None, batch_size=1, loop_over=False, enforce_batch_size=False)
def init_empty(self, init_net)
def write(self, writer_net, fields)
def __init__(self, dataset, name, indices, batch_size=1, loop_over=False, enforce_batch_size=False)
def commit(self, finish_net)
def init_from_dataframe(self, net, dataframe)
def __init__(self, fields, name=None)
def __init__(self, content)
def trim(self, net, multiple_of)
def reader(self, init_net=None, cursor_name=None, batch_size=1, enforce_batch_size=False)
def __init__(self, dataset, name, batch_size=1, enforce_batch_size=False)