Rocksolid Light

Welcome to RetroBBS

mail  files  register  newsreader  groups  login

Message-ID:  

"Our vision is to speed up time, eventually eliminating it." -- Alex Schure


devel / comp.lang.c++ / The ultimate thread pool

The ultimate thread pool

<uou5ir$2cuko$2@raubtier-asyl.eternal-september.org>

  copy mid

https://www.rocksolidbbs.com/devel/article-flat.php?id=3086&group=comp.lang.c%2B%2B#3086

  copy link   Newsgroups: comp.lang.c++
Path: i2pn2.org!rocksolid2!news.neodome.net!news.mixmin.net!eternal-september.org!feeder3.eternal-september.org!news.eternal-september.org!raubtier-asyl.eternal-september.org!.POSTED!not-for-mail
From: Bonita.Montero@gmail.com (Bonita Montero)
Newsgroups: comp.lang.c++
Subject: The ultimate thread pool
Date: Thu, 25 Jan 2024 18:25:48 +0100
Organization: A noiseless patient Spider
Lines: 561
Message-ID: <uou5ir$2cuko$2@raubtier-asyl.eternal-september.org>
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8; format=flowed
Content-Transfer-Encoding: 7bit
Injection-Date: Thu, 25 Jan 2024 17:25:47 -0000 (UTC)
Injection-Info: raubtier-asyl.eternal-september.org; posting-host="ad0645c524da5c98158469efd5e7da33";
logging-data="2521752"; mail-complaints-to="abuse@eternal-september.org"; posting-account="U2FsdGVkX1+q/E63szeGa8qK2tU+tE6BPz7xrQmar1A="
User-Agent: Mozilla Thunderbird
Cancel-Lock: sha1:ZJExHsJu8Bog63ISnuDLjBIMrgk=
Content-Language: de-DE
 by: Bonita Montero - Thu, 25 Jan 2024 17:25 UTC

Once I've written a thread pool that has an upper limit of the number
threads and a timeout when idle threads end theirselfes. If you have
sth userpace CPU bound you'd specify the number of hardware-threads
as the upper limit, if you have much threads doing I/O you may go far
beyond since the hardware-threads aren't fully occupied anyway.
The problem with my initial thread pool class was that there may be
a large number of idle threads which could be used by other pools.
So I wrote a thread pool class where each pool has an upper limit of
the number of executing threads and there are no idle threads within
each pool. Instead the threads go idling in a global singleton pool
and attach to each pool which needs a new thread, thereby minimizing
the total number of threads.

This is the implementation

// header

#pragma once
#include <thread>
#include <mutex>
#include <condition_variable>
#include <deque>
#include <functional>
#include <chrono>

struct thread_pool
{ using void_fn = std::function<void ()>;
thread_pool( size_t maxThreads = 0 );
thread_pool( thread_pool const & ) = delete;
void operator =( thread_pool const & ) = delete;
~thread_pool();
uint64_t enqueue_task( void_fn &&task );
void_fn cancel( uint64_t queueId );
void wait_idle();
size_t max_threads();
size_t resize( size_t maxThreads );
bool clear_queue();
void_fn idle_callback( void_fn &&fn = {} );
std::pair<size_t, size_t> processing();
static typename std::chrono::milliseconds timeout(
std::chrono::milliseconds timeout );
private:
struct idle_node
{
idle_node *next;
bool notify;
};
using queue_item = std::pair<uint64_t, void_fn>;
using task_queue_t = std::deque<queue_item>;
bool m_quit;
size_t
m_maxThreads,
m_nThreadsExecuting;
uint64_t
m_lastIdleQueueId,
m_nextQueueId;
task_queue_t m_queue;
std::condition_variable m_idleCv;
std::shared_ptr<void_fn> m_idleCallback;
idle_node *m_idleList;
inline static struct global_t
{
std::mutex m_mtx;
std::chrono::milliseconds m_timeout = std::chrono::seconds( 1 );
std::condition_variable
m_cv,
m_quitCv;
bool m_quit;
size_t
m_nThreads,
m_nThreadsActive;
std::deque<thread_pool *> m_initiate;
void theThread();
global_t();
~global_t();
} global;
void processIdle( std::unique_lock<std::mutex> &lock );
std::unique_lock<std::mutex> waitIdle();
};

// translation unit

#include <cassert>
#include "thread_pool.h"
#include "invoke_on_destruct.h"

