Caffe2 - C++ API
A deep learning, cross platform ML framework
pthreadpool.cc
1 
17 /* Standard C headers */
18 #include <stdint.h>
19 #include <stdbool.h>
20 #include <stdlib.h>
21 #include <string.h>
22 #include <assert.h>
23 
24 /* POSIX headers */
25 #include <pthread.h>
26 #include <unistd.h>
27 
28 /* Library header */
29 #include "caffe2/core/logging.h"
30 #include "caffe2/utils/fixed_divisor.h"
31 #include "caffe2/utils/threadpool/pthreadpool.h"
32 
33 
34 static inline size_t divide_round_up(size_t dividend, size_t divisor) {
35  if (dividend % divisor == 0) {
36  return dividend / divisor;
37  } else {
38  return dividend / divisor + 1;
39  }
40 }
41 
42 static inline size_t min(size_t a, size_t b) {
43  return a < b ? a : b;
44 }
45 
47  pthreadpool_function_1d_tiled_t function;
48  void* argument;
49  size_t range;
50  size_t tile;
51 };
52 
53 static void compute_1d_tiled(const struct compute_1d_tiled_context* context, size_t linear_index) {
54  const size_t tile_index = linear_index;
55  const size_t index = tile_index * context->tile;
56  const size_t tile = min(context->tile, context->range - index);
57  context->function(context->argument, index, tile);
58 }
59 
60 void pthreadpool_compute_1d_tiled(
61  pthreadpool_t threadpool,
62  pthreadpool_function_1d_tiled_t function,
63  void* argument,
64  size_t range,
65  size_t tile)
66 {
67  if (threadpool == NULL) {
68  /* No thread pool provided: execute function sequentially on the calling thread */
69  for (size_t i = 0; i < range; i += tile) {
70  function(argument, i, min(range - i, tile));
71  }
72  } else {
73  /* Execute in parallel on the thread pool using linearized index */
74  const size_t tile_range = divide_round_up(range, tile);
75  struct compute_1d_tiled_context context = {
76  .function = function,
77  .argument = argument,
78  .range = range,
79  .tile = tile
80  };
81  pthreadpool_compute_1d(threadpool, (pthreadpool_function_1d_t) compute_1d_tiled, &context, tile_range);
82  }
83 }
84 
86  pthreadpool_function_2d_t function;
87  void* argument;
89 };
90 
91 static void compute_2d(const struct compute_2d_context* context, size_t linear_index) {
92  DCHECK_LE(linear_index, std::numeric_limits<int>::max());
93 
94  int q;
95  int r;
96  context->range_j.divMod((int) linear_index, q, r);
97  context->function(context->argument, q, r);
98 }
99 
100 void pthreadpool_compute_2d(
101  struct pthreadpool* threadpool,
102  pthreadpool_function_2d_t function,
103  void* argument,
104  size_t range_i,
105  size_t range_j)
106 {
107  if (threadpool == NULL) {
108  /* No thread pool provided: execute function sequentially on the calling thread */
109  for (size_t i = 0; i < range_i; i++) {
110  for (size_t j = 0; j < range_j; j++) {
111  function(argument, i, j);
112  }
113  }
114  } else {
115  DCHECK_LE(range_i * range_j, (size_t) std::numeric_limits<int>::max());
116  /* Execute in parallel on the thread pool using linearized index */
117  struct compute_2d_context context = {
118  .function = function,
119  .argument = argument,
120  .range_j = caffe2::FixedDivisor<int>(range_j)
121  };
122  pthreadpool_compute_1d(threadpool, (pthreadpool_function_1d_t) compute_2d, &context, range_i * range_j);
123  }
124 }
125 
127  pthreadpool_function_2d_tiled_t function;
128  void* argument;
129  caffe2::FixedDivisor<int> tile_range_j;
130  size_t range_i;
131  size_t range_j;
132  size_t tile_i;
133  size_t tile_j;
134 };
135 
136 static void compute_2d_tiled(const struct compute_2d_tiled_context* context, size_t linear_index) {
137  int q;
138  int r;
139 
140  context->tile_range_j.divMod(linear_index, q, r);
141  const size_t max_tile_i = context->tile_i;
142  const size_t max_tile_j = context->tile_j;
143  const size_t index_i = q * max_tile_i;
144  const size_t index_j = r * max_tile_j;
145  const size_t tile_i = min(max_tile_i, context->range_i - index_i);
146  const size_t tile_j = min(max_tile_j, context->range_j - index_j);
147  context->function(context->argument, index_i, index_j, tile_i, tile_j);
148 }
149 
150 void pthreadpool_compute_2d_tiled(
151  pthreadpool_t threadpool,
152  pthreadpool_function_2d_tiled_t function,
153  void* argument,
154  size_t range_i,
155  size_t range_j,
156  size_t tile_i,
157  size_t tile_j)
158 {
159  if (threadpool == NULL) {
160  /* No thread pool provided: execute function sequentially on the calling thread */
161  for (size_t i = 0; i < range_i; i += tile_i) {
162  for (size_t j = 0; j < range_j; j += tile_j) {
163  function(argument, i, j, min(range_i - i, tile_i), min(range_j - j, tile_j));
164  }
165  }
166  } else {
167  /* Execute in parallel on the thread pool using linearized index */
168  const size_t tile_range_i = divide_round_up(range_i, tile_i);
169  const size_t tile_range_j = divide_round_up(range_j, tile_j);
170  DCHECK_LE(tile_range_i * tile_range_j, (size_t) std::numeric_limits<int>::max());
171  struct compute_2d_tiled_context context = {
172  .function = function,
173  .argument = argument,
174  .tile_range_j = caffe2::FixedDivisor<int>(tile_range_j),
175  .range_i = range_i,
176  .range_j = range_j,
177  .tile_i = tile_i,
178  .tile_j = tile_j
179  };
180  pthreadpool_compute_1d(threadpool, (pthreadpool_function_1d_t) compute_2d_tiled, &context, tile_range_i * tile_range_j);
181  }
182 }