41 #include <condition_variable>
52 #define PARALLEL_FOR_BEGIN(nb_elements) gpmp::core::parallel_for(nb_elements, [&](int start, int end){ for(int i = start; i < end; ++i)
53 #define PARALLEL_FOR_END() \
65 std::queue<std::function<void()>>
tasks;
91 for (
int i = 0; i < numThreads; ++i) {
98 std::function<void()> task_obj;
104 std::unique_lock<std::mutex> lock(this->queue_mutex);
108 this->condition.wait(lock, [
this] {
109 return this->stop || !this->tasks.empty();
113 if (this->stop && this->tasks.empty()) {
119 task_obj = std::move(this->tasks.front());
147 template <
class F,
class... Args>
150 -> std::future<
typename std::invoke_result<F, Args...>::type> {
154 using return_type =
typename std::invoke_result<F, Args...>::type;
162 auto task = std::make_shared<std::packaged_task<return_type()>>(
163 std::bind(std::forward<F>(f), std::forward<Args>(args)...));
166 std::future<return_type>
res = task->get_future();
172 throw std::runtime_error(
"enqueue on stopped ThreadPool");
176 tasks.emplace([task]() { (*task)(); });
194 for (std::thread &worker :
workers) {
219 template <
typename Function,
typename... Args>
222 -> std::future<
typename std::invoke_result<Function, Args...>::type> {
225 auto result = pool.enqueue(std::forward<Function>(func),
226 std::forward<Args>(args)...);
247 std::function<
void(
int start,
int end)> functor,
248 bool use_threads =
true) {
250 unsigned nb_threads_hint = 10;
251 unsigned nb_threads = nb_threads_hint == 0 ? 8 : (nb_threads_hint);
253 unsigned batch_size = nb_elements / nb_threads;
254 unsigned batch_remainder = nb_elements % nb_threads;
256 std::vector<std::thread> my_threads(nb_threads);
260 for (
unsigned i = 0; i < nb_threads; ++i) {
261 int start = i * batch_size;
262 my_threads[i] = std::thread(functor, start, start + batch_size);
266 for (
unsigned i = 0; i < nb_threads; ++i) {
267 int start = i * batch_size;
268 functor(start, start + batch_size);
273 int start = nb_threads * batch_size;
274 functor(start, start + batch_remainder);
278 std::for_each(my_threads.begin(),
280 std::mem_fn(&std::thread::join));
A class that provides a function to dispatch a function call to a thread pool and return a future obj...
auto dispatch(ThreadPool &pool, Function &&func, Args &&...args) -> std::future< typename std::invoke_result< Function, Args... >::type >
Dispatches a function call to a ThreadPool and returns a future object for obtaining the result.
std::condition_variable condition
ThreadPool(int numThreads)
Constructs a ThreadPool with a given number of worker threads to dispatch functions.
std::vector< std::thread > workers
auto enqueue(F &&f, Args &&...args) -> std::future< typename std::invoke_result< F, Args... >::type >
Enqueues a task to the thread pool.
ThreadPool()
Default constructor that creates a ThreadPool with the number of threads.
std::queue< std::function< void()> > tasks
static void parallel_for(unsigned nb_elements, std::function< void(int start, int end)> functor, bool use_threads=true)
Thread rudimentary for loops.
The source C++ openGPMP namespace.