Commit fe143a03 authored by Martin Reinecke's avatar Martin Reinecke
Browse files

prepare for more flexible multithreading

parent 9b8d7eee
Pipeline #79938 passed with stages
in 14 minutes
......@@ -203,6 +203,8 @@ py::array py_transpose(const py::array &in, py::array &out)
MR_fail("unsupported datatype");
}
void py_set_thread_pool_size(size_t new_pool_size)
{ set_pool_size(new_pool_size); }
const char *misc_DS = R"""(
Various unsorted utilities
......@@ -225,6 +227,8 @@ void add_misc(py::module &msup)
m.def("ascontiguousarray",&py_ascontiguousarray, "in"_a);
m.def("transpose",&py_transpose, "in"_a, "out"_a);
m.def("set_thread_pool_size",&py_set_thread_pool_size, "new_pool_size"_a);
}
}
......
......@@ -41,9 +41,7 @@ namespace detail_threading {
#ifndef DUCC0_NO_THREADING
static const size_t max_threads_ = std::max<size_t>(1, std::thread::hardware_concurrency());
std::atomic<size_t> default_nthreads_(max_threads_);
std::atomic<size_t> default_nthreads_(std::max<size_t>(1, std::thread::hardware_concurrency()));
size_t get_default_nthreads()
{ return default_nthreads_; }
......@@ -51,8 +49,6 @@ size_t get_default_nthreads()
void set_default_nthreads(size_t new_default_nthreads)
{ default_nthreads_ = std::max<size_t>(1, new_default_nthreads); }
size_t max_threads() { return max_threads_; }
class latch
{
std::atomic<size_t> num_left_;
......@@ -156,8 +152,6 @@ class thread_pool
threads_(nthreads)
{ create_threads(); }
thread_pool(): thread_pool(max_threads_) {}
~thread_pool() { shutdown(); }
size_t size() const { return threads_.size(); }
......@@ -182,9 +176,12 @@ class thread_pool
}
};
inline thread_pool &get_pool()
thread_pool &get_pool(size_t nthreads=0)
{
static thread_pool pool;
static std::unique_ptr<thread_pool> pool;
if ((!pool) && (nthreads==0)) nthreads=default_nthreads_;
if ((!pool) || ((nthreads!=0) && (nthreads!=pool->size()))) // resize
pool = std::make_unique<thread_pool>(nthreads);
#if __has_include(<pthread.h>)
static std::once_flag f;
call_once(f,
......@@ -197,30 +194,12 @@ inline thread_pool &get_pool()
});
#endif
return pool;
}
inline thread_pool &get_pool2(size_t nthreads=0)
{
static std::unique_ptr<thread_pool> pool(std::make_unique<thread_pool>(1));
if ((!pool) || ((nthreads!=0) && (nthreads!=pool->size()))) // resize
{
pool = std::make_unique<thread_pool>(nthreads);
}
#if __has_include(<pthread.h>)
static std::once_flag f;
call_once(f,
[]{
pthread_atfork(
+[]{ get_pool2().shutdown(); }, // prepare
+[]{ get_pool2().restart(); }, // parent
+[]{ get_pool2().restart(); } // child
);
});
#endif
return *pool;
}
void set_pool_size(size_t new_pool_size)
{ get_pool(new_pool_size); }
class Distribution
{
private:
......@@ -341,8 +320,7 @@ void Distribution::thread_map(std::function<void(Scheduler &)> f)
return;
}
auto & pool = get_pool2(nthreads_);
// auto & pool = get_pool();
auto &pool = get_pool();
latch counter(nthreads_);
std::exception_ptr ex;
std::mutex ex_mut;
......
......@@ -47,10 +47,11 @@ class Scheduler
virtual Range getNext() = 0;
};
size_t max_threads();
void set_default_nthreads(size_t new_default_nthreads);
size_t get_default_nthreads();
void set_pool_size(size_t new_pool_size);
void execSingle(size_t nwork,
std::function<void(Scheduler &)> func);
void execStatic(size_t nwork, size_t nthreads, size_t chunksize,
......@@ -63,7 +64,7 @@ void execParallel(size_t nthreads, std::function<void(Scheduler &)> func);
} // end of namespace detail_threading
using detail_threading::max_threads;
using detail_threading::set_pool_size;
using detail_threading::get_default_nthreads;
using detail_threading::set_default_nthreads;
using detail_threading::Scheduler;
......
......@@ -118,7 +118,7 @@ struct util // hack to avoid duplicate symbols
size_t parallel = size / (info.shape(axis) * vlen);
if (info.shape(axis) < 1000)
parallel /= 4;
size_t max_threads = (nthreads==0) ? ducc0::max_threads() : nthreads;
size_t max_threads = (nthreads==0) ? ducc0::get_default_nthreads() : nthreads;
return std::max(size_t(1), std::min(parallel, max_threads));
}
#endif
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment