|
Mixe for Privacy and Anonymity in the Internet
|
This class bla bla. More...
#include <CAThreadPool.hpp>
Public Member Functions | |
| CAThreadPool (UINT32 num_worker_threads, UINT32 max_queue_size, bool b_do_not_block_when_full) | |
| ~CAThreadPool () | |
| SINT32 | destroy (bool bWaitForFinish) |
| SINT32 | addRequest (THREAD_MAIN_TYP, void *args) |
| Adds a new request (task) to this threadpool. | |
| UINT32 | countRequests () |
Private Attributes | |
| UINT32 | m_NumThreads |
| UINT32 | m_MaxQueueSize |
| bool | m_bDoNotBlockWhenFull |
| CAThread ** | m_parThreads |
| volatile UINT32 | m_CurQueueSize |
| tpool_work_t * | m_pQueueHead |
| tpool_work_t * | m_pQueueTail |
| volatile bool | m_bQueueClosed |
| volatile bool | m_bShutdown |
| CAMutex * | m_pmutexQueue |
| CAConditionVariable * | m_pcondNotEmpty |
| CAConditionVariable * | m_pcondNotFull |
| CAConditionVariable * | m_pcondEmpty |
Friends | |
| THREAD_RETURN | worker_thread_main_loop (void *args) |
This class bla bla.
Definition at line 31 of file CAThreadPool.hpp.
| CAThreadPool::CAThreadPool | ( | UINT32 | num_worker_threads, |
| UINT32 | max_queue_size, | ||
| bool | b_do_not_block_when_full | ||
| ) |
Definition at line 13 of file CAThreadPool.cpp.
References m_bDoNotBlockWhenFull, m_bQueueClosed, m_bShutdown, m_CurQueueSize, m_MaxQueueSize, m_NumThreads, m_parThreads, m_pcondEmpty, m_pcondNotEmpty, m_pcondNotFull, m_pmutexQueue, m_pQueueHead, m_pQueueTail, CAThread::setMainLoop(), CAThread::start(), and worker_thread_main_loop.
{
UINT i;
/* initialize fields */
m_NumThreads = num_worker_threads;
m_MaxQueueSize = max_queue_size;
m_bDoNotBlockWhenFull = b_do_not_block_when_full;
m_parThreads=new CAThread*[num_worker_threads];
m_CurQueueSize = 0;
m_pQueueHead = NULL;
m_pQueueTail = NULL;
m_bQueueClosed = false;
m_bShutdown = false;
m_pmutexQueue=new CAMutex();
m_pcondEmpty=new CAConditionVariable();
m_pcondNotEmpty=new CAConditionVariable();
m_pcondNotFull=new CAConditionVariable();
char thread_str[24];
/* create threads */
for (i = 0; i != num_worker_threads; i++)
{
snprintf(thread_str, 16, "Pool Thread %3d", i);
m_parThreads[i]=new CAThread((UINT8*)thread_str);
m_parThreads[i]->setMainLoop(worker_thread_main_loop);
m_parThreads[i]->start(this);
}
}
| CAThreadPool::~CAThreadPool | ( | ) | [inline] |
Definition at line 37 of file CAThreadPool.hpp.
References destroy().
{
destroy(true);
}
| SINT32 CAThreadPool::addRequest | ( | THREAD_MAIN_TYP | routine, |
| void * | args | ||
| ) |
Adds a new request (task) to this threadpool.
| E_SPACe | if there was no more space in the waiting queue and we do not want to wait for an other request to finish |
| E_SUCCESS | if this request was added to the working queue |
Definition at line 51 of file CAThreadPool.cpp.
References tpool_work::arg, CAConditionVariable::broadcast(), E_SPACE, E_SUCCESS, E_UNKNOWN, CAMutex::lock(), m_bDoNotBlockWhenFull, m_bQueueClosed, m_bShutdown, m_CurQueueSize, m_MaxQueueSize, m_pcondNotEmpty, m_pcondNotFull, m_pmutexQueue, m_pQueueHead, m_pQueueTail, tpool_work::next, CAMsg::printMsg(), tpool_work::routine, CAMutex::unlock(), and CAConditionVariable::wait().
Referenced by fm_loopAcceptUsers().
{
m_pmutexQueue->lock();
tpool_work_t *workp;
// no space and this caller doesn't want to wait
if ((m_CurQueueSize == m_MaxQueueSize) && m_bDoNotBlockWhenFull)
{
m_pmutexQueue->unlock();
return E_SPACE;
}
while((m_CurQueueSize == m_MaxQueueSize) &&
(!(m_bShutdown || m_bQueueClosed)) )
{
CAMsg::printMsg(LOG_INFO,"CAThreadPool::addRequest() -the Thread pool is full...waiting!\n");
m_pcondNotFull->wait(m_pmutexQueue);
}
// the pool is in the process of being destroyed
if (m_bShutdown || m_bQueueClosed)
{
m_pmutexQueue->unlock();
return E_UNKNOWN;
}
// allocate work structure
workp = new tpool_work_t;
workp->routine = routine;
workp->arg = args;
workp->next = NULL;
if (m_CurQueueSize == 0)
{
m_pQueueTail = m_pQueueHead = workp;
m_pcondNotEmpty->broadcast();
}
else
{
m_pQueueTail->next = workp;
m_pQueueTail = workp;
}
m_CurQueueSize++;
m_pmutexQueue->unlock();
return E_SUCCESS;
}
| UINT32 CAThreadPool::countRequests | ( | ) | [inline] |
Definition at line 46 of file CAThreadPool.hpp.
References m_CurQueueSize.
Referenced by fm_loopAcceptUsers().
{
return m_CurQueueSize;
}
| SINT32 CAThreadPool::destroy | ( | bool | bWaitForFinish | ) |
Definition at line 99 of file CAThreadPool.cpp.
References CAConditionVariable::broadcast(), E_SUCCESS, CAThread::join(), CAMutex::lock(), m_bQueueClosed, m_bShutdown, m_CurQueueSize, m_NumThreads, m_parThreads, m_pcondEmpty, m_pcondNotEmpty, m_pcondNotFull, m_pmutexQueue, m_pQueueHead, tpool_work::next, CAMutex::unlock(), and CAConditionVariable::wait().
Referenced by CAFirstMixB::loop(), and ~CAThreadPool().
{
tpool_work_t *cur_nodep;
// Is a shutdown already in progress?
if (m_bQueueClosed || m_bShutdown)
{
return E_SUCCESS;
}
m_pmutexQueue->lock();
m_bQueueClosed = true;
// If the finish flag is set, wait for workers to
// drain queue
if (bWaitForFinish)
{
while (m_CurQueueSize != 0)
{
m_pcondEmpty->wait(m_pmutexQueue);
}
}
m_bShutdown = true;
m_pmutexQueue->unlock();
// Wake up any workers so they recheck shutdown flag
m_pcondNotEmpty->broadcast();
m_pcondNotFull->broadcast();
// Wait for workers to exit
for(UINT32 i=0; i < m_NumThreads; i++)
{
m_parThreads[i]->join();
delete m_parThreads[i];
m_parThreads[i] = NULL;
}
// Now free pool structures
delete[] m_parThreads;
m_parThreads = NULL;
while(m_pQueueHead != NULL)
{
cur_nodep = m_pQueueHead->next;
delete m_pQueueHead;
m_pQueueHead = cur_nodep;
}
delete m_pmutexQueue;
m_pmutexQueue = NULL;
delete m_pcondEmpty;
m_pcondEmpty = NULL;
delete m_pcondNotEmpty;
m_pcondNotEmpty = NULL;
delete m_pcondNotFull;
m_pcondNotFull = NULL;
return E_SUCCESS;
}
| THREAD_RETURN worker_thread_main_loop | ( | void * | args | ) | [friend] |
Definition at line 155 of file CAThreadPool.cpp.
Referenced by CAThreadPool().
{
CAThreadPool* pPool = (CAThreadPool*)arg;
tpool_work_t *my_workp;
for(;;)
{
// Check queue for work
pPool->m_pmutexQueue->lock();
while ((pPool->m_CurQueueSize == 0) && (!pPool->m_bShutdown))
{
pPool->m_pcondNotEmpty->wait(pPool->m_pmutexQueue);
}
//sSleep(5);
// Has a shutdown started while i was sleeping?
if (pPool->m_bShutdown)
{
pPool->m_pmutexQueue->unlock();
THREAD_RETURN_SUCCESS;
}
// Get to work, dequeue the next item
my_workp = pPool->m_pQueueHead;
pPool->m_CurQueueSize--;
if (pPool->m_CurQueueSize == 0)
pPool->m_pQueueHead = pPool->m_pQueueTail = NULL;
else
pPool->m_pQueueHead = my_workp->next;
// Handle waiting add_work threads
if ((!pPool->m_bDoNotBlockWhenFull) &&
(pPool->m_CurQueueSize == (pPool->m_MaxQueueSize - 1)))
pPool->m_pcondNotFull->broadcast();
// Handle waiting destroyer threads
if (pPool->m_CurQueueSize == 0)
pPool->m_pcondEmpty->signal();
pPool->m_pmutexQueue->unlock();
// Do this work item
(*(my_workp->routine))(my_workp->arg);
delete my_workp;
my_workp = NULL;
}
}
bool CAThreadPool::m_bDoNotBlockWhenFull [private] |
Definition at line 55 of file CAThreadPool.hpp.
Referenced by addRequest(), CAThreadPool(), and worker_thread_main_loop().
volatile bool CAThreadPool::m_bQueueClosed [private] |
Definition at line 61 of file CAThreadPool.hpp.
Referenced by addRequest(), CAThreadPool(), and destroy().
volatile bool CAThreadPool::m_bShutdown [private] |
Definition at line 62 of file CAThreadPool.hpp.
Referenced by addRequest(), CAThreadPool(), destroy(), and worker_thread_main_loop().
volatile UINT32 CAThreadPool::m_CurQueueSize [private] |
Definition at line 58 of file CAThreadPool.hpp.
Referenced by addRequest(), CAThreadPool(), countRequests(), destroy(), and worker_thread_main_loop().
UINT32 CAThreadPool::m_MaxQueueSize [private] |
Definition at line 54 of file CAThreadPool.hpp.
Referenced by addRequest(), CAThreadPool(), and worker_thread_main_loop().
UINT32 CAThreadPool::m_NumThreads [private] |
Definition at line 53 of file CAThreadPool.hpp.
Referenced by CAThreadPool(), and destroy().
CAThread** CAThreadPool::m_parThreads [private] |
Definition at line 57 of file CAThreadPool.hpp.
Referenced by CAThreadPool(), and destroy().
CAConditionVariable* CAThreadPool::m_pcondEmpty [private] |
Definition at line 67 of file CAThreadPool.hpp.
Referenced by CAThreadPool(), destroy(), and worker_thread_main_loop().
CAConditionVariable* CAThreadPool::m_pcondNotEmpty [private] |
Definition at line 65 of file CAThreadPool.hpp.
Referenced by addRequest(), CAThreadPool(), destroy(), and worker_thread_main_loop().
CAConditionVariable* CAThreadPool::m_pcondNotFull [private] |
Definition at line 66 of file CAThreadPool.hpp.
Referenced by addRequest(), CAThreadPool(), destroy(), and worker_thread_main_loop().
CAMutex* CAThreadPool::m_pmutexQueue [private] |
Definition at line 64 of file CAThreadPool.hpp.
Referenced by addRequest(), CAThreadPool(), destroy(), and worker_thread_main_loop().
tpool_work_t* CAThreadPool::m_pQueueHead [private] |
Definition at line 59 of file CAThreadPool.hpp.
Referenced by addRequest(), CAThreadPool(), destroy(), and worker_thread_main_loop().
tpool_work_t* CAThreadPool::m_pQueueTail [private] |
Definition at line 60 of file CAThreadPool.hpp.
Referenced by addRequest(), CAThreadPool(), and worker_thread_main_loop().
1.7.6.1