Caffe2 - Python API
A deep learning, cross platform ML framework
task.py
1 ## @package task
2 # Module caffe2.python.task
3 from __future__ import absolute_import
4 from __future__ import division
5 from __future__ import print_function
6 from __future__ import unicode_literals
7 
8 from caffe2.python import core, context
9 from caffe2.python.schema import Field, from_blob_list
10 from collections import defaultdict
11 from copy import copy
12 from future.utils import viewitems
13 
14 
15 def _merge_node_kwargs(a, b):
16  # TODO(azzolini): consistency checks
17  if a is None:
18  return b
19  if b is None:
20  return a
21  c = copy(a)
22  c.update(b)
23  return c
24 
25 
26 @context.define_context(allow_default=True)
27 class Cluster(object):
28  """
29  Context that keeps track of all the node names used.
30  Users shouldn't have to use them directly, since a Cluster is automatically
31  generated at the first usage of 'Node'.
32  """
33 
34  def __init__(self):
35  # list instead of set to keep order
36  self._nodes = []
37  self._node_kwargs = {}
38 
39  def add_node(self, node):
40  if str(node) not in self._nodes:
41  self._nodes.append(str(node))
42  self._node_kwargs[str(node)] = _merge_node_kwargs(
43  node.kwargs(),
44  self._node_kwargs.get(str(node)))
45 
46  def nodes(self):
47  """
48  Returns the list of unique node names used within this context.
49  """
50  return self._nodes
51 
52  def node_kwargs(self):
53  return self._node_kwargs
54 
55  def __repr__(self):
56  return "Cluster(nodes={}, node_kwargs={})".format(
57  self.nodes(), self.node_kwargs())
58 
59 
60 @context.define_context(allow_default=True)
61 class Node(object):
62  """
63  A Node context is used to indicate that all Tasks instantiated within will
64  run on the given node name. (Only the name of the node actually counts.)
65  Example:
66 
67  with TaskGroup() as tg:
68  with Node('node1'):
69  s1 = execution_step(...)
70  Task(step=s1)
71  with Node('node2'):
72  s2 = execution_step(...)
73  with Node('node1'):
74  s3 = execution_step(...)
75 
76  In this example, all three execution steps will run in parallel.
77  Moreover, s1 and s3 will run on the same node, and can see each
78  others blobs.
79 
80  Additionally, a Node can be passed implementation-specific kwargs,
81  in order to specify properties of the node.
82  """
83 
84  def __init__(self, node='local', **kwargs):
85  self._name = str(node)
86  self._kwargs = kwargs
87  Cluster.current().add_node(self)
88 
89  def __str__(self):
90  return self._name
91 
92  def __repr__(self):
93  return "Node(name={}, kwargs={})".format(self._name, self._kwargs)
94 
95  def kwargs(self):
96  return self._kwargs
97 
98 
99 class WorkspaceType(object):
100  """
101  Determines whether tasks of a TaskGroup will run directly at the global
102  workspace, which is kept alive across runs, or whether a new child
103  workspace will be created for the run and destroyed afterwards.
104  """
105  PRIVATE = 'private'
106  GLOBAL = 'global'
107 
108 
109 def get_setup_nets(key, steps_or_nets, target):
110  init_net = core.Net(key + '/init')
111  exit_net = core.Net(key + '/exit')
112  init_nets = []
113  exit_nets = []
114  objs = []
115  for step_or_net in steps_or_nets:
116  if hasattr(step_or_net, 'get_all_attributes'):
117  objs += step_or_net.get_all_attributes(key)
118  elif hasattr(step_or_net, 'get_attributes'):
119  objs += step_or_net.get_attributes(key)
120  for obj in objs:
121  # these are needed in order to allow nesting of TaskGroup, which
122  # is a feature not yet implemented.
123  if hasattr(obj, '_setup_used') and obj._setup_used:
124  continue
125  if hasattr(obj, '_setup_target') and obj._setup_target != target:
126  continue
127  if hasattr(obj, 'setup'):
128  nets = obj.setup(init_net)
129  if isinstance(nets, (list, tuple)):
130  init_nets += nets
131  elif isinstance(nets, (core.Net, core.ExecutionStep)):
132  init_nets.append(nets)
133  elif nets is not None:
134  raise TypeError('Unsupported type for setup: %s' % type(nets))
135  obj._setup_used = True
136  if hasattr(obj, 'exit'):
137  nets = obj.exit(exit_net)
138  if isinstance(nets, (list, tuple)):
139  exit_nets += nets
140  elif isinstance(nets, (core.Net, core.ExecutionStep)):
141  exit_nets.append(nets)
142  elif nets is not None:
143  raise TypeError('Unsupported type for setup: %s' % type(nets))
144  obj._setup_used = True
145 
146  if len(init_net.Proto().op) > 0:
147  init_nets.insert(0, init_net)
148  if len(exit_net.Proto().op) > 0:
149  exit_nets.insert(0, exit_net)
150  return init_nets, exit_nets
151 
152 
153 def add_setup_steps(step, init_nets, exit_nets, name):
154  if not init_nets and not exit_nets:
155  return step
156  steps = []
157  if init_nets:
158  steps.append(core.execution_step('%s:init' % name, init_nets))
159  steps.append(step)
160  if len(exit_nets) > 0:
161  steps.append(core.execution_step('%s:exit' % name, exit_nets))
162  return core.execution_step(name, steps)
163 
164 
165 @context.define_context(allow_default=False)
166 class TaskGroup(object):
167  """
168  Context that gathers tasks which will run concurrently, potentially on
169  multiple nodes. All tasks in the same node will share the same workspace
170  and thus can share blobs, while tasks running in different nodes won't
171  be able to directly share data.
172 
173  All tasks of the task group will start concurrently, and the task group
174  will finish execution when the last task of the group finishes.
175 
176  Example:
177  # supose that s1 ... s5 are execution steps or nets.
178  with TaskGroup() as tg:
179  # these tasks go to default node 'local'
180  Task(step=s1)
181  Task(step=s2)
182 
183  with Node('n2'):
184  Task(step=s3)
185  with Node('n1'):
186  Task(step=s4)
187  with Node('n2'):
188  Task(step=s5)
189 
190  # this will run all steps in parallel.
191  # s1 and s2 will run at default node 'local'
192  # s3 and s5 will run at node 'n2'
193  # s4 will run at node 'n1'
194  session.run(tg)
195  """
196  LOCAL_SETUP = 'local_setup'
197 
198  def __init__(self, workspace_type=None):
199  self._plan_cache = None
200  self._tasks = []
201  self._already_used = False
202  self._prev_active = None
203  self._tasks_to_add = []
204  self._report_nets = {}
205  self._report_steps = []
206  self._workspace_type = workspace_type
207  self._tasks_by_node = None
208  self._remote_nets = []
209 
210  def add_remote_net(self, net):
211  self._remote_nets.append(net)
212 
213  def remote_nets(self):
214  return self._remote_nets
215 
216  def add(self, task):
217  assert not self._already_used, (
218  'Cannot add Task to an already used TaskGroup.')
219  assert (
220  self._workspace_type is None or
221  task._workspace_type is None or
222  self._workspace_type == task._workspace_type)
223  if task._workspace_type is None:
224  task._workspace_type = (
225  self._workspace_type or WorkspaceType.PRIVATE)
226  if self._workspace_type is None:
227  self._workspace_type = task._workspace_type
228  task._notify_used()
229  self._tasks.append(task)
230 
231  def tasks(self):
232  for task in self._tasks_to_add:
233  self.add(task)
234  self._tasks_to_add = []
235  self._already_used = True
236  return self._tasks
237 
238  def num_registered_tasks(self):
239  return len(self._tasks_to_add) + len(self._tasks)
240 
241  def used_nodes(self):
242  # use list to keep order
243  used = []
244  for task in self._tasks + self._tasks_to_add:
245  if task.node not in used:
246  used.append(task.node)
247  return used
248 
249  def report_step(self, step=None, node=None, interval_ms=1000):
250  """
251  Add a "report step" to this TaskGroup. This step will run repeatedly
252  every `interval_ms` milliseconds for the duration of the TaskGroup
253  execution on each of the nodes. It is guaranteed that this step
254  will be run at least once after every Task in the node has finished.
255  """
256  step = core.to_execution_step(step)
257  step.RunEveryMillis(interval_ms)
258  self._report_steps.append((str(node or Node.current(node)), step))
259 
260  def report_net(self, net=None, node=None, report_interval=5):
261  """
262  DEPRECATED. Use report_step instead.
263  """
264  node = str(node or Node.current(node))
265  assert net is None or node not in self._report_nets
266  if node not in self._report_nets:
267  self._report_nets[node] = (
268  net if net else core.Net('%s/reporter' % node),
269  report_interval)
270  return self._report_nets[node][0]
271 
272  def tasks_by_node(self, node_remap=None):
273  # tasks_by_node can't be called twice because the setup won't
274  # work properly a second time.
275  node_map = {}
276  for task in self.tasks():
277  node_map[task.node] =\
278  node_remap(task.node) if node_remap else task.node
279  if self._tasks_by_node is not None:
280  tasks_by_node, prev_node_map = self._tasks_by_node
281  assert prev_node_map == node_map, (
282  'Cannot call tasks_by_node multiple times.')
283  return tasks_by_node
284 
285  # now we have report_steps. report_net is deprecated
286  for node, (net, interval) in viewitems(self._report_nets):
287  self.report_step(net, node=node, interval_ms=interval * 1000)
288  self._report_nets = {}
289 
290  tasks_by_node = defaultdict(list)
291  for task in self.tasks():
292  mapped_node = node_map[task.node]
293  tasks_by_node[mapped_node].append(task)
294 
295  report_steps_by_node = defaultdict(list)
296  for original_node, step in self._report_steps:
297  report_steps_by_node[node_map[original_node]].append(step)
298 
299  grouped_by_node = TaskGroup()
300  for node, tasks in viewitems(tasks_by_node):
301  report_steps = report_steps_by_node[node]
302  node_inits, node_exits = get_setup_nets(
303  TaskGroup.LOCAL_SETUP,
304  [t.get_step() for t in tasks] + report_steps,
305  self)
306  # shortcut for single task with no queue
307  steps = report_steps
308  outputs = []
309  grouped_workspace_type = WorkspaceType.PRIVATE
310  for task in tasks:
311  step = task.get_step()
312  step.SetCreateWorkspace(
313  task.workspace_type() == WorkspaceType.PRIVATE)
314  if step is not None:
315  steps.append(step)
316  outputs += task.outputs()
317  # If any of the tasks in the node uses the global workspace,
318  # then set the grouped task to use the global workspace as well
319  if task.workspace_type() == WorkspaceType.GLOBAL:
320  grouped_workspace_type = WorkspaceType.GLOBAL
321  if len(steps) == 0:
322  steps.append(core.execution_step('empty', []))
323  if len(steps) == 1:
324  step = steps[0]
325  else:
326  step = core.execution_step(
327  '%s:body' % node, steps, concurrent_substeps=True)
328  if len(node_inits) > 0 or len(node_exits) > 0:
329  steps = []
330  if len(node_inits) > 0:
331  steps.append(
332  core.execution_step('%s:init' % node, node_inits))
333  steps.append(step)
334  if len(node_exits) > 0:
335  steps.append(
336  core.execution_step('%s:exit' % node, node_exits))
337  step = core.execution_step(node, steps)
338  Task(
339  node=node, step=step, outputs=outputs,
340  name='grouped_by_node',
341  group=grouped_by_node, workspace_type=grouped_workspace_type)
342  self._tasks_by_node = (grouped_by_node, node_map)
343  return grouped_by_node
344 
345  def to_task(self, node=None):
346  node = str(Node.current(node))
347  tasks = self.tasks_by_node(lambda x: node).tasks()
348  if len(tasks) == 0:
349  return Task()
350  return tasks[0]
351 
352  def workspace_type(self):
353  return self._workspace_type
354 
355  def __repr__(self):
356  return "TaskGroup(tasks={}, workspace_type={}, remote_nets={})".format(
357  self.tasks(), self.workspace_type(), self.remote_nets())
358 
359 
360 class TaskOutput(object):
361  """
362  Represents the output of a task. An output can be a blob,
363  a list of blob, or a record.
364  """
365 
366  def __init__(self, names):
367  self._schema = None
368  self._is_scalar = False
369  if isinstance(names, Field):
370  self._schema = names
371  names = self._schema.field_blobs()
372  self._is_scalar = type(names) not in (tuple, list)
373  if self._is_scalar:
374  names = [names]
375  self.names = names
376  self._values = None
377 
378  def set(self, values, _fetch_func=None):
379  assert len(values) == len(self.names)
380  self._values = values
381  self._fetch_func = _fetch_func
382 
383  def get(self):
384  assert self._values is not None, 'Output value not set yet.'
385  if self._is_scalar:
386  return self._values[0]
387  elif self._schema:
388  return from_blob_list(self._schema, self._values)
389  else:
390  return self._values
391 
392  def fetch(self):
393  assert self._fetch_func is not None, (
394  'Cannot fetch value for this output.')
395  fetched_vals = [self._fetch_func(v) for v in self._values]
396  if self._is_scalar:
397  return fetched_vals[0]
398  elif self._schema:
399  return from_blob_list(self._schema, fetched_vals)
400  else:
401  return fetched_vals
402 
403  def __repr__(self):
404  return "TaskOutput(names={}, values={})".format(self.names, self._values)
405 
406 
407 def final_output(blob_or_record):
408  """
409  Adds an output to the current Task, or if no task is active,
410  create a dummy task that returns the given blob or record
411  to the client. This will return the value of the blob or record when
412  the last task of the TaskGroup for a given node finishes.
413  """
414  cur_task = Task.current(required=False) or Task()
415  return cur_task.add_output(blob_or_record)
416 
417 
418 class TaskOutputList(object):
419  """ Keeps a list of outputs for a task """
420  def __init__(self, outputs=None):
421  self.outputs = outputs or []
422 
423  def names(self):
424  """
425  Retrive the output names.
426  TODO(azzolini): make this schema-based.
427  """
428  names = []
429  for o in self.outputs:
430  names += o.names
431  return names
432 
433  def set_values(self, values, _fetch_func=None):
434  offset = 0
435  for o in self.outputs:
436  num = len(o.names)
437  o.set(values[offset:offset + num], _fetch_func)
438  offset += num
439  assert offset == len(values), 'Wrong number of output values.'
440 
441  def __repr__(self):
442  return "TaskOutputList(outputs={})".format(self.outputs)
443 
444 
446 class Task(object):
447  """
448  A Task is composed of an execution step and zero or more outputs.
449  Tasks are executed in the context of a TaskGroup, which, in turn, can
450  be run by a Session.
451 
452  Task outputs are fetched by the session at the end of the run.
453 
454  The recommended way of creating a task is by using `net_builder.ops`.
455  Example:
456 
457  from net_builder import ops
458  with Node('trainer'), Task(name='my_task', num_instances=2):
459  with ops.task_init():
460  globl = ops.Const(0)
461  with ops.task_instance_init():
462  local = ops.Const(0)
463  with ops.loop(100):
464  ops.Copy(globl, local)
465  with ops.task_instance_exit():
466  ops.Add([globl, local], [globl])
467  with ops.task_exit():
468  ops.Mul([globl, globl], [globl])
469 
470  The task above will create 2 instances that will run in parallel.
471  Each instance will copy `local` to `globl` 100 times, Then Add `local`
472  to `globl` once. The `Mul` will only execute once, after all the instances
473  of the task have finished.
474  """
475 
476  # TASK_SETUP runs once per task, before/after all
477  # concurrent task instances start/finish.
478  TASK_SETUP = 'task_setup'
479  # Setup will run once for each instance of the task.
480  TASK_INSTANCE_SETUP = 'task_instance_setup'
481  REPORT_STEP = 'report_step'
482  _global_names_used = set()
483 
484  @staticmethod
485  def _get_next_name(node, group, name):
486  basename = str(node) + '/' + str(name)
487  names_used = (
488  Task._global_names_used
489  if group is None else
490  set(t.name for t in group._tasks_to_add))
491  cur_name = basename
492  i = 0
493  while cur_name in names_used:
494  i += 1
495  cur_name = '%s:%d' % (basename, i)
496  return cur_name
497 
498  def __init__(
499  self, step=None, outputs=None,
500  workspace_type=None, group=None, node=None, name=None,
501  num_instances=None):
502  """
503  Instantiate a Task and add it to the current TaskGroup and Node.
504 
505  Args:
506  step: If provided, this task will run this ExecutionStep.
507  outputs: If provided, the task will return the provided outputs
508  to the client at completion time.
509  node: If provided, force task execution on the given node.
510  name: Name of the Task.
511  num_instances: If provided, this task will be cloned num_instances
512  times at runtime, and all instances will run
513  concurrently.
514  """
515  if not name and isinstance(step, core.ExecutionStep):
516  name = step.Proto().name
517  if not name:
518  name = 'task'
519  # register this node name with active context
520  self.node = str(Node.current(None if node is None else Node(node)))
521  self.group = TaskGroup.current(group, required=False)
522 
523  self.name = Task._get_next_name(self.node, self.group, name)
524 
525  # may need to be temporarily removed later if Task used as a context
526  if self.group is not None:
527  self.group._tasks_to_add.append(self)
528 
529  self._already_used = False
530  self._step = None
531  self._step_with_setup = None
532  self._outputs = []
533  if step is not None:
534  self.set_step(step)
535  if outputs is not None:
536  self.add_outputs(outputs)
537 
538  self._pipeline = None
539  self._is_pipeline_context = False
540  self._workspace_type = workspace_type
541  self._report_net = None
542  self._num_instances = num_instances
543 
544  def __enter__(self):
545  # temporarily remove from _tasks_to_add to ensure correct order
546  if self.group is not None:
547  self.group._tasks_to_add.remove(self)
548  self._assert_not_used()
549  assert self._step is None, 'This Task already has an execution step.'
550  from caffe2.python import net_builder
551  self._net_builder = net_builder.NetBuilder(_fullname=self.name)
552  self._net_builder.__enter__()
553  return self
554 
555  def __exit__(self, type, value, traceback):
556  self._net_builder.__exit__(type, value, traceback)
557  if type is None:
558  self.set_step(self._net_builder)
559  if self.group is not None:
560  self.group._tasks_to_add.append(self)
561  self._net_builder = None
562 
563  def workspace_type(self):
564  return self._workspace_type
565 
566  def _assert_not_used(self):
567  assert not self._already_used, (
568  'Cannot modify task since it is already been used.')
569 
570  def add_output(self, output):
571  self._assert_not_used()
572  output = (
573  output if isinstance(output, TaskOutput) else TaskOutput(output))
574  self._outputs.append(output)
575  return output
576 
577  def add_outputs(self, outputs):
578  self._assert_not_used()
579  if type(outputs) not in (list, tuple):
580  return self.add_output(outputs)
581  else:
582  return [self.add_output(output) for output in outputs]
583 
584  def set_step(self, step):
585  self._assert_not_used()
586  self._step = core.to_execution_step(step)
587 
588  def get_step(self):
589  if self._step_with_setup is not None:
590  return self._step_with_setup
591 
592  if self._step is None:
593  self._step_with_setup = core.execution_step(self.name, [])
594  return self._step_with_setup
595 
596  report_steps = [
597  s
598  for s in self._step.get_all_attributes(Task.REPORT_STEP)
599  if not hasattr(s, '_report_step_used')
600  ]
601  for step in report_steps:
602  step._report_step_used = True
603  if not step.Proto().run_every_ms:
604  step.RunEveryMillis(1000)
605  task_init_nets, task_exit_nets = get_setup_nets(
606  Task.TASK_SETUP, [self._step] + report_steps, self)
607  instance_init_nets, instance_exit_nets = get_setup_nets(
608  Task.TASK_INSTANCE_SETUP, [self._step] + report_steps, self)
609  if len(self._outputs) == 0:
610  output_net = core.Net('%s:output' % self.name)
611  self.add_output(output_net.ConstantFill(
612  [], 1, dtype=core.DataType.INT32, value=0))
613  task_exit_nets.append(output_net)
614 
615  # Add instance-level report steps
616  body = self._step if not report_steps else core.execution_step(
617  '%s:body' % self.name, report_steps + [self._step])
618  # Enclose with instance-level (thread-local) setup nets
619  step_with_instance_setup = add_setup_steps(
620  body, instance_init_nets, instance_exit_nets,
621  self.name + ':instance')
622  # Set up runtime concurrent instances
623  if self._num_instances and self._num_instances > 1:
624  step_with_instance_setup.SetCreateWorkspace(True)
625  step_with_instance_setup = core.execution_step(
626  '%s:parallel',
627  [step_with_instance_setup],
628  num_concurrent_instances=self._num_instances)
629  # Enclose with task-level setup nets
630  self._step_with_setup = add_setup_steps(
631  step_with_instance_setup, task_init_nets, task_exit_nets, self.name)
632 
633  return self._step_with_setup
634 
635  def output_list(self):
636  return TaskOutputList(self._outputs)
637 
638  def outputs(self):
639  return self._outputs
640 
641  def _notify_used(self):
642  self.get_step()
643  self._already_used = True
644 
645  def __repr__(self):
646  return "Task(name={}, node={}, outputs={})".format(
647  self.name, self.node, self.outputs())
648 
649 
650 class SetupNets(object):
651  """
652  Allow to register a list of nets to be run at initialization
653  and finalization of Tasks or TaskGroups.
654  For example, let's say you have the following:
655 
656  init_net = core.Net('init')
657  my_val = init_net.ConstantFill([], 'my_val', value=0)
658 
659  net = core.Net('counter')
660  net.Add([my_val, net.Const(1),], [my_val])
661 
662  with TaskGroup() as task_group:
663  with Node('trainer'):
664  my_task = Task(step=[net])
665 
666  In order to have `init_net` run once before `net` runs for the
667  first time, you can do one of the following:
668 
669  net.add_attribute(Task.TASK_SETUP, SetupNets([init_net]))
670 
671  or
672 
673  net.add_attribute(TaskGroup.LOCAL_SETUP, SetupNets([init_net]))
674 
675  - With Task.TASK_SETUP, init_net will run once at my_task startup.
676  - With TaskGroup.LOCAL_SETUP, init_net will run once on node 'trainer',
677  before any task of the task group is run on that node.
678 
679  The same SetupNets object can be added to multiple nets. It will only
680  run once per Task/TaskGroup run.
681  """
682 
683  def __init__(self, init_nets=None, exit_nets=None):
684  self.init_nets = init_nets
685  self.exit_nets = exit_nets
686 
687  def setup(self, init_net):
688  return self.init_nets
689 
690  def exit(self, exit_net):
691  return self.exit_nets
692 
693  def __repr__(self):
694  return "SetupNets(init_nets={}, exit_nets={})".format(
695  self.init_nets, self.exit_nets)
def __init__(self, step=None, outputs=None, workspace_type=None, group=None, node=None, name=None, num_instances=None)
Definition: task.py:501
Definition: setup.py:1
def add(self, task)
Definition: task.py:216
def get_step(self)
Definition: task.py:588
def set_step(self, step)
Definition: task.py:584
def add_output(self, output)
Definition: task.py:570
def tasks_by_node(self, node_remap=None)
Definition: task.py:272
def outputs(self)
Definition: task.py:638
def report_step(self, step=None, node=None, interval_ms=1000)
Definition: task.py:249
def add_outputs(self, outputs)
Definition: task.py:577
def node_kwargs(self)
Definition: task.py:52
def report_net(self, net=None, node=None, report_interval=5)
Definition: task.py:260
def workspace_type(self)
Definition: task.py:352
def _assert_not_used(self)
Definition: task.py:566