Caffe2 - C++ API
A deep learning, cross platform ML framework
blob_serialization.h
1 
17 #ifndef CAFFE2_CORE_BLOB_SERIALIZATION_H_
18 #define CAFFE2_CORE_BLOB_SERIALIZATION_H_
19 
20 #include <limits>
21 #include <future>
22 
23 #include <google/protobuf/repeated_field.h>
24 
25 #include "caffe2/core/blob.h"
26 #include "caffe2/core/blob_serializer_base.h"
27 #include "caffe2/core/tensor.h"
28 #include "caffe2/core/typeid.h"
29 #include "caffe2/core/types.h"
30 #include "caffe2/utils/simple_queue.h"
31 
32 CAFFE2_DECLARE_int(caffe2_tensor_chunk_size);
33 CAFFE2_DECLARE_int(caffe2_max_tensor_serializer_threads);
34 CAFFE2_DECLARE_bool(caffe2_serialize_fp16_as_bytes);
35 
36 namespace caffe2 {
37 
38 constexpr auto kTensorBlobType = "Tensor";
39 // String used to separate chunk id from the blob name when storing in DB
40 constexpr auto kChunkIdSeparator = "#%";
41 
42 // The Blob serialization registry and serializer creator functions.
43 CAFFE_DECLARE_TYPED_REGISTRY(
44  BlobSerializerRegistry,
45  CaffeTypeId,
46  BlobSerializerBase,
47  std::unique_ptr);
48 #define REGISTER_BLOB_SERIALIZER(id, ...) \
49  CAFFE_REGISTER_TYPED_CLASS(BlobSerializerRegistry, id, __VA_ARGS__)
50 // Creates an operator with the given operator definition.
51 inline unique_ptr<BlobSerializerBase> CreateSerializer(CaffeTypeId id) {
52  return BlobSerializerRegistry()->Create(id);
53 }
54 
61 template <class Context>
63  public:
64  TensorSerializer() : context_() {}
65  ~TensorSerializer() override {}
70  void Serialize(
71  const Blob& blob,
72  const string& name,
73  SerializationAcceptor acceptor) override;
74  void SerializeWithChunkSize(
75  const Blob& blob,
76  const string& name,
77  SerializationAcceptor acceptor,
78  int chunk_size) override;
79 
80  void Serialize(const Tensor<Context>& tensor, const string& name,
81  TensorProto* proto, size_t chunkBegin, int32_t chunkSize);
82 
83  private:
84  // A utility function to store the device context detauls.
85  void StoreDeviceDetail(const Tensor<Context>& input, TensorProto* proto);
86  Context context_;
87 };
88 
94  public:
95  virtual ~BlobDeserializerBase() {}
96 
97  // Deserializes from a BlobProto object.
98  virtual void Deserialize(const BlobProto& proto, Blob* blob) = 0;
99 };
100 
101 CAFFE_DECLARE_REGISTRY(BlobDeserializerRegistry, BlobDeserializerBase);
102 #define REGISTER_BLOB_DESERIALIZER(name, ...) \
103  CAFFE_REGISTER_CLASS(BlobDeserializerRegistry, name, __VA_ARGS__)
104 // Creates an operator with the given operator definition.
105 inline unique_ptr<BlobDeserializerBase> CreateDeserializer(const string& type) {
106  return BlobDeserializerRegistry()->Create(type);
107 }
108 
117 template <class Context>
119  public:
120  void Deserialize(const BlobProto& proto, Blob* blob) override;
121  void Deserialize(const TensorProto& proto, Tensor<Context>* tensor);
122 };
123 
125 // Implementations
127 
128 namespace detail {
129 template <typename SrcType, typename DstType, class Context>
130 inline void CopyToProtoAsIs(
131  const size_t size,
132  const SrcType* src,
133  google::protobuf::RepeatedField<DstType>* field,
134  Context* context) {
135  static_assert(
136  sizeof(SrcType) == sizeof(DstType),
137  "The source type and dest type cannot be copied as-is. Did "
138  "you mean CopyToProtoWithCast?");
139  field->Reserve(size);
140  for (int i = 0; i < size; ++i) {
141  field->Add(0);
142  }
143  context->template Copy<SrcType, Context, CPUContext>(
144  size, src, reinterpret_cast<SrcType*>(field->mutable_data()));
145  // Make sure that we finish the copy into the protobuf.
146  context->FinishDeviceComputation();
147 }
148 
149 template <typename SrcType, typename DstType, class Context>
150 inline void CopyToProtoWithCast(
151  const size_t size,
152  const SrcType* src,
153  google::protobuf::RepeatedField<DstType>* field,
154  Context* context) {
155  // TODO: we are having one unnecessary copy here if the context is already
156  // CPUContext. Remove it if it is performance critical.
157  unique_ptr<SrcType[]> buffer(new SrcType[size]);
158  context->template Copy<SrcType, Context, CPUContext>(
159  size, src, buffer.get());
160  context->FinishDeviceComputation();
161  field->Reserve(size);
162  for (int i = 0; i < size; ++i) {
163  field->Add(static_cast<DstType>(buffer[i]));
164  }
165 }
166 
167 template <typename SrcType, typename DstType, class Context>
168 inline void CopyFromProtoAsIs(
169  const size_t size,
170  const google::protobuf::RepeatedField<SrcType>& field,
171  DstType* dst,
172  Context* context) {
173  static_assert(
174  sizeof(SrcType) == sizeof(DstType),
175  "The source type and dest type cannot be copied as-is. Did "
176  "you mean CopyFromProtoWithCast?");
177  CAFFE_ENFORCE_EQ(size, field.size(), "Incorrect proto field size.");
178  context->template Copy<DstType, CPUContext, Context>(
179  size, reinterpret_cast<const DstType*>(field.data()), dst);
180 }
181 
182 template <typename SrcType, typename DstType, class Context>
183 inline void CopyFromProtoWithCast(
184  const size_t size,
185  const google::protobuf::RepeatedField<SrcType>& field,
186  DstType* dst,
187  Context* context) {
188  CAFFE_ENFORCE_EQ(size, field.size(), "Incorrect proto field size.");
189  // TODO: we are having one unnecessary copy here if the context is already
190  // CPUContext. Remove it if it is performance critical.
191  unique_ptr<DstType[]> buffer(new DstType[size]);
192  const SrcType* src = field.data();
193  for (int i = 0; i < size; ++i) {
194  buffer[i] = static_cast<DstType>(src[i]);
195  }
196  context->template Copy<DstType, CPUContext, Context>(size, buffer.get(), dst);
197 }
198 
199 } // namespace detail
200 
201 template <class Context>
203  const Blob& blob,
204  const string& name,
205  BlobSerializerBase::SerializationAcceptor acceptor) {
206  this->SerializeWithChunkSize(blob, name, acceptor, kDefaultChunkSize);
207 }
208 
209 template <class Context>
211  const Blob& blob,
212  const string& name,
213  BlobSerializerBase::SerializationAcceptor acceptor,
214  int chunk_size) {
215  CAFFE_ENFORCE(blob.IsType<Tensor<Context>>());
216  const auto& tensor = blob.template Get<Tensor<Context>>();
217  if (chunk_size == kNoChunking) {
218  chunk_size = tensor.size() + 1; // to account for empty tensors
219  } else if (chunk_size == kDefaultChunkSize) {
220  chunk_size = FLAGS_caffe2_tensor_chunk_size;
221  }
222 
223  auto processChunk = [&](int64_t chunkStart) {
224  BlobProto blob_proto;
225  blob_proto.set_name(name);
226  blob_proto.set_type(kTensorBlobType);
227  TensorProto& proto = *blob_proto.mutable_tensor();
228  proto.set_name(name);
229  this->Serialize(
230  tensor, name, blob_proto.mutable_tensor(), chunkStart, chunk_size);
231  acceptor(
232  MakeString(name, kChunkIdSeparator, chunkStart / chunk_size),
233  blob_proto.SerializeAsString());
234  };
235 
236 #ifndef __ANDROID__
237  std::vector<std::future<void>> futures;
238  // Poorman's IOBound ThreadPool
239  SimpleQueue<size_t> chunkQueue;
240  auto task = [&]() {
241  size_t chunkStart;
242  while (chunkQueue.Pop(&chunkStart)) {
243  processChunk(chunkStart);
244  }
245  };
246  if (tensor.size() > chunk_size) {
247  for (int i = 0; i < FLAGS_caffe2_max_tensor_serializer_threads; ++i) {
248  futures.emplace_back(std::async(std::launch::async, task));
249  }
250  }
251 #endif
252 
253  VLOG(1) << "Serializing blob " << name;
254  // Serialize whole vector. If vector is empty, it's shape still needs to be
255  // serialized in empty proto
256  for (size_t chunkBegin = 0;
257  chunkBegin < std::max(tensor.size(), static_cast<TIndex>(1));
258  chunkBegin += chunk_size) {
259  VLOG(2) << "Starting a chunk at " << chunkBegin;
260 #ifndef __ANDROID__
261  if (tensor.size() > chunk_size) {
262  chunkQueue.Push(chunkBegin);
263  } else {
264  // Sync mode for small tensors
265  processChunk(chunkBegin);
266  }
267 #else
268  // Since Android does not have std::future, we will always do sync mode
269  processChunk(chunkBegin);
270 #endif
271  }
272 
273 #ifndef __ANDROID__
274  chunkQueue.NoMoreJobs();
275  for (auto& fut : futures) {
276  fut.get();
277  }
278 #endif
279 }
280 
281 template <class Context>
283  const Tensor<Context>& input,
284  const string& /*name*/,
285  TensorProto* proto_ptr,
286  size_t chunkBegin,
287  int32_t chunkSize) {
288  CAFFE_ENFORCE(
289  chunkBegin <= input.size(),
290  "Chunk begin is out of tensor: ",
291  chunkBegin,
292  ' ',
293  input.size());
294  if (chunkBegin + chunkSize > input.size()) {
295  chunkSize = input.size() - chunkBegin;
296  }
297 
298  CAFFE_ENFORCE(
299  input.raw_data() || chunkSize == 0,
300  "The input does not have data input yet. This is probably because you "
301  "created a tensor of non-zero shape but never filled its data via "
302  "mutable_data() calls. This means that it makes no sense to serialize "
303  "the tensor content.");
304 
305  TensorProto& proto = *proto_ptr;
306  proto.mutable_segment()->set_begin(chunkBegin);
307  proto.mutable_segment()->set_end(chunkBegin + chunkSize);
308 
309  for (int i = 0; i < input.ndim(); ++i) {
310  proto.add_dims(input.dim(i));
311  }
312  const TensorProto::DataType data_type = TypeMetaToDataType(input.meta());
313  proto.set_data_type(data_type);
314  StoreDeviceDetail(input, &proto);
315 
316  // A lot of copypaste is error prone. Should we create a macro for this?
317  switch (data_type) {
318  case TensorProto_DataType_FLOAT:
319  detail::CopyToProtoAsIs(
320  chunkSize,
321  input.template data<float>() + chunkBegin,
322  proto.mutable_float_data(),
323  &this->context_);
324  break;
325  case TensorProto_DataType_INT32:
326  detail::CopyToProtoAsIs(
327  chunkSize,
328  input.template data<int>() + chunkBegin,
329  proto.mutable_int32_data(),
330  &this->context_);
331  break;
332  case TensorProto_DataType_BYTE:
333  LOG(FATAL) << "This should not happen. When serializing, "
334  "BYTE is deprecated and moved to UINT8.";
335  break;
336  case TensorProto_DataType_STRING:
337  {
338  proto.mutable_string_data()->Reserve(chunkSize);
339  const string* content = input.template data<string>();
340  for (int i = chunkBegin; i < chunkBegin + chunkSize; ++i) {
341  proto.add_string_data(content[i]);
342  }
343  break;
344  }
345  case TensorProto_DataType_BOOL:
346  detail::CopyToProtoWithCast(
347  chunkSize,
348  input.template data<bool>() + chunkBegin,
349  proto.mutable_int32_data(),
350  &this->context_);
351  break;
352  case TensorProto_DataType_UINT8:
353  detail::CopyToProtoWithCast(
354  chunkSize,
355  input.template data<uint8_t>() + chunkBegin,
356  proto.mutable_int32_data(),
357  &this->context_);
358  break;
359  case TensorProto_DataType_INT8:
360  detail::CopyToProtoWithCast(
361  chunkSize,
362  input.template data<int8_t>() + chunkBegin,
363  proto.mutable_int32_data(),
364  &this->context_);
365  break;
366  case TensorProto_DataType_UINT16:
367  detail::CopyToProtoWithCast(
368  chunkSize,
369  input.template data<uint16_t>() + chunkBegin,
370  proto.mutable_int32_data(),
371  &this->context_);
372  break;
373  case TensorProto_DataType_INT16:
374  detail::CopyToProtoWithCast(
375  chunkSize,
376  input.template data<int16_t>() + chunkBegin,
377  proto.mutable_int32_data(),
378  &this->context_);
379  break;
380  case TensorProto_DataType_INT64:
381  detail::CopyToProtoAsIs(
382  chunkSize,
383  input.template data<int64_t>() + chunkBegin,
384  proto.mutable_int64_data(),
385  &this->context_);
386  break;
387  case TensorProto_DataType_FLOAT16: {
388  if (FLAGS_caffe2_serialize_fp16_as_bytes) {
389  const int kValue = 1;
390  CAFFE_ENFORCE_EQ(
391  reinterpret_cast<const char*>(&kValue)[0],
392  1,
393  "Serialization of FLOAT16 on big endian platform "
394  "is not written yet.");
395  unique_ptr<char[]> buffer(new char[2 * chunkSize]);
396  this->context_.template Copy<char, Context, CPUContext>(
397  2 * chunkSize,
398  reinterpret_cast<const char*>(
399  input.template data<float16>() + chunkBegin),
400  buffer.get());
401  this->context_.FinishDeviceComputation();
402  proto.set_byte_data(buffer.release(), 2 * chunkSize);
403  } else {
404  detail::CopyToProtoWithCast(
405  chunkSize,
406  reinterpret_cast<const uint16_t*>(input.template data<float16>()) +
407  chunkBegin,
408  proto.mutable_int32_data(),
409  &this->context_);
410  }
411  } break;
412  case TensorProto_DataType_DOUBLE:
413  detail::CopyToProtoAsIs(
414  chunkSize,
415  input.template data<double>() + chunkBegin,
416  proto.mutable_double_data(),
417  &this->context_);
418  break;
419  case TensorProto_DataType_UNDEFINED: {
420  proto.mutable_string_data()->Reserve(chunkSize);
421  Blob temp_blob;
422  const char* raw_data = static_cast<const char*>(input.raw_data());
423  for (int i = chunkBegin; i < chunkBegin + chunkSize; ++i) {
424  temp_blob.ShareExternal(
425  const_cast<char*>(raw_data + i * input.itemsize()), input.meta());
426  proto.add_string_data(temp_blob.Serialize(""));
427  }
428  } break;
429  // Note: we intentially do not provide "default:" so if any new data types
430  // are added, the compiler should warn the user to add the case here.
431  }
432 }
433 
434 template <class Context>
436  const BlobProto& blob_proto,
437  Blob* blob) {
438  Deserialize(blob_proto.tensor(), blob->GetMutable<Tensor<Context>>());
439 }
440 
441 template <class Context>
443  const TensorProto& proto,
444  Tensor<Context>* tensor) {
445  // We create a local context for deserializing. Since Caffe2 contexts are
446  // usually lightweighted, this should not involve too much overhead.
447  Context context(proto.device_detail());
448  context.SwitchToDevice(0);
449  vector<TIndex> dims;
450  for (const TIndex d : proto.dims()) {
451  dims.push_back(d);
452  }
453  tensor->Resize(dims);
454 
455  int64_t chunkBegin = 0;
456  auto chunkEnd = tensor->size();
457  if (proto.has_segment()) {
458  chunkBegin = proto.segment().begin();
459  chunkEnd = proto.segment().end();
460  }
461  CAFFE_ENFORCE(
462  0 <= chunkBegin && chunkBegin <= chunkEnd && chunkEnd <= tensor->size(),
463  "Invalid chunk ",
464  chunkBegin,
465  ' ',
466  chunkEnd,
467  " with total tensor size ",
468  tensor->size());
469  auto chunkSize = chunkEnd - chunkBegin;
470 
471  switch (proto.data_type()) {
472  case TensorProto_DataType_FLOAT:
473  detail::CopyFromProtoAsIs(
474  chunkSize,
475  proto.float_data(),
476  tensor->template mutable_data<float>() + chunkBegin,
477  &context);
478  break;
479  case TensorProto_DataType_INT32:
480  detail::CopyFromProtoAsIs(
481  chunkSize,
482  proto.int32_data(),
483  tensor->template mutable_data<int>() + chunkBegin,
484  &context);
485  break;
486  case TensorProto_DataType_BYTE:
487  // Since BYTE stores the data in a string field instead of a repreated
488  // field we will have it special cased.
489  CAFFE_ENFORCE_EQ(
490  chunkSize, proto.byte_data().size(), "Incorrect proto field size.");
491  context.template Copy<uint8_t, Context, CPUContext>(
492  chunkSize,
493  reinterpret_cast<const uint8_t*>(proto.byte_data().data()),
494  tensor->template mutable_data<uint8_t>() + chunkBegin);
495  break;
496  case TensorProto_DataType_STRING:
497  // Special handing of string because it is a non-fundamental type.
498  {
499  string* content = tensor->template mutable_data<string>();
500  for (int i = 0; i < chunkSize; ++i) {
501  content[i + chunkBegin] = proto.string_data(i);
502  }
503  }
504  break;
505  case TensorProto_DataType_BOOL:
506  detail::CopyFromProtoWithCast(
507  chunkSize,
508  proto.int32_data(),
509  tensor->template mutable_data<bool>() + chunkBegin,
510  &context);
511  break;
512  case TensorProto_DataType_UINT8:
513  detail::CopyFromProtoWithCast(
514  chunkSize,
515  proto.int32_data(),
516  tensor->template mutable_data<uint8_t>() + chunkBegin,
517  &context);
518  break;
519  case TensorProto_DataType_INT8:
520  detail::CopyFromProtoWithCast(
521  chunkSize,
522  proto.int32_data(),
523  tensor->template mutable_data<int8_t>() + chunkBegin,
524  &context);
525  break;
526  case TensorProto_DataType_UINT16:
527  detail::CopyFromProtoWithCast(
528  chunkSize,
529  proto.int32_data(),
530  tensor->template mutable_data<uint16_t>() + chunkBegin,
531  &context);
532  break;
533  case TensorProto_DataType_INT16:
534  detail::CopyFromProtoWithCast(
535  chunkSize,
536  proto.int32_data(),
537  tensor->template mutable_data<int16_t>() + chunkBegin,
538  &context);
539  break;
540  case TensorProto_DataType_INT64:
541  detail::CopyFromProtoAsIs(
542  chunkSize,
543  proto.int64_data(),
544  tensor->template mutable_data<int64_t>() + chunkBegin,
545  &context);
546  break;
547  case TensorProto_DataType_FLOAT16:
548  if (proto.has_byte_data()) {
549  const int kValue = 1;
550  CAFFE_ENFORCE_EQ(
551  reinterpret_cast<const char*>(&kValue)[0],
552  1,
553  "Serialization of FLOAT16 on big endian platform "
554  "is not written yet.");
555  CAFFE_ENFORCE_EQ(
556  2 * chunkSize,
557  proto.byte_data().size(),
558  "Incorrect proto field size.");
559  context.template Copy<float16, Context, CPUContext>(
560  chunkSize,
561  reinterpret_cast<const float16*>(proto.byte_data().data()),
562  tensor->template mutable_data<float16>() + chunkBegin);
563  } else {
564  // Backward compatibility with models which used int32_data field
565  detail::CopyFromProtoWithCast(
566  chunkSize,
567  proto.int32_data(),
568  reinterpret_cast<uint16_t*>(
569  tensor->template mutable_data<float16>()) +
570  chunkBegin,
571  &context);
572  }
573  break;
574  case TensorProto_DataType_DOUBLE:
575  detail::CopyFromProtoAsIs(
576  chunkSize,
577  proto.double_data(),
578  tensor->template mutable_data<double>() + chunkBegin,
579  &context);
580  break;
581  case TensorProto_DataType_UNDEFINED: {
582  Blob temp_blob;
583  void* raw_ptr = nullptr;
584  for (int i = 0; i < chunkSize; ++i) {
585  temp_blob.Deserialize(proto.string_data(i));
586  if (i == 0) {
587  raw_ptr = tensor->raw_mutable_data(temp_blob.meta());
588  }
589  temp_blob.meta().copy()(
590  temp_blob.GetRaw(),
591  static_cast<char*>(raw_ptr) +
592  (i + chunkBegin) * temp_blob.meta().itemsize(),
593  1);
594  }
595  }
596  }
597  context.FinishDeviceComputation();
598 }
599 
600 } // namespace caffe2
601 
602 #endif // CAFFE2_CORE_BLOB_SERIALIZATION_H_
Blob is a general container that hosts a typed pointer.
Definition: blob.h:41
size_t itemsize() const
Return the number of bytes each item takes in the tensor.
Definition: tensor.h:613
std::remove_const< T >::type * ShareExternal(typename std::remove_const< T >::type *allocated)
Sets the underlying object to the allocated one, but does not take over the ownership of the passed i...
Definition: blob.h:179
const TypeMeta & meta() const
Returns the TypeMeta object associated with the current data type.
Definition: tensor.h:664
TIndex dim(const int i) const
Returns the i-th dimension of the tensor.
Definition: tensor.h:687
TensorSerializer is the serializer for Tensors.
BlobDeserializerBase is an abstract class that deserializes a blob from a BlobProto or a TensorProto...
Tensor is the basic class in Caffe2 that stores a contiguous memory with its shape information...
Definition: tensor.h:109
TIndex size() const
Returns the size (i.e.
Definition: tensor.h:609
void Serialize(const Blob &blob, const string &name, SerializationAcceptor acceptor) override
Serializes a Blob.
void Serialize(const string &name, BlobSerializerBase::SerializationAcceptor acceptor, int chunk_size=kDefaultChunkSize) const
Serializes the current blob, if possible.
void Resize(Ts...dim_source)
Resizes a tensor.
Definition: tensor.h:304
TensorDeserializer is the deserializer for Tensors.
Copyright (c) 2016-present, Facebook, Inc.
const void * raw_data() const
Returns a const raw void* pointer of the underlying storage.
Definition: tensor.h:488
TypedCopy copy() const
Returns the typed copy function pointer for individual iterms.
Definition: typeid.h:171
T * GetMutable(bool *is_new_object=nullptr)
Gets a mutable pointer to the stored object.
Definition: blob.h:117
const TypeMeta & meta() const
Returns the meta info of the blob.
Definition: blob.h:79
void Deserialize(const string &content)
Deserializes from a string containing either BlobProto or TensorProto.
bool IsType() const
Checks if the content stored in the blob is of type T.
Definition: blob.h:74
int ndim() const
Returns the number of dimensions of the data.
Definition: tensor.h:605
void * raw_mutable_data(const TypeMeta &meta)
Returns a mutable raw pointer of the underlying storage.
Definition: tensor.h:526
const size_t & itemsize() const
Returns the size of the item.
Definition: typeid.h:159
BlobSerializerBase is an abstract class that serializes a blob to a string.