#if defined(_WIN32)
#pragma warning(disable: 26110) // Caller failing to hold lock
'lock' before calling function 'func'.
#pragma warning(disable: 26111) // Caller failing to release lock
'lock' before calling function 'func'.
#pragma warning(disable: 26115) // Failing to release lock 'lock'
in function 'func'.
#pragma warning(disable: 26117) // Releasing unheld lock 'lock' in
function 'func'.
#pragma warning(disable: 26800) // Use of a moved from object:
'object'.
#endif
#if defined(__llvm__)
#pragma clang diagnostic ignored "-Wparentheses"
#pragma clang diagnostic ignored "-Wunqualified-std-cast-call"
#endif

using namespace std;
using namespace chrono;

thread_pool::thread_pool( size_t maxThreads ) :
m_quit( false ),
m_maxThreads( [&]()
{
if( size_t hc; !maxThreads )
hc = jthread::hardware_concurrency(),
maxThreads = hc ? hc : 1;
return maxThreads;
}() ),
m_nThreadsExecuting( 0 ),
m_lastIdleQueueId( 0 ),
m_nextQueueId( 0 ),
m_idleList( nullptr )
{ }

// may throw system_error

thread_pool::~thread_pool()
{ // wait for idling and inherit the lock
unique_lock lock( waitIdle() );
// erase all outstanding spurious initiations to our pool;
// spurious because of stolen wakeups, non-cancelled initiations or
queue-clears
erase( global.m_initiate, this );
}

void thread_pool::global_t::theThread()
{ try
{
unique_lock lock( m_mtx );
// the first thread never waits with a timeout
bool reserve = m_nThreads == 1;
// reseved timeout which determines the beginning of a wait period
constexpr auto BEGIN_WAIT = time_point<steady_clock>(
steady_clock::duration( -1 ) );
auto start = BEGIN_WAIT;
for( ; ; )
{
while( !m_initiate.size() )
{
// quitting ?
if( m_quit )
{
// the initiation-queue is empty, we can quit;
// we're the last thread ?
if( !--m_nThreads )
// yes: notify the quitting thread's destructor
m_quitCv.notify_one();
return;
}
// it's our first round waiting, i.e. we're not coming
from a spurious initiation ?
if( start.time_since_epoch().count() < 0 )
// yes: set wating start time
start = steady_clock::now();
// we don't have any timeout or we're the reserve-thread ?
if( m_timeout.count() <= 0 || reserve )
// yes: wait for condition without timeout
m_cv.wait( lock );
else
{
// wait for condition with timeout;
// timeout and there are surplus threads and we're
not quitting ?
auto availAssert = []( size_t a ) {
assert((ptrdiff_t)a >= 0); return a; };
if( m_cv.wait_until( lock, start + m_timeout ) !=
cv_status::no_timeout
&& availAssert( m_nThreads - m_nThreadsActive )
> m_initiate.size() && !m_quit )
// yes: release thread
return (void)--m_nThreads;
}
}
// pop first initiate-entry
thread_pool *pool = m_initiate.front();
m_initiate.pop_front();
// pool is empty (spurious or stolen wakeup) or there are
enough running
// threads because the maximum number of threads has been
changed meanwhile ?
if( !pool->m_queue.size() || pool->m_nThreadsExecuting >=
pool->m_maxThreads )
// continue to wait at last start, keeping timeout due time
continue;
// our thread is marked running globally
++m_nThreadsActive;
// our thread is running inside a pool
++pool->m_nThreadsExecuting;
do
{
// pop worker-function
void_fn fn( move( pool->m_queue.front().second ) );
pool->m_queue.pop_front();
// call worker-item unlocked
lock.unlock();
fn();
lock.lock();
// repeat while pool still has items and the maximum
number of threads isn't exhausted;
// both pool-values could have changed while we were
unlocked
} while( pool->m_queue.size() && pool->m_nThreadsExecuting
<= pool->m_maxThreads );
// our thread is removed from the pool
--pool->m_nThreadsExecuting;
// our thread is available again
--m_nThreadsActive;
// ilde-notify
pool->processIdle( lock );
if( !lock )
lock.lock();
// continue with new timeout
start = BEGIN_WAIT;
}
}
catch( ... )
{
terminate();
}
}

void thread_pool::processIdle( std::unique_lock<mutex> &lock )
{ assert(lock);
// we're idling ?
if( m_queue.size() || m_nThreadsExecuting )
// no idle processing
return;
// update idle queue-id
m_lastIdleQueueId = m_nextQueueId;
// there are waiting synchronous idlers in the idle-list ?
if( idle_node *idler = m_idleList; idler )
{
// notify all synchronous idlers
do
idler->notify = true;
while( idler = idler->next );
// doesn't throw
m_idleCv.notify_all();
// idle-list is empty now
m_idleList = nullptr;
}
// there's no asynchronous idle-callback ?
if( !m_idleCallback )
return;
// lock current idle-pointer
shared_ptr<void_fn> idleCallback( m_idleCallback );
lock.unlock();
// call asynchronous idler
try
{
(*idleCallback)();
}
catch( ... )
{
terminate();
}
}

