3 from __future__ 
import absolute_import
     4 from __future__ 
import division
     5 from __future__ 
import print_function
     6 from __future__ 
import unicode_literals
    10 from collections 
import defaultdict
    12 from future.utils 
import viewitems
    15 def _merge_node_kwargs(a, b):
    26 @context.define_context(allow_default=
True)
    29     Context that keeps track of all the node names used.    30     Users shouldn't have to use them directly, since a Cluster is automatically    31     generated at the first usage of 'Node'.    39     def add_node(self, node):
    40         if str(node) 
not in self.
_nodes:
    41             self._nodes.append(str(node))
    44             self._node_kwargs.get(str(node)))
    48         Returns the list of unique node names used within this context.    52     def node_kwargs(self):
    56         return "Cluster(nodes={}, node_kwargs={})".format(
    63     A Node context is used to indicate that all Tasks instantiated within will    64     run on the given node name. (Only the name of the node actually counts.)    67         with TaskGroup() as tg:    69                 s1 = execution_step(...)    72                 s2 = execution_step(...)    74                 s3 = execution_step(...)    76         In this example, all three execution steps will run in parallel.    77         Moreover, s1 and s3 will run on the same node, and can see each    80         Additionally, a Node can be passed implementation-specific kwargs,    81         in order to specify properties of the node.    84     def __init__(self, node='local', **kwargs):
    85         self.
_name = str(node)
    87         Cluster.current().add_node(self)
    93         return "Node(name={}, kwargs={})".format(self.
_name, self.
_kwargs)
   101     Determines whether tasks of a TaskGroup will run directly at the global   102     workspace, which is kept alive across runs, or whether a new child   103     workspace will be created for the run and destroyed afterwards.   109 def get_setup_nets(key, steps_or_nets, target):
   115     for step_or_net 
in steps_or_nets:
   116         if hasattr(step_or_net, 
'get_all_attributes'):
   117             objs += step_or_net.get_all_attributes(key)
   118         elif hasattr(step_or_net, 
'get_attributes'):
   119             objs += step_or_net.get_attributes(key)
   123         if hasattr(obj, 
'_setup_used') 
and obj._setup_used:
   125         if hasattr(obj, 
'_setup_target') 
and obj._setup_target != target:
   127         if hasattr(obj, 
'setup'):
   128             nets = obj.setup(init_net)
   129             if isinstance(nets, (list, tuple)):
   132                 init_nets.append(nets)
   133             elif nets 
is not None:
   134                 raise TypeError(
'Unsupported type for setup: %s' % type(nets))
   135             obj._setup_used = 
True   136         if hasattr(obj, 
'exit'):
   137             nets = obj.exit(exit_net)
   138             if isinstance(nets, (list, tuple)):
   141                 exit_nets.append(nets)
   142             elif nets 
is not None:
   143                 raise TypeError(
'Unsupported type for setup: %s' % type(nets))
   144             obj._setup_used = 
True   146     if len(init_net.Proto().op) > 0:
   147         init_nets.insert(0, init_net)
   148     if len(exit_net.Proto().op) > 0:
   149         exit_nets.insert(0, exit_net)
   150     return init_nets, exit_nets
   153 def add_setup_steps(step, init_nets, exit_nets, name):
   154     if not init_nets 
and not exit_nets:
   158         steps.append(core.execution_step(
'%s:init' % name, init_nets))
   160     if len(exit_nets) > 0:
   161         steps.append(core.execution_step(
'%s:exit' % name, exit_nets))
   162     return core.execution_step(name, steps)
   168     Context that gathers tasks which will run concurrently, potentially on   169     multiple nodes. All tasks in the same node will share the same workspace   170     and thus can share blobs, while tasks running in different nodes won't   171     be able to directly share data.   173     All tasks of the task group will start concurrently, and the task group   174     will finish execution when the last task of the group finishes.   177         # supose that s1 ... s5 are execution steps or nets.   178         with TaskGroup() as tg:   179             # these tasks go to default node 'local'   190         # this will run all steps in parallel.   191         # s1 and s2 will run at default node 'local'   192         # s3 and s5 will run at node 'n2'   193         # s4 will run at node 'n1'   196     LOCAL_SETUP = 
'local_setup'   198     def __init__(self, workspace_type=None):
   210     def add_remote_net(self, net):
   211         self._remote_nets.append(net)
   213     def remote_nets(self):
   218             'Cannot add Task to an already used TaskGroup.')
   221             task._workspace_type 
