Caffe2 - Python API
A deep learning, cross platform ML framework
muji.py
1 ## @package muji
2 # Module caffe2.python.muji
3 """muji.py does multi-gpu training for caffe2 with no need to change the c++
4 side code. Everything is defined on the computation graph level.
5 
6 We support the following use cases:
7  - 2 gpus, where peer access is enabled between them.
8  - 4 gpus, where peer access are enabled between all of them.
9  - 4 gpus, where peer access are enabled in two groups,
10  between {1, 2} and {3, 4}
11  - 8 gpus, where peer access are enabled in two groups,
12  between {1, 2, 3, 4} and {5, 6, 7, 8}.
13 If above cases are not satisfied, a fallback function which does not rely on
14 peer access will be called.
15 """
16 
17 import numpy as np
18 
19 from caffe2.proto import caffe2_pb2
20 from caffe2.python import workspace
21 
22 
23 def OnGPU(gpu_id):
24  """A utility function that returns a device option protobuf of the
25  specified gpu id.
26  """
27  device_option = caffe2_pb2.DeviceOption()
28  device_option.device_type = workspace.GpuDeviceType
29  device_option.device_id = gpu_id
30  return device_option
31 
32 
33 def OnCPU():
34  device_option = caffe2_pb2.DeviceOption()
35  device_option.device_type = caffe2_pb2.CPU
36  return device_option
37 
38 
39 def Allreduce(net, blobs, reduced_affix="_reduced", gpu_indices=None):
40  """The general Allreduce interface that reroutes the function calls.
41  CPUs and AMD GPUs are not supported because
42  GetGpuPeerAccessPattern is called to get gpu peer access pattern.
43  """
44  if gpu_indices is None:
45  gpu_indices = list(range(len(blobs)))
46  if len(gpu_indices) != len(blobs):
47  raise RuntimeError(
48  "gpu_indices length and blobs length mismatch: %d vs %d" %
49  (len(gpu_indices), len(blobs))
50  )
51  pattern = workspace.GetGpuPeerAccessPattern()
52  if len(blobs) == 2 and pattern.shape[0] >= 2 and np.all(pattern[:2, :2]):
53  return Allreduce2(net, blobs, reduced_affix, gpu_indices)
54  elif len(blobs) == 4 and pattern.shape[0] >= 4 and np.all(pattern[:4, :4]):
55  return Allreduce4(net, blobs, reduced_affix, gpu_indices)
56  elif len(blobs) == 4 and pattern.shape[0] >= 4 and np.all(pattern[:2, :2]) and np.all(pattern[2:4, 2:4]):
57  return Allreduce4Group2(net, blobs, reduced_affix, gpu_indices)
58  elif len(blobs) == 8 and pattern.shape[0] >= 8 and np.all(pattern[:8, :8]):
59  return Allreduce8(net, blobs, reduced_affix, gpu_indices)
60  else:
61  return AllreduceFallback(net, blobs, reduced_affix, gpu_indices)
62 
63 
64 def Allreduce2(net, blobs, reduced_affix, gpu_indices):
65  """Allreduce for 2 gpus.
66 
67  Algorithm: 0r <- 0 + 1, 1r <- 0r, where r means "reduced"
68  """
69  a, b = blobs
70  gpu_a, gpu_b = gpu_indices
71  a_reduced = net.Add([a, b], a + reduced_affix, device_option=OnGPU(gpu_a))
72  b_reduced = a_reduced.Copy(
73  [],
74  b + reduced_affix,
75  device_option=OnGPU(gpu_b)
76  )
77  return a_reduced, b_reduced
78 
79 
80 def Allreduce4(net, blobs, reduced_affix, gpu_indices):
81  """Allreduce for 4 gpus.
82 
83  Algorithm: 2 level reduction.
84  0r <- 0 + 1, 2r <- 2 + 3
85  0r <- 0r + 2r
86  2r <- 0r,
87  1r <- 0r, 3r <- 2r
88  """
89  a, b, c, d = blobs
90  gpu_a, gpu_b, gpu_c, gpu_d = gpu_indices
91  # a_reduced <- a+b, c_reduced <- c + d
92  a_reduced = net.Add(
93  [a, b],
94  str(a) + reduced_affix,
95  device_option=OnGPU(gpu_a)
96  )
97  c_reduced = net.Add(
98  [c, d],
99  str(c) + reduced_affix,
100  device_option=OnGPU(gpu_c)
101  )
102  # a_reduced <- a_reduced + c_reduced
103  a_reduced = a_reduced.Add(c_reduced, a_reduced, device_option=OnGPU(gpu_a))
104  # broadcast a_reduced to c_reduced
105  c_reduced = a_reduced.Copy([], c_reduced, device_option=OnGPU(gpu_c))
106  # broadcast to b and d
107  b_reduced = a_reduced.Copy(
108  [],
109  str(b) + reduced_affix,
110  device_option=OnGPU(gpu_b)
111  )
112  d_reduced = c_reduced.Copy(
113  [],
114  str(d) + reduced_affix,
115  device_option=OnGPU(gpu_d)
116  )
117  return a_reduced, b_reduced, c_reduced, d_reduced
118 
119 
120 def Allreduce4Group2(net, blobs, reduced_affix, gpu_indices):
121  """Allreduce for 4 gpus where peer access are enabled in {0,1} and {2,3}
122 
123  Algorithm: 2 level reduction.
124  0r <- 0 + 1, 2r <- 2 + 3
125  0r <- 0r + 2r
126  2r <- 0r,
127  1r <- 0r, 3r <- 2r
128  """
129  a, b, c, d = blobs
130  gpu_a, gpu_b, gpu_c, gpu_d = gpu_indices
131  # a_reduced <- a+b, c_reduced <- c + d
132  a_reduced = net.Add(
133  [a, b],
134  str(a) + reduced_affix,
135  device_option=OnGPU(gpu_a)
136  )
137  c_reduced = net.Add(
138  [c, d],
139  str(c) + reduced_affix,
140  device_option=OnGPU(gpu_c)
141  )
142  # copy from c_reduce(gpu_c) to c_reduce_copy(gpu_a)
143  c_reduced_copy = c_reduced.Copy(
144  [],
145  str(c_reduced) + '_copy',
146  device_option=OnGPU(gpu_a)
147  )
148  # a_reduced <- a_reduced + c_reduced_copy
149  a_reduced = a_reduced.Add(c_reduced_copy, a_reduced, device_option=OnGPU(gpu_a))
150  # broadcast a_reduced to c_reduced
151  c_reduced = a_reduced.Copy([], c_reduced, device_option=OnGPU(gpu_c))
152  # broadcast to b and d
153  b_reduced = a_reduced.Copy(
154  [],
155  str(b) + reduced_affix,
156  device_option=OnGPU(gpu_b)
157  )
158  d_reduced = c_reduced.Copy(
159  [],
160  str(d) + reduced_affix,
161  device_option=OnGPU(gpu_d)
162  )
163  return a_reduced, b_reduced, c_reduced, d_reduced
164 
165 
166 def Allreduce8(net, blobs, reduced_affix, gpu_indices):
167  """Allreduce for 8 gpus.
168 
169  Algorithm: 3 level reduction.
170  0r <- 0 + 1, 2r <- 2 + 3, 4r <- 4 + 5, 6r <- 6 + 7
171  0r <- 0r + 2r, 4r <- 4r + 6r
172  0r <- 0r + 4r
173  4r <- 0r
174  2r <- 0r, 6r <- 4r
175  1r <- 0r, 3r <- 2r, 5r <- 4r, 7r <- 6r
176  """
177  reduced = [None] * 8
178  # Reduction level 1
179  for i in [0, 2, 4, 6]:
180  reduced[i] = net.Add(
181  [blobs[i], blobs[i + 1]],
182  blobs[i] + reduced_affix,
183  device_option=OnGPU(gpu_indices[i])
184  )
185  # Reduction level 2
186  for i in [0, 4]:
187  reduced[i] = net.Add(
188  [reduced[i], reduced[i + 2]],
189  str(blobs[i]) + reduced_affix,
190  device_option=OnGPU(gpu_indices[i])
191  )
192  # Reduction level 3: this involves a copy.
193  reduced_4_copy = reduced[4].Copy(
194  [],
195  str(reduced[4]) + '_copy',
196  device_option=OnGPU(gpu_indices[0])
197  )
198  reduced[0] = reduced[0].Add(
199  reduced_4_copy,
200  reduced[0],
201  device_option=OnGPU(gpu_indices[0])
202  )
203  # Broadcast level 1
204  reduced[4] = reduced[0].Copy(
205  [],
206  reduced[4],
207  device_option=OnGPU(gpu_indices[4])
208  )
209  # Broadcast level 2
210  for i in [2, 6]:
211  reduced[i] = reduced[i - 2].Copy(
212  [],
213  reduced[i],
214  device_option=OnGPU(gpu_indices[i])
215  )
216  # Broadcast level 3
217  for i in [1, 3, 5, 7]:
218  reduced[i] = reduced[i - 1].Copy(
219  [],
220  blobs[i] + reduced_affix,
221  device_option=OnGPU(gpu_indices[i])
222  )
223  return reduced
224 
225 
226 def AllreduceFallback(net, blobs, reduced_affix, gpu_indices):
227  """A fallback option for Allreduce with no assumption on p2p.
228 
229  Algorithm: a flat operation on gpu 0
230  0r <- 0
231  0r <- 0r + i for i in gpu_indices[1:]
232  ir <- 0r for i in gpu_indices[1:]
233  """
234  reduced = [None] * len(gpu_indices)
235  if reduced_affix != '':
236  # copy first
237  reduced[0] = net.Copy(
238  blobs[0],
239  blobs[0] + reduced_affix,
240  device_option=OnGPU(gpu_indices[0])
241  )
242  else:
243  reduced[0] = blobs[0]
244  # do temp copy and add
245  temp_name = reduced[0] + '_temp_copy'
246  for i in range(1, len(gpu_indices)):
247  temp = net.Copy(
248  blobs[i],
249  temp_name,
250  device_option=OnGPU(gpu_indices[0])
251  )
252  reduced[0] = net.Add(
253  [temp, reduced[0]],
254  reduced[0],
255  device_option=OnGPU(gpu_indices[0])
256  )
257  # Broadcast to everyone else
258  for i in range(1, len(gpu_indices)):
259  reduced[i] = net.Copy(
260  reduced[0],
261  blobs[i] + reduced_affix,
262  device_option=OnGPU(gpu_indices[i])
263  )
264  return reduced