Bridge++  Ver. 2.0.2
threadManager.cpp
Go to the documentation of this file.
1 
15 
16 #include <omp.h>
17 
19 #include "IO/bridgeIO.h"
20 using Bridge::vout;
21 
22 // fix bug in global reduction/maximization for large size of data
23 // [2023.04.14 I.Kanamori]
24 
25 //====================================================================
27  template<typename REALTYPE>
28  void sum_global(REALTYPE *a,
29  const int num,
30  std::vector<REALTYPE>& array_reduction,
31  const int each_buf_size,
32  const int ith, const int nth)
33  {
34  typedef REALTYPE real_t;
35  int remaining = num;
36  std::vector<real_t> sum;
37  real_t *psum = nullptr;
38 
39 #pragma omp master
40  {
41  sum.resize(num);
42  for (int i = 0; i < num; i++) {
43  sum[i] = 0;
44  }
45  psum = &sum[0];
46  }
47 
48  real_t *pa = a;
49  while (remaining > 0) // sum over threads; shared buffer size is each_buf_Size
50  {
51  const int n = (remaining < each_buf_size) ? remaining : each_buf_size;
52  for (int i = 0; i < n; ++i) {
53  array_reduction[ith * each_buf_size + i] = pa[i];
54  }
55 #pragma omp barrier
56 #pragma omp master
57  {
58  for (int i = 0; i < nth; ++i) {
59  for (int j = 0; j < n; ++j) {
60  psum[j] += array_reduction[i * each_buf_size + j];
61  }
62  }
63  psum += each_buf_size;
64  } // master
65  pa += each_buf_size;
66  remaining -= each_buf_size;
67 #pragma omp barrier
68  } // sum over threads, done
69 
70 #pragma omp master
71  {
72  Communicator::reduce_sum(num, a, &sum[0], 0);
73  } // a in the master threads knows the global sum
74 
75  remaining = num;
76  pa = a;
77  const int total_buf_size = each_buf_size * nth;
78  while (remaining > 0) // distributes the sum to each thread
79  {
80  const int n = (remaining < total_buf_size) ? remaining : total_buf_size;
81 #pragma omp master
82  {
83  for (int i = 0; i < n; ++i) { // copy to the common buffer
84  array_reduction[i] = pa[i];
85  }
86  } // master
87 
88  // ensures to read updated m_darray_reduction
89 #pragma omp barrier
90  //#ifdef NECSX
91  //#pragma omp flush
92  //#else
93  //if(sizeof(real_t)==4){
94  //#pragma omp flush (ThreadManager::m_darray_reductionF)
95  // } else {
96  //#pragma omp flush (ThreadManager::m_darray_reduction)
97  // }
98  //#endif
99  for (int i = 0; i < n; ++i) { // copy from the common buffer
100  pa[i] = array_reduction[i];
101  }
102  pa += total_buf_size;
103  remaining -= total_buf_size;
104 #pragma omp barrier
105  } // distributes the sum to each thread, done
106  }
107 
108 
109  // global maximization is added [2021.12.25 H.Matsufuru]
110  template<typename REALTYPE>
111  void max_global(REALTYPE *a,
112  const int num,
113  std::vector<REALTYPE>& array_reduction,
114  const int each_buf_size,
115  const int ith, const int nth)
116  {
117  typedef REALTYPE real_t;
118  int remaining = num;
119  std::vector<real_t> vmax;
120  real_t *pmax = nullptr;
121 
122 #pragma omp master
123  {
124  vmax.resize(num);
125  for (int i = 0; i < num; ++i) {
126  vmax[i] = 0.0;
127  }
128  pmax = &vmax[0];
129  }
130 
131  real_t *pa = a;
132  while (remaining > 0) // max over threads; shared buffer size is each_buf_size
133  {
134  const int n = (remaining < each_buf_size) ? remaining : each_buf_size;
135  for (int i = 0; i < n; ++i) {
136  array_reduction[ith * each_buf_size + i] = pa[i];
137  }
138 #pragma omp barrier
139 #pragma omp master
140  {
141  for (int i = 0; i < nth; ++i) {
142  for (int j = 0; j < n; ++j) {
143 #ifdef NECSX
144  if (array_reduction[i * each_buf_size + j] > pmax[j]) {
145  pmax[j] = array_reduction[i * each_buf_size + j];
146  }
147 #else
148  pmax[j] = std::max(pmax[j], array_reduction[i * each_buf_size + j]);
149 #endif
150  }
151  }
152  pmax += each_buf_size;
153  } // master
154  pa += each_buf_size;
155  remaining -= each_buf_size;
156 #pragma omp barrier
157  } // maximize over threads, done
158 
159 #pragma omp master
160  {
161  Communicator::reduce_max(num, a, &vmax[0], 0);
162  } // a in the master threads knows the global max
163 
164  remaining = num;
165  pa = a;
166  const int total_buf_size = each_buf_size * nth;
167  while (remaining > 0) // distributes the max to each thread
168  {
169  const int n = (remaining < total_buf_size) ? remaining : total_buf_size;
170 #pragma omp master
171  {
172  for (int i = 0; i < n; ++i) { // copy to the common buffer
173  array_reduction[i] = pa[i];
174  }
175  } // master
176 
177  // ensures to read updated m_darray_reduction
178 #pragma omp barrier
179 
180  for (int i = 0; i < n; ++i) { // copy from the common buffer
181  pa[i] = array_reduction[i];
182  }
183  pa += total_buf_size;
184  remaining -= total_buf_size;
185 #pragma omp barrier
186  } // distributes the sum to each thread, done
187  }
188 } // namespace ThreadManager_Reduce
189 
190 //====================================================================
191 // initialization of static member variables.
192 
195 std::vector<dcomplex> ThreadManager::m_darray_reductionDC(0);
196 std::vector<double> ThreadManager::m_darray_reduction(0);
197 std::vector<float> ThreadManager::m_darray_reductionF(0);
198 
199 const std::string ThreadManager::class_name = "ThreadManager";
200 
201 //====================================================================
202 void ThreadManager::init(int Nthread)
203 {
205 
206  vout.general(m_vl, "%s: initialization\n", class_name.c_str());
207 
208  int Nthread_env = 0;
209 
210 #pragma omp parallel
211  {
212  if (omp_get_thread_num() == 0) {
213  Nthread_env = omp_get_num_threads();
214  }
215  }
216 
217 
218  if ((Nthread == Nthread_env) || (Nthread == 0)) {
219  m_Nthread = Nthread_env;
220  } else {
221  vout.general(m_vl, "Warning at %s: Nthread(env) != Nthread(input)\n", class_name.c_str());
222  vout.general(m_vl, " Number of threads(env) = %d\n", Nthread_env);
223  vout.general(m_vl, " Number of threads(input) = %d\n", Nthread);
224  vout.general(m_vl, " reset Nthread = Nthread(input).\n");
225 
226  omp_set_num_threads(Nthread);
227  m_Nthread = Nthread;
228  }
229 
230  vout.general(m_vl, " Number of threads = %d\n", m_Nthread);
231 
235 }
236 
237 
238 //====================================================================
240 {
241  vout.paranoiac(m_vl, "%s: finalize.\n", class_name.c_str());
242 }
243 
244 
245 //====================================================================
247 {
248  return omp_get_num_threads();
249 }
250 
251 
252 //====================================================================
254 {
255  return omp_get_thread_num();
256 }
257 
258 
259 //====================================================================
261 {
262  int nth = get_num_threads();
263 
264  barrier(nth);
265 }
266 
267 
268 //====================================================================
270 {
271 #pragma omp barrier
272 }
273 
274 
275 //====================================================================
277 {
278 #pragma omp barrier
279 #pragma omp master
280  {
282  }
283 #pragma omp barrier
284 }
285 
286 
287 //====================================================================
289  const int ith, const int nth)
290 {
293  ith, nth);
294 }
295 
296 
297 //====================================================================
299  const int num,
300  const int ith, const int nth)
301 {
304  ith, nth);
305 }
306 
307 
308 //====================================================================
310  const int ith, const int nth)
311 {
314  ith, nth);
315 }
316 
317 
318 //====================================================================
320  const int num,
321  const int ith, const int nth)
322 {
325  ith, nth);
326 }
327 
328 
329 //====================================================================
331  const int ith, const int nth)
332 {
335  ith, nth);
336 }
337 
338 
339 //====================================================================
341  const int num,
342  const int ith, const int nth)
343 {
346  ith, nth);
347 }
348 
349 
350 //====================================================================
352  const int ith, const int nth)
353 {
356  ith, nth);
357 }
358 
359 
360 //====================================================================
362  const int num,
363  const int ith, const int nth)
364 {
367  ith, nth);
368 }
369 
370 
371 //====================================================================
372 void ThreadManager::assert_single_thread(const std::string& name)
373 {
374  int nth = get_num_threads();
375 
376  if (nth != 1) {
377  vout.crucial(m_vl, "\n");
378  vout.crucial(m_vl, "##### Caution #####\n");
379  vout.crucial(m_vl, "Single-thread %s is called in parallel region.\n",
380  name.c_str());
381  vout.crucial(m_vl, "Current number of thread = %d.\n", nth);
382 
383  exit(EXIT_FAILURE);
384  }
385 }
386 
387 
388 //============================================================END=====
Communicator::sync
static int sync()
synchronize within small world.
Definition: communicator.cpp:140
bridgeIO.h
ThreadManager::each_buf_size
static const int each_buf_size
reduction buffer size for each thread (double)
Definition: threadManager.h:62
ThreadManager::m_Nthread
static int m_Nthread
number of threads.
Definition: threadManager.h:46
ThreadManager::get_num_threads
static int get_num_threads()
returns available number of threads.
Definition: threadManager.cpp:246
Communicator::reduce_max
static int reduce_max(int count, double *recv_buf, double *send_buf, int pattern=0)
find a global maximum of an array of double over the communicator. pattern specifies the dimensions t...
Definition: communicator.cpp:290
ThreadManager::m_darray_reduction
static std::vector< double > m_darray_reduction
Definition: threadManager.h:49
ThreadManager::barrier
static void barrier(const int Nthread)
barrier among threads inside a node.
Definition: threadManager.cpp:269
real_t
double real_t
Definition: bridgeQXS_Clover_coarse_double.cpp:16
Bridge::BridgeIO::paranoiac
void paranoiac(const char *format,...)
Definition: bridgeIO.cpp:238
ThreadManager_Reduce::max_global
void max_global(REALTYPE *a, const int num, std::vector< REALTYPE > &array_reduction, const int each_buf_size, const int ith, const int nth)
Definition: threadManager.cpp:111
ThreadManager::each_buf_sizeF
static const int each_buf_sizeF
reduction buffer size for each thread (float)
Definition: threadManager.h:68
ThreadManager_Reduce::sum_global
void sum_global(REALTYPE *a, const int num, std::vector< REALTYPE > &array_reduction, const int each_buf_size, const int ith, const int nth)
Definition: threadManager.cpp:28
Communicator::reduce_sum
static int reduce_sum(int count, dcomplex *recv_buf, dcomplex *send_buf, int pattern=0)
make a global sum of an array of dcomplex over the communicator. pattern specifies the dimensions to ...
Definition: communicator.cpp:263
ThreadManager::reduce_max_global
static void reduce_max_global(double *value, const int num, const int i_thread, const int Nthread)
global reduction with max for an array: double values are assumed thread local.
Definition: threadManager.cpp:361
ThreadManager::sync_barrier_all
static void sync_barrier_all()
barrier among all the threads and nodes.
Definition: threadManager.cpp:276
ThreadManager::init
static void init(int Nthread)
setup: called in main only once.
Definition: threadManager.cpp:202
ThreadManager::m_vl
static Bridge::VerboseLevel m_vl
verbose level.
Definition: threadManager.h:47
ThreadManager::reduce_sum_global
static void reduce_sum_global(dcomplex &value, const int i_thread, const int Nthread)
global reduction with summation: dcomplex values are assumed thread local.
Definition: threadManager.cpp:288
threadManager.h
ThreadManager::class_name
static const std::string class_name
Definition: threadManager.h:77
ThreadManager::m_darray_reductionDC
static std::vector< dcomplex > m_darray_reductionDC
Definition: threadManager.h:48
ThreadManager::wait
static void wait()
barrier among threads inside a node.
Definition: threadManager.cpp:260
ThreadManager::m_darray_reductionF
static std::vector< float > m_darray_reductionF
Definition: threadManager.h:50
CommonParameters::Vlevel
static Bridge::VerboseLevel Vlevel()
Definition: commonParameters.h:122
ThreadManager::finalize
static void finalize()
finalization.
Definition: threadManager.cpp:239
Bridge::BridgeIO::crucial
void crucial(const char *format,...)
Definition: bridgeIO.cpp:180
communicator.h
ThreadManager_Reduce
Definition: threadManager.cpp:26
ThreadManager::get_thread_id
static int get_thread_id()
returns thread id.
Definition: threadManager.cpp:253
Bridge::VerboseLevel
VerboseLevel
Definition: bridgeIO.h:42
Bridge::CRUCIAL
@ CRUCIAL
Definition: bridgeIO.h:44
Bridge::BridgeIO::general
void general(const char *format,...)
Definition: bridgeIO.cpp:200
ThreadManager::assert_single_thread
static void assert_single_thread(const std::string &class_name)
assert currently running on single thread.
Definition: threadManager.cpp:372
Bridge::vout
BridgeIO vout
Definition: bridgeIO.cpp:512
ThreadManager::each_buf_sizeDC
static const int each_buf_sizeDC
reduction buffer size for each thread (dcomplex)
Definition: threadManager.h:56