openGPMP
Open Source Mathematics Package
threads.hpp
Go to the documentation of this file.
1 /*************************************************************************
2  *
3  * Project
4  * _____ _____ __ __ _____
5  * / ____| __ \| \/ | __ \
6  * ___ _ __ ___ _ __ | | __| |__) | \ / | |__) |
7  * / _ \| '_ \ / _ \ '_ \| | |_ | ___/| |\/| | ___/
8  *| (_) | |_) | __/ | | | |__| | | | | | | |
9  * \___/| .__/ \___|_| |_|\_____|_| |_| |_|_|
10  * | |
11  * |_|
12  *
13  * Copyright (C) Akiel Aries, <akiel@akiel.org>, et al.
14  *
15  * This software is licensed as described in the file LICENSE, which
16  * you should have received as part of this distribution. The terms
17  * among other details are referenced in the official documentation
18  * seen here : https://akielaries.github.io/openGPMP/ along with
19  * important files seen in this project.
20  *
21  * You may opt to use, copy, modify, merge, publish, distribute
22  * and/or sell copies of the Software, and permit persons to whom
23  * the Software is furnished to do so, under the terms of the
24  * LICENSE file. As this is an Open Source effort, all implementations
25  * must be of the same methodology.
26  *
27  *
28  *
29  * This software is distributed on an AS IS basis, WITHOUT
30  * WARRANTY OF ANY KIND, either express or implied.
31  *
32  ************************************************************************/
33 
38 #ifndef THREADS_HPP
39 #define THREADS_HPP
40 
41 #include <condition_variable>
42 #include <functional>
43 #include <future>
44 #include <mutex>
45 #include <queue>
46 #include <thread>
47 #include <vector>
48 
52 #define PARALLEL_FOR_BEGIN(nb_elements) gpmp::core::parallel_for(nb_elements, [&](int start, int end){ for(int i = start; i < end; ++i)
53 #define PARALLEL_FOR_END() \
54  })
55 
56 namespace gpmp {
57 
58 namespace core {
59 
60 class ThreadPool {
61  private:
62  // VECTOR of threads to execute tasks
63  std::vector<std::thread> workers;
64  // QUEUE of tasks to be executed
65  std::queue<std::function<void()>> tasks;
66  // MUTEX synchronizing access to the QUEUE of tasks
67  std::mutex queue_mutex;
68  // CONDITIONAL to notify waiting threads when queue gets populated
69  std::condition_variable condition;
70  // BOOL indicating if ThreadPool should stop execution
71  bool stop;
72 
73  public:
78  ThreadPool() : ThreadPool(std::thread::hardware_concurrency()) {
79  }
80 
88  explicit ThreadPool(int numThreads) : stop(false) {
89 
90  // traverse through the number of threads specified
91  for (int i = 0; i < numThreads; ++i) {
92  // add a new thread to the vector storing workers using lambda
93  // function
94  workers.emplace_back([this] {
95  for (;;) {
96  // worker thread creates task object that holds next task to
97  // be executed
98  std::function<void()> task_obj;
99 
100  // this "symbolizes" the critical section of the TheadPool
101  // class
102  {
103  // worker thread locks queue_mutex
104  std::unique_lock<std::mutex> lock(this->queue_mutex);
105  // wait on conditional_variable (ThreadPool stop OR
106  // queued task), wait() locks/unlocks based on condition
107  // result
108  this->condition.wait(lock, [this] {
109  return this->stop || !this->tasks.empty();
110  });
111  // based on stop OR awaiting tasks, return from the
112  // thread
113  if (this->stop && this->tasks.empty()) {
114  return;
115  }
116 
117  // if above isnt met, move first task in TASKS queue to
118  // the task object to transfer ownership
119  task_obj = std::move(this->tasks.front());
120 
121  // pop the handed off task to make room for a new one.
122  // only ONE thread should remove a task from the queue
123  // at a time
124  this->tasks.pop();
125  }
126 
127  // EXECUTE THE HANDED OFF TASK
128  task_obj();
129  }
130  });
131  }
132  }
133 
147  template <class F, class... Args>
148  auto enqueue(F &&f, Args &&...args)
149  //-> std::future<typename std::result_of<F(Args...)>::type> {
150  -> std::future<typename std::invoke_result<F, Args...>::type> {
151 
152  // this is the return type of the passed in function
153  // using return_type = typename std::result_of<F(Args...)>::type;
154  using return_type = typename std::invoke_result<F, Args...>::type;
155 
156  // * SHARED POINTER to PACKAGED TASK used to store the passed in i
157  // function + its arguments
158  // * std::bind used to create function object binded to the
159  // function `f` + its args to the packaged tasks
160  // * std::forward used for forwarding an argument to another
161  // function
162  auto task = std::make_shared<std::packaged_task<return_type()>>(
163  std::bind(std::forward<F>(f), std::forward<Args>(args)...));
164 
165  // the FUTURE obj retrieves the return value of the function passed in
166  std::future<return_type> res = task->get_future();
167  {
168  // aquire lock on queue_mutex for synchronization
169  std::unique_lock<std::mutex> lock(queue_mutex);
170  // check if threadpool stop is initiated
171  if (stop) {
172  throw std::runtime_error("enqueue on stopped ThreadPool");
173  }
174  // add a task using emplace to the queue as a lambda that calls the
175  // packaged task
176  tasks.emplace([task]() { (*task)(); });
177  } // once this is hit, unique_lock is out of scope & mutex is
178  // automatically unlocked
179  // notify one waiting thread of one new task added to the queue
180  condition.notify_one();
181  // the return is the future object
182  return res;
183  }
184 
186  {
187  // lock queue_mutex & set stop to true
188  std::unique_lock<std::mutex> lock(queue_mutex);
189  stop = true;
190  }
191  // unblock all threads
192  condition.notify_all();
193  // treaverse threads and join
194  for (std::thread &worker : workers) {
195  worker.join();
196  }
197  }
198 };
199 
205  public:
219  template <typename Function, typename... Args>
220  auto dispatch(ThreadPool &pool, Function &&func, Args &&...args)
221  //-> std::future<typename std::result_of<Function(Args...)>::type> {
222  -> std::future<typename std::invoke_result<Function, Args...>::type> {
223 
224  // enqueue the function call to the thread pool
225  auto result = pool.enqueue(std::forward<Function>(func),
226  std::forward<Args>(args)...);
227 
228  // return the future object to get the result later
229  return result;
230  }
231 };
232 
246 static void parallel_for(unsigned nb_elements,
247  std::function<void(int start, int end)> functor,
248  bool use_threads = true) {
249  // -------
250  unsigned nb_threads_hint = 10; // std::thread::hardware_concurrency();
251  unsigned nb_threads = nb_threads_hint == 0 ? 8 : (nb_threads_hint);
252 
253  unsigned batch_size = nb_elements / nb_threads;
254  unsigned batch_remainder = nb_elements % nb_threads;
255 
256  std::vector<std::thread> my_threads(nb_threads);
257 
258  if (use_threads) {
259  // Multithread execution
260  for (unsigned i = 0; i < nb_threads; ++i) {
261  int start = i * batch_size;
262  my_threads[i] = std::thread(functor, start, start + batch_size);
263  }
264  } else {
265  // Single thread execution (for easy debugging)
266  for (unsigned i = 0; i < nb_threads; ++i) {
267  int start = i * batch_size;
268  functor(start, start + batch_size);
269  }
270  }
271 
272  // Deform the elements left
273  int start = nb_threads * batch_size;
274  functor(start, start + batch_remainder);
275 
276  // Wait for the other thread to finish their task
277  if (use_threads) {
278  std::for_each(my_threads.begin(),
279  my_threads.end(),
280  std::mem_fn(&std::thread::join));
281  }
282 }
283 
284 } // namespace core
285 
286 } // namespace gpmp
287 
288 #endif
A class that provides a function to dispatch a function call to a thread pool and return a future obj...
Definition: threads.hpp:204
auto dispatch(ThreadPool &pool, Function &&func, Args &&...args) -> std::future< typename std::invoke_result< Function, Args... >::type >
Dispatches a function call to a ThreadPool and returns a future object for obtaining the result.
Definition: threads.hpp:220
std::condition_variable condition
Definition: threads.hpp:69
ThreadPool(int numThreads)
Constructs a ThreadPool with a given number of worker threads to dispatch functions.
Definition: threads.hpp:88
std::vector< std::thread > workers
Definition: threads.hpp:63
auto enqueue(F &&f, Args &&...args) -> std::future< typename std::invoke_result< F, Args... >::type >
Enqueues a task to the thread pool.
Definition: threads.hpp:148
std::mutex queue_mutex
Definition: threads.hpp:67
ThreadPool()
Default constructor that creates a ThreadPool with the number of threads.
Definition: threads.hpp:78
std::queue< std::function< void()> > tasks
Definition: threads.hpp:65
static void parallel_for(unsigned nb_elements, std::function< void(int start, int end)> functor, bool use_threads=true)
Thread rudimentary for loops.
Definition: threads.hpp:246
The source C++ openGPMP namespace.