Skip to content
GitLab
Menu
Projects
Groups
Snippets
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
Martin Reinecke
ducc
Commits
b96e949e
Unverified
Commit
b96e949e
authored
Sep 04, 2020
by
Test2, Test1 (martin.reinecke1@gmx.de)
Committed by
GitHub
Sep 04, 2020
Browse files
Merge pull request
#6
from peterbell10/threading-issues
Threading issues
parents
fbca8d81
5c9ae868
Changes
1
Hide whitespace changes
Inline
Side-by-side
src/ducc0/infra/threading.cc
View file @
b96e949e
...
...
@@ -89,8 +89,8 @@ template <typename T> class concurrent_queue
public:
void
push
(
T
val
)
{
++
size_
;
lock_t
lock
(
mut_
);
++
size_
;
q_
.
push
(
std
::
move
(
val
));
}
...
...
@@ -101,22 +101,21 @@ template <typename T> class concurrent_queue
// Queue might have been emptied while we acquired the lock
if
(
q_
.
empty
())
return
false
;
--
size_
;
val
=
std
::
move
(
q_
.
front
());
--
size_
;
q_
.
pop
();
return
true
;
}
bool
empty
()
const
{
return
size_
==
0
;
}
};
class
thread_pool
{
private:
//FIXME: temporary ... OSX seems to set the macro, but not to have the variable
//#if __cpp_lib_hardware_interference_size >= 201603
// struct alignas(std::hardware_destructive_interference_size) worker
//#else
struct
alignas
(
64
)
worker
//#endif
// A reasonable guess, probably close enough for most hardware
static
constexpr
size_t
cache_line_size
=
64
;
struct
alignas
(
cache_line_size
)
worker
{
std
::
thread
thread
;
std
::
condition_variable
work_ready
;
...
...
@@ -124,24 +123,49 @@ class thread_pool
std
::
atomic_flag
busy_flag
=
ATOMIC_FLAG_INIT
;
std
::
function
<
void
()
>
work
;
void
worker_main
(
std
::
atomic
<
bool
>
&
shutdown_flag
,
void
worker_main
(
std
::
atomic
<
bool
>
&
shutdown_flag
,
std
::
atomic
<
size_t
>
&
unscheduled_tasks
,
concurrent_queue
<
std
::
function
<
void
()
>>
&
overflow_work
)
{
using
lock_t
=
std
::
unique_lock
<
std
::
mutex
>
;
lock_t
lock
(
mut
)
;
while
(
!
shutdown_flag
)
bool
expect_work
=
true
;
while
(
!
shutdown_flag
||
expect_work
)
{
// Wait to be woken by the thread pool with a piece of work
work_ready
.
wait
(
lock
,
[
&
]{
return
(
work
||
shutdown_flag
);
});
if
(
!
work
)
continue
;
work
();
std
::
function
<
void
()
>
local_work
;
if
(
expect_work
||
unscheduled_tasks
==
0
)
{
lock_t
lock
(
mut
);
// Wait until there is work to be executed
work_ready
.
wait
(
lock
,
[
&
]{
return
(
work
||
shutdown_flag
);
});
local_work
.
swap
(
work
);
expect_work
=
false
;
}
bool
marked_busy
=
false
;
if
(
local_work
)
{
marked_busy
=
true
;
local_work
();
}
// Execute any work which queued up while we were busy
while
(
overflow_work
.
try_pop
(
work
))
work
();
if
(
!
overflow_work
.
empty
())
{
if
(
!
marked_busy
&&
busy_flag
.
test_and_set
())
{
expect_work
=
true
;
continue
;
}
marked_busy
=
true
;
while
(
overflow_work
.
try_pop
(
local_work
))
{
--
unscheduled_tasks
;
local_work
();
}
}
// Mark ourself as available before going back to sleep
work
=
nullptr
;
busy_flag
.
clear
();
if
(
marked_busy
)
busy_flag
.
clear
();
}
}
};
...
...
@@ -150,6 +174,7 @@ class thread_pool
std
::
mutex
mut_
;
std
::
vector
<
worker
>
workers_
;
std
::
atomic
<
bool
>
shutdown_
;
std
::
atomic
<
size_t
>
unscheduled_tasks_
;
using
lock_t
=
std
::
lock_guard
<
std
::
mutex
>
;
void
create_threads
()
...
...
@@ -164,7 +189,7 @@ class thread_pool
worker
->
busy_flag
.
clear
();
worker
->
work
=
nullptr
;
worker
->
thread
=
std
::
thread
(
[
worker
,
this
]{
worker
->
worker_main
(
shutdown_
,
overflow_work_
);
});
[
worker
,
this
]{
worker
->
worker_main
(
shutdown_
,
unscheduled_tasks_
,
overflow_work_
);
});
}
catch
(...)
{
...
...
@@ -200,32 +225,23 @@ class thread_pool
if
(
shutdown_
)
throw
std
::
runtime_error
(
"Work item submitted after shutdown"
);
auto
submit_to_idle
=
[
&
](
std
::
function
<
void
()
>
&
work
)
->
bool
{
for
(
auto
&
worker
:
workers_
)
if
(
!
worker
.
busy_flag
.
test_and_set
())
{
{
lock_t
lock
(
worker
.
mut
);
worker
.
work
=
std
::
move
(
work
);
}
worker
.
work_ready
.
notify_one
();
return
true
;
}
return
false
;
};
++
unscheduled_tasks_
;
// First check for any idle workers and wake those
if
(
submit_to_idle
(
work
))
return
;
for
(
auto
&
worker
:
workers_
)
if
(
!
worker
.
busy_flag
.
test_and_set
())
{
--
unscheduled_tasks_
;
{
lock_t
lock
(
worker
.
mut
);
worker
.
work
=
std
::
move
(
work
);
}
worker
.
work_ready
.
notify_one
();
return
;
}
// If no workers were idle, push onto the overflow queue for later
overflow_work_
.
push
(
std
::
move
(
work
));
// Possible race: All workers might have gone idle between the first
// submit attempt and pushing the work item into the queue. So, there
// could be no active workers to check the queue.
// Resolve with another check for idle workers.
std
::
function
<
void
()
>
dummy_work
=
[]{};
submit_to_idle
(
dummy_work
);
}
void
shutdown
()
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment