Commit c1e7bdf9 authored by Peter Bell's avatar Peter Bell
Browse files

Better fix for possible race condition.

Avoid putting threads to sleep if there are any unscheduled tasks in flight.
parent 0eb0e272
......@@ -572,8 +572,8 @@ template <typename T> class concurrent_queue
void push(T val)
{
++size_;
lock_t lock(mut_);
++size_;
q_.push(std::move(val));
}
......@@ -584,11 +584,13 @@ template <typename T> class concurrent_queue
// Queue might have been emptied while we acquired the lock
if (q_.empty()) return false;
--size_;
val = std::move(q_.front());
--size_;
q_.pop();
return true;
}
bool empty() const { return size_==0; }
};
// C++ allocator with support for over-aligned types
......@@ -605,7 +607,7 @@ template <typename T> struct aligned_allocator
return static_cast<T*>(mem);
}
void deallocate(T *p, size_t n)
void deallocate(T *p, size_t /*n*/)
{ aligned_dealloc(p); }
};
......@@ -621,24 +623,49 @@ class thread_pool
std::atomic_flag busy_flag = ATOMIC_FLAG_INIT;
std::function<void()> work;
void worker_main(std::atomic<bool> & shutdown_flag,
concurrent_queue<std::function<void()>> &overflow_work)
void worker_main(
std::atomic<bool> &shutdown_flag,
std::atomic<size_t> &unscheduled_tasks,
concurrent_queue<std::function<void()>> &overflow_work)
{
using lock_t = std::unique_lock<std::mutex>;
lock_t lock(mut);
while (!shutdown_flag)
bool expect_work = true;
while (!shutdown_flag || expect_work)
{
// Wait to be woken by the thread pool with a piece of work
work_ready.wait(lock, [&]{ return (work || shutdown_flag); });
if (!work) continue;
work();
std::function<void()> local_work;
if (expect_work || unscheduled_tasks == 0)
{
lock_t lock(mut);
// Wait until there is work to be executed
work_ready.wait(lock, [&]{ return (work || shutdown_flag); });
local_work.swap(work);
expect_work = false;
}
// Execute any work which queued up while we were busy
while (overflow_work.try_pop(work)) work();
bool marked_busy = false;
if (local_work)
{
marked_busy = true;
local_work();
}
// Mark ourself as available before going back to sleep
work = nullptr;
busy_flag.clear();
if (!overflow_work.empty())
{
if (!marked_busy && busy_flag.test_and_set())
{
expect_work = true;
continue;
}
marked_busy = true;
while (overflow_work.try_pop(local_work))
{
--unscheduled_tasks;
local_work();
}
}
if (marked_busy) busy_flag.clear();
}
}
};
......@@ -647,6 +674,7 @@ class thread_pool
std::mutex mut_;
std::vector<worker, aligned_allocator<worker>> workers_;
std::atomic<bool> shutdown_;
std::atomic<size_t> unscheduled_tasks_;
using lock_t = std::lock_guard<std::mutex>;
void create_threads()
......@@ -660,8 +688,10 @@ class thread_pool
auto *worker = &workers_[i];
worker->busy_flag.clear();
worker->work = nullptr;
worker->thread = std::thread(
[worker, this]{ worker->worker_main(shutdown_, overflow_work_); });
worker->thread = std::thread([worker, this]
{
worker->worker_main(shutdown_, unscheduled_tasks_, overflow_work_);
});
}
catch (...)
{
......@@ -695,34 +725,25 @@ class thread_pool
{
lock_t lock(mut_);
if (shutdown_)
throw std::runtime_error("Work item submitted after shutdown");
throw std::runtime_error("Work item submitted after shutdown");
auto submit_to_idle = [&](std::function<void()> &work) -> bool
{
for (auto &worker : workers_)
if (!worker.busy_flag.test_and_set())
{
{
lock_t lock(worker.mut);
worker.work = std::move(work);
}
worker.work_ready.notify_one();
return true;
}
return false;
};
++unscheduled_tasks_;
// First check for any idle workers and wake those
if (submit_to_idle(work)) return;
for (auto &worker : workers_)
if (!worker.busy_flag.test_and_set())
{
--unscheduled_tasks_;
{
lock_t lock(worker.mut);
worker.work = std::move(work);
}
worker.work_ready.notify_one();
return;
}
// If no workers were idle, push onto the overflow queue for later
overflow_work_.push(std::move(work));
// Possible race: All workers might have gone idle between the first
// submit attempt and pushing the work item into the queue. So, there
// could be no active workers to check the queue.
// Resolve with another check for idle workers.
std::function<void()> dummy_work = []{};
submit_to_idle(dummy_work);
}
void shutdown()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment