Commit 88bb197e authored by Peter Bell's avatar Peter Bell Committed by Martin Reinecke
Browse files

Threadpool: Assign work preferentially to threads in creation order

parent 3a416632
/*
This file is part of pocketfft.
Copyright (C) 2010-2019 Max-Planck-Society
Copyright (C) 2019 Peter Bell
Copyright (C) 2010-2020 Max-Planck-Society
Copyright (C) 2019-2020 Peter Bell
For the odd-sized DCT-IV transforms:
Copyright (C) 2003, 2007-14 Matteo Frigo
......@@ -70,6 +70,7 @@ SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#include <queue>
#include <atomic>
#include <functional>
#include <new>
#ifdef POCKETFFT_PTHREADS
# include <pthread.h>
......@@ -148,6 +149,30 @@ template<> struct VLEN<double> { static constexpr size_t val=2; };
#endif
#endif
#if __cplusplus >= 201703L
inline void *aligned_alloc(size_t align, size_t size)
{
void *ptr = ::aligned_alloc(align,size);
if (!ptr) throw std::bad_alloc();
return ptr;
}
inline void aligned_dealloc(void *ptr)
{ free(ptr); }
#else // portable emulation
inline void *aligned_alloc(size_t align, size_t size)
{
align = std::max(align, alignof(max_align_t));
void *ptr = malloc(size+align);
if (!ptr) throw std::bad_alloc();
void *res = reinterpret_cast<void *>
((reinterpret_cast<uintptr_t>(ptr) & ~(uintptr_t(align-1))) + uintptr_t(align));
(reinterpret_cast<void**>(res))[-1] = ptr;
return res;
}
inline void aligned_dealloc(void *ptr)
{ if (ptr) free((reinterpret_cast<void**>(ptr))[-1]); }
#endif
template<typename T> class arr
{
private:
......@@ -164,29 +189,15 @@ template<typename T> class arr
}
static void dealloc(T *ptr)
{ free(ptr); }
#elif __cplusplus >= 201703L
static T *ralloc(size_t num)
{
if (num==0) return nullptr;
void *res = aligned_alloc(64,num*sizeof(T));
if (!res) throw std::bad_alloc();
return reinterpret_cast<T *>(res);
}
static void dealloc(T *ptr)
{ free(ptr); }
#else // portable emulation
#else
static T *ralloc(size_t num)
{
if (num==0) return nullptr;
void *ptr = malloc(num*sizeof(T)+64);
if (!ptr) throw std::bad_alloc();
T *res = reinterpret_cast<T *>
((reinterpret_cast<size_t>(ptr) & ~(size_t(63))) + 64);
(reinterpret_cast<void**>(res))[-1] = ptr;
return res;
void *ptr = aligned_alloc(64, num*sizeof(T));
return static_cast<T*>(ptr);
}
static void dealloc(T *ptr)
{ if (ptr) free((reinterpret_cast<void**>(ptr))[-1]); }
{ aligned_dealloc(ptr); }
#endif
public:
......@@ -554,77 +565,128 @@ template <typename T> class concurrent_queue
{
std::queue<T> q_;
std::mutex mut_;
std::condition_variable item_added_;
bool shutdown_;
using lock_t = std::unique_lock<std::mutex>;
std::atomic<size_t> size_;
using lock_t = std::lock_guard<std::mutex>;
public:
concurrent_queue(): shutdown_(false) {}
void push(T val)
{
{
++size_;
lock_t lock(mut_);
if (shutdown_)
throw std::runtime_error("Item added to queue after shutdown");
q_.push(move(val));
}
item_added_.notify_one();
q_.push(std::move(val));
}
bool pop(T & val)
bool try_pop(T &val)
{
if (size_ == 0) return false;
lock_t lock(mut_);
item_added_.wait(lock, [this] { return (!q_.empty() || shutdown_); });
if (q_.empty())
return false; // We are shutting down
// Queue might have been emptied while we acquired the lock
if (q_.empty()) return false;
--size_;
val = std::move(q_.front());
q_.pop();
return true;
}
};
void shutdown()
{
{
lock_t lock(mut_);
shutdown_ = true;
}
item_added_.notify_all();
}
// C++ allocator with support for over-aligned types
template <typename T> struct aligned_allocator
{
using value_type = T;
template <class U>
aligned_allocator(const aligned_allocator<U>&) {}
aligned_allocator() = default;
void restart() { shutdown_ = false; }
T *allocate(size_t n)
{
void* mem = aligned_alloc(alignof(T), n*sizeof(T));
return static_cast<T*>(mem);
}
void deallocate(T *p, size_t n)
{ aligned_dealloc(p); }
};
class thread_pool
{
concurrent_queue<std::function<void()>> work_queue_;
std::vector<std::thread> threads_;
void worker_main()
#if __cplusplus >= 201703L
struct alignas(std::hardware_destructive_interference_size) worker
#else
struct alignas(64) worker
#endif
{
std::thread thread;
std::condition_variable work_ready;
std::mutex mut;
std::atomic_flag busy_flag = ATOMIC_FLAG_INIT;
std::function<void()> work;
while (work_queue_.pop(work))
work();
}
void worker_main(std::atomic<bool> & shutdown_flag,
concurrent_queue<std::function<void()>> &overflow_work)
{
using lock_t = std::unique_lock<std::mutex>;
lock_t lock(mut);
while (!shutdown_flag)
{
// Wait to be woken by the thread pool with a piece of work
work_ready.wait(lock, [&]{ return (work || shutdown_flag); });
if (!work) continue;
work();
// Execute any work which queued up while we were busy
while (overflow_work.try_pop(work)) work();
// Mark ourself as available before going back to sleep
work = nullptr;
busy_flag.clear();
}
}
};
concurrent_queue<std::function<void()>> overflow_work_;
std::mutex mut_;
std::vector<worker, aligned_allocator<worker>> workers_;
std::atomic<bool> shutdown_;
using lock_t = std::lock_guard<std::mutex>;
void create_threads()
{
size_t nthreads = threads_.size();
lock_t lock(mut_);
size_t nthreads=workers_.size();
for (size_t i=0; i<nthreads; ++i)
{
try { threads_[i] = std::thread([this]{ worker_main(); }); }
try
{
auto *worker = &workers_[i];
worker->busy_flag.clear();
worker->work = nullptr;
worker->thread = std::thread(
[worker, this]{ worker->worker_main(shutdown_, overflow_work_); });
}
catch (...)
{
shutdown();
shutdown_locked();
throw;
}
}
}
void shutdown_locked()
{
shutdown_ = true;
for (auto &worker : workers_)
worker.work_ready.notify_all();
for (auto &worker : workers_)
if (worker.thread.joinable())
worker.thread.join();
}
public:
explicit thread_pool(size_t nthreads):
threads_(nthreads)
workers_(nthreads)
{ create_threads(); }
thread_pool(): thread_pool(max_threads) {}
......@@ -633,20 +695,47 @@ class thread_pool
void submit(std::function<void()> work)
{
work_queue_.push(move(work));
lock_t lock(mut_);
if (shutdown_)
throw std::runtime_error("Work item submitted after shutdown");
auto submit_to_idle = [&](std::function<void()> &work) -> bool
{
for (auto &worker : workers_)
if (!worker.busy_flag.test_and_set())
{
{
lock_t lock(worker.mut);
worker.work = std::move(work);
}
worker.work_ready.notify_one();
return true;
}
return false;
};
// First check for any idle workers and wake those
if (submit_to_idle(work)) return;
// If no workers were idle, push onto the overflow queue for later
overflow_work_.push(std::move(work));
// Possible race: All workers might have gone idle between the first
// submit attempt and pushing the work item into the queue. So, there
// could be no active workers to check the queue.
// Resolve with another check for idle workers.
std::function<void()> dummy_work = []{};
submit_to_idle(dummy_work);
}
void shutdown()
{
work_queue_.shutdown();
for (auto &thread : threads_)
if (thread.joinable())
thread.join();
lock_t lock(mut_);
shutdown_locked();
}
void restart()
{
work_queue_.restart();
shutdown_ = false;
create_threads();
}
};
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment