2     from urllib.parse 
import urlparse
     4     from urlparse 
import urlparse
     7 from . 
import FileStore, TCPStore
    10 _rendezvous_handlers = {}
    13 def register_rendezvous_handler(scheme, handler):
    14     """Registers a new rendezvous handler.    16     Before we can run collective algorithms, participating processes    17     need to find each other and exchange information to be able to    18     communicate. We call this process rendezvous.    20     The outcome of the rendezvous process is a triplet containing a    21     shared key/value store, the rank of the process, and the total    22     number of participating processes.    24     If none of the bundled rendezvous methods apply to your execution    25     environment you can opt to register your own rendezvous handler.    26     Pick a unique name and use the URL scheme to identify it when    27     calling the `rendezvous()` function.    30         scheme (str): URL scheme to identify your rendezvous handler.    31         handler (function): Handler that is invoked when the    32             `rendezvous()` function is called with a URL that uses    33             the corresponding scheme. It must be a generator function    34             that yields the triplet.    36     global _rendezvous_handlers
    37     if scheme 
in _rendezvous_handlers:
    39             "Rendezvous handler for {}:// already registered".format(scheme)
    41     _rendezvous_handlers[scheme] = handler
    44 def rendezvous(url, **kwargs):
    45     global _rendezvous_handlers
    46     result = urlparse(url)
    47     if result.scheme 
not in _rendezvous_handlers:
    48         raise RuntimeError(
"No rendezvous handler for {}://".format(result.scheme))
    49     return _rendezvous_handlers[result.scheme](url, **kwargs)
    52 def _rendezvous_error(msg):
    53     return ValueError(
"Error initializing torch.distributed using " + msg)
    56 def _file_rendezvous_handler(url):
    58         return _rendezvous_error(
"file:// rendezvous: " + msg)
    60     result = urlparse(url)
    63         raise _error(
"path missing")
    64     query = dict(pair.split(
"=") 
for pair 
in filter(
None, result.query.split(
"&")))
    65     if "rank" not in query:
    66         raise _error(
"rank parameter missing")
    67     if "world_size" not in query:
    68         raise _error(
"world size parameter missing")
    70     rank = int(query[
"rank"])
    71     world_size = int(query[
"world_size"])
    72     store = FileStore(path, world_size)
    73     yield (store, rank, world_size)
    76     raise RuntimeError(
"Unable to perform rerendezvous using file:// method")
    79 def _tcp_rendezvous_handler(url):
    81         return _rendezvous_error(
"tcp:// rendezvous: " + msg)
    83     result = urlparse(url)
    85         raise _error(
"port number missing")
    86     query = dict(pair.split(
"=") 
for pair 
in filter(
None, result.query.split(
"&")))
    87     if "rank" not in query:
    88         raise _error(
"rank parameter missing")
    89     if "world_size" not in query:
    90         raise _error(
"world size parameter missing")
    92     rank = int(query[
"rank"])
    93     world_size = int(query[
"world_size"])
    94     start_daemon = rank == 0
    95     store = TCPStore(result.hostname, result.port, world_size, start_daemon)
    96     yield (store, rank, world_size)
    99     raise RuntimeError(
"Unable to perform rerendezvous using tcp:// method")
   102 def _env_rendezvous_handler(url):
   104         return _rendezvous_error(
"env:// rendezvous: " + msg)
   107         return _error(
"environment variable %s expected, but not set" % var)
   109     if not url.startswith(
"env://"):
   110         raise _error(
"url must be equal to `env://`")
   111     result = urlparse(url)
   112     query = dict(pair.split(
"=") 
for pair 
in filter(
None, result.query.split(
"&")))
   115         rank = int(query[
"rank"])
   117         rank = os.environ.get(
"RANK", 
None)
   119             raise _env_error(
"RANK")
   121     if "world_size" in query:
   122         world_size = int(query[
"world_size"])
   124         world_size = os.environ.get(
"WORLD_SIZE", 
None)
   125         if world_size 
is None:
   126             raise _env_error(
"WORLD_SIZE")
   128     master_addr = os.environ.get(
"MASTER_ADDR", 
None)
   129     if master_addr 
is None:
   130         raise _env_error(
"MASTER_ADDR")
   132     master_port = os.environ.get(
"MASTER_PORT", 
None)
   133     if master_port 
is None:
   134         raise _env_error(
"MASTER_PORT")
   138     world_size = int(world_size)
   139     master_port = int(master_port)
   142     start_daemon = rank == 0
   143     store = TCPStore(master_addr, master_port, world_size, start_daemon)
   144     yield (store, rank, world_size)
   147     raise RuntimeError(
"Unable to perform rerendezvous using env:// method")
   150 register_rendezvous_handler(
"file", _file_rendezvous_handler)
   151 register_rendezvous_handler(
"tcp", _tcp_rendezvous_handler)
   152 register_rendezvous_handler(
"env", _env_rendezvous_handler)