// may throw system_error

void thread_pool::wait_idle()
{ (void)waitIdle();
}

// may throw system_error

unique_lock<mutex> thread_pool::waitIdle()
{ unique_lock lock( global.m_mtx );
// there are no running threads in our pool and the queue is empty ?
if( !m_nThreadsExecuting && !m_queue.size() )
// we're allready idling
return lock;
// register idle-node
idle_node idleNode { m_idleList, false };
m_idleList = &idleNode;
// wait for idle-notification
do
// doesn't throw
m_idleCv.wait( lock );
while( !idleNode.notify );
return lock;
}

// may throw system_error
// may throw bad_alloc

uint64_t thread_pool::enqueue_task( void_fn &&task )
{ unique_lock lock( global.m_mtx );
assert(m_maxThreads);
// get next queue-id
uint64_t queueId = m_nextQueueId++;
// revert queue-id on exception
invoke_on_destruct revertQueueId( [&] { --m_nextQueueId; } );
// eneue ...
m_queue.emplace_back( queueId, move( task ) );
// enough threads running ?
if( m_nThreadsExecuting >= m_maxThreads )
// yes: no need to notify thread from global pool
return revertQueueId.disable(), queueId;
invoke_on_destruct moveBackTask( [&] { task = move(
m_queue.back().second ); m_queue.pop_back(); } );
// enqueue new initiation
global.m_initiate.emplace_back( this );
global.m_cv.notify_one();
// initiation succeeded, there'll be at least one thread
// which will process it, so disable reverting the initiation
moveBackTask.disable();
revertQueueId.disable();
assert(global.m_nThreads >= global.m_nThreadsActive);
// are there less free threads than initiations ?
if( global.m_nThreads - global.m_nThreadsActive >=
global.m_initiate.size() )
// no: no need to create addtional thread
return queueId;
// assure there's at least the reserve thread alive
assert(global.m_nThreads);
// yes: try to spawn new thread unlocked since this may take a
longer time and we won't lock the queues meanwhile;
// there is at least one thread, so the thead-counter can be
inacurrate since thread-creation may fail
++global.m_nThreads;
lock.unlock();
try
{
// create and detach pool-thread ?
thread( &global_t::theThread, &global ).detach();
}
catch( ... )
{
// failed: re-adjust threads counter;
// at least the reserve-thread is dedicated to us
lock.lock(),
--global.m_nThreads;
}
return queueId;
}

typename thread_pool::void_fn thread_pool::cancel( uint64_t queueId )
{ unique_lock<mutex> lock( global.m_mtx );
// reverse-find entry to cancel
auto cancelThis = find_if( m_queue.rbegin(), m_queue.rend(), [&](
pair<uint64_t, void_fn> &q ) { return q.first == queueId; } ).base();
// not found ?
if( cancelThis == m_queue.begin() )
// entry couldn't be cancelled
return void_fn();
// save function object for return-value
void_fn savedFn( std::move( (--cancelThis)->second ) );
// erase queue-item;
// leave initiation alive, the thread is already signalled anyway
and will remove the
// initiations in-order while we would remove them within the
initiation-queue (more expensive)
m_queue.erase( cancelThis );
// idle-notify
processIdle( lock );
return savedFn;
}

// may throw system_error

size_t thread_pool::max_threads()
{ lock_guard lock( global.m_mtx );
return m_maxThreads;
}

// may throw system_error

