Caffe2 - Python API
A deep learning, cross platform ML framework
feature_sparse_to_dense.py
1 # @package sparse_to_dense
2 # Module caffe2.python.layers.sparse_to_dense
3 from __future__ import absolute_import, division, print_function, unicode_literals
4 
5 import numpy as np
6 from caffe2.python import schema
7 from caffe2.python.layers.layers import ModelLayer
8 
9 
11  def __init__(
12  self, model, input_record, input_specs, name="feature_sparse_to_dense", **kwargs
13  ):
14  """
15  `input_specs` follows the format of FeatureSpec from schema. To be more
16  precise it's a namedtuple that should have:
17  'feature_type', 'feature_names', 'feature_ids'
18  """
19  super(FeatureSparseToDense, self).__init__(model, name, input_record, **kwargs)
20 
21  self.input_specs = input_specs
22 
23  outputs = []
24  for field, feature_specs in self.input_specs:
25  assert len(feature_specs.feature_names) == len(feature_specs.feature_ids)
26  if feature_specs.feature_type == "FLOAT":
27  outputs.append(
28  (
29  field,
31  (np.float32, (len(feature_specs.feature_ids),)),
32  self.get_next_blob_reference(field + "_output"),
33  ),
34  )
35  )
36  elif feature_specs.feature_type == "ID_LIST":
37  outputs.append(
38  (
39  field,
41  (
42  "ranges",
44  (np.int32, (len(feature_specs.feature_ids), 2)),
45  self.get_next_blob_reference(field + "_ranges"),
46  ),
47  ),
48  (
49  "values",
51  np.int64,
52  self.get_next_blob_reference(field + "_values"),
53  ),
54  ),
55  ),
56  )
57  )
58  elif feature_specs.feature_type == "ID_SCORE_LIST":
59  outputs.append(
60  (
61  field,
63  (
64  "ranges",
66  (np.int32, (len(feature_specs.feature_ids), 2)),
67  self.get_next_blob_reference(field + "_ranges"),
68  ),
69  ),
70  (
71  "ids",
73  np.int64,
74  self.get_next_blob_reference(field + "_ids"),
75  ),
76  ),
77  (
78  "scores",
80  np.float32,
81  self.get_next_blob_reference(field + "_scores"),
82  ),
83  ),
84  ),
85  )
86  )
87  elif feature_specs.feature_type == "EMBEDDING":
88  # We don't know dimensions of embeddings in input data.
89  # Even though they should match dimensions from feature config,
90  # we keep ranges blob to check input data later.
91  outputs.append(
92  (
93  field,
95  (
96  "ranges",
98  (np.int32, (len(feature_specs.feature_ids), 2)),
99  self.get_next_blob_reference(field + "_ranges"),
100  ),
101  ),
102  (
103  "values",
105  np.float32,
106  self.get_next_blob_reference(field + "_values"),
107  ),
108  ),
109  ),
110  )
111  )
112  elif feature_specs.feature_type == "GENERIC_FEATURE":
113  # We don't know dimensions of embeddings in input data.
114  # Even though they should match dimensions from feature config,
115  # we keep ranges blob to check input data later.
116  # Currently this schema with ranges and values is only for
117  # generic type enum 1. If new types are implemented, we need to
118  # modify the ParseGeneric operator, and this part accordinly
119  outputs.append(
120  (
121  field,
123  (
124  "ranges",
126  (np.int32, (len(feature_specs.feature_ids), 2)),
127  self.get_next_blob_reference(field + "_ranges"),
128  ),
129  ),
130  (
131  "values",
133  np.float32,
134  self.get_next_blob_reference(field + "_values"),
135  ),
136  ),
137  ),
138  )
139  )
140  else:
141  raise TypeError(
142  "Unsupported input type: {0}".format(feature_specs.feature_type)
143  )
144 
145  # TODO(amalevich): This schema is producing ranges. And thus if there is
146  # something using it it should support ranges as well. It might be
147  # confusing, if we don't add better support for ranges/have it as a
148  # first layer
149  self.output_schema = schema.Struct(*outputs)
150 
151  # TODO(amalevich): Consider moving this data to schema, instead
152  # Structs doens't support attaching metadata to them and clonning
153  # will break things badly, but this is the most elegant way to pass
154  # this info around. Should we change it or it'll be too much work and
155  # not worse it?
156  for field, feature_specs in input_specs:
157  schema.attach_metadata_to_scalars(
158  self.output_schema[field], schema.Metadata(feature_specs=feature_specs)
159  )
160  self.zero = model.global_constants["ZERO"]
161  self.zero_range = model.global_constants["ZERO_RANGE"]
162 
163  # Add operators to all types that need to be densified
164  def add_ops(self, net):
165  record = self.input_record
166  for field, feature_specs in self.input_specs:
167  if feature_specs.feature_type == "FLOAT":
168  net.SparseToDenseMask(
169  [
170  record[field].keys(),
171  record[field].values(),
172  self.zero,
173  record[field].lengths(),
174  ],
175  [self.output_schema[field]()],
176  mask=feature_specs.feature_ids,
177  )
178  elif feature_specs.feature_type == "ID_LIST":
179  id_list_ranges = net.LengthsToRanges(
180  record[field].values.lengths(), net.NextScopedBlob("id_list_ranges")
181  )
182  net.SparseToDenseMask(
183  [
184  record[field].keys(),
185  id_list_ranges,
186  self.zero_range,
187  record[field].lengths(),
188  ],
189  self.output_schema[field].ranges(),
190  mask=feature_specs.feature_ids,
191  )
192  # Alias helps to enforce the fact that all SparseToDense calls
193  # produce new blobs.
194  # Reusing blob names might result in some weird consequences
195  # during the delivery time, when content of the blobs is
196  # generated based on the inputSpecs.
197  net.Alias(
198  record[field].values.items(), self.output_schema[field].values()
199  )
200  elif feature_specs.feature_type == "ID_SCORE_LIST":
201  # TODO: merge this to the case above?
202  id_list_ranges = net.LengthsToRanges(
203  record[field].values.lengths(),
204  net.NextScopedBlob("id_score_list_ranges"),
205  )
206  net.SparseToDenseMask(
207  [
208  record[field].keys(),
209  id_list_ranges,
210  self.zero_range,
211  record[field].lengths(),
212  ],
213  self.output_schema[field].ranges(),
214  mask=feature_specs.feature_ids,
215  )
216  # Alias helps to enforce the fact that all SparseToDense calls
217  # produce new blobs.
218  # Reusing blob names might result in some weird consequences
219  # during the delivery time, when content of the blobs is
220  # generated based on the inputSpecs.
221  net.Alias(record[field].values.keys(), self.output_schema[field].ids())
222  net.Alias(
223  record[field].values.values(), self.output_schema[field].scores()
224  )
225  elif feature_specs.feature_type == "EMBEDDING":
226  ranges = net.LengthsToRanges(
227  record[field].values.lengths(),
228  net.NextScopedBlob("embeddings_ranges"),
229  )
230  net.SparseToDenseMask(
231  [
232  record[field].keys(),
233  ranges,
234  self.zero_range,
235  record[field].lengths(),
236  ],
237  self.output_schema[field].ranges(),
238  mask=feature_specs.feature_ids,
239  )
240  # Alias helps to enforce the fact that all SparseToDense calls
241  # produce new blobs.
242  # Reusing blob names might result in some weird consequences
243  # during the delivery time, when content of the blobs is
244  # generated based on the inputSpecs.
245  net.Alias(
246  record[field].values.items(), self.output_schema[field].values()
247  )
248  elif feature_specs.feature_type == "GENERIC_FEATURE":
249  (
250  feature_lengths_blob,
251  feature_ids_blob,
252  value_lengths_blob,
253  value_values_blob,
254  ) = net.ParseGeneric(
255  [record[field]()],
256  ["feature_lengths", "feature_ids", "value_lengths", "value_values"],
257  feature_type_enum=1,
258  )
259  # Currently our implementation only supports
260  # generic type enum 1. If new types are implemented, we need to
261  # modify the ParseGeneric operator, the schema above,
262  # and this part accordinly to parse the generic feature strings
263  # into input_record
264 
265  ranges = net.LengthsToRanges(
266  value_lengths_blob, net.NextScopedBlob("generics_ranges")
267  )
268  net.SparseToDenseMask(
269  [feature_ids_blob, ranges, self.zero_range, feature_lengths_blob],
270  self.output_schema[field].ranges(),
271  mask=feature_specs.feature_ids,
272  )
273  # Alias helps to enforce the fact that all SparseToDense calls
274  # produce new blobs.
275  # Reusing blob names might result in some weird consequences
276  # during the delivery time, when content of the blobs is
277  # generated based on the inputSpecs.
278  net.Alias(value_values_blob, self.output_schema[field].values())
279 
280  def get_metadata(self):
281  metadata = []
282  for field, feature_specs in self.input_specs:
283  metadata.append(
284  (
285  {
286  "type": feature_specs.feature_type,
287  "names": feature_specs.feature_names,
288  "ids": feature_specs.feature_ids,
289  },
290  self.output_schema[field].field_blobs(),
291  self.output_schema[field].field_types(),
292  )
293  )
294  if feature_specs.feature_type == "FLOAT":
295  metadata[-1][0]["cardinality"] = 1
296  return metadata
def get_next_blob_reference(self, name)
Definition: layers.py:349
def __init__(self, model, input_record, input_specs, name="feature_sparse_to_dense", kwargs)