is None or   223         if task._workspace_type 
is None:
   224             task._workspace_type = (
   229         self._tasks.append(task)
   238     def num_registered_tasks(self):
   241     def used_nodes(self):
   245             if task.node 
not in used:
   246                 used.append(task.node)
   251         Add a "report step" to this TaskGroup. This step will run repeatedly   252         every `interval_ms` milliseconds for the duration of the TaskGroup   253         execution on each of the nodes. It is guaranteed that this step   254         will be run at least once after every Task in the node has finished.   256         step = core.to_execution_step(step)
   257         step.RunEveryMillis(interval_ms)
   258         self._report_steps.append((str(node 
or Node.current(node)), step))
   260     def report_net(self, net=None, node=None, report_interval=5):
   262         DEPRECATED. Use report_step instead.   264         node = str(node 
or Node.current(node))
   268                 net 
if net 
else core.Net(
'%s/reporter' % node),
   272     def tasks_by_node(self, node_remap=None):
   276         for task 
in self.
tasks():
   277             node_map[task.node] =\
   278                 node_remap(task.node) 
if node_remap 
else task.node
   281             assert prev_node_map == node_map, (
   282                 'Cannot call tasks_by_node multiple times.')
   286         for node, (net, interval) 
in viewitems(self.
_report_nets):
   287             self.
report_step(net, node=node, interval_ms=interval * 1000)
   290         tasks_by_node = defaultdict(list)
   291         for task 
in self.
tasks():
   292             mapped_node = node_map[task.node]
   293             tasks_by_node[mapped_node].append(task)
   295         report_steps_by_node = defaultdict(list)
   297             report_steps_by_node[node_map[original_node]].append(step)
   300         for node, tasks 
in viewitems(tasks_by_node):
   301             report_steps = report_steps_by_node[node]
   302             node_inits, node_exits = get_setup_nets(
   303                 TaskGroup.LOCAL_SETUP,
   304                 [t.get_step() 
for t 
in tasks] + report_steps,
   309             grouped_workspace_type = WorkspaceType.PRIVATE
   311                 step = task.get_step()
   312                 step.SetCreateWorkspace(
   313                     task.workspace_type() == WorkspaceType.PRIVATE)
   316                 outputs += task.outputs()
   319                 if task.workspace_type() == WorkspaceType.GLOBAL:
   320                     grouped_workspace_type = WorkspaceType.GLOBAL
   322                 steps.append(core.execution_step(
'empty', []))
   326                 step = core.execution_step(
   327                     '%s:body' % node, steps, concurrent_substeps=
True)
   328             if len(node_inits) > 0 
or len(node_exits) > 0:
   330                 if len(node_inits) > 0:
   332                         core.execution_step(
'%s:init' % node, node_inits))
   334                 if len(node_exits) > 0:
   336                         core.execution_step(
'%s:exit' % node, node_exits))
   337                 step = core.execution_step(node, steps)
   339                 node=node, step=step, outputs=outputs,
   340                 name=
'grouped_by_node',
   341                 group=grouped_by_node, workspace_type=grouped_workspace_type)
   343         return grouped_by_node
   345     def to_task(self, node=None):
   346         node = str(Node.current(node))
   352     def workspace_type(self):
   356         return "TaskGroup(tasks={}, workspace_type={}, remote_nets={})".format(
   362     Represents the output of a task. An output can be a blob,   363     a list of blob, or a record.   366     def __init__(self, names):
   369         if isinstance(names, Field):
   371             names = self._schema.field_blobs()
   372         self.
_is_scalar = type(names) 
not in (tuple, list)
   378     def set(self, values, _fetch_func=None):
   379         assert len(values) == len(self.
names)
   384         assert self.
_values is not None, 
'Output value not set yet.'   394             'Cannot fetch value for this output.')
   397             return fetched_vals[0]
   399             return from_blob_list(self.
_schema, fetched_vals)
   404         return "TaskOutput(names={}, values={})".format(self.
names, self.
_values)
   407 def final_output(blob_or_record):
   409     Adds an output to the current Task, or if no task is active,   410     create a dummy task that returns the given blob or record   411     to the client. This will return the value of the blob or record when   412     the last task of the TaskGroup for a given node finishes.   414     cur_task = Task.current(required=
False) 
or Task()
   415     return cur_task.add_output(blob_or_record)
   419     """ Keeps a list of outputs for a task """   420     def __init__(self, outputs=None):
   425         Retrive the output names.   426         TODO(azzolini): make this schema-based.   433     def set_values(self, values, _fetch_func=None):
   437             o.set(values[offset:offset + num], _fetch_func)
   439         assert offset == len(values), 
'Wrong number of output values.'   442         return "TaskOutputList(outputs={})".format(self.
outputs)
   448     A Task is composed of an execution step and zero or more outputs.   449     Tasks are executed in the context of a TaskGroup, which, in turn, can   452     Task outputs are fetched by the session at the end of the run.   454     The recommended way of creating a task is by using `net_builder.ops`.   457         from net_builder import ops   458         with Node('trainer'), Task(name='my_task', num_instances=2):   459             with ops.task_init():   461             with ops.task_instance_init():   464                 ops.Copy(globl, local)   465             with ops.task_instance_exit():   466                 ops.Add([globl, local], [globl])   467             with ops.task_exit():   468                 ops.Mul([globl, globl], [globl])   470     The task above will create 2 instances that will run in parallel.   471     Each instance will copy `local` to `globl` 100 times, Then Add `local`   472     to `globl` once. The `Mul` will only execute once, after all the instances   473     of the task have finished.   478     TASK_SETUP = 
'task_setup'   480     TASK_INSTANCE_SETUP = 
'task_instance_setup'   481     REPORT_STEP = 
'report_step'   482     _global_names_used = set()
   485     def _get_next_name(node, group, name):
   486         basename = str(node) + 
