|
Mixe for Privacy and Anonymity in the Internet
|
This is a simple FIFO-Queue. More...
#include <CAQueue.hpp>
Public Member Functions | |
| CAQueue (UINT32 expectedElementSize=0) | |
| Give the size of the amount of data what you will add in one step. | |
| ~CAQueue () | |
| Deletes this Queue and all stored data. | |
| SINT32 | add (const void *buff, UINT32 size) |
| Adds data to the Queue. | |
| SINT32 | close () |
| Closes the Queue (for writing). | |
| SINT32 | get (UINT8 *pbuff, UINT32 *psize) |
| Gets up to psize number of bytes from the Queue. | |
| SINT32 | getOrWait (UINT8 *pbuff, UINT32 *psize) |
| Gets data from the Queue or waits until some data is available, if the Queue is empty. | |
| SINT32 | getOrWait (UINT8 *pbuff, UINT32 *psize, UINT32 msTimeOut) |
| Gets data from the Queue or waits until some data is available, if the Queue is empty or a timeout is reached. | |
| SINT32 | peek (UINT8 *pbuff, UINT32 *psize) |
| Peeks data from the Queue. | |
| SINT32 | remove (UINT32 *psize) |
| Removes data from the Queue. | |
| SINT32 | clean () |
| Removes any stored data from the Queue. | |
| UINT32 | getSize () |
| Returns the size of stored data in byte. | |
| bool | isEmpty () |
| Returns true, if the Queue is empty. | |
| bool | isClosed () |
| Returns true, if the Queue is closed. | |
Static Public Member Functions | |
| static SINT32 | test () |
| Method to test the Queue. | |
Private Attributes | |
| QUEUE * | m_Queue |
| QUEUE * | m_lastElem |
| volatile UINT32 | m_nQueueSize |
| volatile bool | m_bClosed |
| UINT32 | m_nExpectedElementSize |
| CAMutex * | m_pcsQueue |
| CAConditionVariable * | m_pconvarSize |
This is a simple FIFO-Queue.
You can add data and get them back. This class is thread safe. TODO: The handling of getAndWait is not correct because remove could intercept.... Maybe we do not neeed an other Mutex other than then ConVar....
Definition at line 49 of file CAQueue.hpp.
| CAQueue::CAQueue | ( | UINT32 | expectedElementSize = 0 | ) | [inline] |
Give the size of the amount of data what you will add in one step.
Used for optimizations. Use expectedElementSize=0, if you have no idea about the typicall amount of data added in one call to add().
Definition at line 57 of file CAQueue.hpp.
References m_bClosed, m_lastElem, m_nExpectedElementSize, m_nQueueSize, m_pconvarSize, m_pcsQueue, and m_Queue.
Referenced by test().
{
m_Queue=NULL;
m_lastElem=NULL;
m_nExpectedElementSize=expectedElementSize;
m_nQueueSize=0;
m_pcsQueue=new CAMutex();
m_pconvarSize=new CAConditionVariable();
m_bClosed=false;
#ifdef QUEUE_SIZE_LOG
m_nLogSize=0;
#endif
//m_pHeap=NULL;
//incHeap();
}
Deletes this Queue and all stored data.
Definition at line 36 of file CAQueue.cpp.
References clean(), m_pconvarSize, and m_pcsQueue.
{
clean();
delete m_pcsQueue;
m_pcsQueue = NULL;
delete m_pconvarSize;
m_pconvarSize = NULL;
}
| SINT32 CAQueue::add | ( | const void * | buff, |
| UINT32 | size | ||
| ) |
Adds data to the Queue.
| buff | pointer to the data buffer |
| size | size of data to add |
| E_UNKNOWN | in case of an error |
| E_SUCCESS | if succesful |
Definition at line 76 of file CAQueue.cpp.
References E_SUCCESS, E_UNKNOWN, _t_queue::index, CAMutex::lock(), m_lastElem, m_nQueueSize, m_pconvarSize, m_pcsQueue, m_Queue, _t_queue::next, _t_queue::pBuff, CAMsg::printMsg(), CAConditionVariable::signal(), _t_queue::size, and CAMutex::unlock().
Referenced by CAChain::addDataToUpstreamQueue(), CALastMix::clean(), CAFirstMix::clean(), CAFirstMixA::closeConnection(), MemFormatTarget::dumpMem(), fm_loopReadFromMix(), lm_loopReadFromMix(), CALastMixA::loop(), CAFirstMixB::loop(), CAFirstMixA::loop(), CALastMixB::loop(), mm_loopReadFromMixAfter(), mm_loopReadFromMixBefore(), CAFirstMixA::notifyAllUserChannels(), producer(), CAControlChannelDispatcher::sendMessages(), test(), and MemFormatTarget::writeChars().
{
if(size==0)
return E_SUCCESS;
if(buff==NULL)
return E_UNKNOWN;
m_pcsQueue->lock();
//if(m_pHeap==NULL)
// incHeap();
#ifdef _DEBUG
// if(m_nExpectedElementSize>0&&size>(m_nExpectedElementSize<<1))
/*if(size>1500)
{
CAMsg::printMsg(LOG_DEBUG,"CAQueue::add() WARNING: request for add %u bytes in a queue with expected element size of %u bytes !\n",size,m_nExpectedElementSize);
}*/
#endif
if(m_Queue==NULL)
{
/*m_Queue=m_pHeap;
m_pHeap=m_pHeap->next;
if(size>m_nExpectedElementSize)
{
delete[] m_Queue->pBuff;
m_Queue->pBuff=new UINT8[size];
}*/
m_Queue=new QUEUE;
m_Queue->pBuff=new UINT8[size];
m_Queue->next=NULL;
m_Queue->index=0;
m_Queue->size=size;
memcpy(m_Queue->pBuff,buff,size);
m_lastElem=m_Queue;
}
else
{
/* m_lastElem->next=m_pHeap;
m_lastElem=m_pHeap;
m_pHeap=m_pHeap->next;
if(size>m_nExpectedElementSize)
{
delete[] m_lastElem->pBuff;
m_lastElem->pBuff=new UINT8[size];
}*/
m_lastElem->next=new QUEUE;
m_lastElem=m_lastElem->next;
m_lastElem->pBuff=new UINT8[size];
m_lastElem->next=NULL;
m_lastElem->size=size;
m_lastElem->index=0;
memcpy(m_lastElem->pBuff,buff,size);
}
m_nQueueSize+=size;
#ifdef QUEUE_SIZE_LOG
if(m_nLogSize!=0 && m_nQueueSize>m_nLogSize)
{
CAMsg::printMsg(LOG_DEBUG,"CAQueue::add() WARNING: queue size is now %u bytes which is above the expected maximum size of %u\n !\n",m_nQueueSize,m_nLogSize);
}
#endif
m_pcsQueue->unlock();
m_pconvarSize->lock();
m_pconvarSize->signal();
m_pconvarSize->unlock();
return E_SUCCESS;
}
| SINT32 CAQueue::clean | ( | ) |
Removes any stored data from the Queue.
Definition at line 47 of file CAQueue.cpp.
References E_SUCCESS, CAMutex::lock(), m_lastElem, m_nQueueSize, m_pcsQueue, m_Queue, _t_queue::next, _t_queue::pBuff, and CAMutex::unlock().
Referenced by CAFirstMixA::checkUserConnections(), CAChain::closeChainInternal(), CAChain::~CAChain(), and ~CAQueue().
{
m_pcsQueue->lock();
while(m_Queue!=NULL)
{
delete[] m_Queue->pBuff;
m_Queue->pBuff = NULL;
m_lastElem=m_Queue;
m_Queue=m_Queue->next;
delete m_lastElem;
}
/* while(m_pHeap!=NULL)
{
delete[] m_pHeap->pBuff;
m_lastElem=m_pHeap;
m_pHeap=m_pHeap->next;
delete m_lastElem;
}*/
m_nQueueSize=0;
m_lastElem=NULL;
m_pcsQueue->unlock();
return E_SUCCESS;
}
| SINT32 CAQueue::close | ( | ) | [inline] |
Closes the Queue (for writing).
One can still read the remaing bytes out of the queue.
Definition at line 77 of file CAQueue.hpp.
References E_SUCCESS, CAMutex::lock(), m_bClosed, m_pcsQueue, and CAMutex::unlock().
Referenced by CALastMixA::loop().
{
m_pcsQueue->lock();
m_bClosed=true;
m_pcsQueue->unlock();
return E_SUCCESS;
}
| SINT32 CAQueue::get | ( | UINT8 * | pbuff, |
| UINT32 * | psize | ||
| ) |
Gets up to psize number of bytes from the Queue.
The data is removed from the Queue.
| pbuff | pointer to a buffer, there the data should be stored |
| psize | on call contains the size of pbuff, on return contains the size of returned data |
| E_SUCCESS | if succesful |
| E_UNKNOWN | in case of an error |
Definition at line 149 of file CAQueue.cpp.
References E_SUCCESS, E_UNKNOWN, _t_queue::index, CAMutex::lock(), m_nQueueSize, m_pcsQueue, m_Queue, _t_queue::next, _t_queue::pBuff, _t_queue::size, and CAMutex::unlock().
Referenced by CAFirstMix::doUserLogin_internal(), CALastMixA::loop(), CAFirstMixB::loop(), CAFirstMixA::loop(), CALastMixB::loop(), CAFirstMixA::sendToUsers(), and test().
{
if(pbuff==NULL||psize==NULL)
return E_UNKNOWN;
if(*psize==0)
return E_SUCCESS;
if(m_Queue==NULL)
{
*psize=0;
return E_SUCCESS;
}
m_pcsQueue->lock();
UINT32 space=*psize;
*psize=0;
while(space>=m_Queue->size)
{
memcpy(pbuff,m_Queue->pBuff+m_Queue->index,m_Queue->size);
*psize+=m_Queue->size;
pbuff+=m_Queue->size;
space-=m_Queue->size;
m_nQueueSize-=m_Queue->size;
QUEUE* tmp=m_Queue;
m_Queue=m_Queue->next;
//tmp->next=m_pHeap;
//m_pHeap=tmp;
delete[] tmp->pBuff;
tmp->pBuff = NULL;
delete tmp;
if(m_Queue==NULL)
{
m_pcsQueue->unlock();
return E_SUCCESS;
}
}
if(space>0)
{
memcpy(pbuff,m_Queue->pBuff+m_Queue->index,space);
*psize+=space;
m_Queue->size-=space;
m_Queue->index+=space;
m_nQueueSize-=space;
//memmove(m_Queue->pBuff,m_Queue->pBuff+space,m_Queue->size);
}
m_pcsQueue->unlock();
return E_SUCCESS;
}
| SINT32 CAQueue::getOrWait | ( | UINT8 * | pbuff, |
| UINT32 * | psize | ||
| ) |
Gets data from the Queue or waits until some data is available, if the Queue is empty.
The data is removed from the Queue.
| pbuff | pointer to a buffer, there the data should be stored |
| psize | on call contains the size of pbuff, on return contains the size of returned data |
| E_SUCCESS | if succesful |
| E_UNKNOWN | in case of an error |
Definition at line 205 of file CAQueue.cpp.
References CAMutex::lock(), m_pconvarSize, m_Queue, CAMutex::unlock(), and CAConditionVariable::wait().
Referenced by consumer(), fm_loopSendToMix(), lm_loopSendToMix(), mm_loopSendToMixAfter(), and mm_loopSendToMixBefore().
{
m_pconvarSize->lock();
while(m_Queue==NULL)
m_pconvarSize->wait();
SINT32 ret=get(pbuff,psize);
m_pconvarSize->unlock();
return ret;
}
| SINT32 CAQueue::getOrWait | ( | UINT8 * | pbuff, |
| UINT32 * | psize, | ||
| UINT32 | msTimeout | ||
| ) |
Gets data from the Queue or waits until some data is available, if the Queue is empty or a timeout is reached.
The data is removed from the Queue.
| pbuff | pointer to a buffer, there the data should be stored |
| psize | on call contains the size of pbuff, on return contains the size of returned data |
| msTimeout | timeout in milli seconds |
| E_SUCCESS | if succesful |
| E_TIMEDOUT | if timeout was reached |
| E_UNKNOWN | in case of an error |
Definition at line 226 of file CAQueue.cpp.
References E_TIMEDOUT, CAMutex::lock(), m_pconvarSize, m_Queue, CAMutex::unlock(), and CAConditionVariable::wait().
{
m_pconvarSize->lock();
SINT32 ret;
while(m_Queue==NULL)
{
ret=m_pconvarSize->wait(msTimeout);
if(ret==E_TIMEDOUT)
{
m_pconvarSize->unlock();
return E_TIMEDOUT;
}
}
ret=get(pbuff,psize);
m_pconvarSize->unlock();
return ret;
}
| UINT32 CAQueue::getSize | ( | ) | [inline] |
Returns the size of stored data in byte.
Definition at line 95 of file CAQueue.hpp.
References CAMutex::lock(), m_nQueueSize, m_pcsQueue, and CAMutex::unlock().
Referenced by CAFirstMixA::checkUserConnections(), CAFirstMix::doUserLogin_internal(), MemFormatTarget::dumpMem(), fm_loopReadFromMix(), lm_loopReadFromMix(), CALastMixA::loop(), CAFirstMixB::loop(), CAFirstMixA::loop(), CALastMixB::loop(), mm_loopReadFromMixAfter(), mm_loopReadFromMixBefore(), CAFirstMixA::sendToUsers(), and test().
{
if (m_pcsQueue)
{
m_pcsQueue->lock();
UINT32 s=m_nQueueSize;
m_pcsQueue->unlock();
return s;
}
return 0;
}
| bool CAQueue::isClosed | ( | ) | [inline] |
Returns true, if the Queue is closed.
| true,if | Queue is closed |
| false,otherwise |
Definition at line 120 of file CAQueue.hpp.
References m_bClosed.
Referenced by CALastMixA::loop().
{
return m_bClosed;
}
| bool CAQueue::isEmpty | ( | ) | [inline] |
Returns true, if the Queue is empty.
| true,if | Queue is empty |
| false,if | Queue contains data |
Definition at line 111 of file CAQueue.hpp.
References m_Queue.
Referenced by CALastMixA::loop(), CAChain::sendUpstreamData(), and test().
{
return (m_Queue==NULL);
}
| SINT32 CAQueue::peek | ( | UINT8 * | pbuff, |
| UINT32 * | psize | ||
| ) |
Peeks data from the Queue.
The data is NOT removed from the Queue.
| pbuff | pointer to a buffer, where the data should be stored |
| psize | on call contains the size of pbuff, on return contains the size of returned data |
| E_SUCCESS | if succesful |
| E_CLOSED | if the queue is already empty AND closed |
| E_UNKNOWN | in case of an error |
Definition at line 252 of file CAQueue.cpp.
References E_CLOSED, E_SUCCESS, E_UNKNOWN, _t_queue::index, CAMutex::lock(), m_bClosed, m_pcsQueue, m_Queue, _t_queue::next, _t_queue::pBuff, _t_queue::size, and CAMutex::unlock().
Referenced by MemFormatTarget::dumpMem(), CALastMixA::loop(), and CAChain::sendUpstreamDataInternal().
{
if(pbuff==NULL||psize==NULL)
return E_UNKNOWN;
if(*psize==0)
return E_SUCCESS;
m_pcsQueue->lock();
UINT32 space=*psize;
*psize=0;
if(m_Queue==NULL)
{
SINT32 ret=E_SUCCESS;
if(m_bClosed)
ret=E_CLOSED;
m_pcsQueue->unlock();
return ret;
}
QUEUE* tmpQueue=m_Queue;
while(space>=tmpQueue->size)
{
memcpy(pbuff,tmpQueue->pBuff+tmpQueue->index,tmpQueue->size);
*psize+=tmpQueue->size;
pbuff+=tmpQueue->size;
space-=tmpQueue->size;
tmpQueue=tmpQueue->next;
if(tmpQueue==NULL)
{
m_pcsQueue->unlock();
return E_SUCCESS;
}
}
memcpy(pbuff,tmpQueue->pBuff+tmpQueue->index,space);
*psize+=space;
m_pcsQueue->unlock();
return E_SUCCESS;
}
| SINT32 CAQueue::remove | ( | UINT32 * | psize | ) |
Removes data from the Queue.
| psize | on call contains the size of data to remove, on return contains the size of removed data |
| E_SUCCESS | if succesful |
| E_UNKNOWN | in case of an error |
Definition at line 296 of file CAQueue.cpp.
References E_SUCCESS, E_UNKNOWN, _t_queue::index, CAMutex::lock(), m_nQueueSize, m_pcsQueue, m_Queue, _t_queue::next, _t_queue::pBuff, _t_queue::size, and CAMutex::unlock().
Referenced by CALastMixA::loop(), and CAChain::sendUpstreamDataInternal().
{
if(m_Queue==NULL||psize==NULL)
return E_UNKNOWN;
if(*psize==0)
return E_SUCCESS;
m_pcsQueue->lock();
UINT32 space=*psize;
*psize=0;
while(space>=m_Queue->size)
{
*psize+=m_Queue->size;
space-=m_Queue->size;
m_nQueueSize-=m_Queue->size;
QUEUE* tmp=m_Queue;
m_Queue=m_Queue->next;
// tmp->next=m_pHeap;
// m_pHeap=tmp;
delete[] tmp->pBuff;
tmp->pBuff = NULL;
delete tmp;
if(m_Queue==NULL)
{
m_pcsQueue->unlock();
return E_SUCCESS;
}
}
if(space>0)
{
*psize+=space;
m_Queue->size-=space;
m_nQueueSize-=space;
m_Queue->index+=space;
//memmove(m_Queue->pBuff,m_Queue->pBuff+space,m_Queue->size);
}
m_pcsQueue->unlock();
return E_SUCCESS;
}
| SINT32 CAQueue::test | ( | ) | [static] |
Method to test the Queue.
| E_SUCCESS,if | Queue implementation seams to be ok |
Definition at line 380 of file CAQueue.cpp.
References add(), __queue_test::buff, CAQueue(), consumer(), E_SUCCESS, E_UNKNOWN, get(), getRandom(), getSize(), isEmpty(), CAThread::join(), __queue_test::len, __queue_test::pQueue, producer(), CAThread::setMainLoop(), CAThread::start(), and TEST_SIZE.
Referenced by main().
{
CAQueue* pQueue=new CAQueue(1000);
#define TEST_SIZE 1000000
UINT8* source=new UINT8[TEST_SIZE];
UINT8* target=new UINT8[TEST_SIZE];
getRandom(source,TEST_SIZE);
UINT32 count=0;
UINT32 aktSize;
srand((unsigned)time( NULL ));
//Single Thread.....
//adding
while(TEST_SIZE-count>10000)
{
aktSize=rand();
aktSize%=0xFFFF;
aktSize%=(TEST_SIZE-count);
if(pQueue->add(source+count,aktSize)!=E_SUCCESS)
return E_UNKNOWN;
count+=aktSize;
if(pQueue->getSize()!=count)
return E_UNKNOWN;
}
if(pQueue->add(source+count,TEST_SIZE-count)!=E_SUCCESS)
return E_UNKNOWN;
if(pQueue->getSize()!=TEST_SIZE)
return E_UNKNOWN;
//getting
count=0;
while(!pQueue->isEmpty())
{
aktSize=rand();
aktSize%=0xFFFF;
if(pQueue->get(target+count,&aktSize)!=E_SUCCESS)
return E_UNKNOWN;
count+=aktSize;
}
if(count!=TEST_SIZE)
return E_UNKNOWN;
if(memcmp(source,target,TEST_SIZE)!=0)
return E_UNKNOWN;
//Multiple Threads....
CAThread* pthreadProducer=new CAThread((UINT8*)"Queue Producer Thread");
CAThread* pthreadConsumer=new CAThread((UINT8*)"Queue Consumer Thread");
pthreadProducer->setMainLoop(producer);
pthreadConsumer->setMainLoop(consumer);
struct __queue_test t1,t2;
t1.buff=source;
t2.buff=target;
t2.len=t1.len=TEST_SIZE;
t2.pQueue=t1.pQueue=pQueue;
pthreadProducer->start(&t1);
pthreadConsumer->start(&t2);
pthreadProducer->join();
pthreadConsumer->join();
delete pthreadProducer;
pthreadProducer = NULL;
delete pthreadConsumer;
pthreadConsumer = NULL;
delete pQueue;
pQueue = NULL;
if(memcmp(source,target,TEST_SIZE)!=0)
return E_UNKNOWN;
delete []source;
source = NULL;
delete []target;
target = NULL;
return E_SUCCESS;
}
volatile bool CAQueue::m_bClosed [private] |
Definition at line 141 of file CAQueue.hpp.
Referenced by CAQueue(), close(), isClosed(), and peek().
QUEUE* CAQueue::m_lastElem [private] |
Definition at line 139 of file CAQueue.hpp.
UINT32 CAQueue::m_nExpectedElementSize [private] |
Definition at line 142 of file CAQueue.hpp.
Referenced by CAQueue().
volatile UINT32 CAQueue::m_nQueueSize [private] |
CAConditionVariable* CAQueue::m_pconvarSize [private] |
Definition at line 145 of file CAQueue.hpp.
Referenced by add(), CAQueue(), getOrWait(), and ~CAQueue().
CAMutex* CAQueue::m_pcsQueue [private] |
QUEUE* CAQueue::m_Queue [private] |
Definition at line 138 of file CAQueue.hpp.
Referenced by add(), CAQueue(), clean(), get(), getOrWait(), isEmpty(), peek(), and remove().
1.7.6.1