Caffe2 - Python API
A deep learning, cross platform ML framework
task.py
1 # Copyright (c) 2016-present, Facebook, Inc.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 ##############################################################################
15 
16 ## @package task
17 # Module caffe2.python.task
18 from __future__ import absolute_import
19 from __future__ import division
20 from __future__ import print_function
21 from __future__ import unicode_literals
22 
23 from caffe2.python import core, context
24 from caffe2.python.schema import Field, from_blob_list
25 from collections import defaultdict
26 from copy import copy
27 from future.utils import viewitems
28 
29 
30 def _merge_node_kwargs(a, b):
31  # TODO(azzolini): consistency checks
32  if a is None:
33  return b
34  if b is None:
35  return a
36  c = copy(a)
37  c.update(b)
38  return c
39 
40 
41 @context.define_context(allow_default=True)
42 class Cluster(object):
43  """
44  Context that keeps track of all the node names used.
45  Users shouldn't have to use them directly, since a Cluster is automatically
46  generated at the first usage of 'Node'.
47  """
48 
49  def __init__(self):
50  # list instead of set to keep order
51  self._nodes = []
52  self._node_kwargs = {}
53 
54  def add_node(self, node):
55  if str(node) not in self._nodes:
56  self._nodes.append(str(node))
57  self._node_kwargs[str(node)] = _merge_node_kwargs(
58  node.kwargs(),
59  self._node_kwargs.get(str(node)))
60 
61  def nodes(self):
62  """
63  Returns the list of unique node names used within this context.
64  """
65  return self._nodes
66 
67  def node_kwargs(self):
68  return self._node_kwargs
69 
70 
71 @context.define_context(allow_default=True)
72 class Node(object):
73  """
74  A Node context is used to indicate that all Tasks instantiated within will
75  run on the given node name. (Only the name of the node actually counts.)
76  Example:
77 
78  with TaskGroup() as tg:
79  with Node('node1'):
80  s1 = execution_step(...)
81  Task(step=s1)
82  with Node('node2'):
83  s2 = execution_step(...)
84  with Node('node1'):
85  s3 = execution_step(...)
86 
87  In this example, all three execution steps will run in parallel.
88  Moreover, s1 and s3 will run on the same node, and can see each
89  others blobs.
90 
91  Additionally, a Node can be passed implementation-specific kwargs,
92  in order to specify properties of the node.
93  """
94 
95  def __init__(self, node='local', **kwargs):
96  self._name = str(node)
97  self._kwargs = kwargs
98  Cluster.current().add_node(self)
99 
100  def __str__(self):
101  return self._name
102 
103  def kwargs(self):
104  return self._kwargs
105 
106 
107 class WorkspaceType(object):
108  """
109  Determines whether tasks of a TaskGroup will run directly at the global
110  workspace, which is kept alive across runs, or whether a new child
111  workspace will be created for the run and destroyed afterwards.
112  """
113  PRIVATE = 'private'
114  GLOBAL = 'global'
115 
116 
117 def get_setup_nets(key, steps_or_nets, target):
118  init_net = core.Net(key + '/init')
119  exit_net = core.Net(key + '/exit')
120  init_nets = []
121  exit_nets = []
122  objs = []
123  for step_or_net in steps_or_nets:
124  if hasattr(step_or_net, 'get_all_attributes'):
125  objs += step_or_net.get_all_attributes(key)
126  elif hasattr(step_or_net, 'get_attributes'):
127  objs += step_or_net.get_attributes(key)
128  for obj in objs:
129  # these are needed in order to allow nesting of TaskGroup, which
130  # is a feature not yet implemented.
131  if hasattr(obj, '_setup_used') and obj._setup_used:
132  continue
133  if hasattr(obj, '_setup_target') and obj._setup_target != target:
134  continue
135  if hasattr(obj, 'setup'):
136  nets = obj.setup(init_net)
137  if isinstance(nets, (list, tuple)):
138  init_nets += nets
139  elif isinstance(nets, (core.Net, core.ExecutionStep)):
140  init_nets.append(nets)
141  elif nets is not None:
142  raise TypeError('Unsupported type for setup: %s' % type(nets))
143  obj._setup_used = True
144  if hasattr(obj, 'exit'):
145  nets = obj.exit(exit_net)
146  if isinstance(nets, (list, tuple)):
147  exit_nets += nets
148  elif isinstance(nets, (core.Net, core.ExecutionStep)):
149  exit_nets.append(nets)
150  elif nets is not None:
151  raise TypeError('Unsupported type for setup: %s' % type(nets))
152  obj._setup_used = True
153 
154  if len(init_net.Proto().op) > 0:
155  init_nets.insert(0, init_net)
156  if len(exit_net.Proto().op) > 0:
157  exit_nets.insert(0, exit_net)
158  return init_nets, exit_nets
159 
160 
161 def add_setup_steps(step, init_nets, exit_nets, name):
162  if not init_nets and not exit_nets:
163  return step
164  steps = []
165  if init_nets:
166  steps.append(core.execution_step('%s:init' % name, init_nets))
167  steps.append(step)
168  if len(exit_nets) > 0:
169  steps.append(core.execution_step('%s:exit' % name, exit_nets))
170  return core.execution_step(name, steps)
171 
172 
173 @context.define_context(allow_default=False)
174 class TaskGroup(object):
175  """
176  Context that gathers tasks which will run concurrently, potentially on
177  multiple nodes. All tasks in the same node will share the same workspace
178  and thus can share blobs, while tasks running in different nodes won't
179  be able to directly share data.
180 
181  All tasks of the task group will start concurrently, and the task group
182  will finish execution when the last task of the group finishes.
183 
184  Example:
185  # supose that s1 ... s5 are execution steps or nets.
186  with TaskGroup() as tg:
187  # these tasks go to default node 'local'
188  Task(step=s1)
189  Task(step=s2)
190 
191  with Node('n2'):
192  Task(step=s3)
193  with Node('n1'):
194  Task(step=s4)
195  with Node('n2'):
196  Task(step=s5)
197 
198  # this will run all steps in parallel.
199  # s1 and s2 will run at default node 'local'
200  # s3 and s5 will run at node 'n2'
201  # s4 will run at node 'n1'
202  session.run(tg)
203  """
204  LOCAL_SETUP = 'local_setup'
205 
206  def __init__(self, workspace_type=None):
207  self._plan_cache = None
208  self._tasks = []
209  self._already_used = False
210  self._prev_active = None
211  self._tasks_to_add = []
212  self._report_nets = {}
213  self._report_steps = []
214  self._workspace_type = workspace_type
215  self._tasks_by_node = None
216 
217  def add(self, task):
218  assert not self._already_used, (
219  'Cannot add Task to an already used TaskGroup.')
220  assert (
221  self._workspace_type is None or
222  task._workspace_type is None or
223  self._workspace_type == task._workspace_type)
224  if task._workspace_type is None:
225  task._workspace_type = (
226  self._workspace_type or WorkspaceType.PRIVATE)
227  if self._workspace_type is None:
228  self._workspace_type = task._workspace_type
229  task._notify_used()
230  self._tasks.append(task)
231 
232  def tasks(self):
233  for task in self._tasks_to_add:
234  self.add(task)
235  self._tasks_to_add = []
236  self._already_used = True
237  return self._tasks
238 
239  def num_registered_tasks(self):
240  return len(self._tasks_to_add) + len(self._tasks)
241 
242  def used_nodes(self):
243  # use list to keep order
244  used = []
245  for task in self._tasks + self._tasks_to_add:
246  if task.node not in used:
247  used.append(task.node)
248  return used
249 
250  def report_step(self, step=None, node=None, interval_ms=1000):
251  """
252  Add a "report step" to this TaskGroup. This step will run repeatedly
253  every `interval_ms` milliseconds for the duration of the TaskGroup
254  execution on each of the nodes. It is guaranteed that this step
255  will be run at least once after every Task in the node has finished.
256  """
257  step = core.to_execution_step(step)
258  step.RunEveryMillis(interval_ms)
259  self._report_steps.append((str(node or Node.current(node)), step))
260 
261  def report_net(self, net=None, node=None, report_interval=5):
262  """
263  DEPRECATED. Use report_step instead.
264  """
265  node = str(node or Node.current(node))
266  assert net is None or node not in self._report_nets
267  if node not in self._report_nets:
268  self._report_nets[node] = (
269  net if net else core.Net('%s/reporter' % node),
270  report_interval)
271  return self._report_nets[node][0]
272 
273  def tasks_by_node(self, node_remap=None):
274  # tasks_by_node can't be called twice because the setup won't
275  # work properly a second time.
276  node_map = {}
277  for task in self.tasks():
278  node_map[task.node] =\
279  node_remap(task.node) if node_remap else task.node
280  if self._tasks_by_node is not None:
281  tasks_by_node, prev_node_map = self._tasks_by_node
282  assert prev_node_map == node_map, (
283  'Cannot call tasks_by_node multiple times.')
284  return tasks_by_node
285 
286  # now we have report_steps. report_net is deprecated
287  for node, (net, interval) in viewitems(self._report_nets):
288  self.report_step(net, node=node, interval_ms=interval * 1000)
289  self._report_nets = {}
290 
291  tasks_by_node = defaultdict(list)
292  for task in self.tasks():
293  mapped_node = node_map[task.node]
294  tasks_by_node[mapped_node].append(task)
295 
296  report_steps_by_node = defaultdict(list)
297  for original_node, step in self._report_steps:
298  report_steps_by_node[node_map[original_node]].append(step)
299 
300  grouped_by_node = TaskGroup()
301  for node, tasks in viewitems(tasks_by_node):
302  report_steps = report_steps_by_node[node]
303  node_inits, node_exits = get_setup_nets(
304  TaskGroup.LOCAL_SETUP,
305  [t.get_step() for t in tasks] + report_steps,
306  self)
307  # shortcut for single task with no queue
308  steps = report_steps
309  outputs = []
310  grouped_workspace_type = WorkspaceType.PRIVATE
311  for task in tasks:
312  step = task.get_step()
313  step.SetCreateWorkspace(
314  task.workspace_type() == WorkspaceType.PRIVATE)
315  if step is not None:
316  steps.append(step)
317  outputs += task.outputs()
318  # If any of the tasks in the node uses the global workspace,
319  # then set the grouped task to use the global workspace as well
320  if task.workspace_type() == WorkspaceType.GLOBAL:
321  grouped_workspace_type = WorkspaceType.GLOBAL
322  if len(steps) == 0:
323  steps.append(core.execution_step('empty', []))
324  if len(steps) == 1:
325  step = steps[0]
326  else:
327  step = core.execution_step(
328  '%s:body' % node, steps, concurrent_substeps=True)
329  if len(node_inits) > 0 or len(node_exits) > 0:
330  steps = []
331  if len(node_inits) > 0:
332  steps.append(
333  core.execution_step('%s:init' % node, node_inits))
334  steps.append(step)
335  if len(node_exits) > 0:
336  steps.append(
337  core.execution_step('%s:exit' % node, node_exits))
338  step = core.execution_step(node, steps)
339  Task(
340  node=node, step=step, outputs=outputs,
341  name='grouped_by_node',
342  group=grouped_by_node, workspace_type=grouped_workspace_type)
343  self._tasks_by_node = (grouped_by_node, node_map)
344  return grouped_by_node
345 
346  def to_task(self, node=None):
347  node = str(Node.current(node))
348  tasks = self.tasks_by_node(lambda x: node).tasks()
349  if len(tasks) == 0:
350  return Task()
351  return tasks[0]
352 
353  def workspace_type(self):
354  return self._workspace_type
355 
356 
357 class TaskOutput(object):
358  """
359  Represents the output of a task. An output can be a blob,
360  a list of blob, or a record.
361  """
362 
363  def __init__(self, names):
364  self._schema = None
365  self._is_scalar = False
366  if isinstance(names, Field):
367  self._schema = names
368  names = self._schema.field_blobs()
369  self._is_scalar = type(names) not in (tuple, list)
370  if self._is_scalar:
371  names = [names]
372  self.names = names
373  self._values = None
374 
375  def set(self, values, _fetch_func=None):
376  assert len(values) == len(self.names)
377  self._values = values
378  self._fetch_func = _fetch_func
379 
380  def get(self):
381  assert self._values is not None, 'Output value not set yet.'
382  if self._is_scalar:
383  return self._values[0]
384  elif self._schema:
385  return from_blob_list(self._schema, self._values)
386  else:
387  return self._values
388 
389  def fetch(self):
390  assert self._fetch_func is not None, (
391  'Cannot fetch value for this output.')
392  fetched_vals = [self._fetch_func(v) for v in self._values]
393  if self._is_scalar:
394  return fetched_vals[0]
395  elif self._schema:
396  return from_blob_list(self._schema, fetched_vals)
397  else:
398  return fetched_vals
399 
400 
401 def final_output(blob_or_record):
402  """
403  Adds an output to the current Task, or if no task is active,
404  create a dummy task that returns the given blob or record
405  to the client. This will return the value of the blob or record when
406  the last task of the TaskGroup for a given node finishes.
407  """
408  cur_task = Task.current(required=False) or Task()
409  return cur_task.add_output(blob_or_record)
410 
411 
412 class TaskOutputList(object):
413  """ Keeps a list of outputs for a task """
414  def __init__(self, outputs=None):
415  self.outputs = outputs or []
416 
417  def names(self):
418  """
419  Retrive the output names.
420  TODO(azzolini): make this schema-based.
421  """
422  names = []
423  for o in self.outputs:
424  names += o.names
425  return names
426 
427  def set_values(self, values, _fetch_func=None):
428  offset = 0
429  for o in self.outputs:
430  num = len(o.names)
431  o.set(values[offset:offset + num], _fetch_func)
432  offset += num
433  assert offset == len(values), 'Wrong number of output values.'
434 
435 
437 class Task(object):
438  """
439  A Task is composed of an execution step and zero or more outputs.
440  Tasks are executed in the context of a TaskGroup, which, in turn, can
441  be run by a Session.
442 
443  Task outputs are fetched by the session at the end of the run.
444 
445  The recommended way of creating a task is by using `net_builder.ops`.
446  Example:
447 
448  from net_builder import ops
449  with Node('trainer'), Task(name='my_task', num_instances=2):
450  with ops.task_init():
451  globl = ops.Const(0)
452  with ops.task_instance_init():
453  local = ops.Const(0)
454  with ops.loop(100):
455  ops.Copy(globl, local)
456  with ops.task_instance_exit():
457  ops.Add([globl, local], [globl])
458  with ops.task_exit():
459  ops.Mul([globl, globl], [blobl])
460 
461  The task above will create 2 instances that will run in parallel.
462  Each instance will copy `local` to `globl` 100 times, Then Add `local`
463  to `globl` once. The `Mul` will only execute once, after all the instances
464  of the task have finished.
465  """
466 
467  # TASK_SETUP runs once per task, before/after all
468  # concurrent task instances start/finish.
469  TASK_SETUP = 'task_setup'
470  # Setup will run once for each instance of the task.
471  TASK_INSTANCE_SETUP = 'task_instance_setup'
472  REPORT_STEP = 'report_step'
473  _global_names_used = set()
474 
475  @staticmethod
476  def _get_next_name(node, group, name):
477  basename = str(node) + '/' + str(name)
478  names_used = (
479  Task._global_names_used
480  if group is None else
481  set(t.name for t in group._tasks_to_add))
482  cur_name = basename
483  i = 0
484  while cur_name in names_used:
485  i += 1
486  cur_name = '%s:%d' % (basename, i)
487  return cur_name
488 
489  def __init__(
490  self, step=None, outputs=None,
491  workspace_type=None, group=None, node=None, name=None,
492  num_instances=None):
493  """
494  Instantiate a Task and add it to the current TaskGroup and Node.
495 
496  Args:
497  step: If provided, this task will run this ExecutionStep.
498  outputs: If provided, the task will return the provided outputs
499  to the client at completion time.
500  node: If provided, force task execution on the given node.
501  name: Name of the Task.
502  num_instances: If provided, this task will be cloned num_instances
503  times at runtime, and all instances will run
504  concurrently.
505  """
506  if not name and isinstance(step, core.ExecutionStep):
507  name = step.Proto().name
508  if not name:
509  name = 'task'
510  # register this node name with active context
511  self.node = str(Node.current(None if node is None else Node(node)))
512  self.group = TaskGroup.current(group, required=False)
513 
514  self.name = Task._get_next_name(self.node, self.group, name)
515 
516  # may need to be temporarily removed later if Task used as a context
517  if self.group is not None:
518  self.group._tasks_to_add.append(self)
519 
520  self._already_used = False
521  self._step = None
522  self._step_with_setup = None
523  self._outputs = []
524  if step is not None:
525  self.set_step(step)
526  if outputs is not None:
527  self.add_outputs(outputs)
528 
529  self._pipeline = None
530  self._is_pipeline_context = False
531  self._workspace_type = workspace_type
532  self._report_net = None
533  self._num_instances = num_instances
534 
535  def __enter__(self):
536  # temporarily remove from _tasks_to_add to ensure correct order
537  if self.group is not None:
538  self.group._tasks_to_add.remove(self)
539  self._assert_not_used()
540  assert self._step is None, 'This Task already has an execution step.'
541  from caffe2.python import net_builder
542  self._net_builder = net_builder.NetBuilder(_fullname=self.name)
543  self._net_builder.__enter__()
544  return self
545 
546  def __exit__(self, type, value, traceback):
547  self._net_builder.__exit__(type, value, traceback)
548  if type is None:
549  self.set_step(self._net_builder)
550  if self.group is not None:
551  self.group._tasks_to_add.append(self)
552  self._net_builder = None
553 
554  def workspace_type(self):
555  return self._workspace_type
556 
557  def _assert_not_used(self):
558  assert not self._already_used, (
559  'Cannot modify task since it is already been used.')
560 
561  def add_output(self, output):
562  self._assert_not_used()
563  output = (
564  output if isinstance(output, TaskOutput) else TaskOutput(output))
565  self._outputs.append(output)
566  return output
567 
568  def add_outputs(self, outputs):
569  self._assert_not_used()
570  if type(outputs) not in (list, tuple):
571  return self.add_output(outputs)
572  else:
573  return [self.add_output(output) for output in outputs]
574 
575  def set_step(self, step):
576  self._assert_not_used()
577  self._step = core.to_execution_step(step)
578 
579  def get_step(self):
580  if self._step_with_setup is not None:
581  return self._step_with_setup
582 
583  if self._step is None:
584  self._step_with_setup = core.execution_step(self.name, [])
585  return self._step_with_setup
586 
587  report_steps = [
588  s
589  for s in self._step.get_all_attributes(Task.REPORT_STEP)
590  if not hasattr(s, '_report_step_used')
591  ]
592  for step in report_steps:
593  step._report_step_used = True
594  if not step.Proto().run_every_ms:
595  step.RunEveryMillis(1000)
596  task_init_nets, task_exit_nets = get_setup_nets(
597  Task.TASK_SETUP, [self._step] + report_steps, self)
598  instance_init_nets, instance_exit_nets = get_setup_nets(
599  Task.TASK_INSTANCE_SETUP, [self._step] + report_steps, self)
600  if len(self._outputs) == 0:
601  output_net = core.Net('%s:output' % self.name)
602  self.add_output(output_net.ConstantFill(
603  [], 1, dtype=core.DataType.INT32, value=0))
604  task_exit_nets.append(output_net)
605 
606  # Add instance-level report steps
607  body = self._step if not report_steps else core.execution_step(
608  '%s:body' % self.name, report_steps + [self._step])
609  # Enclose with instance-level (thread-local) setup nets
610  step_with_instance_setup = add_setup_steps(
611  body, instance_init_nets, instance_exit_nets,
612  self.name + ':instance')
613  # Set up runtime concurrent instances
614  if self._num_instances and self._num_instances > 1:
615  step_with_instance_setup.SetCreateWorkspace(True)
616  step_with_instance_setup = core.execution_step(
617  '%s:parallel',
618  [step_with_instance_setup],
619  num_concurrent_instances=self._num_instances)
620  # Enclose with task-level setup nets
621  self._step_with_setup = add_setup_steps(
622  step_with_instance_setup, task_init_nets, task_exit_nets, self.name)
623 
624  return self._step_with_setup
625 
626  def output_list(self):
627  return TaskOutputList(self._outputs)
628 
629  def outputs(self):
630  return self._outputs
631 
632  def _notify_used(self):
633  self.get_step()
634  self._already_used = True
635 
636 
637 class SetupNets(object):
638  """
639  Allow to register a list of nets to be run at initialization
640  and finalization of Tasks or TaskGroups.
641  For example, let's say you have the following:
642 
643  init_net = core.Net('init')
644  my_val = init_net.ConstantFill([], 'my_val', value=0)
645 
646  net = core.Net('counter')
647  net.Add([my_val, net.Const(1),], [my_val])
648 
649  with TaskGroup() as task_group:
650  with Node('trainer'):
651  my_task = Task(step=[net])
652 
653  In order to have `init_net` run once before `net` runs for the
654  first time, you can do one of the following:
655 
656  net.add_attribute(Task.TASK_SETUP, SetupNets([init_net]))
657 
658  or
659 
660  net.add_attribute(TaskGroup.LOCAL_SETUP, SetupNets([init_net]))
661 
662  - With Task.TASK_SETUP, init_net will run once at my_task startup.
663  - With TaskGroup.LOCAL_SETUP, init_net will run once on node 'trainer',
664  before any task of the task group is run on that node.
665 
666  The same SetupNets object can be added to multiple nets. It will only
667  run once per Task/TaskGroup run.
668  """
669 
670  def __init__(self, init_nets=None, exit_nets=None):
671  self.init_nets = init_nets
672  self.exit_nets = exit_nets
673 
674  def setup(self, init_net):
675  return self.init_nets
676 
677  def exit(self, exit_net):
678  return self.exit_nets
def __init__(self, step=None, outputs=None, workspace_type=None, group=None, node=None, name=None, num_instances=None)
Definition: task.py:492
Definition: setup.py:1
def add(self, task)
Definition: task.py:217
def get_step(self)
Definition: task.py:579
def set_step(self, step)
Definition: task.py:575
def add_output(self, output)
Definition: task.py:561
def tasks_by_node(self, node_remap=None)
Definition: task.py:273
def report_step(self, step=None, node=None, interval_ms=1000)
Definition: task.py:250
def add_outputs(self, outputs)
Definition: task.py:568
def report_net(self, net=None, node=None, report_interval=5)
Definition: task.py:261
def _assert_not_used(self)
Definition: task.py:557