'/' + str(name)
   488             Task._global_names_used
   489             if group 
is None else   490             set(t.name 
for t 
in group._tasks_to_add))
   493         while cur_name 
in names_used:
   495             cur_name = 
'%s:%d' % (basename, i)
   499             self, step=
None, outputs=
None,
   500             workspace_type=
None, group=
None, node=
None, name=
None,
   503         Instantiate a Task and add it to the current TaskGroup and Node.   506            step:    If provided, this task will run this ExecutionStep.   507            outputs: If provided, the task will return the provided outputs   508                     to the client at completion time.   509            node:    If provided, force task execution on the given node.   510            name:    Name of the Task.   511            num_instances: If provided, this task will be cloned num_instances   512                           times at runtime, and all instances will run   516             name = step.Proto().name
   520         self.
node = str(Node.current(
None if node 
is None else Node(node)))
   521         self.
group = TaskGroup.current(group, required=
False)
   526         if self.
group is not None:
   527             self.group._tasks_to_add.append(self)
   535         if outputs 
is not None:
   546         if self.
group is not None:
   547             self.group._tasks_to_add.remove(self)
   549         assert self.
_step is None, 
'This Task already has an execution step.'   552         self._net_builder.__enter__()
   555     def __exit__(self, type, value, traceback):
   556         self._net_builder.__exit__(type, value, traceback)
   559         if self.
group is not None:
   560             self.group._tasks_to_add.append(self)
   563     def workspace_type(self):
   566     def _assert_not_used(self):
   568             'Cannot modify task since it is already been used.')
   570     def add_output(self, output):
   573             output 
if isinstance(output, TaskOutput) 
else TaskOutput(output))
   574         self._outputs.append(output)
   577     def add_outputs(self, outputs):
   579         if type(outputs) 
not in (list, tuple):
   582             return [self.
add_output(output) 
for output 
in outputs]
   584     def set_step(self, step):
   586         self.
_step = core.to_execution_step(step)
   592         if self.
_step is None:
   598             for s 
in self._step.get_all_attributes(Task.REPORT_STEP)
   599             if not hasattr(s, 
'_report_step_used')
   601         for step 
in report_steps:
   602             step._report_step_used = 
True   603             if not step.Proto().run_every_ms:
   604                 step.RunEveryMillis(1000)
   605         task_init_nets, task_exit_nets = get_setup_nets(
   606             Task.TASK_SETUP, [self.
_step] + report_steps, self)
   607         instance_init_nets, instance_exit_nets = get_setup_nets(
   608             Task.TASK_INSTANCE_SETUP, [self.
_step] + report_steps, self)
   612                 [], 1, dtype=core.DataType.INT32, value=0))
   613             task_exit_nets.append(output_net)
   616         body = self.
_step if not report_steps 
else core.execution_step(
   617             '%s:body' % self.
name, report_steps + [self.
_step])
   619         step_with_instance_setup = add_setup_steps(
   620             body, instance_init_nets, instance_exit_nets,
   621             self.
name + 
':instance')
   624             step_with_instance_setup.SetCreateWorkspace(
True)
   625             step_with_instance_setup = core.execution_step(
   627                 [step_with_instance_setup],
   631             step_with_instance_setup, task_init_nets, task_exit_nets, self.
name)
   635     def output_list(self):
   641     def _notify_used(self):
   646         return "Task(name={}, node={}, outputs={})".format(
   652     Allow to register a list of nets to be run at initialization   653     and finalization of Tasks or TaskGroups.   654     For example, let's say you have the following:   656         init_net = core.Net('init')   657         my_val = init_net.ConstantFill([], 'my_val', value=0)   659         net = core.Net('counter')   660         net.Add([my_val, net.Const(1),], [my_val])   662         with TaskGroup() as task_group:   663             with Node('trainer'):   664                 my_task = Task(step=[net])   666     In order to have `init_net` run once before `net` runs for the   667     first time, you can do one of the following:   669         net.add_attribute(Task.TASK_SETUP, SetupNets([init_net]))   673         net.add_attribute(TaskGroup.LOCAL_SETUP, SetupNets([init_net]))   675     - With Task.TASK_SETUP, init_net will run once at my_task startup.   676     - With TaskGroup.LOCAL_SETUP, init_net will run once on node 'trainer',   677       before any task of the task group is run on that node.   679     The same SetupNets object can be added to multiple nets. It will only   680     run once per Task/TaskGroup run.   683     def __init__(self, init_nets=None, exit_nets=None):
   687     def setup(self, init_net):
   690     def exit(self, exit_net):
   694         return "SetupNets(init_nets={}, exit_nets={})".format(
 
def __init__(self, step=None, outputs=None, workspace_type=None, group=None, node=None, name=None, num_instances=None)
 
def add_output(self, output)
 
def tasks_by_node(self, node_remap=None)
 
def report_step(self, step=None, node=None, interval_ms=1000)
 
def add_outputs(self, outputs)
 
def report_net(self, net=None, node=None, report_interval=5)
 
def _assert_not_used(self)