Caffe2 - Python API
A deep learning, cross platform ML framework
pipeline.py
1 # Copyright (c) 2016-present, Facebook, Inc.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 ##############################################################################
15 
16 ## @package pipeline
17 # Module caffe2.python.pipeline
18 from __future__ import absolute_import
19 from __future__ import division
20 from __future__ import print_function
21 from __future__ import unicode_literals
22 
23 from caffe2.python import core, queue_util
24 from caffe2.python.dataio import Reader, Writer
25 from caffe2.python.net_builder import NetBuilder, ops
26 from caffe2.python.schema import as_record, Field
27 from caffe2.python.task import Node, Task, TaskGroup
28 
29 
30 class Output(object):
31  """
32  Represents the result of a processor function. A processor can either
33  return an Output, or it can return a record, in which case an Output will be
34  created for it afterwards.
35  """
36  def __init__(self, nets=None, record=None, should_stop=None):
37  builder_children = NetBuilder.current().get()
38  assert nets is None or len(builder_children) == 0, (
39  'Cannot both use `ops` syntax and return a list of nets.')
40  if nets is None:
41  nets = builder_children
42  if isinstance(nets, core.Net):
43  nets = [nets]
44  self.nets = [] if nets is None else list(nets)
45  self.record = None if record is None else as_record(record)
46  self.should_stop = should_stop
47 
48 
49 DEFAULT_QUEUE_CAPACITY = 100
50 
51 
52 def _init_output(output, capacity, global_init_net, global_exit_net):
53  if output is None:
54  out_queue = queue_util.Queue(
55  capacity=(
56  capacity if capacity is not None
57  else DEFAULT_QUEUE_CAPACITY))
58  writer = out_queue.writer()
59  elif isinstance(output, Writer):
60  assert capacity is None, 'capacity would not be used.'
61  out_queue = None
62  writer = output
63  elif hasattr(output, 'writer'):
64  assert capacity is None, 'capacity would not be used.'
65  out_queue = output
66  writer = output.writer()
67  else:
68  raise ValueError('output must be a reader, queue or stream.')
69  writer.setup_ex(global_init_net, global_exit_net)
70  return out_queue, writer
71 
72 
73 def make_processor(processor):
74  if processor is None:
75  return lambda rec: rec
76  elif isinstance(processor, core.Net):
77  return NetProcessor(processor)
78  else:
79  return processor
80 
81 
82 def normalize_processor_output(output):
83  """
84  Allow for processors to return results in several formats.
85  TODO(azzolini): simplify once all processors use NetBuilder API.
86  """
87  if isinstance(output, Output):
88  """ Processor returned an Output. """
89  return output
90  elif isinstance(output, Field):
91  """ Processor returned a record. """
92  return Output(record=output)
93  elif isinstance(output, tuple):
94  is_record_and_blob = (
95  len(output) == 2 and
96  isinstance(output[0], Field) and
97  isinstance(output[1], core.BlobReference))
98  if is_record_and_blob:
99  """ Processor returned (record, stop_blob) """
100  return Output(None, *output)
101  else:
102  """ Processor returned (nets, record, stop_blob) """
103  return Output(*output)
104  else:
105  """ Processor returned nets, no output """
106  return Output(output)
107 
108 
109 def pipe(
110  input, output=None, num_threads=1, processor=None, name=None,
111  capacity=None, group=None, num_runtime_threads=1):
112  """
113  Given a Reader, Queue or DataStream in `input`, and optionally, a Writer,
114  Queue or DataStream in `output`, creates a Task that, when run, will
115  pipe the input into the output, using multiple parallel threads.
116  Additionally, if a processor is given, it will be called between reading
117  and writing steps, allowing it to transform the record.
118 
119  Args:
120  input: either a Reader, Queue or DataStream that will be read
121  until a stop is signaled either by the reader or the
122  writer.
123  output: either a Writer, a Queue or a DataStream that will be
124  writen to as long as neither reader nor writer signal
125  a stop condition. If output is not provided or is None,
126  a Queue is created with given `capacity` and writen to.
127  num_threads: number of concurrent threads used for processing and
128  piping. If set to 0, no Task is created, and a
129  reader is returned instead -- the reader returned will
130  read from the reader passed in and process it.
131  ** DEPRECATED **. Use `num_runtime_threads` instead.
132  This option will be removed once all readers/processors
133  support `num_runtime_threads`.
134  processor: (optional) function that takes an input record and
135  optionally returns a record; this will be called
136  between read and write steps. If the processor does
137  not return a record, a writer will not be instantiated.
138  Processor can also be a core.Net with input and output
139  records properly set. In that case, a NetProcessor is
140  instantiated, cloning the net for each of the threads.
141  name: (optional) name of the task to be created.
142  capacity: when output is not passed, a queue of given `capacity`
143  is created and written to.
144  group: (optional) explicitly add the created Task to this
145  TaskGroup, instead of using the currently active one.
146  num_runtime_threads: Similar to `num_threads`, but instead of expanding
147  the tasks with a `for` loop in python, does that at
148  runtime. This is preferable to `num_threads`, but some
149  processors/readers still require to be called multiple
150  times in python.
151 
152  Returns:
153  Output Queue, DataStream, Reader, or None, depending on the parameters
154  passed.
155  """
156  result, _ = _pipe_step(
157  input, output, num_threads, processor, name, capacity, group,
158  num_runtime_threads)
159  return result
160 
161 
162 def pipe_and_output(
163  input, output=None, num_threads=1, processor=None, name=None,
164  capacity=None, group=None, num_runtime_threads=1, final_outputs=None):
165  """
166  Similar to `pipe`, with the additional ability for the pipe Task to
167  return output values to the `Session` once done.
168 
169  Returns:
170  Tuple (out_queue, *task_outputs)
171  out_queue: same as return value of `pipe`.
172  task_outputs: TaskOutput object, fetchable from the client after
173  session.run() returns.
174  """
175  assert num_threads > 0
176  result, task = _pipe_step(
177  input, output, num_threads, processor, name, capacity, group,
178  num_runtime_threads, final_outputs)
179  output = None
180  if final_outputs is not None:
181  output = task.outputs()
182  if type(final_outputs) not in (list, tuple):
183  output = output[0]
184  return result, output
185 
186 
187 def processor_name(processor):
188  if hasattr(processor, 'name'):
189  return processor.name
190  if hasattr(processor, 'func_name'):
191  if processor.func_name == '<lambda>':
192  return processor.__module__
193  if hasattr(processor, 'im_class'):
194  return '%s.%s' % (processor.im_class.__name__, processor.func_name)
195  return processor.func_name
196  return processor.__class__.__name__
197 
198 
199 def _runtime_threads_task(name, group, final_outputs, reader, num_threads,
200  output, capacity):
201  node_name = str(Node.current())
202  profiler_name = "{0}/{1}/{2}/{3}/{4}".format(
203  node_name,
204  "pipe",
205  name,
206  processor_name(input) if input else "NoInput",
207  processor_name(output) if output else "NoOutput")
208 
209  with Task(name=name, group=group, outputs=final_outputs,
210  num_instances=num_threads) as task:
211  global_exit_net = core.Net('pipe:exit')
212  global_init_net = core.Net('pipe:init')
213  reader.setup_ex(global_init_net, global_exit_net)
214 
215  init_net = core.Net('pipe:instance:init')
216  exit_net = core.Net('pipe:instance:exit')
217  read_nets, status, rec = reader.read_record_ex(init_net, exit_net)
218  init_net.ConstantFill(
219  [], [status],
220  shape=[],
221  value=False,
222  dtype=core.DataType.BOOL
223  )
224 
225  if rec is not None:
226  out_queue, writer = _init_output(
227  output, capacity, global_init_net, global_exit_net)
228  write_nets, _ = writer.write_record_ex(
229  rec, init_net, exit_net, status)
230  else:
231  out_queue = None
232  write_nets = []
233 
234  with ops.task_init():
235  ops.net(global_init_net)
236  with ops.task_instance_init():
237  ops.net(init_net)
238 
239  timer_start_net = core.Net('timer_start')
240  timer = timer_start_net.TimerBegin([], counter_name=profiler_name)
241  timer_end_net = core.Net('timer_end')
242  timer_end_net.TimerEnd(timer, [])
243 
244  ops.net(core.execution_step(
245  'body',
246  [timer_start_net] + list(read_nets) + list(write_nets) +
247  [timer_end_net],
248  should_stop_blob=status))
249  ops.net(timer_end_net)
250 
251  with ops.task_instance_exit():
252  ops.net(exit_net)
253  with ops.task_exit():
254  ops.net(global_exit_net)
255 
256  return out_queue, task
257 
258 
259 def _static_threads_task(name, group, final_outputs, reader, num_threads,
260  output, capacity):
261  node_name = str(Node.current())
262  profiler_name = "{0}/{1}/{2}/{3}/{4}".format(
263  node_name,
264  "pipe",
265  name,
266  processor_name(input) if input else "NoInput",
267  processor_name(output) if output else "NoOutput")
268 
269  with Task(name=name, group=group, outputs=final_outputs) as task:
270  global_exit_net = core.Net('exit')
271  global_init_net = core.Net('init')
272  reader.setup_ex(global_init_net, global_exit_net)
273 
274  out_queue = None
275  writer = None
276 
277  steps = []
278  for thread_id in range(num_threads):
279  with NetBuilder(name='t:%d' % thread_id) as nb:
280  init_net = core.Net('init')
281  exit_net = core.Net('exit')
282  read_nets, status, rec = reader.read_record_ex(
283  init_net, exit_net)
284  init_net.ConstantFill(
285  [], [status],
286  shape=[],
287  value=False,
288  dtype=core.DataType.BOOL
289  )
290 
291  if rec is not None:
292  if writer is None:
293  # hack so that the out queue gets the right name prefix
294  # (otherwise they would be prefixed with the thread id)
295  with NetBuilder(_fullname=task.name):
296  out_queue, writer = _init_output(
297  output, capacity, global_init_net,
298  global_exit_net)
299  write_nets, _ = writer.write_record_ex(
300  rec, init_net, exit_net, status)
301  else:
302  write_nets = []
303 
304  timer_start_net = core.Net('timer_start')
305  timer = timer_start_net.TimerBegin([], counter_name=profiler_name)
306  timer_end_net = core.Net('timer_end')
307  timer_end_net.TimerEnd(timer, [])
308 
309  ops.net(init_net)
310  ops.net(core.execution_step(
311  'body',
312  [timer_start_net] + list(read_nets) + list(write_nets) +
313  [timer_end_net],
314  should_stop_blob=status))
315  ops.net(timer_end_net)
316  ops.net(exit_net)
317  steps.append(core.to_execution_step(nb))
318  ops.net(global_init_net)
319  ops.net(core.execution_step('body', steps, concurrent_substeps=True))
320  ops.net(global_exit_net)
321  return out_queue, task
322 
323 
324 def _pipe_step(
325  input, output=None, num_threads=1, processor=None, name=None,
326  capacity=None, group=None, num_runtime_threads=None, final_outputs=None):
327  """
328  """
329  assert num_threads <= 1 or num_runtime_threads <= 1, (
330  'Only one of num_threads or num_runtime_threads must be set.')
331 
332  if isinstance(input, Reader):
333  reader = input
334  elif hasattr(input, 'reader'):
335  reader = input.reader()
336  else:
337  raise ValueError('in must be a reader, queue or stream.')
338 
339  if processor is not None:
340  reader = ProcessingReader(reader, processor)
341 
342  if num_threads == 0 or num_runtime_threads == 0:
343  assert output is None
344  return reader, None
345 
346  if name is None and processor is not None:
347  name = processor_name(processor)
348  if name is None and output is not None:
349  name = 'pipe_into:%s' % processor_name(output)
350  if name is None:
351  name = 'pipe_from:%s' % processor_name(input)
352 
353  if num_threads > 1:
354  return _static_threads_task(
355  name, group, final_outputs, reader, num_threads, output, capacity)
356  else:
357  return _runtime_threads_task(
358  name, group, final_outputs, reader, num_runtime_threads, output,
359  capacity)
360 
361 
363  """
364  Reader that reads from an upstream reader, calls the processor, and returns
365  the processed record.
366  """
367  def __init__(self, reader, processor):
368  Reader.__init__(self)
369  self.reader = reader
370  self.processor = make_processor(processor)
371 
372  def setup_ex(self, init_net, finish_net):
373  self.reader.setup_ex(init_net, finish_net)
374 
375  def read_ex(self, init_net, exit_net):
376  read_nets, status, rec = self.reader.read_record_ex(init_net, exit_net)
377  with NetBuilder(_stop_blob=status):
378  # Current NetBuilder is optionally used inside the processor,
379  # then its children are retrived inside of
380  # normalize_processor_output.
381  # Once readers and writers also use NetBuilder,
382  # this logic will be more natural.
383  result = normalize_processor_output(self.processor(rec))
384  read_nets += result.nets
385  if result.should_stop is not None:
386  stop_net = core.Net('stop_net')
387  stop_net.Copy([result.should_stop], [status])
388  read_nets.append(stop_net)
389  if hasattr(self.processor, 'setup'):
390  init_net.add_attribute(TaskGroup.LOCAL_SETUP, self.processor)
391  self._set_schema(result.record)
392  fields = result.record.field_blobs() if result.record else None
393  return read_nets, status, fields
394 
395 
396 class NetProcessor(object):
397  """
398  Processor that clones a core.Net each time it's called, executing
399  the cloned net as the processor. It requires the Net to have input
400  and (optionally) output records set, with net.set_input_record() and
401  net.set_output_record().
402  """
403  def __init__(self, net, stop_signal=None, thread_init_nets=None, name=None):
404  assert isinstance(net, core.Net)
405  assert stop_signal is None or isinstance(
406  stop_signal, core.BlobReference)
407  self.name = name or str(net)
408  self.thread_init_nets = thread_init_nets or []
409  self.net = net
410  self._stop_signal = stop_signal
411  self._blob_maps = []
412  self._frozen = False
413  self._cloned_init_nets = []
414 
415  def setup(self, init_net):
416  self._frozen = True
417  cloned_init_nets = self._cloned_init_nets
418  self._cloned_init_nets = []
419  return cloned_init_nets
420 
421  def __call__(self, rec):
422  assert not self._frozen
423  prefix = NetBuilder.current().name + '/'
424  blob_remap = {}
425  for net in self.thread_init_nets:
426  new_net, _ = core.clone_and_bind_net(
427  net, str(net) + prefix, prefix, blob_remap)
428  self._cloned_init_nets.append(new_net)
429 
430  new_net, remappings = core.clone_and_bind_net(
431  self.net, str(self.net) + prefix, prefix, blob_remap, rec)
432 
433  if self._stop_signal is None:
434  stop_signal = None
435  elif str(self._stop_signal) in remappings:
436  stop_signal = core.BlobReference(
437  remappings[str(self._stop_signal)],
438  net=new_net)
439  else:
440  stop_signal = self._stop_signal
441 
442  self._blob_maps.append(remappings)
443  return Output([new_net], new_net.output_record(), stop_signal)
444 
445  def blob_maps(self):
446  self._frozen = True
447  return self._blob_maps
Definition: setup.py:1
def _set_schema(self, schema)
Definition: dataio.py:62