3 from __future__
import absolute_import
4 from __future__
import division
5 from __future__
import print_function
6 from __future__
import unicode_literals
10 from collections
import defaultdict
12 from future.utils
import viewitems
15 def _merge_node_kwargs(a, b):
26 @context.define_context(allow_default=
True)
29 Context that keeps track of all the node names used. 30 Users shouldn't have to use them directly, since a Cluster is automatically 31 generated at the first usage of 'Node'. 39 def add_node(self, node):
40 if str(node)
not in self.
_nodes:
41 self._nodes.append(str(node))
44 self._node_kwargs.get(str(node)))
48 Returns the list of unique node names used within this context. 52 def node_kwargs(self):
56 return "Cluster(nodes={}, node_kwargs={})".format(
63 A Node context is used to indicate that all Tasks instantiated within will 64 run on the given node name. (Only the name of the node actually counts.) 67 with TaskGroup() as tg: 69 s1 = execution_step(...) 72 s2 = execution_step(...) 74 s3 = execution_step(...) 76 In this example, all three execution steps will run in parallel. 77 Moreover, s1 and s3 will run on the same node, and can see each 80 Additionally, a Node can be passed implementation-specific kwargs, 81 in order to specify properties of the node. 84 def __init__(self, node='local', **kwargs):
85 self.
_name = str(node)
87 Cluster.current().add_node(self)
93 return "Node(name={}, kwargs={})".format(self.
_name, self.
_kwargs)
101 Determines whether tasks of a TaskGroup will run directly at the global 102 workspace, which is kept alive across runs, or whether a new child 103 workspace will be created for the run and destroyed afterwards. 109 def get_setup_nets(key, steps_or_nets, target):
115 for step_or_net
in steps_or_nets:
116 if hasattr(step_or_net,
'get_all_attributes'):
117 objs += step_or_net.get_all_attributes(key)
118 elif hasattr(step_or_net,
'get_attributes'):
119 objs += step_or_net.get_attributes(key)
123 if hasattr(obj,
'_setup_used')
and obj._setup_used:
125 if hasattr(obj,
'_setup_target')
and obj._setup_target != target:
127 if hasattr(obj,
'setup'):
128 nets = obj.setup(init_net)
129 if isinstance(nets, (list, tuple)):
132 init_nets.append(nets)
133 elif nets
is not None:
134 raise TypeError(
'Unsupported type for setup: %s' % type(nets))
135 obj._setup_used =
True 136 if hasattr(obj,
'exit'):
137 nets = obj.exit(exit_net)
138 if isinstance(nets, (list, tuple)):
141 exit_nets.append(nets)
142 elif nets
is not None:
143 raise TypeError(
'Unsupported type for setup: %s' % type(nets))
144 obj._setup_used =
True 146 if len(init_net.Proto().op) > 0:
147 init_nets.insert(0, init_net)
148 if len(exit_net.Proto().op) > 0:
149 exit_nets.insert(0, exit_net)
150 return init_nets, exit_nets
153 def add_setup_steps(step, init_nets, exit_nets, name):
154 if not init_nets
and not exit_nets:
158 steps.append(core.execution_step(
'%s:init' % name, init_nets))
160 if len(exit_nets) > 0:
161 steps.append(core.execution_step(
'%s:exit' % name, exit_nets))
162 return core.execution_step(name, steps)
168 Context that gathers tasks which will run concurrently, potentially on 169 multiple nodes. All tasks in the same node will share the same workspace 170 and thus can share blobs, while tasks running in different nodes won't 171 be able to directly share data. 173 All tasks of the task group will start concurrently, and the task group 174 will finish execution when the last task of the group finishes. 177 # supose that s1 ... s5 are execution steps or nets. 178 with TaskGroup() as tg: 179 # these tasks go to default node 'local' 190 # this will run all steps in parallel. 191 # s1 and s2 will run at default node 'local' 192 # s3 and s5 will run at node 'n2' 193 # s4 will run at node 'n1' 196 LOCAL_SETUP =
'local_setup' 198 def __init__(self, workspace_type=None):
210 def add_remote_net(self, net):
211 self._remote_nets.append(net)
213 def remote_nets(self):
218 'Cannot add Task to an already used TaskGroup.')
221 task._workspace_type
is None or 223 if task._workspace_type
is None:
224 task._workspace_type = (
229 self._tasks.append(task)
238 def num_registered_tasks(self):
241 def used_nodes(self):
245 if task.node
not in used:
246 used.append(task.node)
251 Add a "report step" to this TaskGroup. This step will run repeatedly 252 every `interval_ms` milliseconds for the duration of the TaskGroup 253 execution on each of the nodes. It is guaranteed that this step 254 will be run at least once after every Task in the node has finished. 256 step = core.to_execution_step(step)
257 step.RunEveryMillis(interval_ms)
258 self._report_steps.append((str(node
or Node.current(node)), step))
260 def report_net(self, net=None, node=None, report_interval=5):
262 DEPRECATED. Use report_step instead. 264 node = str(node
or Node.current(node))
268 net
if net
else core.Net(
'%s/reporter' % node),
272 def tasks_by_node(self, node_remap=None):
276 for task
in self.
tasks():
277 node_map[task.node] =\
278 node_remap(task.node)
if node_remap
else task.node
281 assert prev_node_map == node_map, (
282 'Cannot call tasks_by_node multiple times.')
286 for node, (net, interval)
in viewitems(self.
_report_nets):
287 self.
report_step(net, node=node, interval_ms=interval * 1000)
290 tasks_by_node = defaultdict(list)
291 for task
in self.
tasks():
292 mapped_node = node_map[task.node]
293 tasks_by_node[mapped_node].append(task)
295 report_steps_by_node = defaultdict(list)
297 report_steps_by_node[node_map[original_node]].append(step)
300 for node, tasks
in viewitems(tasks_by_node):
301 report_steps = report_steps_by_node[node]
302 node_inits, node_exits = get_setup_nets(
303 TaskGroup.LOCAL_SETUP,
304 [t.get_step()
for t
in tasks] + report_steps,
309 grouped_workspace_type = WorkspaceType.PRIVATE
311 step = task.get_step()
312 step.SetCreateWorkspace(
313 task.workspace_type() == WorkspaceType.PRIVATE)
316 outputs += task.outputs()
319 if task.workspace_type() == WorkspaceType.GLOBAL:
320 grouped_workspace_type = WorkspaceType.GLOBAL
322 steps.append(core.execution_step(
'empty', []))
326 step = core.execution_step(
327 '%s:body' % node, steps, concurrent_substeps=
True)
328 if len(node_inits) > 0
or len(node_exits) > 0:
330 if len(node_inits) > 0:
332 core.execution_step(
'%s:init' % node, node_inits))
334 if len(node_exits) > 0:
336 core.execution_step(
'%s:exit' % node, node_exits))
337 step = core.execution_step(node, steps)
339 node=node, step=step, outputs=outputs,
340 name=
'grouped_by_node',
341 group=grouped_by_node, workspace_type=grouped_workspace_type)
343 return grouped_by_node
345 def to_task(self, node=None):
346 node = str(Node.current(node))
352 def workspace_type(self):
356 return "TaskGroup(tasks={}, workspace_type={}, remote_nets={})".format(
362 Represents the output of a task. An output can be a blob, 363 a list of blob, or a record. 366 def __init__(self, names):
369 if isinstance(names, Field):
371 names = self._schema.field_blobs()
372 self.
_is_scalar = type(names)
not in (tuple, list)
378 def set(self, values, _fetch_func=None):
379 assert len(values) == len(self.
names)
384 assert self.
_values is not None,
'Output value not set yet.' 394 'Cannot fetch value for this output.')
397 return fetched_vals[0]
399 return from_blob_list(self.
_schema, fetched_vals)
404 return "TaskOutput(names={}, values={})".format(self.
names, self.
_values)
407 def final_output(blob_or_record):
409 Adds an output to the current Task, or if no task is active, 410 create a dummy task that returns the given blob or record 411 to the client. This will return the value of the blob or record when 412 the last task of the TaskGroup for a given node finishes. 414 cur_task = Task.current(required=
False)
or Task()
415 return cur_task.add_output(blob_or_record)
419 """ Keeps a list of outputs for a task """ 420 def __init__(self, outputs=None):
425 Retrive the output names. 426 TODO(azzolini): make this schema-based. 433 def set_values(self, values, _fetch_func=None):
437 o.set(values[offset:offset + num], _fetch_func)
439 assert offset == len(values),
'Wrong number of output values.' 442 return "TaskOutputList(outputs={})".format(self.
outputs)
448 A Task is composed of an execution step and zero or more outputs. 449 Tasks are executed in the context of a TaskGroup, which, in turn, can 452 Task outputs are fetched by the session at the end of the run. 454 The recommended way of creating a task is by using `net_builder.ops`. 457 from net_builder import ops 458 with Node('trainer'), Task(name='my_task', num_instances=2): 459 with ops.task_init(): 461 with ops.task_instance_init(): 464 ops.Copy(globl, local) 465 with ops.task_instance_exit(): 466 ops.Add([globl, local], [globl]) 467 with ops.task_exit(): 468 ops.Mul([globl, globl], [globl]) 470 The task above will create 2 instances that will run in parallel. 471 Each instance will copy `local` to `globl` 100 times, Then Add `local` 472 to `globl` once. The `Mul` will only execute once, after all the instances 473 of the task have finished. 478 TASK_SETUP =
'task_setup' 480 TASK_INSTANCE_SETUP =
'task_instance_setup' 481 REPORT_STEP =
'report_step' 482 _global_names_used = set()
485 def _get_next_name(node, group, name):
486 basename = str(node) +
'/' + str(name)
488 Task._global_names_used
489 if group
is None else 490 set(t.name
for t
in group._tasks_to_add))
493 while cur_name
in names_used:
495 cur_name =
'%s:%d' % (basename, i)
499 self, step=
None, outputs=
None,
500 workspace_type=
None, group=
None, node=
None, name=
None,
503 Instantiate a Task and add it to the current TaskGroup and Node. 506 step: If provided, this task will run this ExecutionStep. 507 outputs: If provided, the task will return the provided outputs 508 to the client at completion time. 509 node: If provided, force task execution on the given node. 510 name: Name of the Task. 511 num_instances: If provided, this task will be cloned num_instances 512 times at runtime, and all instances will run 516 name = step.Proto().name
520 self.
node = str(Node.current(
None if node
is None else Node(node)))
521 self.
group = TaskGroup.current(group, required=
False)
526 if self.
group is not None:
527 self.group._tasks_to_add.append(self)
535 if outputs
is not None:
546 if self.
group is not None:
547 self.group._tasks_to_add.remove(self)
549 assert self.
_step is None,
'This Task already has an execution step.' 552 self._net_builder.__enter__()
555 def __exit__(self, type, value, traceback):
556 self._net_builder.__exit__(type, value, traceback)
559 if self.
group is not None:
560 self.group._tasks_to_add.append(self)
563 def workspace_type(self):
566 def _assert_not_used(self):
568 'Cannot modify task since it is already been used.')
570 def add_output(self, output):
573 output
if isinstance(output, TaskOutput)
else TaskOutput(output))
574 self._outputs.append(output)
577 def add_outputs(self, outputs):
579 if type(outputs)
not in (list, tuple):
582 return [self.
add_output(output)
for output
in outputs]
584 def set_step(self, step):
586 self.
_step = core.to_execution_step(step)
592 if self.
_step is None:
598 for s
in self._step.get_all_attributes(Task.REPORT_STEP)
599 if not hasattr(s,
'_report_step_used')
601 for step
in report_steps:
602 step._report_step_used =
True 603 if not step.Proto().run_every_ms:
604 step.RunEveryMillis(1000)
605 task_init_nets, task_exit_nets = get_setup_nets(
606 Task.TASK_SETUP, [self.
_step] + report_steps, self)
607 instance_init_nets, instance_exit_nets = get_setup_nets(
608 Task.TASK_INSTANCE_SETUP, [self.
_step] + report_steps, self)
612 [], 1, dtype=core.DataType.INT32, value=0))
613 task_exit_nets.append(output_net)
616 body = self.
_step if not report_steps
else core.execution_step(
617 '%s:body' % self.
name, report_steps + [self.
_step])
619 step_with_instance_setup = add_setup_steps(
620 body, instance_init_nets, instance_exit_nets,
621 self.
name +
':instance')
624 step_with_instance_setup.SetCreateWorkspace(
True)
625 step_with_instance_setup = core.execution_step(
627 [step_with_instance_setup],
631 step_with_instance_setup, task_init_nets, task_exit_nets, self.
name)
635 def output_list(self):
641 def _notify_used(self):
646 return "Task(name={}, node={}, outputs={})".format(
652 Allow to register a list of nets to be run at initialization 653 and finalization of Tasks or TaskGroups. 654 For example, let's say you have the following: 656 init_net = core.Net('init') 657 my_val = init_net.ConstantFill([], 'my_val', value=0) 659 net = core.Net('counter') 660 net.Add([my_val, net.Const(1),], [my_val]) 662 with TaskGroup() as task_group: 663 with Node('trainer'): 664 my_task = Task(step=[net]) 666 In order to have `init_net` run once before `net` runs for the 667 first time, you can do one of the following: 669 net.add_attribute(Task.TASK_SETUP, SetupNets([init_net])) 673 net.add_attribute(TaskGroup.LOCAL_SETUP, SetupNets([init_net])) 675 - With Task.TASK_SETUP, init_net will run once at my_task startup. 676 - With TaskGroup.LOCAL_SETUP, init_net will run once on node 'trainer', 677 before any task of the task group is run on that node. 679 The same SetupNets object can be added to multiple nets. It will only 680 run once per Task/TaskGroup run. 683 def __init__(self, init_nets=None, exit_nets=None):
687 def setup(self, init_net):
690 def exit(self, exit_net):
694 return "SetupNets(init_nets={}, exit_nets={})".format(
def __init__(self, step=None, outputs=None, workspace_type=None, group=None, node=None, name=None, num_instances=None)
def add_output(self, output)
def tasks_by_node(self, node_remap=None)
def report_step(self, step=None, node=None, interval_ms=1000)
def add_outputs(self, outputs)
def report_net(self, net=None, node=None, report_interval=5)
def _assert_not_used(self)