Mixe for Privacy and Anonymity in the Internet
CAQueue.cpp
Go to the documentation of this file.
00001 /*
00002 Copyright (c) 2000, The JAP-Team 
00003 All rights reserved.
00004 Redistribution and use in source and binary forms, with or without modification, 
00005 are permitted provided that the following conditions are met:
00006 
00007   - Redistributions of source code must retain the above copyright notice, 
00008     this list of conditions and the following disclaimer.
00009 
00010   - Redistributions in binary form must reproduce the above copyright notice, 
00011     this list of conditions and the following disclaimer in the documentation and/or 
00012     other materials provided with the distribution.
00013 
00014   - Neither the name of the University of Technology Dresden, Germany nor the names of its contributors 
00015     may be used to endorse or promote products derived from this software without specific 
00016     prior written permission. 
00017 
00018   
00019 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS ``AS IS'' AND ANY EXPRESS 
00020 OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY 
00021 AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS
00022 BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
00023 (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, 
00024 OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER 
00025 IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 
00026 OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE
00027 */
00028 #include "StdAfx.h"
00029 #ifndef ONLY_LOCAL_PROXY
00030 #include "CAQueue.hpp"
00031 #include "CAMsg.hpp"
00032 #include "CAUtil.hpp"
00033 #include "CAThread.hpp"
00034 
00036 CAQueue::~CAQueue()
00037   {
00038     clean();
00039     delete m_pcsQueue;
00040     m_pcsQueue = NULL;
00041     delete m_pconvarSize;
00042     m_pconvarSize = NULL;
00043   }
00044 
00047 SINT32 CAQueue::clean()
00048   {
00049     m_pcsQueue->lock();
00050     while(m_Queue!=NULL)
00051       {
00052         delete[] m_Queue->pBuff;
00053         m_Queue->pBuff = NULL;
00054         m_lastElem=m_Queue;
00055         m_Queue=m_Queue->next;
00056         delete m_lastElem;
00057       }
00058 /*    while(m_pHeap!=NULL)
00059       {
00060         delete[] m_pHeap->pBuff;
00061         m_lastElem=m_pHeap;
00062         m_pHeap=m_pHeap->next;
00063         delete m_lastElem;
00064       }*/
00065     m_nQueueSize=0;
00066     m_lastElem=NULL;
00067     m_pcsQueue->unlock();
00068     return E_SUCCESS;
00069   }
00076 SINT32 CAQueue::add(const void* buff,UINT32 size)
00077   {
00078     if(size==0)
00079       return E_SUCCESS;
00080     if(buff==NULL)
00081       return E_UNKNOWN;
00082     m_pcsQueue->lock();
00083     //if(m_pHeap==NULL)
00084     //  incHeap();
00085 #ifdef _DEBUG
00086 //    if(m_nExpectedElementSize>0&&size>(m_nExpectedElementSize<<1))
00087     /*if(size>1500)
00088       {
00089         CAMsg::printMsg(LOG_DEBUG,"CAQueue::add() WARNING: request for add %u bytes in a queue with expected element size of %u bytes !\n",size,m_nExpectedElementSize);
00090       }*/
00091 #endif
00092     if(m_Queue==NULL)
00093       {
00094         /*m_Queue=m_pHeap;
00095         m_pHeap=m_pHeap->next;
00096         if(size>m_nExpectedElementSize)
00097           {
00098             delete[] m_Queue->pBuff;
00099             m_Queue->pBuff=new UINT8[size];
00100           }*/
00101         m_Queue=new QUEUE;
00102         m_Queue->pBuff=new UINT8[size];
00103         m_Queue->next=NULL;
00104         m_Queue->index=0;
00105         m_Queue->size=size;
00106         memcpy(m_Queue->pBuff,buff,size);
00107         m_lastElem=m_Queue;
00108       }
00109     else
00110       {
00111 /*        m_lastElem->next=m_pHeap;
00112         m_lastElem=m_pHeap;
00113         m_pHeap=m_pHeap->next;
00114         if(size>m_nExpectedElementSize)
00115           {
00116             delete[] m_lastElem->pBuff;
00117             m_lastElem->pBuff=new UINT8[size];
00118           }*/
00119         m_lastElem->next=new QUEUE;
00120         m_lastElem=m_lastElem->next;
00121         m_lastElem->pBuff=new UINT8[size];
00122         m_lastElem->next=NULL;
00123         m_lastElem->size=size;
00124         m_lastElem->index=0;
00125         memcpy(m_lastElem->pBuff,buff,size);
00126       }
00127     m_nQueueSize+=size;
00128 #ifdef QUEUE_SIZE_LOG
00129     if(m_nLogSize!=0 && m_nQueueSize>m_nLogSize)
00130       {
00131         CAMsg::printMsg(LOG_DEBUG,"CAQueue::add() WARNING: queue size is now %u bytes which is above the expected maximum size of %u\n !\n",m_nQueueSize,m_nLogSize);
00132       }
00133 #endif
00134     m_pcsQueue->unlock();
00135     m_pconvarSize->lock();
00136     m_pconvarSize->signal();
00137     m_pconvarSize->unlock();
00138     return E_SUCCESS;
00139   }
00140 
00149 SINT32 CAQueue::get(UINT8* pbuff,UINT32* psize)
00150   {
00151     if(pbuff==NULL||psize==NULL)
00152       return E_UNKNOWN;
00153     if(*psize==0)
00154       return E_SUCCESS;
00155     if(m_Queue==NULL)
00156       {
00157         *psize=0;
00158         return E_SUCCESS;
00159       }
00160     m_pcsQueue->lock();
00161     UINT32 space=*psize;
00162     *psize=0;
00163     while(space>=m_Queue->size)
00164       {
00165         memcpy(pbuff,m_Queue->pBuff+m_Queue->index,m_Queue->size);
00166         *psize+=m_Queue->size;
00167         pbuff+=m_Queue->size;
00168         space-=m_Queue->size;
00169         m_nQueueSize-=m_Queue->size;
00170         QUEUE* tmp=m_Queue;
00171         m_Queue=m_Queue->next;
00172         //tmp->next=m_pHeap;
00173         //m_pHeap=tmp;
00174         delete[] tmp->pBuff;
00175         tmp->pBuff = NULL;
00176         delete tmp;
00177         if(m_Queue==NULL)
00178           {
00179             m_pcsQueue->unlock();
00180             return E_SUCCESS;
00181           }
00182       }
00183     if(space>0)
00184       {
00185         memcpy(pbuff,m_Queue->pBuff+m_Queue->index,space);
00186         *psize+=space;
00187         m_Queue->size-=space;
00188         m_Queue->index+=space;
00189         m_nQueueSize-=space;
00190         //memmove(m_Queue->pBuff,m_Queue->pBuff+space,m_Queue->size);
00191       }
00192     m_pcsQueue->unlock();
00193     return E_SUCCESS;
00194   }
00195 
00205 SINT32 CAQueue::getOrWait(UINT8* pbuff,UINT32* psize)
00206   {
00207     m_pconvarSize->lock();
00208     while(m_Queue==NULL)
00209       m_pconvarSize->wait();
00210     SINT32 ret=get(pbuff,psize);
00211     m_pconvarSize->unlock();
00212     return ret;
00213   }
00214 
00226 SINT32 CAQueue::getOrWait(UINT8* pbuff,UINT32* psize,UINT32 msTimeout)
00227   {
00228     m_pconvarSize->lock();
00229     SINT32 ret;
00230     while(m_Queue==NULL)
00231       {
00232         ret=m_pconvarSize->wait(msTimeout);
00233         if(ret==E_TIMEDOUT)
00234           {
00235             m_pconvarSize->unlock();
00236             return E_TIMEDOUT;
00237           }
00238       }   
00239     ret=get(pbuff,psize);
00240     m_pconvarSize->unlock();
00241     return ret;
00242   }
00243 
00252 SINT32 CAQueue::peek(UINT8* pbuff,UINT32* psize)
00253   {
00254     if(pbuff==NULL||psize==NULL)
00255       return E_UNKNOWN;
00256     if(*psize==0)
00257       return E_SUCCESS;
00258     m_pcsQueue->lock();
00259     UINT32 space=*psize;
00260     *psize=0;
00261     if(m_Queue==NULL)
00262       {
00263         SINT32 ret=E_SUCCESS;
00264         if(m_bClosed)
00265           ret=E_CLOSED;
00266         m_pcsQueue->unlock();
00267         return ret;
00268       }
00269     QUEUE* tmpQueue=m_Queue;
00270     while(space>=tmpQueue->size)
00271       {
00272         memcpy(pbuff,tmpQueue->pBuff+tmpQueue->index,tmpQueue->size);
00273         *psize+=tmpQueue->size;
00274         pbuff+=tmpQueue->size;
00275         space-=tmpQueue->size;
00276         tmpQueue=tmpQueue->next;
00277         if(tmpQueue==NULL)
00278           {
00279             m_pcsQueue->unlock();
00280             return E_SUCCESS;
00281           }
00282       }
00283     memcpy(pbuff,tmpQueue->pBuff+tmpQueue->index,space);
00284     *psize+=space;
00285     m_pcsQueue->unlock();
00286     return E_SUCCESS;
00287   } 
00288   
00289   
00296 SINT32 CAQueue::remove(UINT32* psize)
00297   {
00298     if(m_Queue==NULL||psize==NULL)
00299       return E_UNKNOWN;
00300     if(*psize==0)
00301       return E_SUCCESS;
00302     m_pcsQueue->lock();
00303     UINT32 space=*psize;
00304     *psize=0;
00305     while(space>=m_Queue->size)
00306       {
00307         *psize+=m_Queue->size;
00308         space-=m_Queue->size;
00309         m_nQueueSize-=m_Queue->size;
00310         QUEUE* tmp=m_Queue;
00311         m_Queue=m_Queue->next;
00312 //        tmp->next=m_pHeap;
00313 //        m_pHeap=tmp;
00314         delete[] tmp->pBuff;
00315         tmp->pBuff = NULL;
00316         delete tmp;
00317         if(m_Queue==NULL)
00318           {
00319             m_pcsQueue->unlock();
00320             return E_SUCCESS;
00321           }
00322       }
00323     if(space>0)
00324       {
00325         *psize+=space;
00326         m_Queue->size-=space;
00327         m_nQueueSize-=space;
00328         m_Queue->index+=space;
00329         //memmove(m_Queue->pBuff,m_Queue->pBuff+space,m_Queue->size);
00330       }
00331     m_pcsQueue->unlock();
00332     return E_SUCCESS;
00333   }
00334 
00335 struct __queue_test
00336   {
00337     CAQueue* pQueue;
00338     UINT8* buff;
00339     SINT32 len;
00340   };
00341 
00342 THREAD_RETURN producer(void* param)
00343   {
00344     struct __queue_test* pTest=(struct __queue_test *)param;
00345     UINT32 count=0;
00346     UINT32 aktSize;
00347     while(pTest->len>10)
00348         {
00349           aktSize=rand();
00350           aktSize%=0xFFFF;
00351           aktSize%=pTest->len;
00352           if(pTest->pQueue->add(pTest->buff+count,aktSize)!=E_SUCCESS)
00353             THREAD_RETURN_ERROR;
00354           count+=aktSize;
00355           pTest->len-=aktSize;
00356           msSleep(rand()%100);
00357         }
00358     if(pTest->pQueue->add(pTest->buff+count,pTest->len)!=E_SUCCESS)
00359       THREAD_RETURN_ERROR;
00360     THREAD_RETURN_SUCCESS;
00361   }
00362 
00363 THREAD_RETURN consumer(void* param)
00364   {
00365     struct __queue_test* pTest=(struct __queue_test *)param;
00366     UINT32 count=0;
00367     UINT32 aktSize;
00368     do
00369       {
00370         aktSize=rand();
00371         aktSize%=0xFFFF;
00372         if(pTest->pQueue->getOrWait(pTest->buff+count,&aktSize)!=E_SUCCESS)
00373           THREAD_RETURN_ERROR;
00374         count+=aktSize;
00375         pTest->len-=aktSize;
00376       }while(pTest->len>10);
00377     THREAD_RETURN_SUCCESS;
00378   }
00379 
00380 SINT32 CAQueue::test()
00381   {
00382     CAQueue* pQueue=new CAQueue(1000);
00383     #define TEST_SIZE 1000000
00384     UINT8* source=new UINT8[TEST_SIZE];
00385     UINT8* target=new UINT8[TEST_SIZE];
00386     getRandom(source,TEST_SIZE);
00387     UINT32 count=0;
00388     UINT32 aktSize;
00389     
00390     srand((unsigned)time( NULL ));
00391     
00392     //Single Thread.....
00393     //adding
00394     
00395     while(TEST_SIZE-count>10000)
00396         {
00397           aktSize=rand();
00398           aktSize%=0xFFFF;
00399           aktSize%=(TEST_SIZE-count);
00400           if(pQueue->add(source+count,aktSize)!=E_SUCCESS)
00401             return E_UNKNOWN;
00402           count+=aktSize;
00403           if(pQueue->getSize()!=count)
00404             return E_UNKNOWN;
00405         }
00406     if(pQueue->add(source+count,TEST_SIZE-count)!=E_SUCCESS)
00407       return E_UNKNOWN;
00408     if(pQueue->getSize()!=TEST_SIZE)
00409       return E_UNKNOWN;
00410     
00411     //getting
00412     count=0;
00413     while(!pQueue->isEmpty())
00414       {
00415         aktSize=rand();
00416         aktSize%=0xFFFF;
00417         if(pQueue->get(target+count,&aktSize)!=E_SUCCESS)
00418           return E_UNKNOWN;
00419         count+=aktSize;
00420       }
00421     if(count!=TEST_SIZE)
00422       return E_UNKNOWN;
00423     if(memcmp(source,target,TEST_SIZE)!=0)
00424       return E_UNKNOWN;
00425     
00426     //Multiple Threads....
00427     CAThread* pthreadProducer=new CAThread((UINT8*)"Queue Producer Thread");
00428     CAThread* pthreadConsumer=new CAThread((UINT8*)"Queue Consumer Thread");
00429     pthreadProducer->setMainLoop(producer);
00430     pthreadConsumer->setMainLoop(consumer);
00431     struct __queue_test t1,t2;
00432     t1.buff=source;
00433     t2.buff=target;
00434     t2.len=t1.len=TEST_SIZE;
00435     t2.pQueue=t1.pQueue=pQueue;
00436     pthreadProducer->start(&t1);
00437     pthreadConsumer->start(&t2);
00438     pthreadProducer->join();
00439     pthreadConsumer->join();
00440     delete pthreadProducer;
00441     pthreadProducer = NULL;
00442     delete pthreadConsumer;
00443     pthreadConsumer = NULL;
00444     delete pQueue;
00445     pQueue = NULL;
00446     if(memcmp(source,target,TEST_SIZE)!=0)
00447       return E_UNKNOWN;
00448     
00449     delete []source;
00450     source = NULL;
00451     delete []target;
00452     target = NULL;
00453     return E_SUCCESS;
00454   }
00455 #endif //ONLY_LOCAL_PROXY