KratosMultiphysics
KRATOS Multiphysics (Kratos) is a framework for building parallel, multi-disciplinary simulation software, aiming at modularity, extensibility, and high performance. Kratos is written in C++, and counts with an extensive Python interface.
parallel_utilities.h
Go to the documentation of this file.
1 // | / |
2 // ' / __| _` | __| _ \ __|
3 // . \ | ( | | ( |\__ `
4 // _|\_\_| \__,_|\__|\___/ ____/
5 // Multi-Physics
6 //
7 // License: BSD License
8 // Kratos default license: kratos/license.txt
9 //
10 // Main authors: Riccardo Rossi
11 // Denis Demidov
12 // Philipp Bucher (https://github.com/philbucher)
13 //
14 
15 #pragma once
16 
17 // System includes
18 #include <iostream>
19 #include <array>
20 #include <iterator>
21 #include <vector>
22 #include <tuple>
23 #include <cmath>
24 #include <limits>
25 #include <future>
26 #include <thread>
27 #include <mutex>
28 
29 // External includes
30 #ifdef KRATOS_SMP_OPENMP
31 #include <omp.h>
32 #endif
33 
34 // Project includes
35 #include "includes/define.h"
37 #include "includes/lock_object.h"
38 
39 #define KRATOS_CRITICAL_SECTION const std::lock_guard scope_lock(ParallelUtilities::GetGlobalLock());
40 
41 #define KRATOS_PREPARE_CATCH_THREAD_EXCEPTION std::stringstream err_stream;
42 
43 #define KRATOS_CATCH_THREAD_EXCEPTION \
44 } catch(Exception& e) { \
45  KRATOS_CRITICAL_SECTION \
46  err_stream << "Thread #" << i << " caught exception: " << e.what(); \
47 } catch(std::exception& e) { \
48  KRATOS_CRITICAL_SECTION \
49  err_stream << "Thread #" << i << " caught exception: " << e.what(); \
50 } catch(...) { \
51  KRATOS_CRITICAL_SECTION \
52  err_stream << "Thread #" << i << " caught unknown exception:"; \
53 }
54 
55 #define KRATOS_CHECK_AND_THROW_THREAD_EXCEPTION \
56 const std::string& err_msg = err_stream.str(); \
57 KRATOS_ERROR_IF_NOT(err_msg.empty()) << "The following errors occured in a parallel region!\n" << err_msg << std::endl;
58 
59 namespace Kratos
60 {
62 
64 
67 class KRATOS_API(KRATOS_CORE) ParallelUtilities
68 {
69 public:
72 
76 
80  [[nodiscard]] static int GetNumThreads();
81 
85  static void SetNumThreads(const int NumThreads);
86 
91  [[nodiscard]] static int GetNumProcs();
92 
94 
99  [[nodiscard]] static LockObject& GetGlobalLock();
100 
102 
103 private:
106 
107  static LockObject* mspGlobalLock;
108 
109  static int* mspNumThreads;
110 
114 
116  ParallelUtilities() = delete;
117 
121  static int InitializeNumberOfThreads();
123 
126 
127  static int& GetNumberOfThreads();
128 
130 }; // Class ParallelUtilities
131 
132 
133 //***********************************************************************************
134 //***********************************************************************************
135 //***********************************************************************************
140 template<class TIterator, int MaxThreads=Globals::MaxAllowedThreads>
142 {
143 public:
148  BlockPartition(TIterator it_begin,
149  TIterator it_end,
150  int Nchunks = ParallelUtilities::GetNumThreads())
151  {
152  static_assert(
153  std::is_same_v<typename std::iterator_traits<TIterator>::iterator_category, std::random_access_iterator_tag>,
154  "BlockPartition requires random access iterators to divide the input range into partitions"
155  );
156  KRATOS_ERROR_IF(Nchunks < 1) << "Number of chunks must be > 0 (and not " << Nchunks << ")" << std::endl;
157 
158  const std::ptrdiff_t size_container = it_end-it_begin;
159 
160  if (size_container == 0) {
161  mNchunks = Nchunks;
162  } else {
163  // in case the container is smaller than the number of chunks
164  mNchunks = std::min(static_cast<int>(size_container), Nchunks);
165  }
166  const std::ptrdiff_t block_partition_size = size_container / mNchunks;
167  mBlockPartition[0] = it_begin;
168  mBlockPartition[mNchunks] = it_end;
169  for (int i=1; i<mNchunks; i++) {
170  mBlockPartition[i] = mBlockPartition[i-1] + block_partition_size;
171  }
172  }
173 
177  template <class TUnaryFunction>
178  inline void for_each(TUnaryFunction&& f)
179  {
181 
182  #pragma omp parallel for
183  for (int i=0; i<mNchunks; ++i) {
184  KRATOS_TRY
185  for (auto it = mBlockPartition[i]; it != mBlockPartition[i+1]; ++it) {
186  f(*it); //note that we pass the value to the function, not the iterator
187  }
189  }
190 
192  }
193 
199  template <class TReducer, class TUnaryFunction>
200  [[nodiscard]] inline typename TReducer::return_type for_each(TUnaryFunction &&f)
201  {
203 
204  TReducer global_reducer;
205  #pragma omp parallel for
206  for (int i=0; i<mNchunks; ++i) {
207  KRATOS_TRY
208  TReducer local_reducer;
209  for (auto it = mBlockPartition[i]; it != mBlockPartition[i+1]; ++it) {
210  local_reducer.LocalReduce(f(*it));
211  }
212  global_reducer.ThreadSafeReduce(local_reducer);
214  }
215 
217 
218  return global_reducer.GetValue();
219  }
220 
225  template <class TThreadLocalStorage, class TFunction>
226  inline void for_each(const TThreadLocalStorage& rThreadLocalStoragePrototype, TFunction &&f)
227  {
228  static_assert(std::is_copy_constructible<TThreadLocalStorage>::value, "TThreadLocalStorage must be copy constructible!");
229 
231 
232  #pragma omp parallel
233  {
234  // copy the prototype to create the thread local storage
235  TThreadLocalStorage thread_local_storage(rThreadLocalStoragePrototype);
236 
237  #pragma omp for
238  for(int i=0; i<mNchunks; ++i){
239  KRATOS_TRY
240  for (auto it = mBlockPartition[i]; it != mBlockPartition[i+1]; ++it){
241  f(*it, thread_local_storage); // note that we pass the value to the function, not the iterator
242  }
244  }
245  }
247  }
248 
255  template <class TReducer, class TThreadLocalStorage, class TFunction>
256  [[nodiscard]] inline typename TReducer::return_type for_each(const TThreadLocalStorage& rThreadLocalStoragePrototype, TFunction &&f)
257  {
258  static_assert(std::is_copy_constructible<TThreadLocalStorage>::value, "TThreadLocalStorage must be copy constructible!");
259 
261 
262  TReducer global_reducer;
263 
264  #pragma omp parallel
265  {
266  // copy the prototype to create the thread local storage
267  TThreadLocalStorage thread_local_storage(rThreadLocalStoragePrototype);
268 
269  #pragma omp for
270  for (int i=0; i<mNchunks; ++i) {
271  KRATOS_TRY
272  TReducer local_reducer;
273  for (auto it = mBlockPartition[i]; it != mBlockPartition[i+1]; ++it) {
274  local_reducer.LocalReduce(f(*it, thread_local_storage));
275  }
276  global_reducer.ThreadSafeReduce(local_reducer);
278  }
279  }
281  return global_reducer.GetValue();
282  }
283 
284 private:
285  int mNchunks;
286  std::array<TIterator, MaxThreads> mBlockPartition;
287 };
288 
296 template <class TIterator,
297  class TFunction,
298  std::enable_if_t<std::is_base_of_v<std::input_iterator_tag, typename std::iterator_traits<TIterator>::iterator_category>,bool> = true>
299 void block_for_each(TIterator itBegin, TIterator itEnd, TFunction&& rFunction)
300 {
301  BlockPartition<TIterator>(itBegin, itEnd).for_each(std::forward<TFunction>(rFunction));
302 }
303 
312 template <class TReduction,
313  class TIterator,
314  class TFunction,
315  std::enable_if_t<std::is_base_of_v<std::input_iterator_tag, typename std::iterator_traits<TIterator>::iterator_category>,bool> = true>
316 [[nodiscard]] typename TReduction::return_type block_for_each(TIterator itBegin, TIterator itEnd, TFunction&& rFunction)
317 {
318  return BlockPartition<TIterator>(itBegin, itEnd).template for_each<TReduction>(std::forward<TFunction>(std::forward<TFunction>(rFunction)));
319 }
320 
330 template <class TIterator,
331  class TTLS,
332  class TFunction,
333  std::enable_if_t<std::is_base_of_v<std::input_iterator_tag, typename std::iterator_traits<TIterator>::iterator_category>,bool> = true>
334 void block_for_each(TIterator itBegin, TIterator itEnd, const TTLS& rTLS, TFunction &&rFunction)
335 {
336  BlockPartition<TIterator>(itBegin, itEnd).for_each(rTLS, std::forward<TFunction>(rFunction));
337 }
338 
349 template <class TReduction,
350  class TIterator,
351  class TTLS,
352  class TFunction,
353  std::enable_if_t<std::is_base_of_v<std::input_iterator_tag, typename std::iterator_traits<TIterator>::iterator_category>,bool> = true>
354 [[nodiscard]] typename TReduction::return_type block_for_each(TIterator itBegin, TIterator itEnd, const TTLS& tls, TFunction&& rFunction)
355 {
356  return BlockPartition<TIterator>(itBegin, itEnd).template for_each<TReduction>(tls, std::forward<TFunction>(std::forward<TFunction>(rFunction)));
357 }
358 
365 template <class TContainerType,
366  class TFunctionType,
367  std::enable_if_t<!std::is_same_v<
368  std::iterator_traits<typename decltype(std::declval<std::remove_cv_t<TContainerType>>().begin())::value_type>,
369  void
370  >, bool> = true
371  >
372 void block_for_each(TContainerType &&v, TFunctionType &&func)
373 {
374  block_for_each(v.begin(), v.end(), std::forward<TFunctionType>(func));
375 }
376 
384 template <class TReducer,
385  class TContainerType,
386  class TFunctionType,
387  std::enable_if_t<!std::is_same_v<
388  std::iterator_traits<typename decltype(std::declval<std::remove_cv_t<TContainerType>>().begin())::value_type>,
389  void
390  >, bool> = true
391  >
392 [[nodiscard]] typename TReducer::return_type block_for_each(TContainerType &&v, TFunctionType &&func)
393 {
394  return block_for_each<TReducer>(v.begin(), v.end(), std::forward<TFunctionType>(func));
395 }
396 
405 template <class TContainerType,
406  class TThreadLocalStorage,
407  class TFunctionType,
408  std::enable_if_t<!std::is_same_v<
409  std::iterator_traits<typename decltype(std::declval<std::remove_cv_t<TContainerType>>().begin())::value_type>,
410  void
411  >, bool> = true
412  >
413 void block_for_each(TContainerType &&v, const TThreadLocalStorage& tls, TFunctionType &&func)
414 {
415  block_for_each(v.begin(), v.end(), tls, std::forward<TFunctionType>(func));
416 }
417 
427 template <class TReducer,
428  class TContainerType,
429  class TThreadLocalStorage,
430  class TFunctionType,
431  std::enable_if_t<!std::is_same_v<
432  std::iterator_traits<typename decltype(std::declval<std::remove_cv_t<TContainerType>>().begin())::value_type>,
433  void
434  >, bool> = true
435  >
436 [[nodiscard]] typename TReducer::return_type block_for_each(TContainerType &&v, const TThreadLocalStorage& tls, TFunctionType &&func)
437 {
438  return block_for_each<TReducer>(v.begin(), v.end(), tls, std::forward<TFunctionType>(func));
439 }
440 
441 //***********************************************************************************
442 //***********************************************************************************
443 //***********************************************************************************
449 template<class TIndexType=std::size_t, int TMaxThreads=Globals::MaxAllowedThreads>
451 {
452 public:
453 
458  IndexPartition(TIndexType Size,
459  int Nchunks = ParallelUtilities::GetNumThreads())
460  {
461  KRATOS_ERROR_IF(Nchunks < 1) << "Number of chunks must be > 0 (and not " << Nchunks << ")" << std::endl;
462 
463  if (Size == 0) {
464  mNchunks = Nchunks;
465  } else {
466  // in case the container is smaller than the number of chunks
467  mNchunks = std::min(static_cast<int>(Size), Nchunks);
468  }
469 
470  const int block_partition_size = Size / mNchunks;
471  mBlockPartition[0] = 0;
472  mBlockPartition[mNchunks] = Size;
473  for (int i=1; i<mNchunks; i++) {
474  mBlockPartition[i] = mBlockPartition[i-1] + block_partition_size;
475  }
476 
477  }
478 
479  //NOT COMMENTING IN DOXYGEN - THIS SHOULD BE SORT OF HIDDEN UNTIL GIVEN PRIME TIME
480  //pure c++11 version (can handle exceptions)
481  template <class TUnaryFunction>
482  inline void for_pure_c11(TUnaryFunction &&f)
483  {
484  std::vector< std::future<void> > runners(mNchunks);
485  const auto& partition = mBlockPartition;
486  for (int i=0; i<mNchunks; ++i) {
487  runners[i] = std::async(std::launch::async, [&partition, i, &f]()
488  {
489  for (auto k = partition[i]; k < partition[i+1]; ++k) {
490  f(k);
491  }
492  });
493  }
494 
495  //here we impose a syncronization and we check the exceptions
496  for(int i=0; i<mNchunks; ++i) {
497  try {
498  runners[i].get();
499  }
500  catch(Exception& e) {
501  KRATOS_ERROR << std::endl << "THREAD number: " << i << " caught exception " << e.what() << std::endl;
502  } catch(std::exception& e) {
503  KRATOS_ERROR << std::endl << "THREAD number: " << i << " caught exception " << e.what() << std::endl;
504  } catch(...) {
505  KRATOS_ERROR << std::endl << "unknown error" << std::endl;
506  }
507  }
508  }
509 
513  template <class TUnaryFunction>
514  inline void for_each(TUnaryFunction &&f)
515  {
517 
518  #pragma omp parallel for
519  for (int i=0; i<mNchunks; ++i) {
520  KRATOS_TRY
521  for (auto k = mBlockPartition[i]; k < mBlockPartition[i+1]; ++k) {
522  f(k); //note that we pass a reference to the value, not the iterator
523  }
525  }
527  }
528 
534  template <class TReducer, class TUnaryFunction>
535  [[nodiscard]] inline typename TReducer::return_type for_each(TUnaryFunction &&f)
536  {
538 
539  TReducer global_reducer;
540  #pragma omp parallel for
541  for (int i=0; i<mNchunks; ++i) {
542  KRATOS_TRY
543  TReducer local_reducer;
544  for (auto k = mBlockPartition[i]; k < mBlockPartition[i+1]; ++k) {
545  local_reducer.LocalReduce(f(k));
546  }
547  global_reducer.ThreadSafeReduce(local_reducer);
549  }
551  return global_reducer.GetValue();
552  }
553 
554 
559  template <class TThreadLocalStorage, class TFunction>
560  inline void for_each(const TThreadLocalStorage& rThreadLocalStoragePrototype, TFunction &&f)
561  {
562  static_assert(std::is_copy_constructible<TThreadLocalStorage>::value, "TThreadLocalStorage must be copy constructible!");
563 
565 
566  #pragma omp parallel
567  {
568  // copy the prototype to create the thread local storage
569  TThreadLocalStorage thread_local_storage(rThreadLocalStoragePrototype);
570 
571  #pragma omp for
572  for (int i=0; i<mNchunks; ++i) {
573  KRATOS_TRY
574  for (auto k = mBlockPartition[i]; k < mBlockPartition[i+1]; ++k) {
575  f(k, thread_local_storage); //note that we pass a reference to the value, not the iterator
576  }
578  }
579  }
581  }
582 
589  template <class TReducer, class TThreadLocalStorage, class TFunction>
590  [[nodiscard]] inline typename TReducer::return_type for_each(const TThreadLocalStorage& rThreadLocalStoragePrototype, TFunction &&f)
591  {
592  static_assert(std::is_copy_constructible<TThreadLocalStorage>::value, "TThreadLocalStorage must be copy constructible!");
593 
595 
596  TReducer global_reducer;
597 
598  #pragma omp parallel
599  {
600  // copy the prototype to create the thread local storage
601  TThreadLocalStorage thread_local_storage(rThreadLocalStoragePrototype);
602 
603  #pragma omp for
604  for (int i=0; i<mNchunks; ++i) {
605  KRATOS_TRY
606  TReducer local_reducer;
607  for (auto k = mBlockPartition[i]; k < mBlockPartition[i+1]; ++k) {
608  local_reducer.LocalReduce(f(k, thread_local_storage));
609  }
610  global_reducer.ThreadSafeReduce(local_reducer);
612  }
613  }
615 
616  return global_reducer.GetValue();
617  }
618 
619 private:
620  int mNchunks;
621  std::array<TIndexType, TMaxThreads> mBlockPartition;
622 };
623 
624 } // namespace Kratos.
625 
626 #undef KRATOS_PREPARE_CATCH_THREAD_EXCEPTION
627 #undef KRATOS_CATCH_THREAD_EXCEPTION
628 #undef KRATOS_CHECK_AND_THROW_THREAD_EXCEPTION
Definition: parallel_utilities.h:142
TReducer::return_type for_each(TUnaryFunction &&f)
loop allowing reductions. f called on every entry in rData the function f needs to return the values ...
Definition: parallel_utilities.h:200
TReducer::return_type for_each(const TThreadLocalStorage &rThreadLocalStoragePrototype, TFunction &&f)
loop with thread local storage (TLS) allowing reductions. f called on every entry in rData the functi...
Definition: parallel_utilities.h:256
BlockPartition(TIterator it_begin, TIterator it_end, int Nchunks=ParallelUtilities::GetNumThreads())
Definition: parallel_utilities.h:148
void for_each(const TThreadLocalStorage &rThreadLocalStoragePrototype, TFunction &&f)
loop with thread local storage (TLS). f called on every entry in rData
Definition: parallel_utilities.h:226
void for_each(TUnaryFunction &&f)
simple iteration loop. f called on every entry in rData
Definition: parallel_utilities.h:178
Extends the std::exception class with more information about error location.
Definition: exception.h:49
This class is useful for index iteration over containers.
Definition: parallel_utilities.h:451
TReducer::return_type for_each(TUnaryFunction &&f)
Definition: parallel_utilities.h:535
TReducer::return_type for_each(const TThreadLocalStorage &rThreadLocalStoragePrototype, TFunction &&f)
Definition: parallel_utilities.h:590
IndexPartition(TIndexType Size, int Nchunks=ParallelUtilities::GetNumThreads())
constructor using the size of the partition to be used
Definition: parallel_utilities.h:458
void for_each(const TThreadLocalStorage &rThreadLocalStoragePrototype, TFunction &&f)
loop with thread local storage (TLS). f called on every entry in rData
Definition: parallel_utilities.h:560
void for_pure_c11(TUnaryFunction &&f)
Definition: parallel_utilities.h:482
void for_each(TUnaryFunction &&f)
Definition: parallel_utilities.h:514
This class defines and stores a lock and gives an interface to it.
Definition: lock_object.h:42
Shared memory parallelism related helper class.
Definition: parallel_utilities.h:68
static int GetNumThreads()
Returns the current number of threads.
Definition: parallel_utilities.cpp:34
#define KRATOS_TRY
Definition: define.h:109
#define KRATOS_ERROR
Definition: exception.h:161
#define KRATOS_ERROR_IF(conditional)
Definition: exception.h:162
static double min(double a, double b)
Definition: GeometryFunctions.h:71
TSpaceType::IndexType Size(TSpaceType &dummy, typename TSpaceType::VectorType const &rV)
Definition: add_strategies_to_python.cpp:111
REF: G. R. Cowper, GAUSSIAN QUADRATURE FORMULAS FOR TRIANGLES.
Definition: mesh_condition.cpp:21
REACTION_CHECK_STIFFNESS_FACTOR INNER_LOOP_ITERATION DISTANCE_THRESHOLD ACTIVE_CHECK_FACTOR AUXILIAR_COORDINATES NORMAL_GAP WEIGHTED_GAP WEIGHTED_SCALAR_RESIDUAL bool
Definition: contact_structural_mechanics_application_variables.h:93
void block_for_each(TIterator itBegin, TIterator itEnd, TFunction &&rFunction)
Execute a functor on all items of a range in parallel.
Definition: parallel_utilities.h:299
def func(args)
Definition: fde_solve.py:101
v
Definition: generate_convection_diffusion_explicit_element.py:114
f
Definition: generate_convection_diffusion_explicit_element.py:112
tuple const
Definition: ode_solve.py:403
int k
Definition: quadrature.py:595
integer i
Definition: TensorModule.f:17
e
Definition: run_cpp_mpi_tests.py:31
#define KRATOS_CATCH_THREAD_EXCEPTION
Definition: parallel_utilities.h:43
#define KRATOS_CHECK_AND_THROW_THREAD_EXCEPTION
Definition: parallel_utilities.h:55
#define KRATOS_PREPARE_CATCH_THREAD_EXCEPTION
Definition: parallel_utilities.h:41