|
Mixe for Privacy and Anonymity in the Internet
|
00001 /******************************************************** 00002 * A Thread pool class inspired by: 00003 * "Using POSIX Threads: Programming with Pthreads" 00004 * by Brad nichols, Dick Buttlar, Jackie Farrell 00005 * O'Reilly & Associates, Inc. 00006 */ 00007 #include "StdAfx.h" 00008 #ifndef ONLY_LOCAL_PROXY 00009 #include "CAThreadPool.hpp" 00010 #include "CAMsg.hpp" 00011 void *tpool_thread(void *); 00012 00013 CAThreadPool::CAThreadPool( UINT32 num_worker_threads, 00014 UINT32 max_queue_size, 00015 bool b_do_not_block_when_full) 00016 { 00017 UINT i; 00018 00019 /* initialize fields */ 00020 m_NumThreads = num_worker_threads; 00021 m_MaxQueueSize = max_queue_size; 00022 m_bDoNotBlockWhenFull = b_do_not_block_when_full; 00023 m_parThreads=new CAThread*[num_worker_threads]; 00024 m_CurQueueSize = 0; 00025 m_pQueueHead = NULL; 00026 m_pQueueTail = NULL; 00027 m_bQueueClosed = false; 00028 m_bShutdown = false; 00029 m_pmutexQueue=new CAMutex(); 00030 m_pcondEmpty=new CAConditionVariable(); 00031 m_pcondNotEmpty=new CAConditionVariable(); 00032 m_pcondNotFull=new CAConditionVariable(); 00033 00034 char thread_str[24]; 00035 00036 /* create threads */ 00037 for (i = 0; i != num_worker_threads; i++) 00038 { 00039 snprintf(thread_str, 16, "Pool Thread %3d", i); 00040 m_parThreads[i]=new CAThread((UINT8*)thread_str); 00041 m_parThreads[i]->setMainLoop(worker_thread_main_loop); 00042 m_parThreads[i]->start(this); 00043 } 00044 } 00045 00051 SINT32 CAThreadPool::addRequest(THREAD_MAIN_TYP routine, void *args) 00052 { 00053 m_pmutexQueue->lock(); 00054 tpool_work_t *workp; 00055 00056 // no space and this caller doesn't want to wait 00057 if ((m_CurQueueSize == m_MaxQueueSize) && m_bDoNotBlockWhenFull) 00058 { 00059 m_pmutexQueue->unlock(); 00060 return E_SPACE; 00061 } 00062 00063 while((m_CurQueueSize == m_MaxQueueSize) && 00064 (!(m_bShutdown || m_bQueueClosed)) ) 00065 { 00066 CAMsg::printMsg(LOG_INFO,"CAThreadPool::addRequest() -the Thread pool is full...waiting!\n"); 00067 m_pcondNotFull->wait(m_pmutexQueue); 00068 } 00069 00070 // the pool is in the process of being destroyed 00071 if (m_bShutdown || m_bQueueClosed) 00072 { 00073 m_pmutexQueue->unlock(); 00074 return E_UNKNOWN; 00075 } 00076 00077 // allocate work structure 00078 workp = new tpool_work_t; 00079 workp->routine = routine; 00080 workp->arg = args; 00081 workp->next = NULL; 00082 00083 if (m_CurQueueSize == 0) 00084 { 00085 m_pQueueTail = m_pQueueHead = workp; 00086 m_pcondNotEmpty->broadcast(); 00087 } 00088 else 00089 { 00090 m_pQueueTail->next = workp; 00091 m_pQueueTail = workp; 00092 } 00093 00094 m_CurQueueSize++; 00095 m_pmutexQueue->unlock(); 00096 return E_SUCCESS; 00097 } 00098 00099 SINT32 CAThreadPool::destroy(bool bWaitForFinish) 00100 { 00101 tpool_work_t *cur_nodep; 00102 // Is a shutdown already in progress? 00103 if (m_bQueueClosed || m_bShutdown) 00104 { 00105 return E_SUCCESS; 00106 } 00107 00108 m_pmutexQueue->lock(); 00109 m_bQueueClosed = true; 00110 // If the finish flag is set, wait for workers to 00111 // drain queue 00112 if (bWaitForFinish) 00113 { 00114 while (m_CurQueueSize != 0) 00115 { 00116 m_pcondEmpty->wait(m_pmutexQueue); 00117 } 00118 } 00119 00120 m_bShutdown = true; 00121 m_pmutexQueue->unlock(); 00122 00123 // Wake up any workers so they recheck shutdown flag 00124 m_pcondNotEmpty->broadcast(); 00125 m_pcondNotFull->broadcast(); 00126 00127 // Wait for workers to exit 00128 for(UINT32 i=0; i < m_NumThreads; i++) 00129 { 00130 m_parThreads[i]->join(); 00131 delete m_parThreads[i]; 00132 m_parThreads[i] = NULL; 00133 } 00134 // Now free pool structures 00135 delete[] m_parThreads; 00136 m_parThreads = NULL; 00137 while(m_pQueueHead != NULL) 00138 { 00139 cur_nodep = m_pQueueHead->next; 00140 delete m_pQueueHead; 00141 m_pQueueHead = cur_nodep; 00142 } 00143 delete m_pmutexQueue; 00144 m_pmutexQueue = NULL; 00145 delete m_pcondEmpty; 00146 m_pcondEmpty = NULL; 00147 delete m_pcondNotEmpty; 00148 m_pcondNotEmpty = NULL; 00149 delete m_pcondNotFull; 00150 m_pcondNotFull = NULL; 00151 00152 return E_SUCCESS; 00153 } 00154 00155 THREAD_RETURN worker_thread_main_loop(void *arg) 00156 { 00157 CAThreadPool* pPool = (CAThreadPool*)arg; 00158 tpool_work_t *my_workp; 00159 00160 for(;;) 00161 { 00162 // Check queue for work 00163 pPool->m_pmutexQueue->lock(); 00164 while ((pPool->m_CurQueueSize == 0) && (!pPool->m_bShutdown)) 00165 { 00166 pPool->m_pcondNotEmpty->wait(pPool->m_pmutexQueue); 00167 } 00168 //sSleep(5); 00169 // Has a shutdown started while i was sleeping? 00170 if (pPool->m_bShutdown) 00171 { 00172 pPool->m_pmutexQueue->unlock(); 00173 THREAD_RETURN_SUCCESS; 00174 } 00175 00176 // Get to work, dequeue the next item 00177 my_workp = pPool->m_pQueueHead; 00178 pPool->m_CurQueueSize--; 00179 if (pPool->m_CurQueueSize == 0) 00180 pPool->m_pQueueHead = pPool->m_pQueueTail = NULL; 00181 else 00182 pPool->m_pQueueHead = my_workp->next; 00183 00184 // Handle waiting add_work threads 00185 if ((!pPool->m_bDoNotBlockWhenFull) && 00186 (pPool->m_CurQueueSize == (pPool->m_MaxQueueSize - 1))) 00187 pPool->m_pcondNotFull->broadcast(); 00188 // Handle waiting destroyer threads 00189 if (pPool->m_CurQueueSize == 0) 00190 pPool->m_pcondEmpty->signal(); 00191 pPool->m_pmutexQueue->unlock(); 00192 00193 // Do this work item 00194 (*(my_workp->routine))(my_workp->arg); 00195 delete my_workp; 00196 my_workp = NULL; 00197 } 00198 } 00199 #endif //ONLY_LOCAL_PROXY
1.7.6.1