Caffe2 - Python API
A deep learning, cross platform ML framework
executor_test_util.py
1 from __future__ import absolute_import
2 from __future__ import division
3 from __future__ import print_function
4 
5 
6 from caffe2.python import (
7  brew, cnn, core, workspace, data_parallel_model,
8  timeout_guard, model_helper, optimizer)
9 from caffe2.python.test_util import TestCase
10 import caffe2.python.models.resnet as resnet
11 from caffe2.python.modeling.initializers import Initializer
12 from caffe2.python import convnet_benchmarks as cb
13 from caffe2.python import hypothesis_test_util as hu
14 
15 import time
16 import numpy as np
17 from hypothesis import settings
18 
19 
20 CI_MAX_EXAMPLES = 2
21 CI_TIMEOUT = 600
22 
23 
24 def executor_test_settings(func):
25  if hu.is_sandcastle() or hu.is_travis():
26  return settings(
27  max_examples=CI_MAX_EXAMPLES,
28  timeout=CI_TIMEOUT
29  )(func)
30  else:
31  return func
32 
33 
34 def gen_test_resnet50(_order, _cudnn_ws):
35  model = cnn.CNNModelHelper(
36  order="NCHW",
37  name="resnet_50_test",
38  cudnn_exhaustive_search=True,
39  )
40  data = model.net.AddExternalInput("data")
41  label = model.net.AddExternalInput("label")
42  (_softmax, loss) = resnet.create_resnet50(
43  model,
44  data,
45  num_input_channels=3,
46  num_labels=1000,
47  label=label,
48  is_test=False,
49  )
50  return model, 227
51 
52 
53 def conv_model_generators():
54  return {
55  'AlexNet': cb.AlexNet,
56  'OverFeat': cb.OverFeat,
57  'VGGA': cb.VGGA,
58  'Inception': cb.Inception,
59  'MLP': cb.MLP,
60  'Resnet50': gen_test_resnet50,
61  }
62 
63 
64 def executor_test_model_names():
65  if hu.is_sandcastle() or hu.is_travis():
66  return ["MLP"]
67  else:
68  return conv_model_generators().keys()
69 
70 
71 def build_conv_model(model_name, batch_size):
72  model_gen_map = conv_model_generators()
73  assert model_name in model_gen_map, "Model " + model_name + " not found"
74  model, input_size = model_gen_map[model_name]("NCHW", None)
75 
76  input_shape = [batch_size, 3, input_size, input_size]
77  if model_name == "MLP":
78  input_shape = [batch_size, input_size]
79 
80  model.param_init_net.GaussianFill(
81  [],
82  "data",
83  shape=input_shape,
84  mean=0.0,
85  std=1.0
86  )
87  model.param_init_net.UniformIntFill(
88  [],
89  "label",
90  shape=[batch_size, ],
91  min=0,
92  max=999
93  )
94 
95  model.AddGradientOperators(["loss"])
96 
97  ITER = brew.iter(model, "iter")
98  LR = model.net.LearningRate(
99  ITER, "LR", base_lr=-1e-8, policy="step", stepsize=10000, gamma=0.999)
100  ONE = model.param_init_net.ConstantFill([], "ONE", shape=[1], value=1.0)
101  for param in model.params:
102  param_grad = model.param_to_grad[param]
103  model.net.WeightedSum([param, ONE, param_grad, LR], param)
104 
105  return model
106 
107 
108 def build_resnet50_dataparallel_model(
109  num_gpus,
110  batch_size,
111  epoch_size,
112  cudnn_workspace_limit_mb=64,
113  num_channels=3,
114  num_labels=1000,
115  weight_decay=1e-4,
116  base_learning_rate=0.1,
117  image_size=227,
118  use_cpu=False):
119 
120  batch_per_device = batch_size // num_gpus
121 
122  train_arg_scope = {
123  'order': 'NCHW',
124  'use_cudnn': True,
125  'cudnn_exhaustive_search': False,
126  'ws_nbytes_limit': (cudnn_workspace_limit_mb * 1024 * 1024),
127  'deterministic': True,
128  }
129  train_model = model_helper.ModelHelper(
130  name="test_resnet50", arg_scope=train_arg_scope
131  )
132 
133  def create_resnet50_model_ops(model, loss_scale):
134  with brew.arg_scope([brew.conv, brew.fc],
135  WeightInitializer=Initializer,
136  BiasInitializer=Initializer,
137  enable_tensor_core=0):
138  pred = resnet.create_resnet50(
139  model,
140  "data",
141  num_input_channels=num_channels,
142  num_labels=num_labels,
143  no_bias=True,
144  no_loss=True,
145  )
146 
147  softmax, loss = model.SoftmaxWithLoss([pred, 'label'],
148  ['softmax', 'loss'])
149  loss = model.Scale(loss, scale=loss_scale)
150  brew.accuracy(model, [softmax, "label"], "accuracy")
151  return [loss]
152 
153  def add_optimizer(model):
154  stepsz = int(30 * epoch_size / batch_size)
155  optimizer.add_weight_decay(model, weight_decay)
156  opt = optimizer.build_multi_precision_sgd(
157  model,
158  base_learning_rate,
159  momentum=0.9,
160  nesterov=1,
161  policy="step",
162  stepsize=stepsz,
163  gamma=0.1
164  )
165  return opt
166 
167  def add_image_input(model):
168  model.param_init_net.GaussianFill(
169  [],
170  ["data"],
171  shape=[batch_per_device, 3, image_size, image_size],
172  dtype='float',
173  )
174  model.param_init_net.ConstantFill(
175  [],
176  ["label"],
177  shape=[batch_per_device],
178  value=1,
179  dtype=core.DataType.INT32,
180  )
181 
182  def add_post_sync_ops(model):
183  for param_info in model.GetOptimizationParamInfo(model.GetParams()):
184  if param_info.blob_copy is not None:
185  model.param_init_net.HalfToFloat(
186  param_info.blob,
187  param_info.blob_copy[core.DataType.FLOAT])
188 
189  # Create parallelized model
190  data_parallel_model.Parallelize(
191  train_model,
192  input_builder_fun=add_image_input,
193  forward_pass_builder_fun=create_resnet50_model_ops,
194  optimizer_builder_fun=add_optimizer,
195  post_sync_builder_fun=add_post_sync_ops,
196  devices=list(range(num_gpus)),
197  rendezvous=None,
198  optimize_gradient_memory=True,
199  cpu_device=use_cpu,
200  shared_model=use_cpu,
201  )
202 
203  return train_model
204 
205 
206 def run_resnet50_epoch(train_model, batch_size, epoch_size, skip_first_n_iter=0):
207  epoch_iters = int(epoch_size / batch_size)
208  prefix = "{}_{}".format(
209  train_model._device_prefix,
210  train_model._devices[0])
211  train_time = 0.0
212  train_examples = 0
213  for i in range(epoch_iters):
214  timeout = 600.0 if i == 0 else 60.0
215  with timeout_guard.CompleteInTimeOrDie(timeout):
216  t1 = time.time()
217  workspace.RunNet(train_model.net.Proto().name)
218  t2 = time.time()
219  dt = t2 - t1
220  if i >= skip_first_n_iter:
221  train_time += dt
222  train_examples += batch_size
223 
224  fmt = "Finished iteration {}/{} ({:.2f} images/sec)"
225  print(fmt.format(i + 1, epoch_iters, batch_size / dt))
226 
227  accuracy = workspace.FetchBlob(prefix + '/accuracy')
228  loss = workspace.FetchBlob(prefix + '/loss')
229 
230  assert loss < 40, "Exploded gradients"
231 
232  return (
233  train_examples,
234  train_time,
235  accuracy, loss)
236 
237 
239  def compare_executors(self, model, ref_executor, test_executor, model_run_func):
240  model.Proto().type = ref_executor
241  model.param_init_net.set_rand_seed(seed=0xCAFFE2)
242  model.net.set_rand_seed(seed=0xCAFFE2)
243 
244  workspace.ResetWorkspace()
245  workspace.RunNetOnce(model.param_init_net)
246 
247  workspace.CreateNet(model.net)
248  model_run_func()
249  ref_ws = {str(k): workspace.FetchBlob(k) for k in workspace.Blobs()}
250  ref_ws = {k: v for k, v in ref_ws.items() if type(v) is np.ndarray}
251 
252  workspace.ResetWorkspace()
253  workspace.RunNetOnce(model.param_init_net)
254 
255  model.Proto().type = test_executor
256  workspace.CreateNet(model.net, overwrite=True)
257  model_run_func()
258  test_ws = {str(k): workspace.FetchBlob(k) for k in workspace.Blobs()}
259  test_ws = {k: v for k, v in test_ws.items() if type(v) is np.ndarray}
260 
261  for blob_name, ref_val in ref_ws.items():
262  self.assertTrue(
263  blob_name in test_ws,
264  "Blob {} not found in {} run".format(blob_name, test_executor))
265  val = test_ws[blob_name]
266  np.testing.assert_array_equal(
267  val, ref_val,
268  "Blob {} differs in {} run".format(blob_name, test_executor))