Mixe for Privacy and Anonymity in the Internet
Public Member Functions | Private Attributes | Friends
CAThreadPool Class Reference

This class bla bla. More...

#include <CAThreadPool.hpp>

Collaboration diagram for CAThreadPool:
[legend]

List of all members.

Public Member Functions

 CAThreadPool (UINT32 num_worker_threads, UINT32 max_queue_size, bool b_do_not_block_when_full)
 ~CAThreadPool ()
SINT32 destroy (bool bWaitForFinish)
SINT32 addRequest (THREAD_MAIN_TYP, void *args)
 Adds a new request (task) to this threadpool.
UINT32 countRequests ()

Private Attributes

UINT32 m_NumThreads
UINT32 m_MaxQueueSize
bool m_bDoNotBlockWhenFull
CAThread ** m_parThreads
volatile UINT32 m_CurQueueSize
tpool_work_tm_pQueueHead
tpool_work_tm_pQueueTail
volatile bool m_bQueueClosed
volatile bool m_bShutdown
CAMutexm_pmutexQueue
CAConditionVariablem_pcondNotEmpty
CAConditionVariablem_pcondNotFull
CAConditionVariablem_pcondEmpty

Friends

THREAD_RETURN worker_thread_main_loop (void *args)

Detailed Description

This class bla bla.

Definition at line 31 of file CAThreadPool.hpp.


Constructor & Destructor Documentation

CAThreadPool::CAThreadPool ( UINT32  num_worker_threads,
UINT32  max_queue_size,
bool  b_do_not_block_when_full 
)

Definition at line 13 of file CAThreadPool.cpp.

References m_bDoNotBlockWhenFull, m_bQueueClosed, m_bShutdown, m_CurQueueSize, m_MaxQueueSize, m_NumThreads, m_parThreads, m_pcondEmpty, m_pcondNotEmpty, m_pcondNotFull, m_pmutexQueue, m_pQueueHead, m_pQueueTail, CAThread::setMainLoop(), CAThread::start(), and worker_thread_main_loop.

  {
    UINT i;
    
    /* initialize fields */
    m_NumThreads = num_worker_threads;
    m_MaxQueueSize = max_queue_size;
    m_bDoNotBlockWhenFull = b_do_not_block_when_full;
    m_parThreads=new CAThread*[num_worker_threads];
    m_CurQueueSize = 0;
    m_pQueueHead = NULL; 
    m_pQueueTail = NULL;
    m_bQueueClosed = false;  
    m_bShutdown = false; 
    m_pmutexQueue=new CAMutex();
    m_pcondEmpty=new CAConditionVariable();
    m_pcondNotEmpty=new CAConditionVariable();
    m_pcondNotFull=new CAConditionVariable();

    char thread_str[24];
    
    /* create threads */
    for (i = 0; i != num_worker_threads; i++) 
      {
        snprintf(thread_str, 16, "Pool Thread %3d", i);
        m_parThreads[i]=new CAThread((UINT8*)thread_str);
        m_parThreads[i]->setMainLoop(worker_thread_main_loop);
        m_parThreads[i]->start(this);
      }
  }

Here is the call graph for this function:

Definition at line 37 of file CAThreadPool.hpp.

References destroy().

        {
          destroy(true);
        }

Here is the call graph for this function:


Member Function Documentation

SINT32 CAThreadPool::addRequest ( THREAD_MAIN_TYP  routine,
void *  args 
)

Adds a new request (task) to this threadpool.

Return values:
E_SPACeif there was no more space in the waiting queue and we do not want to wait for an other request to finish
E_SUCCESSif this request was added to the working queue

Definition at line 51 of file CAThreadPool.cpp.

