diff --git a/pocketfft_hdronly.h b/pocketfft_hdronly.h index e7519d4a6e6cbd0628c99434739d9c7715089767..8a58957719042d67bf5222a58e8f6d3e65b5fdfb 100644 --- a/pocketfft_hdronly.h +++ b/pocketfft_hdronly.h @@ -1,8 +1,8 @@ /* This file is part of pocketfft. -Copyright (C) 2010-2019 Max-Planck-Society -Copyright (C) 2019 Peter Bell +Copyright (C) 2010-2020 Max-Planck-Society +Copyright (C) 2019-2020 Peter Bell For the odd-sized DCT-IV transforms: Copyright (C) 2003, 2007-14 Matteo Frigo @@ -70,6 +70,7 @@ SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #include <queue> #include <atomic> #include <functional> +#include <new> #ifdef POCKETFFT_PTHREADS # include <pthread.h> @@ -148,6 +149,30 @@ template<> struct VLEN<double> { static constexpr size_t val=2; }; #endif #endif +#if __cplusplus >= 201703L +inline void *aligned_alloc(size_t align, size_t size) + { + void *ptr = ::aligned_alloc(align,size); + if (!ptr) throw std::bad_alloc(); + return ptr; + } +inline void aligned_dealloc(void *ptr) + { free(ptr); } +#else // portable emulation +inline void *aligned_alloc(size_t align, size_t size) + { + align = std::max(align, alignof(max_align_t)); + void *ptr = malloc(size+align); + if (!ptr) throw std::bad_alloc(); + void *res = reinterpret_cast<void *> + ((reinterpret_cast<uintptr_t>(ptr) & ~(uintptr_t(align-1))) + uintptr_t(align)); + (reinterpret_cast<void**>(res))[-1] = ptr; + return res; + } +inline void aligned_dealloc(void *ptr) + { if (ptr) free((reinterpret_cast<void**>(ptr))[-1]); } +#endif + template<typename T> class arr { private: @@ -164,29 +189,15 @@ template<typename T> class arr } static void dealloc(T *ptr) { free(ptr); } -#elif __cplusplus >= 201703L - static T *ralloc(size_t num) - { - if (num==0) return nullptr; - void *res = aligned_alloc(64,num*sizeof(T)); - if (!res) throw std::bad_alloc(); - return reinterpret_cast<T *>(res); - } - static void dealloc(T *ptr) - { free(ptr); } -#else // portable emulation +#else static T *ralloc(size_t num) { if (num==0) return nullptr; - void *ptr = malloc(num*sizeof(T)+64); - if (!ptr) throw std::bad_alloc(); - T *res = reinterpret_cast<T *> - ((reinterpret_cast<size_t>(ptr) & ~(size_t(63))) + 64); - (reinterpret_cast<void**>(res))[-1] = ptr; - return res; + void *ptr = aligned_alloc(64, num*sizeof(T)); + return static_cast<T*>(ptr); } static void dealloc(T *ptr) - { if (ptr) free((reinterpret_cast<void**>(ptr))[-1]); } + { aligned_dealloc(ptr); } #endif public: @@ -554,77 +565,128 @@ template <typename T> class concurrent_queue { std::queue<T> q_; std::mutex mut_; - std::condition_variable item_added_; - bool shutdown_; - using lock_t = std::unique_lock<std::mutex>; + std::atomic<size_t> size_; + using lock_t = std::lock_guard<std::mutex>; public: - concurrent_queue(): shutdown_(false) {} void push(T val) { - { + ++size_; lock_t lock(mut_); - if (shutdown_) - throw std::runtime_error("Item added to queue after shutdown"); - q_.push(move(val)); - } - item_added_.notify_one(); + q_.push(std::move(val)); } - bool pop(T & val) + bool try_pop(T &val) { + if (size_ == 0) return false; lock_t lock(mut_); - item_added_.wait(lock, [this] { return (!q_.empty() || shutdown_); }); - if (q_.empty()) - return false; // We are shutting down + // Queue might have been emptied while we acquired the lock + if (q_.empty()) return false; + --size_; val = std::move(q_.front()); q_.pop(); return true; } + }; - void shutdown() - { - { - lock_t lock(mut_); - shutdown_ = true; - } - item_added_.notify_all(); - } +// C++ allocator with support for over-aligned types +template <typename T> struct aligned_allocator + { + using value_type = T; + template <class U> + aligned_allocator(const aligned_allocator<U>&) {} + aligned_allocator() = default; - void restart() { shutdown_ = false; } + T *allocate(size_t n) + { + void* mem = aligned_alloc(alignof(T), n*sizeof(T)); + return static_cast<T*>(mem); + } + + void deallocate(T *p, size_t n) + { aligned_dealloc(p); } }; class thread_pool { - concurrent_queue<std::function<void()>> work_queue_; - std::vector<std::thread> threads_; - - void worker_main() +#if __cplusplus >= 201703L + struct alignas(std::hardware_destructive_interference_size) worker +#else + struct alignas(64) worker +#endif { + std::thread thread; + std::condition_variable work_ready; + std::mutex mut; + std::atomic_flag busy_flag = ATOMIC_FLAG_INIT; std::function<void()> work; - while (work_queue_.pop(work)) - work(); - } + + void worker_main(std::atomic<bool> & shutdown_flag, + concurrent_queue<std::function<void()>> &overflow_work) + { + using lock_t = std::unique_lock<std::mutex>; + lock_t lock(mut); + while (!shutdown_flag) + { + // Wait to be woken by the thread pool with a piece of work + work_ready.wait(lock, [&]{ return (work || shutdown_flag); }); + if (!work) continue; + work(); + + // Execute any work which queued up while we were busy + while (overflow_work.try_pop(work)) work(); + + // Mark ourself as available before going back to sleep + work = nullptr; + busy_flag.clear(); + } + } + }; + + concurrent_queue<std::function<void()>> overflow_work_; + std::mutex mut_; + std::vector<worker, aligned_allocator<worker>> workers_; + std::atomic<bool> shutdown_; + using lock_t = std::lock_guard<std::mutex>; void create_threads() { - size_t nthreads = threads_.size(); + lock_t lock(mut_); + size_t nthreads=workers_.size(); for (size_t i=0; i<nthreads; ++i) { - try { threads_[i] = std::thread([this]{ worker_main(); }); } + try + { + auto *worker = &workers_[i]; + worker->busy_flag.clear(); + worker->work = nullptr; + worker->thread = std::thread( + [worker, this]{ worker->worker_main(shutdown_, overflow_work_); }); + } catch (...) { - shutdown(); + shutdown_locked(); throw; } } } + void shutdown_locked() + { + shutdown_ = true; + for (auto &worker : workers_) + worker.work_ready.notify_all(); + + for (auto &worker : workers_) + if (worker.thread.joinable()) + worker.thread.join(); + } + public: explicit thread_pool(size_t nthreads): - threads_(nthreads) + workers_(nthreads) { create_threads(); } thread_pool(): thread_pool(max_threads) {} @@ -633,20 +695,47 @@ class thread_pool void submit(std::function<void()> work) { - work_queue_.push(move(work)); + lock_t lock(mut_); + if (shutdown_) + throw std::runtime_error("Work item submitted after shutdown"); + + auto submit_to_idle = [&](std::function<void()> &work) -> bool + { + for (auto &worker : workers_) + if (!worker.busy_flag.test_and_set()) + { + { + lock_t lock(worker.mut); + worker.work = std::move(work); + } + worker.work_ready.notify_one(); + return true; + } + return false; + }; + + // First check for any idle workers and wake those + if (submit_to_idle(work)) return; + // If no workers were idle, push onto the overflow queue for later + overflow_work_.push(std::move(work)); + + // Possible race: All workers might have gone idle between the first + // submit attempt and pushing the work item into the queue. So, there + // could be no active workers to check the queue. + // Resolve with another check for idle workers. + std::function<void()> dummy_work = []{}; + submit_to_idle(dummy_work); } void shutdown() { - work_queue_.shutdown(); - for (auto &thread : threads_) - if (thread.joinable()) - thread.join(); + lock_t lock(mut_); + shutdown_locked(); } void restart() { - work_queue_.restart(); + shutdown_ = false; create_threads(); } };