Commit 3578717d authored by Martin Reinecke's avatar Martin Reinecke

Merge remote-tracking branch 'origin/master' into sincospi

parents b96ae36a 553e4f1c
...@@ -62,8 +62,16 @@ SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. ...@@ -62,8 +62,16 @@ SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#include <array> #include <array>
#include <mutex> #include <mutex>
#endif #endif
#ifdef POCKETFFT_OPENMP
#include <omp.h> #include <mutex>
#include <condition_variable>
#include <thread>
#include <queue>
#include <atomic>
#include <functional>
#ifdef POCKETFFT_PTHREADS
# include <pthread.h>
#endif #endif
...@@ -684,22 +692,206 @@ struct util // hack to avoid duplicate symbols ...@@ -684,22 +692,206 @@ struct util // hack to avoid duplicate symbols
if (axis>=shape.size()) throw invalid_argument("bad axis number"); if (axis>=shape.size()) throw invalid_argument("bad axis number");
} }
#ifdef POCKETFFT_OPENMP
static size_t nthreads() { return size_t(omp_get_num_threads()); }
static size_t thread_num() { return size_t(omp_get_thread_num()); }
static size_t thread_count (size_t nthreads, const shape_t &shape, static size_t thread_count (size_t nthreads, const shape_t &shape,
size_t axis) size_t axis, size_t vlen)
{ {
if (nthreads==1) return 1; if (nthreads==1) return 1;
if (prod(shape) < 20*shape[axis]) return 1; size_t size = prod(shape);
return (nthreads==0) ? size_t(omp_get_max_threads()) : nthreads; size_t parallel = size / (shape[axis] * vlen);
if (shape[axis] < 1000)
parallel /= 4;
size_t max_threads = nthreads == 0 ?
thread::hardware_concurrency() : nthreads;
return max(size_t(1), min(parallel, max_threads));
} }
#else
static constexpr size_t nthreads() { return 1; }
static constexpr size_t thread_num() { return 0; }
#endif
}; };
namespace threading {
thread_local size_t thread_id = 0;
thread_local size_t num_threads = 1;
class latch
{
atomic<size_t> num_left_;
mutex mut_;
condition_variable completed_;
using lock_t = unique_lock<mutex>;
public:
latch(size_t n): num_left_(n) {}
void count_down()
{
{
lock_t lock(mut_);
if (--num_left_)
return;
}
completed_.notify_all();
}
void wait()
{
lock_t lock(mut_);
completed_.wait(lock, [this]{ return is_ready(); });
}
bool is_ready() { return num_left_ == 0; }
};
template <typename T> class concurrent_queue
{
queue<T> q_;
mutex mut_;
condition_variable item_added_;
bool shutdown_;
using lock_t = unique_lock<mutex>;
public:
concurrent_queue(): shutdown_(false) {}
void push(T val)
{
{
lock_t lock(mut_);
if (shutdown_)
throw runtime_error("Item added to queue after shutdown");
q_.push(move(val));
}
item_added_.notify_one();
}
bool pop(T & val)
{
lock_t lock(mut_);
item_added_.wait(lock, [this] { return (!q_.empty() || shutdown_); });
if (q_.empty())
return false; // We are shutting down
val = std::move(q_.front());
q_.pop();
return true;
}
void shutdown()
{
{
lock_t lock(mut_);
shutdown_ = true;
}
item_added_.notify_all();
}
void restart() { shutdown_ = false; }
};
class thread_pool
{
concurrent_queue<function<void()>> work_queue_;
vector<thread> threads_;
void worker_main()
{
function<void()> work;
while (work_queue_.pop(work))
work();
}
void create_threads()
{
size_t nthreads = threads_.size();
for (size_t i=0; i<nthreads; ++i)
{
try { threads_[i] = thread([this]{ worker_main(); }); }
catch (...)
{
shutdown();
throw;
}
}
}
public:
explicit thread_pool(size_t nthreads):
threads_(nthreads)
{ create_threads(); }
thread_pool(): thread_pool(thread::hardware_concurrency()) {}
~thread_pool() { shutdown(); }
void submit(function<void()> work)
{
work_queue_.push(move(work));
}
void shutdown()
{
work_queue_.shutdown();
for (auto &thread : threads_)
if (thread.joinable())
thread.join();
}
void restart()
{
work_queue_.restart();
create_threads();
}
};
thread_pool & get_pool()
{
static thread_pool pool;
#ifdef POCKETFFT_PTHREADS
static once_flag f;
call_once(f,
[]{
pthread_atfork(
+[]{ get_pool().shutdown(); }, // prepare
+[]{ get_pool().restart(); }, // parent
+[]{ get_pool().restart(); } // child
);
});
#endif
return pool;
}
/** Map a function f over nthreads */
template <typename Func>
void thread_map(size_t nthreads, Func f)
{
if (nthreads == 0)
nthreads = thread::hardware_concurrency();
if (nthreads == 1)
{ f(); return; }
auto & pool = get_pool();
latch counter(nthreads);
exception_ptr ex;
mutex ex_mut;
for (size_t i=0; i<nthreads; ++i)
{
pool.submit(
[&f, &counter, &ex, &ex_mut, i, nthreads] {
thread_id = i;
num_threads = nthreads;
try { f(); }
catch (...)
{
lock_guard<mutex> lock(ex_mut);
ex = current_exception();
}
counter.count_down();
});
}
counter.wait();
if (ex)
rethrow_exception(ex);
}
}
// //
// complex FFTPACK transforms // complex FFTPACK transforms
// //
...@@ -2789,10 +2981,10 @@ template<size_t N> class multi_iter ...@@ -2789,10 +2981,10 @@ template<size_t N> class multi_iter
str_i(iarr.stride(idim_)), p_oi(0), str_o(oarr.stride(idim_)), str_i(iarr.stride(idim_)), p_oi(0), str_o(oarr.stride(idim_)),
idim(idim_), rem(iarr.size()/iarr.shape(idim)) idim(idim_), rem(iarr.size()/iarr.shape(idim))
{ {
auto nshares = util::nthreads(); auto nshares = threading::num_threads;
if (nshares==1) return; if (nshares==1) return;
if (nshares==0) throw runtime_error("can't run with zero threads"); if (nshares==0) throw runtime_error("can't run with zero threads");
auto myshare = util::thread_num(); auto myshare = threading::thread_id;
if (myshare>=nshares) throw runtime_error("impossible share requested"); if (myshare>=nshares) throw runtime_error("impossible share requested");
size_t nbase = rem/nshares; size_t nbase = rem/nshares;
size_t additional = rem%nshares; size_t additional = rem%nshares;
...@@ -2926,8 +3118,10 @@ class rev_iter ...@@ -2926,8 +3118,10 @@ class rev_iter
size_t remaining() const { return rem; } size_t remaining() const { return rem; }
}; };
#ifndef POCKETFFT_NO_VECTORS
template<typename T> struct VTYPE {}; template<typename T> struct VTYPE {};
template <typename T> using vtype_t = typename VTYPE<T>::type;
#ifndef POCKETFFT_NO_VECTORS
template<> struct VTYPE<float> template<> struct VTYPE<float>
{ {
using type = float __attribute__ ((vector_size (VLEN<float>::val*sizeof(float)))); using type = float __attribute__ ((vector_size (VLEN<float>::val*sizeof(float))));
...@@ -2940,8 +3134,6 @@ template<> struct VTYPE<long double> ...@@ -2940,8 +3134,6 @@ template<> struct VTYPE<long double>
{ {
using type = long double __attribute__ ((vector_size (VLEN<long double>::val*sizeof(long double)))); using type = long double __attribute__ ((vector_size (VLEN<long double>::val*sizeof(long double))));
}; };
template <typename T> using vtype_t = typename VTYPE<T>::type;
#endif #endif
template<typename T> arr<char> alloc_tmp(const shape_t &shape, template<typename T> arr<char> alloc_tmp(const shape_t &shape,
...@@ -2966,12 +3158,6 @@ template<typename T> arr<char> alloc_tmp(const shape_t &shape, ...@@ -2966,12 +3158,6 @@ template<typename T> arr<char> alloc_tmp(const shape_t &shape,
return arr<char>(tmpsize*elemsize); return arr<char>(tmpsize*elemsize);
} }
#ifdef POCKETFFT_OPENMP
#define POCKETFFT_NTHREADS nthreads
#else
#define POCKETFFT_NTHREADS
#endif
template <typename T, size_t vlen> void copy_input(const multi_iter<vlen> &it, template <typename T, size_t vlen> void copy_input(const multi_iter<vlen> &it,
const cndarr<cmplx<T>> &src, cmplx<vtype_t<T>> *POCKETFFT_RESTRICT dst) const cndarr<cmplx<T>> &src, cmplx<vtype_t<T>> *POCKETFFT_RESTRICT dst)
{ {
...@@ -3030,42 +3216,41 @@ template <typename T> using add_vec_t = typename add_vec<T>::type; ...@@ -3030,42 +3216,41 @@ template <typename T> using add_vec_t = typename add_vec<T>::type;
template<typename Tplan, typename T, typename T0, typename Exec> template<typename Tplan, typename T, typename T0, typename Exec>
POCKETFFT_NOINLINE void general_nd(const cndarr<T> &in, ndarr<T> &out, POCKETFFT_NOINLINE void general_nd(const cndarr<T> &in, ndarr<T> &out,
const shape_t &axes, T0 fct, size_t POCKETFFT_NTHREADS, const Exec & exec, const shape_t &axes, T0 fct, size_t nthreads, const Exec & exec,
const bool allow_inplace=true) const bool allow_inplace=true)
{ {
shared_ptr<Tplan> plan; shared_ptr<Tplan> plan;
for (size_t iax=0; iax<axes.size(); ++iax) for (size_t iax=0; iax<axes.size(); ++iax)
{ {
constexpr auto vlen = VLEN<T0>::val;
size_t len=in.shape(axes[iax]); size_t len=in.shape(axes[iax]);
if ((!plan) || (len!=plan->length())) if ((!plan) || (len!=plan->length()))
plan = get_plan<Tplan>(len); plan = get_plan<Tplan>(len);
#ifdef POCKETFFT_OPENMP threading::thread_map(
#pragma omp parallel num_threads(util::thread_count(nthreads, in.shape(), axes[iax])) util::thread_count(nthreads, in.shape(), axes[iax], VLEN<T>::val),
#endif [&] {
{ constexpr auto vlen = VLEN<T0>::val;
auto storage = alloc_tmp<T0>(in.shape(), len, sizeof(T)); auto storage = alloc_tmp<T0>(in.shape(), len, sizeof(T));
const auto &tin(iax==0? in : out); const auto &tin(iax==0? in : out);
multi_iter<vlen> it(tin, out, axes[iax]); multi_iter<vlen> it(tin, out, axes[iax]);
#ifndef POCKETFFT_NO_VECTORS #ifndef POCKETFFT_NO_VECTORS
if (vlen>1) if (vlen>1)
while (it.remaining()>=vlen) while (it.remaining()>=vlen)
{ {
it.advance(vlen); it.advance(vlen);
auto tdatav = reinterpret_cast<add_vec_t<T> *>(storage.data()); auto tdatav = reinterpret_cast<add_vec_t<T> *>(storage.data());
exec(it, tin, out, tdatav, *plan, fct); exec(it, tin, out, tdatav, *plan, fct);
} }
#endif #endif
while (it.remaining()>0) while (it.remaining()>0)
{ {
it.advance(1); it.advance(1);
auto buf = allow_inplace && it.stride_out() == sizeof(T) ? auto buf = allow_inplace && it.stride_out() == sizeof(T) ?
&out[it.oofs(0)] : reinterpret_cast<T *>(storage.data()); &out[it.oofs(0)] : reinterpret_cast<T *>(storage.data());
exec(it, tin, out, buf, *plan, fct); exec(it, tin, out, buf, *plan, fct);
} }
} // end of parallel region }); // end of parallel region
fct = T0(1); // factor has been applied, use 1 for remaining axes fct = T0(1); // factor has been applied, use 1 for remaining axes
} }
} }
...@@ -3145,119 +3330,117 @@ struct ExecDcst ...@@ -3145,119 +3330,117 @@ struct ExecDcst
template<typename T> POCKETFFT_NOINLINE void general_r2c( template<typename T> POCKETFFT_NOINLINE void general_r2c(
const cndarr<T> &in, ndarr<cmplx<T>> &out, size_t axis, bool forward, T fct, const cndarr<T> &in, ndarr<cmplx<T>> &out, size_t axis, bool forward, T fct,
size_t POCKETFFT_NTHREADS) size_t nthreads)
{ {
auto plan = get_plan<pocketfft_r<T>>(in.shape(axis)); auto plan = get_plan<pocketfft_r<T>>(in.shape(axis));
constexpr auto vlen = VLEN<T>::val;
size_t len=in.shape(axis); size_t len=in.shape(axis);
#ifdef POCKETFFT_OPENMP threading::thread_map(
#pragma omp parallel num_threads(util::thread_count(nthreads, in.shape(), axis)) util::thread_count(nthreads, in.shape(), axis, VLEN<T>::val),
#endif [&] {
{ constexpr auto vlen = VLEN<T>::val;
auto storage = alloc_tmp<T>(in.shape(), len, sizeof(T)); auto storage = alloc_tmp<T>(in.shape(), len, sizeof(T));
multi_iter<vlen> it(in, out, axis); multi_iter<vlen> it(in, out, axis);
#ifndef POCKETFFT_NO_VECTORS #ifndef POCKETFFT_NO_VECTORS
if (vlen>1) if (vlen>1)
while (it.remaining()>=vlen) while (it.remaining()>=vlen)
{ {
it.advance(vlen); it.advance(vlen);
auto tdatav = reinterpret_cast<vtype_t<T> *>(storage.data()); auto tdatav = reinterpret_cast<vtype_t<T> *>(storage.data());
copy_input(it, in, tdatav); copy_input(it, in, tdatav);
plan->exec(tdatav, fct, true); plan->exec(tdatav, fct, true);
for (size_t j=0; j<vlen; ++j) for (size_t j=0; j<vlen; ++j)
out[it.oofs(j,0)].Set(tdatav[0][j]); out[it.oofs(j,0)].Set(tdatav[0][j]);
size_t i=1, ii=1;
if (forward)
for (; i<len-1; i+=2, ++ii)
for (size_t j=0; j<vlen; ++j)
out[it.oofs(j,ii)].Set(tdatav[i][j], tdatav[i+1][j]);
else
for (; i<len-1; i+=2, ++ii)
for (size_t j=0; j<vlen; ++j)
out[it.oofs(j,ii)].Set(tdatav[i][j], -tdatav[i+1][j]);
if (i<len)
for (size_t j=0; j<vlen; ++j)
out[it.oofs(j,ii)].Set(tdatav[i][j]);
}
#endif
while (it.remaining()>0)
{
it.advance(1);
auto tdata = reinterpret_cast<T *>(storage.data());
copy_input(it, in, tdata);
plan->exec(tdata, fct, true);
out[it.oofs(0)].Set(tdata[0]);
size_t i=1, ii=1; size_t i=1, ii=1;
if (forward) if (forward)
for (; i<len-1; i+=2, ++ii) for (; i<len-1; i+=2, ++ii)
for (size_t j=0; j<vlen; ++j) out[it.oofs(ii)].Set(tdata[i], tdata[i+1]);
out[it.oofs(j,ii)].Set(tdatav[i][j], tdatav[i+1][j]);
else else
for (; i<len-1; i+=2, ++ii) for (; i<len-1; i+=2, ++ii)
for (size_t j=0; j<vlen; ++j) out[it.oofs(ii)].Set(tdata[i], -tdata[i+1]);
out[it.oofs(j,ii)].Set(tdatav[i][j], -tdatav[i+1][j]);
if (i<len) if (i<len)
for (size_t j=0; j<vlen; ++j) out[it.oofs(ii)].Set(tdata[i]);
out[it.oofs(j,ii)].Set(tdatav[i][j]);
} }
#endif }); // end of parallel region
while (it.remaining()>0)
{
it.advance(1);
auto tdata = reinterpret_cast<T *>(storage.data());
copy_input(it, in, tdata);
plan->exec(tdata, fct, true);
out[it.oofs(0)].Set(tdata[0]);
size_t i=1, ii=1;
if (forward)
for (; i<len-1; i+=2, ++ii)
out[it.oofs(ii)].Set(tdata[i], tdata[i+1]);
else
for (; i<len-1; i+=2, ++ii)
out[it.oofs(ii)].Set(tdata[i], -tdata[i+1]);
if (i<len)
out[it.oofs(ii)].Set(tdata[i]);
}
} // end of parallel region
} }
template<typename T> POCKETFFT_NOINLINE void general_c2r( template<typename T> POCKETFFT_NOINLINE void general_c2r(
const cndarr<cmplx<T>> &in, ndarr<T> &out, size_t axis, bool forward, T fct, const cndarr<cmplx<T>> &in, ndarr<T> &out, size_t axis, bool forward, T fct,
size_t POCKETFFT_NTHREADS) size_t nthreads)
{ {
auto plan = get_plan<pocketfft_r<T>>(out.shape(axis)); auto plan = get_plan<pocketfft_r<T>>(out.shape(axis));
constexpr auto vlen = VLEN<T>::val;
size_t len=out.shape(axis); size_t len=out.shape(axis);
#ifdef POCKETFFT_OPENMP threading::thread_map(
#pragma omp parallel num_threads(util::thread_count(nthreads, in.shape(), axis)) util::thread_count(nthreads, in.shape(), axis, VLEN<T>::val),
#endif [&] {
{ constexpr auto vlen = VLEN<T>::val;
auto storage = alloc_tmp<T>(out.shape(), len, sizeof(T)); auto storage = alloc_tmp<T>(out.shape(), len, sizeof(T));
multi_iter<vlen> it(in, out, axis); multi_iter<vlen> it(in, out, axis);
#ifndef POCKETFFT_NO_VECTORS #ifndef POCKETFFT_NO_VECTORS
if (vlen>1) if (vlen>1)
while (it.remaining()>=vlen) while (it.remaining()>=vlen)
{ {
it.advance(vlen); it.advance(vlen);
auto tdatav = reinterpret_cast<vtype_t<T> *>(storage.data()); auto tdatav = reinterpret_cast<vtype_t<T> *>(storage.data());
for (size_t j=0; j<vlen; ++j)
tdatav[0][j]=in[it.iofs(j,0)].r;
{
size_t i=1, ii=1;
if (forward)
for (; i<len-1; i+=2, ++ii)
for (size_t j=0; j<vlen; ++j)
in[it.iofs(j,ii)].SplitConj(tdatav[i][j], tdatav[i+1][j]);
else
for (; i<len-1; i+=2, ++ii)
for (size_t j=0; j<vlen; ++j) for (size_t j=0; j<vlen; ++j)
in[it.iofs(j,ii)].Split(tdatav[i][j], tdatav[i+1][j]); tdatav[0][j]=in[it.iofs(j,0)].r;
if (i<len) {
for (size_t j=0; j<vlen; ++j) size_t i=1, ii=1;
tdatav[i][j] = in[it.iofs(j,ii)].r; if (forward)
} for (; i<len-1; i+=2, ++ii)
plan->exec(tdatav, fct, false); for (size_t j=0; j<vlen; ++j)
copy_output(it, tdatav, out); in[it.iofs(j,ii)].SplitConj(tdatav[i][j], tdatav[i+1][j]);
} else
for (; i<len-1; i+=2, ++ii)
for (size_t j=0; j<vlen; ++j)
in[it.iofs(j,ii)].Split(tdatav[i][j], tdatav[i+1][j]);
if (i<len)
for (size_t j=0; j<vlen; ++j)
tdatav[i][j] = in[it.iofs(j,ii)].r;
}
plan->exec(tdatav, fct, false);
copy_output(it, tdatav, out);
}
#endif #endif
while (it.remaining()>0) while (it.remaining()>0)
{ {
it.advance(1); it.advance(1);
auto tdata = reinterpret_cast<T *>(storage.data()); auto tdata = reinterpret_cast<T *>(storage.data());
tdata[0]=in[it.iofs(0)].r; tdata[0]=in[it.iofs(0)].r;
{ {
size_t i=1, ii=1; size_t i=1, ii=1;
if (forward) if (forward)
for (; i<len-1; i+=2, ++ii) for (; i<len-1; i+=2, ++ii)
in[it.iofs(ii)].SplitConj(tdata[i], tdata[i+1]); in[it.iofs(ii)].SplitConj(tdata[i], tdata[i+1]);
else else
for (; i<len-1; i+=2, ++ii) for (; i<len-1; i+=2, ++ii)
in[it.iofs(ii)].Split(tdata[i], tdata[i+1]); in[it.iofs(ii)].Split(tdata[i], tdata[i+1]);
if (i<len) if (i<len)
tdata[i] = in[it.iofs(ii)].r; tdata[i] = in[it.iofs(ii)].r;
} }
plan->exec(tdata, fct, false); plan->exec(tdata, fct, false);
copy_output(it, tdata, out);