References tpool_work::arg, CAConditionVariable::broadcast(), E_SPACE, E_SUCCESS, E_UNKNOWN, CAMutex::lock(), m_bDoNotBlockWhenFull, m_bQueueClosed, m_bShutdown, m_CurQueueSize, m_MaxQueueSize, m_pcondNotEmpty, m_pcondNotFull, m_pmutexQueue, m_pQueueHead, m_pQueueTail, tpool_work::next, CAMsg::printMsg(), tpool_work::routine, CAMutex::unlock(), and CAConditionVariable::wait().

Referenced by fm_loopAcceptUsers().

{
  m_pmutexQueue->lock();
  tpool_work_t *workp;

  // no space and this caller doesn't want to wait 
  if ((m_CurQueueSize == m_MaxQueueSize) && m_bDoNotBlockWhenFull) 
    {
      m_pmutexQueue->unlock();
      return E_SPACE;
    }

  while((m_CurQueueSize == m_MaxQueueSize) && 
        (!(m_bShutdown || m_bQueueClosed))  )
    {
      CAMsg::printMsg(LOG_INFO,"CAThreadPool::addRequest() -the Thread pool is full...waiting!\n");
      m_pcondNotFull->wait(m_pmutexQueue);
    }

  // the pool is in the process of being destroyed 
  if (m_bShutdown || m_bQueueClosed)
    {
      m_pmutexQueue->unlock();
      return E_UNKNOWN;
    }

  // allocate work structure 
  workp = new tpool_work_t;
  workp->routine = routine;
  workp->arg = args;
  workp->next = NULL;

  if (m_CurQueueSize == 0)
    {
      m_pQueueTail = m_pQueueHead = workp;
      m_pcondNotEmpty->broadcast();
    }
  else
    {
      m_pQueueTail->next = workp;
      m_pQueueTail = workp;
    }

  m_CurQueueSize++; 
  m_pmutexQueue->unlock();
  return E_SUCCESS;
}

Here is the call graph for this function:

Definition at line 46 of file CAThreadPool.hpp.

References m_CurQueueSize.

Referenced by fm_loopAcceptUsers().

      {
        return m_CurQueueSize;
      }
SINT32 CAThreadPool::destroy ( bool  bWaitForFinish)

Definition at line 99 of file CAThreadPool.cpp.

References CAConditionVariable::broadcast(), E_SUCCESS, CAThread::join(), CAMutex::lock(), m_bQueueClosed, m_bShutdown, m_CurQueueSize, m_NumThreads, m_parThreads, m_pcondEmpty, m_pcondNotEmpty, m_pcondNotFull, m_pmutexQueue, m_pQueueHead, tpool_work::next, CAMutex::unlock(), and CAConditionVariable::wait().

Referenced by CAFirstMixB::loop(), and ~CAThreadPool().

  {
    tpool_work_t *cur_nodep;
    // Is a shutdown already in progress?
    if (m_bQueueClosed || m_bShutdown)
      {
        return E_SUCCESS;
      }

    m_pmutexQueue->lock();
    m_bQueueClosed = true;
    // If the finish flag is set, wait for workers to 
    //   drain queue  
    if (bWaitForFinish)
      {
        while (m_CurQueueSize != 0)
          {
            m_pcondEmpty->wait(m_pmutexQueue);
          }
      }

    m_bShutdown = true;
    m_pmutexQueue->unlock();

    // Wake up any workers so they recheck shutdown flag 
    m_pcondNotEmpty->broadcast();
    m_pcondNotFull->broadcast();

    // Wait for workers to exit 
    for(UINT32 i=0; i < m_NumThreads; i++) 
      {
        m_parThreads[i]->join();
        delete m_parThreads[i];
        m_parThreads[i] = NULL;
      }
    // Now free pool structures 
    delete[] m_parThreads;
    m_parThreads = NULL;
    while(m_pQueueHead != NULL)
    {
      cur_nodep = m_pQueueHead->next;
      delete m_pQueueHead; 
      m_pQueueHead = cur_nodep;     
    }
    delete m_pmutexQueue;
    m_pmutexQueue = NULL;
    delete m_pcondEmpty;
    m_pcondEmpty = NULL;
    delete m_pcondNotEmpty;
    m_pcondNotEmpty = NULL;
    delete m_pcondNotFull;
    m_pcondNotFull = NULL;

    return E_SUCCESS;
  }