size_t thread_pool::resize( size_t maxThreads )
{ unique_lock lock( global.m_mtx );
// remember before number of maximum threads
size_t before = m_maxThreads;
// maximum number of threads determined by hardware concurrency ?
if( size_t hc; maxThreads )
// no: set max threads according to given value
m_maxThreads = maxThreads;
else
// yes: set max threads according to hardware concurrency
hc = jthread::hardware_concurrency(),
// if hardware concurrency is zero, use at least one thread
m_maxThreads = hc ? hc : 1;
// number of max threads has shrunken ?
if( maxThreads <= before )
// yes: no need to notify additional threads
return before;
// additional initiations over allowed initiations before;
// "before - m_nThreadsExecuting" may be negative because of recent
resizes
size_t addInitiate = m_queue.size() - (before - m_nThreadsExecuting);
// enough threads before ?
if( (ptrdiff_t)addInitiate <= 0 )
// yes: no need for additional initiations
return before;
// clip additional initiatons to maximum number of threads
size_t maxAddThreads = maxThreads - before;
addInitiate = addInitiate <= maxAddThreads ? addInitiate :
maxAddThreads;
// issue additiona initiations
auto &init = global.m_initiate;
for( size_t remainingInitiations = addInitiate;
remainingInitiations; --remainingInitiations )
{
try
{
// append initiation to the global initiation queue
init.emplace_back( this );
}
catch( ... )
{
// failed: we had at least one initiation or there was an
initiation to us before ?
if( remainingInitiations != addInitiate || find(
init.begin(), init.end(), this ) != init.end() )
{
// yes: that's sufficient, at least one thread is
dedicated to us;
// rebember number of successful innitiations
addInitiate -= remainingInitiations;
break;
}
else
// no: there's no initiation to us in the global queue;
// rethrow exception
throw;
}
global.m_cv.notify_one();
}
// additional thresds to create
assert(global.m_nThreads >= global.m_nThreadsActive);
size_t addCreate = addInitiate - (global.m_nThreads -
global.m_nThreadsActive);
if( (ptrdiff_t)addCreate <= 0 )
// no: let available threads do the work
return before;
// increase the number of threads to be created, creation may fail,
number may become inaccurate
global.m_nThreads += addCreate;
// create threads unlocked
lock.unlock();
for( ; addCreate; --addCreate )
try
{
// exception while creating thread ?
thread( &global_t::theThread, &global ).detach();
}
catch( ... )
{
// yes: there'll at least the reserve thread dedicated to us;
// adjust the threads counter, making it accurate again
lock.lock(),
global.m_nThreads -= addCreate;
break;
}
// threads counter is accurate now, return the before number of threads
return before;
}

// may throw system_error

bool thread_pool::clear_queue()
{ unique_lock lock( global.m_mtx );
// the queue is alreay empty ?
if( !m_queue.size() )
// yes: nothing to do
return false;
// clear the queue, leaving initiations alive since the thread is
awakened
// anyway and it's more efficient to remove the initiations from
the left
m_queue.clear();
// idle-notify
processIdle( lock );
return true;
}

// may throw system_error
// may throw bad_alloc

typename thread_pool::void_fn thread_pool::idle_callback( void_fn &&fn )
{ // create the new shared_ptr to the idle-callback unlocked
shared_ptr<void_fn> ptr;
if( fn )
ptr = make_shared<void_fn>( move( fn ) );
// apply the new idle-callback locked
lock_guard lock( global.m_mtx );
swap( ptr, m_idleCallback );
return move( *ptr );
}

// may throw system_error

pair<size_t, size_t> thread_pool::processing()
{ lock_guard lock( global.m_mtx );
return { m_queue.size(), m_nThreadsExecuting };
}

thread_pool::global_t::global_t() :
m_quit( false ),
m_nThreads( 1 ),
m_nThreadsActive( 0 )
{ thread( &global_t::theThread, this ).detach();
}

thread_pool::global_t::~global_t()
{ unique_lock lock( m_mtx );
// set quit flag for the threads waiting for initiations;
// remaining initiations are processed before quitting
m_quit = true;
// wait for all threads to quit
while( m_nThreads )
// notify_all() and wait() don't throw
m_cv.notify_all(),
m_quitCv.wait( lock );
}

typename std::chrono::milliseconds thread_pool::timeout(
std::chrono::milliseconds timeout )
{ lock_guard lock( global.m_mtx );
auto before = global.m_timeout;
global.m_timeout = timeout;
// awake all threads in global pool to adjust their due time
global.m_cv.notify_all();
return before;
}

Issuing a new thread is as easy as creating a normal C++11-thread:
pool.enqueue_task( functionObj [,... parameters] );
The pools maximum number of threads can be adjusted with resize().
Each task gets a 64 bit unique_id which is returned after enqueuing
a task. The task can be cancelled with thead_pool::cancel() if it
hasn't been fetched by a worker thread. The method wait_idle() waits
untill all tasks are executed. There's idle_callback() which is given
a function-object which is executed asynchronously in the last thread
before a pool is going to idle. The method processing() reports the
queue size and the number of currently executed tasks.

SubjectRepliesAuthor
o The ultimate thread pool

By: Bonita Montero on Thu, 25 Jan 2024

49Bonita Montero
server_pubkey.txt

rocksolid light 0.9.81
clearnet tor