Mixe for Privacy and Anonymity in the Internet
CAThreadPool.cpp
Go to the documentation of this file.
00001 /********************************************************
00002  * A Thread pool class inspired by:
00003  * "Using POSIX Threads: Programming with Pthreads"
00004  *     by Brad nichols, Dick Buttlar, Jackie Farrell
00005  *     O'Reilly & Associates, Inc.
00006  */
00007 #include "StdAfx.h"
00008 #ifndef ONLY_LOCAL_PROXY
00009 #include "CAThreadPool.hpp"
00010 #include "CAMsg.hpp"
00011 void *tpool_thread(void *);
00012 
00013 CAThreadPool::CAThreadPool( UINT32 num_worker_threads, 
00014                             UINT32 max_queue_size,
00015                             bool   b_do_not_block_when_full)
00016   {
00017     UINT i;
00018     
00019     /* initialize fields */
00020     m_NumThreads = num_worker_threads;
00021     m_MaxQueueSize = max_queue_size;
00022     m_bDoNotBlockWhenFull = b_do_not_block_when_full;
00023     m_parThreads=new CAThread*[num_worker_threads];
00024     m_CurQueueSize = 0;
00025     m_pQueueHead = NULL; 
00026     m_pQueueTail = NULL;
00027     m_bQueueClosed = false;  
00028     m_bShutdown = false; 
00029     m_pmutexQueue=new CAMutex();
00030     m_pcondEmpty=new CAConditionVariable();
00031     m_pcondNotEmpty=new CAConditionVariable();
00032     m_pcondNotFull=new CAConditionVariable();
00033 
00034     char thread_str[24];
00035     
00036     /* create threads */
00037     for (i = 0; i != num_worker_threads; i++) 
00038       {
00039         snprintf(thread_str, 16, "Pool Thread %3d", i);
00040         m_parThreads[i]=new CAThread((UINT8*)thread_str);
00041         m_parThreads[i]->setMainLoop(worker_thread_main_loop);
00042         m_parThreads[i]->start(this);
00043       }
00044   }
00045 
00051 SINT32 CAThreadPool::addRequest(THREAD_MAIN_TYP routine, void *args)
00052 {
00053   m_pmutexQueue->lock();
00054   tpool_work_t *workp;
00055 
00056   // no space and this caller doesn't want to wait 
00057   if ((m_CurQueueSize == m_MaxQueueSize) && m_bDoNotBlockWhenFull) 
00058     {
00059       m_pmutexQueue->unlock();
00060       return E_SPACE;
00061     }
00062 
00063   while((m_CurQueueSize == m_MaxQueueSize) && 
00064         (!(m_bShutdown || m_bQueueClosed))  )
00065     {
00066       CAMsg::printMsg(LOG_INFO,"CAThreadPool::addRequest() -the Thread pool is full...waiting!\n");
00067       m_pcondNotFull->wait(m_pmutexQueue);
00068     }
00069 
00070   // the pool is in the process of being destroyed 
00071   if (m_bShutdown || m_bQueueClosed)
00072     {
00073       m_pmutexQueue->unlock();
00074       return E_UNKNOWN;
00075     }
00076 
00077   // allocate work structure 
00078   workp = new tpool_work_t;
00079   workp->routine = routine;
00080   workp->arg = args;
00081   workp->next = NULL;
00082 
00083   if (m_CurQueueSize == 0)
00084     {
00085       m_pQueueTail = m_pQueueHead = workp;
00086       m_pcondNotEmpty->broadcast();
00087     }
00088   else
00089     {
00090       m_pQueueTail->next = workp;
00091       m_pQueueTail = workp;
00092     }
00093 
00094   m_CurQueueSize++; 
00095   m_pmutexQueue->unlock();
00096   return E_SUCCESS;
00097 }
00098 
00099 SINT32 CAThreadPool::destroy(bool bWaitForFinish)
00100   {
00101     tpool_work_t *cur_nodep;
00102     // Is a shutdown already in progress?
00103     if (m_bQueueClosed || m_bShutdown)
00104       {
00105         return E_SUCCESS;
00106       }
00107 
00108     m_pmutexQueue->lock();
00109     m_bQueueClosed = true;
00110     // If the finish flag is set, wait for workers to 
00111     //   drain queue  
00112     if (bWaitForFinish)
00113       {
00114         while (m_CurQueueSize != 0)
00115           {
00116             m_pcondEmpty->wait(m_pmutexQueue);
00117           }
00118       }
00119 
00120     m_bShutdown = true;
00121     m_pmutexQueue->unlock();
00122 
00123     // Wake up any workers so they recheck shutdown flag 
00124     m_pcondNotEmpty->broadcast();
00125     m_pcondNotFull->broadcast();
00126 
00127     // Wait for workers to exit 
00128     for(UINT32 i=0; i < m_NumThreads; i++) 
00129       {
00130         m_parThreads[i]->join();
00131         delete m_parThreads[i];
00132         m_parThreads[i] = NULL;
00133       }
00134     // Now free pool structures 
00135     delete[] m_parThreads;
00136     m_parThreads = NULL;
00137     while(m_pQueueHead != NULL)
00138     {
00139       cur_nodep = m_pQueueHead->next;
00140       delete m_pQueueHead; 
00141       m_pQueueHead = cur_nodep;     
00142     }
00143     delete m_pmutexQueue;
00144     m_pmutexQueue = NULL;
00145     delete m_pcondEmpty;
00146     m_pcondEmpty = NULL;
00147     delete m_pcondNotEmpty;
00148     m_pcondNotEmpty = NULL;
00149     delete m_pcondNotFull;
00150     m_pcondNotFull = NULL;
00151 
00152     return E_SUCCESS;
00153   }
00154 
00155 THREAD_RETURN worker_thread_main_loop(void *arg)
00156 {
00157   CAThreadPool* pPool = (CAThreadPool*)arg; 
00158   tpool_work_t  *my_workp;
00159   
00160   for(;;)
00161     {
00162     // Check queue for work  
00163       pPool->m_pmutexQueue->lock();
00164       while ((pPool->m_CurQueueSize == 0) && (!pPool->m_bShutdown))
00165         {
00166           pPool->m_pcondNotEmpty->wait(pPool->m_pmutexQueue);
00167         }
00168       //sSleep(5); 
00169       // Has a shutdown started while i was sleeping? 
00170       if (pPool->m_bShutdown)
00171         {
00172           pPool->m_pmutexQueue->unlock();
00173           THREAD_RETURN_SUCCESS;
00174         }
00175 
00176       // Get to work, dequeue the next item  
00177       my_workp = pPool->m_pQueueHead;
00178       pPool->m_CurQueueSize--;
00179       if (pPool->m_CurQueueSize == 0)
00180         pPool->m_pQueueHead = pPool->m_pQueueTail = NULL;
00181       else
00182         pPool->m_pQueueHead = my_workp->next;
00183  
00184       // Handle waiting add_work threads 
00185       if ((!pPool->m_bDoNotBlockWhenFull) &&
00186           (pPool->m_CurQueueSize ==  (pPool->m_MaxQueueSize - 1))) 
00187           pPool->m_pcondNotFull->broadcast();
00188       // Handle waiting destroyer threads 
00189       if (pPool->m_CurQueueSize == 0)
00190         pPool->m_pcondEmpty->signal();
00191       pPool->m_pmutexQueue->unlock();
00192       
00193       // Do this work item 
00194       (*(my_workp->routine))(my_workp->arg);
00195       delete my_workp;
00196       my_workp = NULL;
00197   } 
00198 }
00199 #endif //ONLY_LOCAL_PROXY