Here is the call graph for this function:


Friends And Related Function Documentation

THREAD_RETURN worker_thread_main_loop ( void *  args) [friend]

Definition at line 155 of file CAThreadPool.cpp.

Referenced by CAThreadPool().

{
  CAThreadPool* pPool = (CAThreadPool*)arg; 
  tpool_work_t  *my_workp;
  
  for(;;)
    {
    // Check queue for work  
      pPool->m_pmutexQueue->lock();
      while ((pPool->m_CurQueueSize == 0) && (!pPool->m_bShutdown))
        {
          pPool->m_pcondNotEmpty->wait(pPool->m_pmutexQueue);
        }
      //sSleep(5); 
      // Has a shutdown started while i was sleeping? 
      if (pPool->m_bShutdown)
        {
          pPool->m_pmutexQueue->unlock();
          THREAD_RETURN_SUCCESS;
        }

      // Get to work, dequeue the next item  
      my_workp = pPool->m_pQueueHead;
      pPool->m_CurQueueSize--;
      if (pPool->m_CurQueueSize == 0)
        pPool->m_pQueueHead = pPool->m_pQueueTail = NULL;
      else
        pPool->m_pQueueHead = my_workp->next;
 
      // Handle waiting add_work threads 
      if ((!pPool->m_bDoNotBlockWhenFull) &&
          (pPool->m_CurQueueSize ==  (pPool->m_MaxQueueSize - 1))) 
          pPool->m_pcondNotFull->broadcast();
      // Handle waiting destroyer threads 
      if (pPool->m_CurQueueSize == 0)
        pPool->m_pcondEmpty->signal();
      pPool->m_pmutexQueue->unlock();
      
      // Do this work item 
      (*(my_workp->routine))(my_workp->arg);
      delete my_workp;
      my_workp = NULL;
  } 
}

Member Data Documentation

Definition at line 55 of file CAThreadPool.hpp.

Referenced by addRequest(), CAThreadPool(), and worker_thread_main_loop().

volatile bool CAThreadPool::m_bQueueClosed [private]

Definition at line 61 of file CAThreadPool.hpp.

Referenced by addRequest(), CAThreadPool(), and destroy().

volatile bool CAThreadPool::m_bShutdown [private]

Definition at line 62 of file CAThreadPool.hpp.

Referenced by addRequest(), CAThreadPool(), destroy(), and worker_thread_main_loop().

Definition at line 54 of file CAThreadPool.hpp.

Referenced by addRequest(), CAThreadPool(), and worker_thread_main_loop().

Definition at line 53 of file CAThreadPool.hpp.

Referenced by CAThreadPool(), and destroy().

Definition at line 57 of file CAThreadPool.hpp.

Referenced by CAThreadPool(), and destroy().

Definition at line 67 of file CAThreadPool.hpp.

Referenced by CAThreadPool(), destroy(), and worker_thread_main_loop().

Definition at line 65 of file CAThreadPool.hpp.

Referenced by addRequest(), CAThreadPool(), destroy(), and worker_thread_main_loop().

Definition at line 66 of file CAThreadPool.hpp.

Referenced by addRequest(), CAThreadPool(), destroy(), and worker_thread_main_loop().

Definition at line 64 of file CAThreadPool.hpp.

Referenced by addRequest(), CAThreadPool(), destroy(), and worker_thread_main_loop().

Definition at line 59 of file CAThreadPool.hpp.

Referenced by addRequest(), CAThreadPool(), destroy(), and worker_thread_main_loop().

Definition at line 60 of file CAThreadPool.hpp.

Referenced by addRequest(), CAThreadPool(), and worker_thread_main_loop().


The documentation for this class was generated from the following files: