13 #include <condition_variable> 48 void resize(std::size_t num_threads);
53 template <
class F,
class... Args>
54 auto enqueue(F&& f, Args&&... args)
55 -> std::future<
typename std::result_of<F(Args...)>::type>;
66 std::queue<std::function<void()>>
tasks_;
70 template <
class F,
class... Args>
72 -> std::future<
typename std::result_of<F(Args...)>::type>
74 using return_type =
typename std::result_of<F(Args...)>::type;
76 auto task = std::make_shared<std::packaged_task<return_type()>>(
77 std::bind(std::forward<F>(f), std::forward<Args>(args)...));
79 std::future<return_type> res = task->get_future();
81 std::unique_lock<std::mutex> lock(queue_mutex_);
84 if (do_stop_)
throw std::runtime_error(
"enqueue on stopped ThreadPool");
87 if (policy_ == POLICY_DROP_OLD)
89 while (tasks_.size() >= threads_.size())
95 tasks_.emplace([task]() { (*task)(); });
97 condition_.notify_one();
If a task arrives and there are more pending tasks than worker threads, drop previous tasks...
auto enqueue(F &&f, Args &&... args) -> std::future< typename std::result_of< F(Args...)>::type >
Enqueue one new working item, to be executed by threads when any is available.
WorkerThreadsPool()=default
std::atomic_bool do_stop_
WorkerThreadsPool(std::size_t num_threads, queue_policy_t p=POLICY_FIFO)
std::condition_variable condition_
std::vector< std::thread > threads_
std::queue< std::function< void()> > tasks_
void resize(std::size_t num_threads)
void clear()
Stops all working jobs.
std::size_t pendingTasks() const noexcept
Returns the number of enqueued tasks, currently waiting for a free working thread to process them...
Default policy: all tasks are executed in FIFO order.