|
Mixe for Privacy and Anonymity in the Internet
|
00001 /* 00002 Copyright (c) 2000, The JAP-Team 00003 All rights reserved. 00004 Redistribution and use in source and binary forms, with or without modification, 00005 are permitted provided that the following conditions are met: 00006 00007 - Redistributions of source code must retain the above copyright notice, 00008 this list of conditions and the following disclaimer. 00009 00010 - Redistributions in binary form must reproduce the above copyright notice, 00011 this list of conditions and the following disclaimer in the documentation and/or 00012 other materials provided with the distribution. 00013 00014 - Neither the name of the University of Technology Dresden, Germany nor the names of its contributors 00015 may be used to endorse or promote products derived from this software without specific 00016 prior written permission. 00017 00018 00019 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS ``AS IS'' AND ANY EXPRESS 00020 OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY 00021 AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS 00022 BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES 00023 (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, 00024 OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER 00025 IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 00026 OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE 00027 */ 00028 #include "StdAfx.h" 00029 #ifndef ONLY_LOCAL_PROXY 00030 #include "CAQueue.hpp" 00031 #include "CAMsg.hpp" 00032 #include "CAUtil.hpp" 00033 #include "CAThread.hpp" 00034 00036 CAQueue::~CAQueue() 00037 { 00038 clean(); 00039 delete m_pcsQueue; 00040 m_pcsQueue = NULL; 00041 delete m_pconvarSize; 00042 m_pconvarSize = NULL; 00043 } 00044 00047 SINT32 CAQueue::clean() 00048 { 00049 m_pcsQueue->lock(); 00050 while(m_Queue!=NULL) 00051 { 00052 delete[] m_Queue->pBuff; 00053 m_Queue->pBuff = NULL; 00054 m_lastElem=m_Queue; 00055 m_Queue=m_Queue->next; 00056 delete m_lastElem; 00057 } 00058 /* while(m_pHeap!=NULL) 00059 { 00060 delete[] m_pHeap->pBuff; 00061 m_lastElem=m_pHeap; 00062 m_pHeap=m_pHeap->next; 00063 delete m_lastElem; 00064 }*/ 00065 m_nQueueSize=0; 00066 m_lastElem=NULL; 00067 m_pcsQueue->unlock(); 00068 return E_SUCCESS; 00069 } 00076 SINT32 CAQueue::add(const void* buff,UINT32 size) 00077 { 00078 if(size==0) 00079 return E_SUCCESS; 00080 if(buff==NULL) 00081 return E_UNKNOWN; 00082 m_pcsQueue->lock(); 00083 //if(m_pHeap==NULL) 00084 // incHeap(); 00085 #ifdef _DEBUG 00086 // if(m_nExpectedElementSize>0&&size>(m_nExpectedElementSize<<1)) 00087 /*if(size>1500) 00088 { 00089 CAMsg::printMsg(LOG_DEBUG,"CAQueue::add() WARNING: request for add %u bytes in a queue with expected element size of %u bytes !\n",size,m_nExpectedElementSize); 00090 }*/ 00091 #endif 00092 if(m_Queue==NULL) 00093 { 00094 /*m_Queue=m_pHeap; 00095 m_pHeap=m_pHeap->next; 00096 if(size>m_nExpectedElementSize) 00097 { 00098 delete[] m_Queue->pBuff; 00099 m_Queue->pBuff=new UINT8[size]; 00100 }*/ 00101 m_Queue=new QUEUE; 00102 m_Queue->pBuff=new UINT8[size]; 00103 m_Queue->next=NULL; 00104 m_Queue->index=0; 00105 m_Queue->size=size; 00106 memcpy(m_Queue->pBuff,buff,size); 00107 m_lastElem=m_Queue; 00108 } 00109 else 00110 { 00111 /* m_lastElem->next=m_pHeap; 00112 m_lastElem=m_pHeap; 00113 m_pHeap=m_pHeap->next; 00114 if(size>m_nExpectedElementSize) 00115 { 00116 delete[] m_lastElem->pBuff; 00117 m_lastElem->pBuff=new UINT8[size]; 00118 }*/ 00119 m_lastElem->next=new QUEUE; 00120 m_lastElem=m_lastElem->next; 00121 m_lastElem->pBuff=new UINT8[size]; 00122 m_lastElem->next=NULL; 00123 m_lastElem->size=size; 00124 m_lastElem->index=0; 00125 memcpy(m_lastElem->pBuff,buff,size); 00126 } 00127 m_nQueueSize+=size; 00128 #ifdef QUEUE_SIZE_LOG 00129 if(m_nLogSize!=0 && m_nQueueSize>m_nLogSize) 00130 { 00131 CAMsg::printMsg(LOG_DEBUG,"CAQueue::add() WARNING: queue size is now %u bytes which is above the expected maximum size of %u\n !\n",m_nQueueSize,m_nLogSize); 00132 } 00133 #endif 00134 m_pcsQueue->unlock(); 00135 m_pconvarSize->lock(); 00136 m_pconvarSize->signal(); 00137 m_pconvarSize->unlock(); 00138 return E_SUCCESS; 00139 } 00140 00149 SINT32 CAQueue::get(UINT8* pbuff,UINT32* psize) 00150 { 00151 if(pbuff==NULL||psize==NULL) 00152 return E_UNKNOWN; 00153 if(*psize==0) 00154 return E_SUCCESS; 00155 if(m_Queue==NULL) 00156 { 00157 *psize=0; 00158 return E_SUCCESS; 00159 } 00160 m_pcsQueue->lock(); 00161 UINT32 space=*psize; 00162 *psize=0; 00163 while(space>=m_Queue->size) 00164 { 00165 memcpy(pbuff,m_Queue->pBuff+m_Queue->index,m_Queue->size); 00166 *psize+=m_Queue->size; 00167 pbuff+=m_Queue->size; 00168 space-=m_Queue->size; 00169 m_nQueueSize-=m_Queue->size; 00170 QUEUE* tmp=m_Queue; 00171 m_Queue=m_Queue->next; 00172 //tmp->next=m_pHeap; 00173 //m_pHeap=tmp; 00174 delete[] tmp->pBuff; 00175 tmp->pBuff = NULL; 00176 delete tmp; 00177 if(m_Queue==NULL) 00178 { 00179 m_pcsQueue->unlock(); 00180 return E_SUCCESS; 00181 } 00182 } 00183 if(space>0) 00184 { 00185 memcpy(pbuff,m_Queue->pBuff+m_Queue->index,space); 00186 *psize+=space; 00187 m_Queue->size-=space; 00188 m_Queue->index+=space; 00189 m_nQueueSize-=space; 00190 //memmove(m_Queue->pBuff,m_Queue->pBuff+space,m_Queue->size); 00191 } 00192 m_pcsQueue->unlock(); 00193 return E_SUCCESS; 00194 } 00195 00205 SINT32 CAQueue::getOrWait(UINT8* pbuff,UINT32* psize) 00206 { 00207 m_pconvarSize->lock(); 00208 while(m_Queue==NULL) 00209 m_pconvarSize->wait(); 00210 SINT32 ret=get(pbuff,psize); 00211 m_pconvarSize->unlock(); 00212 return ret; 00213 } 00214 00226 SINT32 CAQueue::getOrWait(UINT8* pbuff,UINT32* psize,UINT32 msTimeout) 00227 { 00228 m_pconvarSize->lock(); 00229 SINT32 ret; 00230 while(m_Queue==NULL) 00231 { 00232 ret=m_pconvarSize->wait(msTimeout); 00233 if(ret==E_TIMEDOUT) 00234 { 00235 m_pconvarSize->unlock(); 00236 return E_TIMEDOUT; 00237 } 00238 } 00239 ret=get(pbuff,psize); 00240 m_pconvarSize->unlock(); 00241 return ret; 00242 } 00243 00252 SINT32 CAQueue::peek(UINT8* pbuff,UINT32* psize) 00253 { 00254 if(pbuff==NULL||psize==NULL) 00255 return E_UNKNOWN; 00256 if(*psize==0) 00257 return E_SUCCESS; 00258 m_pcsQueue->lock(); 00259 UINT32 space=*psize; 00260 *psize=0; 00261 if(m_Queue==NULL) 00262 { 00263 SINT32 ret=E_SUCCESS; 00264 if(m_bClosed) 00265 ret=E_CLOSED; 00266 m_pcsQueue->unlock(); 00267 return ret; 00268 } 00269 QUEUE* tmpQueue=m_Queue; 00270 while(space>=tmpQueue->size) 00271 { 00272 memcpy(pbuff,tmpQueue->pBuff+tmpQueue->index,tmpQueue->size); 00273 *psize+=tmpQueue->size; 00274 pbuff+=tmpQueue->size; 00275 space-=tmpQueue->size; 00276 tmpQueue=tmpQueue->next; 00277 if(tmpQueue==NULL) 00278 { 00279 m_pcsQueue->unlock(); 00280 return E_SUCCESS; 00281 } 00282 } 00283 memcpy(pbuff,tmpQueue->pBuff+tmpQueue->index,space); 00284 *psize+=space; 00285 m_pcsQueue->unlock(); 00286 return E_SUCCESS; 00287 } 00288 00289 00296 SINT32 CAQueue::remove(UINT32* psize) 00297 { 00298 if(m_Queue==NULL||psize==NULL) 00299 return E_UNKNOWN; 00300 if(*psize==0) 00301 return E_SUCCESS; 00302 m_pcsQueue->lock(); 00303 UINT32 space=*psize; 00304 *psize=0; 00305 while(space>=m_Queue->size) 00306 { 00307 *psize+=m_Queue->size; 00308 space-=m_Queue->size; 00309 m_nQueueSize-=m_Queue->size; 00310 QUEUE* tmp=m_Queue; 00311 m_Queue=m_Queue->next; 00312 // tmp->next=m_pHeap; 00313 // m_pHeap=tmp; 00314 delete[] tmp->pBuff; 00315 tmp->pBuff = NULL; 00316 delete tmp; 00317 if(m_Queue==NULL) 00318 { 00319 m_pcsQueue->unlock(); 00320 return E_SUCCESS; 00321 } 00322 } 00323 if(space>0) 00324 { 00325 *psize+=space; 00326 m_Queue->size-=space; 00327 m_nQueueSize-=space; 00328 m_Queue->index+=space; 00329 //memmove(m_Queue->pBuff,m_Queue->pBuff+space,m_Queue->size); 00330 } 00331 m_pcsQueue->unlock(); 00332 return E_SUCCESS; 00333 } 00334 00335 struct __queue_test 00336 { 00337 CAQueue* pQueue; 00338 UINT8* buff; 00339 SINT32 len; 00340 }; 00341 00342 THREAD_RETURN producer(void* param) 00343 { 00344 struct __queue_test* pTest=(struct __queue_test *)param; 00345 UINT32 count=0; 00346 UINT32 aktSize; 00347 while(pTest->len>10) 00348 { 00349 aktSize=rand(); 00350 aktSize%=0xFFFF; 00351 aktSize%=pTest->len; 00352 if(pTest->pQueue->add(pTest->buff+count,aktSize)!=E_SUCCESS) 00353 THREAD_RETURN_ERROR; 00354 count+=aktSize; 00355 pTest->len-=aktSize; 00356 msSleep(rand()%100); 00357 } 00358 if(pTest->pQueue->add(pTest->buff+count,pTest->len)!=E_SUCCESS) 00359 THREAD_RETURN_ERROR; 00360 THREAD_RETURN_SUCCESS; 00361 } 00362 00363 THREAD_RETURN consumer(void* param) 00364 { 00365 struct __queue_test* pTest=(struct __queue_test *)param; 00366 UINT32 count=0; 00367 UINT32 aktSize; 00368 do 00369 { 00370 aktSize=rand(); 00371 aktSize%=0xFFFF; 00372 if(pTest->pQueue->getOrWait(pTest->buff+count,&aktSize)!=E_SUCCESS) 00373 THREAD_RETURN_ERROR; 00374 count+=aktSize; 00375 pTest->len-=aktSize; 00376 }while(pTest->len>10); 00377 THREAD_RETURN_SUCCESS; 00378 } 00379 00380 SINT32 CAQueue::test() 00381 { 00382 CAQueue* pQueue=new CAQueue(1000); 00383 #define TEST_SIZE 1000000 00384 UINT8* source=new UINT8[TEST_SIZE]; 00385 UINT8* target=new UINT8[TEST_SIZE]; 00386 getRandom(source,TEST_SIZE); 00387 UINT32 count=0; 00388 UINT32 aktSize; 00389 00390 srand((unsigned)time( NULL )); 00391 00392 //Single Thread..... 00393 //adding 00394 00395 while(TEST_SIZE-count>10000) 00396 { 00397 aktSize=rand(); 00398 aktSize%=0xFFFF; 00399 aktSize%=(TEST_SIZE-count); 00400 if(pQueue->add(source+count,aktSize)!=E_SUCCESS) 00401 return E_UNKNOWN; 00402 count+=aktSize; 00403 if(pQueue->getSize()!=count) 00404 return E_UNKNOWN; 00405 } 00406 if(pQueue->add(source+count,TEST_SIZE-count)!=E_SUCCESS) 00407 return E_UNKNOWN; 00408 if(pQueue->getSize()!=TEST_SIZE) 00409 return E_UNKNOWN; 00410 00411 //getting 00412 count=0; 00413 while(!pQueue->isEmpty()) 00414 { 00415 aktSize=rand(); 00416 aktSize%=0xFFFF; 00417 if(pQueue->get(target+count,&aktSize)!=E_SUCCESS) 00418 return E_UNKNOWN; 00419 count+=aktSize; 00420 } 00421 if(count!=TEST_SIZE) 00422 return E_UNKNOWN; 00423 if(memcmp(source,target,TEST_SIZE)!=0) 00424 return E_UNKNOWN; 00425 00426 //Multiple Threads.... 00427 CAThread* pthreadProducer=new CAThread((UINT8*)"Queue Producer Thread"); 00428 CAThread* pthreadConsumer=new CAThread((UINT8*)"Queue Consumer Thread"); 00429 pthreadProducer->setMainLoop(producer); 00430 pthreadConsumer->setMainLoop(consumer); 00431 struct __queue_test t1,t2; 00432 t1.buff=source; 00433 t2.buff=target; 00434 t2.len=t1.len=TEST_SIZE; 00435 t2.pQueue=t1.pQueue=pQueue; 00436 pthreadProducer->start(&t1); 00437 pthreadConsumer->start(&t2); 00438 pthreadProducer->join(); 00439 pthreadConsumer->join(); 00440 delete pthreadProducer; 00441 pthreadProducer = NULL; 00442 delete pthreadConsumer; 00443 pthreadConsumer = NULL; 00444 delete pQueue; 00445 pQueue = NULL; 00446 if(memcmp(source,target,TEST_SIZE)!=0) 00447 return E_UNKNOWN; 00448 00449 delete []source; 00450 source = NULL; 00451 delete []target; 00452 target = NULL; 00453 return E_SUCCESS; 00454 } 00455 #endif //ONLY_LOCAL_PROXY
1.7.6.1