Unverified Commit 712a860c authored by Test2, Test1 (martin.reinecke1@gmx.de)'s avatar Test2, Test1 (martin.reinecke1@gmx.de) Committed by GitHub
Browse files

Merge pull request #5 from peterbell10/threading-issues

Threading issues
parents 82659582 5c9ae868
Pipeline #81670 passed with stages
in 21 minutes and 11 seconds
...@@ -89,8 +89,8 @@ template <typename T> class concurrent_queue ...@@ -89,8 +89,8 @@ template <typename T> class concurrent_queue
public: public:
void push(T val) void push(T val)
{ {
++size_;
lock_t lock(mut_); lock_t lock(mut_);
++size_;
q_.push(std::move(val)); q_.push(std::move(val));
} }
...@@ -101,22 +101,21 @@ template <typename T> class concurrent_queue ...@@ -101,22 +101,21 @@ template <typename T> class concurrent_queue
// Queue might have been emptied while we acquired the lock // Queue might have been emptied while we acquired the lock
if (q_.empty()) return false; if (q_.empty()) return false;
--size_;
val = std::move(q_.front()); val = std::move(q_.front());
--size_;
q_.pop(); q_.pop();
return true; return true;
} }
bool empty() const { return size_==0; }
}; };
class thread_pool class thread_pool
{ {
private: private:
//FIXME: temporary ... OSX seems to set the macro, but not to have the variable // A reasonable guess, probably close enough for most hardware
//#if __cpp_lib_hardware_interference_size >= 201603 static constexpr size_t cache_line_size = 64;
// struct alignas(std::hardware_destructive_interference_size) worker struct alignas(cache_line_size) worker
//#else
struct alignas(64) worker
//#endif
{ {
std::thread thread; std::thread thread;
std::condition_variable work_ready; std::condition_variable work_ready;
...@@ -124,24 +123,49 @@ class thread_pool ...@@ -124,24 +123,49 @@ class thread_pool
std::atomic_flag busy_flag = ATOMIC_FLAG_INIT; std::atomic_flag busy_flag = ATOMIC_FLAG_INIT;
std::function<void()> work; std::function<void()> work;
void worker_main(std::atomic<bool> & shutdown_flag, void worker_main(
std::atomic<bool> &shutdown_flag,
std::atomic<size_t> &unscheduled_tasks,
concurrent_queue<std::function<void()>> &overflow_work) concurrent_queue<std::function<void()>> &overflow_work)
{ {
using lock_t = std::unique_lock<std::mutex>; using lock_t = std::unique_lock<std::mutex>;
lock_t lock(mut); bool expect_work = true;
while (!shutdown_flag) while (!shutdown_flag || expect_work)
{
std::function<void()> local_work;
if (expect_work || unscheduled_tasks == 0)
{ {
// Wait to be woken by the thread pool with a piece of work lock_t lock(mut);
// Wait until there is work to be executed
work_ready.wait(lock, [&]{ return (work || shutdown_flag); }); work_ready.wait(lock, [&]{ return (work || shutdown_flag); });
if (!work) continue; local_work.swap(work);
work(); expect_work = false;
}
// Execute any work which queued up while we were busy bool marked_busy = false;
while (overflow_work.try_pop(work)) work(); if (local_work)
{
marked_busy = true;
local_work();
}
// Mark ourself as available before going back to sleep if (!overflow_work.empty())
work = nullptr; {
busy_flag.clear(); if (!marked_busy && busy_flag.test_and_set())
{
expect_work = true;
continue;
}
marked_busy = true;
while (overflow_work.try_pop(local_work))
{
--unscheduled_tasks;
local_work();
}
}
if (marked_busy) busy_flag.clear();
} }
} }
}; };
...@@ -150,6 +174,7 @@ class thread_pool ...@@ -150,6 +174,7 @@ class thread_pool
std::mutex mut_; std::mutex mut_;
std::vector<worker> workers_; std::vector<worker> workers_;
std::atomic<bool> shutdown_; std::atomic<bool> shutdown_;
std::atomic<size_t> unscheduled_tasks_;
using lock_t = std::lock_guard<std::mutex>; using lock_t = std::lock_guard<std::mutex>;
void create_threads() void create_threads()
...@@ -164,7 +189,7 @@ class thread_pool ...@@ -164,7 +189,7 @@ class thread_pool
worker->busy_flag.clear(); worker->busy_flag.clear();
worker->work = nullptr; worker->work = nullptr;
worker->thread = std::thread( worker->thread = std::thread(
[worker, this]{ worker->worker_main(shutdown_, overflow_work_); }); [worker, this]{ worker->worker_main(shutdown_, unscheduled_tasks_, overflow_work_); });
} }
catch (...) catch (...)
{ {
...@@ -200,32 +225,23 @@ class thread_pool ...@@ -200,32 +225,23 @@ class thread_pool
if (shutdown_) if (shutdown_)
throw std::runtime_error("Work item submitted after shutdown"); throw std::runtime_error("Work item submitted after shutdown");
auto submit_to_idle = [&](std::function<void()> &work) -> bool ++unscheduled_tasks_;
{
// First check for any idle workers and wake those
for (auto &worker : workers_) for (auto &worker : workers_)
if (!worker.busy_flag.test_and_set()) if (!worker.busy_flag.test_and_set())
{ {
--unscheduled_tasks_;
{ {
lock_t lock(worker.mut); lock_t lock(worker.mut);
worker.work = std::move(work); worker.work = std::move(work);
} }
worker.work_ready.notify_one(); worker.work_ready.notify_one();
return true; return;
} }
return false;
};
// First check for any idle workers and wake those
if (submit_to_idle(work)) return;
// If no workers were idle, push onto the overflow queue for later // If no workers were idle, push onto the overflow queue for later
overflow_work_.push(std::move(work)); overflow_work_.push(std::move(work));
// Possible race: All workers might have gone idle between the first
// submit attempt and pushing the work item into the queue. So, there
// could be no active workers to check the queue.
// Resolve with another check for idle workers.
std::function<void()> dummy_work = []{};
submit_to_idle(dummy_work);
} }
void shutdown() void shutdown()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment