Caffe2 - Python API
A deep learning, cross platform ML framework
muji.py
1 # Copyright (c) 2016-present, Facebook, Inc.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 ##############################################################################
15 
16 ## @package muji
17 # Module caffe2.python.muji
18 """muji.py does multi-gpu training for caffe2 with no need to change the c++
19 side code. Everything is defined on the computation graph level.
20 
21 Currently, here are the assumptions: we only support the following use cases:
22  - 2 gpus, where peer access is enabled between them.
23  - 4 gpus, where peer access are enabled between all of them.
24  - 8 gpus, where peer access are enabled in two groups,
25  between {1, 2, 3, 4} and {5, 6, 7, 8}.
26 """
27 
28 from caffe2.proto import caffe2_pb2
29 
30 
31 def OnGPU(gpu_id):
32  """A utility function that returns a device option protobuf of the
33  specified gpu id.
34  """
35  device_option = caffe2_pb2.DeviceOption()
36  device_option.device_type = caffe2_pb2.CUDA
37  device_option.cuda_gpu_id = gpu_id
38  return device_option
39 
40 
41 def OnCPU():
42  device_option = caffe2_pb2.DeviceOption()
43  device_option.device_type = caffe2_pb2.CPU
44  return device_option
45 
46 
47 def Allreduce(net, blobs, reduced_affix="_reduced", gpu_indices=None):
48  """The general Allreduce interface that reroutes the function calls.
49  """
50  if gpu_indices is None:
51  gpu_indices = list(range(len(blobs)))
52  if len(gpu_indices) != len(blobs):
53  raise RuntimeError(
54  "gpu_indices length and blobs length mismatch: %d vs %d" %
55  (len(gpu_indices), len(blobs))
56  )
57  if len(blobs) == 2:
58  return Allreduce2(net, blobs, reduced_affix, gpu_indices)
59  elif len(blobs) == 4:
60  return Allreduce4(net, blobs, reduced_affix, gpu_indices)
61  elif len(blobs) == 8:
62  return Allreduce8(net, blobs, reduced_affix, gpu_indices)
63  else:
64  return AllreduceFallback(net, blobs, reduced_affix, gpu_indices)
65 
66 
67 def Allreduce2(net, blobs, reduced_affix, gpu_indices):
68  """Allreduce for 2 gpus.
69 
70  Algorithm: 0r <- 0 + 1, 1r <- 0r, where r means "reduced"
71  """
72  a, b = blobs
73  gpu_a, gpu_b = gpu_indices
74  a_reduced = net.Add([a, b], a + reduced_affix, device_option=OnGPU(gpu_a))
75  b_reduced = a_reduced.Copy(
76  [],
77  b + reduced_affix,
78  device_option=OnGPU(gpu_b)
79  )
80  return a_reduced, b_reduced
81 
82 
83 def Allreduce4(net, blobs, reduced_affix, gpu_indices):
84  """Allreduce for 4 gpus.
85 
86  Algorithm: 2 level reduction.
87  0r <- 0 + 1, 2r <- 2 + 3
88  0r <- 0r + 2r
89  2r <- 0r,
90  1r <- 0r, 3r <- 2r
91  """
92  a, b, c, d = blobs
93  gpu_a, gpu_b, gpu_c, gpu_d = gpu_indices
94  # a_reduced <- a+b, c_reduced <- c + d
95  a_reduced = net.Add(
96  [a, b],
97  str(a) + reduced_affix,
98  device_option=OnGPU(gpu_a)
99  )
100  c_reduced = net.Add(
101  [c, d],
102  str(c) + reduced_affix,
103  device_option=OnGPU(gpu_c)
104  )
105  # a_reduced <- a_reduced + c_reduced
106  a_reduced = a_reduced.Add(c_reduced, a_reduced, device_option=OnGPU(gpu_a))
107  # broadcast a_reduced to c_reduced
108  c_reduced = a_reduced.Copy([], c_reduced, device_option=OnGPU(gpu_c))
109  # broadcast to b and d
110  b_reduced = a_reduced.Copy(
111  [],
112  str(b) + reduced_affix,
113  device_option=OnGPU(gpu_b)
114  )
115  d_reduced = c_reduced.Copy(
116  [],
117  str(d) + reduced_affix,
118  device_option=OnGPU(gpu_d)
119  )
120  return a_reduced, b_reduced, c_reduced, d_reduced
121 
122 
123 def Allreduce8(net, blobs, reduced_affix, gpu_indices):
124  """Allreduce for 8 gpus.
125 
126  Algorithm: 3 level reduction.
127  0r <- 0 + 1, 2r <- 2 + 3, 4r <- 4 + 5, 6r <- 6 + 7
128  0r <- 0r + 2r, 4r <- 4r + 6r
129  0r <- 0r + 4r
130  4r <- 0r
131  2r <- 0r, 6r <- 4r
132  1r <- 0r, 3r <- 2r, 5r <- 4r, 7r <- 6r
133  """
134  reduced = [None] * 8
135  # Reduction level 1
136  for i in [0, 2, 4, 6]:
137  reduced[i] = net.Add(
138  [blobs[i], blobs[i + 1]],
139  blobs[i] + reduced_affix,
140  device_option=OnGPU(gpu_indices[i])
141  )
142  # Reduction level 2
143  for i in [0, 4]:
144  reduced[i] = net.Add(
145  [reduced[i], reduced[i + 2]],
146  str(blobs[i]) + reduced_affix,
147  device_option=OnGPU(gpu_indices[i])
148  )
149  # Reduction level 3: this involves a copy.
150  reduced_4_copy = reduced[4].Copy(
151  [],
152  str(reduced[4]) + '_copy',
153  device_option=OnGPU(gpu_indices[0])
154  )
155  reduced[0] = reduced[0].Add(
156  reduced_4_copy,
157  reduced[0],
158  device_option=OnGPU(gpu_indices[0])
159  )
160  # Broadcast level 1
161  reduced[4] = reduced[0].Copy(
162  [],
163  reduced[4],
164  device_option=OnGPU(gpu_indices[4])
165  )
166  # Broadcast level 2
167  for i in [2, 6]:
168  reduced[i] = reduced[i - 2].Copy(
169  [],
170  reduced[i],
171  device_option=OnGPU(gpu_indices[i])
172  )
173  # Broadcast level 3
174  for i in [1, 3, 5, 7]:
175  reduced[i] = reduced[i - 1].Copy(
176  [],
177  blobs[i] + reduced_affix,
178  device_option=OnGPU(gpu_indices[i])
179  )
180  return reduced
181 
182 
183 def AllreduceFallback(net, blobs, reduced_affix, gpu_indices):
184  """A fallback option for Allreduce with no assumption on p2p.
185 
186  Algorithm: a flat operation on gpu 0
187  0r <- 0
188  0r <- 0r + i for i in gpu_indices[1:]
189  ir <- 0r for i in gpu_indices[1:]
190  """
191  reduced = [None] * len(gpu_indices)
192  # copy first
193  reduced[0] = net.Copy(
194  blobs[0],
195  blobs[0] + reduced_affix,
196  device_option=OnGPU(gpu_indices[0])
197  )
198  # do temp copy and add
199  temp_name = reduced[0] + '_temp_copy'
200  for i in range(1, len(gpu_indices)):
201  temp = net.Copy(
202  blobs[i],
203  temp_name,
204  device_option=OnGPU(gpu_indices[0])
205  )
206  reduced[0] = reduced[0].Add(
207  temp,
208  reduced[0],
209  device_option=OnGPU(gpu_indices[0])
210  )
211  # Broadcast to everyone else
212  for i in range(1, len(gpu_indices)):
213  reduced[i] = net.Copy(
214  reduced[0],
215  blobs[i] + reduced_affix,
216  device_option=OnGPU(gpu_indices[i])
217  )
218  return reduced