Skip to content
GitLab
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
Martin Reinecke
pypocketfft
Commits
8234f5c6
Commit
8234f5c6
authored
Sep 04, 2020
by
Martin Reinecke
Browse files
Merge branch 'threadpool-fixes' into 'master'
Threadpool fixes Closes
#14
See merge request
!40
parents
7a39f782
c1e7bdf9
Changes
1
Hide whitespace changes
Inline
Side-by-side
pocketfft_hdronly.h
View file @
8234f5c6
...
...
@@ -572,8 +572,8 @@ template <typename T> class concurrent_queue
void
push
(
T
val
)
{
++
size_
;
lock_t
lock
(
mut_
);
++
size_
;
q_
.
push
(
std
::
move
(
val
));
}
...
...
@@ -584,11 +584,13 @@ template <typename T> class concurrent_queue
// Queue might have been emptied while we acquired the lock
if
(
q_
.
empty
())
return
false
;
--
size_
;
val
=
std
::
move
(
q_
.
front
());
--
size_
;
q_
.
pop
();
return
true
;
}
bool
empty
()
const
{
return
size_
==
0
;
}
};
// C++ allocator with support for over-aligned types
...
...
@@ -605,17 +607,15 @@ template <typename T> struct aligned_allocator
return
static_cast
<
T
*>
(
mem
);
}
void
deallocate
(
T
*
p
,
size_t
n
)
void
deallocate
(
T
*
p
,
size_t
/*n*/
)
{
aligned_dealloc
(
p
);
}
};
class
thread_pool
{
#if __cpp_lib_hardware_interference_size >= 201603L
struct
alignas
(
std
::
hardware_destructive_interference_size
)
worker
#else
struct
alignas
(
64
)
worker
#endif
// A reasonable guess, probably close enough for most hardware
static
constexpr
size_t
cache_line_size
=
64
;
struct
alignas
(
cache_line_size
)
worker
{
std
::
thread
thread
;
std
::
condition_variable
work_ready
;
...
...
@@ -623,24 +623,49 @@ class thread_pool
std
::
atomic_flag
busy_flag
=
ATOMIC_FLAG_INIT
;
std
::
function
<
void
()
>
work
;
void
worker_main
(
std
::
atomic
<
bool
>
&
shutdown_flag
,
concurrent_queue
<
std
::
function
<
void
()
>>
&
overflow_work
)
void
worker_main
(
std
::
atomic
<
bool
>
&
shutdown_flag
,
std
::
atomic
<
size_t
>
&
unscheduled_tasks
,
concurrent_queue
<
std
::
function
<
void
()
>>
&
overflow_work
)
{
using
lock_t
=
std
::
unique_lock
<
std
::
mutex
>
;
lock_t
lock
(
mut
)
;
while
(
!
shutdown_flag
)
bool
expect_work
=
true
;
while
(
!
shutdown_flag
||
expect_work
)
{
// Wait to be woken by the thread pool with a piece of work
work_ready
.
wait
(
lock
,
[
&
]{
return
(
work
||
shutdown_flag
);
});
if
(
!
work
)
continue
;
work
();
std
::
function
<
void
()
>
local_work
;
if
(
expect_work
||
unscheduled_tasks
==
0
)
{
lock_t
lock
(
mut
);
// Wait until there is work to be executed
work_ready
.
wait
(
lock
,
[
&
]{
return
(
work
||
shutdown_flag
);
});
local_work
.
swap
(
work
);
expect_work
=
false
;
}
bool
marked_busy
=
false
;
if
(
local_work
)
{
marked_busy
=
true
;
local_work
();
}
// Execute any work which queued up while we were busy
while
(
overflow_work
.
try_pop
(
work
))
work
();
if
(
!
overflow_work
.
empty
())
{
if
(
!
marked_busy
&&
busy_flag
.
test_and_set
())
{
expect_work
=
true
;
continue
;
}
marked_busy
=
true
;
// Mark ourself as available before going back to sleep
work
=
nullptr
;
busy_flag
.
clear
();
while
(
overflow_work
.
try_pop
(
local_work
))
{
--
unscheduled_tasks
;
local_work
();
}
}
if
(
marked_busy
)
busy_flag
.
clear
();
}
}
};
...
...
@@ -649,6 +674,7 @@ class thread_pool
std
::
mutex
mut_
;
std
::
vector
<
worker
,
aligned_allocator
<
worker
>>
workers_
;
std
::
atomic
<
bool
>
shutdown_
;
std
::
atomic
<
size_t
>
unscheduled_tasks_
;
using
lock_t
=
std
::
lock_guard
<
std
::
mutex
>
;
void
create_threads
()
...
...
@@ -662,8 +688,10 @@ class thread_pool
auto
*
worker
=
&
workers_
[
i
];
worker
->
busy_flag
.
clear
();
worker
->
work
=
nullptr
;
worker
->
thread
=
std
::
thread
(
[
worker
,
this
]{
worker
->
worker_main
(
shutdown_
,
overflow_work_
);
});
worker
->
thread
=
std
::
thread
([
worker
,
this
]
{
worker
->
worker_main
(
shutdown_
,
unscheduled_tasks_
,
overflow_work_
);
});
}
catch
(...)
{
...
...
@@ -697,34 +725,25 @@ class thread_pool
{
lock_t
lock
(
mut_
);
if
(
shutdown_
)
throw
std
::
runtime_error
(
"Work item submitted after shutdown"
);
throw
std
::
runtime_error
(
"Work item submitted after shutdown"
);
auto
submit_to_idle
=
[
&
](
std
::
function
<
void
()
>
&
work
)
->
bool
{
for
(
auto
&
worker
:
workers_
)
if
(
!
worker
.
busy_flag
.
test_and_set
())
{
{
lock_t
lock
(
worker
.
mut
);
worker
.
work
=
std
::
move
(
work
);
}
worker
.
work_ready
.
notify_one
();
return
true
;
}
return
false
;
};
++
unscheduled_tasks_
;
// First check for any idle workers and wake those
if
(
submit_to_idle
(
work
))
return
;
for
(
auto
&
worker
:
workers_
)
if
(
!
worker
.
busy_flag
.
test_and_set
())
{
--
unscheduled_tasks_
;
{
lock_t
lock
(
worker
.
mut
);
worker
.
work
=
std
::
move
(
work
);
}
worker
.
work_ready
.
notify_one
();
return
;
}
// If no workers were idle, push onto the overflow queue for later
overflow_work_
.
push
(
std
::
move
(
work
));
// Possible race: All workers might have gone idle between the first
// submit attempt and pushing the work item into the queue. So, there
// could be no active workers to check the queue.
// Resolve with another check for idle workers.
std
::
function
<
void
()
>
dummy_work
=
[]{};
submit_to_idle
(
dummy_work
);
}
void
shutdown
()
...
...
Write
Preview
Supports
Markdown
0%
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment