Mixe for Privacy and Anonymity in the Internet
Public Member Functions | Static Public Member Functions | Private Attributes
CAQueue Class Reference

This is a simple FIFO-Queue. More...

#include <CAQueue.hpp>

Collaboration diagram for CAQueue:
[legend]

List of all members.

Public Member Functions

 CAQueue (UINT32 expectedElementSize=0)
 Give the size of the amount of data what you will add in one step.
 ~CAQueue ()
 Deletes this Queue and all stored data.
SINT32 add (const void *buff, UINT32 size)
 Adds data to the Queue.
SINT32 close ()
 Closes the Queue (for writing).
SINT32 get (UINT8 *pbuff, UINT32 *psize)
 Gets up to psize number of bytes from the Queue.
SINT32 getOrWait (UINT8 *pbuff, UINT32 *psize)
 Gets data from the Queue or waits until some data is available, if the Queue is empty.
SINT32 getOrWait (UINT8 *pbuff, UINT32 *psize, UINT32 msTimeOut)
 Gets data from the Queue or waits until some data is available, if the Queue is empty or a timeout is reached.
SINT32 peek (UINT8 *pbuff, UINT32 *psize)
 Peeks data from the Queue.
SINT32 remove (UINT32 *psize)
 Removes data from the Queue.
SINT32 clean ()
 Removes any stored data from the Queue.
UINT32 getSize ()
 Returns the size of stored data in byte.
bool isEmpty ()
 Returns true, if the Queue is empty.
bool isClosed ()
 Returns true, if the Queue is closed.

Static Public Member Functions

static SINT32 test ()
 Method to test the Queue.

Private Attributes

QUEUEm_Queue
QUEUEm_lastElem
volatile UINT32 m_nQueueSize
volatile bool m_bClosed
UINT32 m_nExpectedElementSize
CAMutexm_pcsQueue
CAConditionVariablem_pconvarSize

Detailed Description

This is a simple FIFO-Queue.

You can add data and get them back. This class is thread safe. TODO: The handling of getAndWait is not correct because remove could intercept.... Maybe we do not neeed an other Mutex other than then ConVar....

Definition at line 49 of file CAQueue.hpp.


Constructor & Destructor Documentation

CAQueue::CAQueue ( UINT32  expectedElementSize = 0) [inline]

Give the size of the amount of data what you will add in one step.

Used for optimizations. Use expectedElementSize=0, if you have no idea about the typicall amount of data added in one call to add().

Definition at line 57 of file CAQueue.hpp.

References m_bClosed, m_lastElem, m_nExpectedElementSize, m_nQueueSize, m_pconvarSize, m_pcsQueue, and m_Queue.

Referenced by test().

        {
          m_Queue=NULL;
          m_lastElem=NULL;
          m_nExpectedElementSize=expectedElementSize;
          m_nQueueSize=0;
          m_pcsQueue=new CAMutex();
          m_pconvarSize=new CAConditionVariable();
          m_bClosed=false;
#ifdef QUEUE_SIZE_LOG
          m_nLogSize=0;
#endif
          //m_pHeap=NULL;
          //incHeap();
        }

Deletes this Queue and all stored data.

Definition at line 36 of file CAQueue.cpp.

References clean(), m_pconvarSize, and m_pcsQueue.

  {
    clean();
    delete m_pcsQueue;
    m_pcsQueue = NULL;
    delete m_pconvarSize;
    m_pconvarSize = NULL;
  }

Here is the call graph for this function:


Member Function Documentation

SINT32 CAQueue::add ( const void *  buff,
UINT32  size 
)

Adds data to the Queue.

Parameters:
buffpointer to the data buffer
sizesize of data to add
Return values:
E_UNKNOWNin case of an error
E_SUCCESSif succesful

Definition at line 76 of file CAQueue.cpp.

References E_SUCCESS, E_UNKNOWN, _t_queue::index, CAMutex::lock(), m_lastElem, m_nQueueSize, m_pconvarSize, m_pcsQueue, m_Queue, _t_queue::next, _t_queue::pBuff, CAMsg::printMsg(), CAConditionVariable::signal(), _t_queue::size, and CAMutex::unlock().

Referenced by CAChain::addDataToUpstreamQueue(), CALastMix::clean(), CAFirstMix::clean(), CAFirstMixA::closeConnection(), MemFormatTarget::dumpMem(), fm_loopReadFromMix(), lm_loopReadFromMix(), CALastMixA::loop(), CAFirstMixB::loop(), CAFirstMixA::loop(), CALastMixB::loop(), mm_loopReadFromMixAfter(), mm_loopReadFromMixBefore(), CAFirstMixA::notifyAllUserChannels(), producer(), CAControlChannelDispatcher::sendMessages(), test(), and MemFormatTarget::writeChars().

  {
    if(size==0)
      return E_SUCCESS;
    if(buff==NULL)
      return E_UNKNOWN;
    m_pcsQueue->lock();
    //if(m_pHeap==NULL)
    //  incHeap();
#ifdef _DEBUG
//    if(m_nExpectedElementSize>0&&size>(m_nExpectedElementSize<<1))
    /*if(size>1500)
      {
        CAMsg::printMsg(LOG_DEBUG,"CAQueue::add() WARNING: request for add %u bytes in a queue with expected element size of %u bytes !\n",size,m_nExpectedElementSize);
      }*/
#endif
    if(m_Queue==NULL)
      {
        /*m_Queue=m_pHeap;
        m_pHeap=m_pHeap->next;
        if(size>m_nExpectedElementSize)
          {
            delete[] m_Queue->pBuff;
            m_Queue->pBuff=new UINT8[size];
          }*/
        m_Queue=new QUEUE;
        m_Queue->pBuff=new UINT8[size];
        m_Queue->next=NULL;
        m_Queue->index=0;
        m_Queue->size=size;
        memcpy(m_Queue->pBuff,buff,size);
        m_lastElem=m_Queue;
      }
    else
      {
/*        m_lastElem->next=m_pHeap;
        m_lastElem=m_pHeap;
        m_pHeap=m_pHeap->next;
        if(size>m_nExpectedElementSize)
          {
            delete[] m_lastElem->pBuff;
            m_lastElem->pBuff=new UINT8[size];
          }*/
        m_lastElem->next=new QUEUE;
        m_lastElem=m_lastElem->next;
        m_lastElem->pBuff=new UINT8[size];
        m_lastElem->next=NULL;
        m_lastElem->size=size;
        m_lastElem->index=0;
        memcpy(m_lastElem->pBuff,buff,size);
      }
    m_nQueueSize+=size;
#ifdef QUEUE_SIZE_LOG
    if(m_nLogSize!=0 && m_nQueueSize>m_nLogSize)
      {
        CAMsg::printMsg(LOG_DEBUG,"CAQueue::add() WARNING: queue size is now %u bytes which is above the expected maximum size of %u\n !\n",m_nQueueSize,m_nLogSize);
      }
#endif
    m_pcsQueue->unlock();
    m_pconvarSize->lock();
    m_pconvarSize->signal();
    m_pconvarSize->unlock();
    return E_SUCCESS;
  }

Here is the call graph for this function:

Removes any stored data from the Queue.

Definition at line 47 of file CAQueue.cpp.

References E_SUCCESS, CAMutex::lock(), m_lastElem, m_nQueueSize, m_pcsQueue, m_Queue, _t_queue::next, _t_queue::pBuff, and CAMutex::unlock().

Referenced by CAFirstMixA::checkUserConnections(), CAChain::closeChainInternal(), CAChain::~CAChain(), and ~CAQueue().

  {
    m_pcsQueue->lock();
    while(m_Queue!=NULL)
      {
        delete[] m_Queue->pBuff;
        m_Queue->pBuff = NULL;
        m_lastElem=m_Queue;
        m_Queue=m_Queue->next;
        delete m_lastElem;
      }
/*    while(m_pHeap!=NULL)
      {
        delete[] m_pHeap->pBuff;
        m_lastElem=m_pHeap;
        m_pHeap=m_pHeap->next;
        delete m_lastElem;
      }*/
    m_nQueueSize=0;
    m_lastElem=NULL;
    m_pcsQueue->unlock();
    return E_SUCCESS;
  }

Here is the call graph for this function:

SINT32 CAQueue::close ( ) [inline]

Closes the Queue (for writing).

One can still read the remaing bytes out of the queue.

Definition at line 77 of file CAQueue.hpp.

References E_SUCCESS, CAMutex::lock(), m_bClosed, m_pcsQueue, and CAMutex::unlock().

Referenced by CALastMixA::loop().

        {
          m_pcsQueue->lock();
          m_bClosed=true;
          m_pcsQueue->unlock();
          return E_SUCCESS;
        }

Here is the call graph for this function:

SINT32 CAQueue::get ( UINT8 pbuff,
UINT32 psize 
)

Gets up to psize number of bytes from the Queue.

The data is removed from the Queue.

Parameters:
pbuffpointer to a buffer, there the data should be stored
psizeon call contains the size of pbuff, on return contains the size of returned data
Return values:
E_SUCCESSif succesful
E_UNKNOWNin case of an error

Definition at line 149 of file CAQueue.cpp.

References E_SUCCESS, E_UNKNOWN, _t_queue::index, CAMutex::lock(), m_nQueueSize, m_pcsQueue, m_Queue, _t_queue::next, _t_queue::pBuff, _t_queue::size, and CAMutex::unlock().

Referenced by CAFirstMix::doUserLogin_internal(), CALastMixA::loop(), CAFirstMixB::loop(), CAFirstMixA::loop(), CALastMixB::loop(), CAFirstMixA::sendToUsers(), and test().

  {
    if(pbuff==NULL||psize==NULL)
      return E_UNKNOWN;
    if(*psize==0)
      return E_SUCCESS;
    if(m_Queue==NULL)
      {
        *psize=0;
        return E_SUCCESS;
      }
    m_pcsQueue->lock();
    UINT32 space=*psize;
    *psize=0;
    while(space>=m_Queue->size)
      {
        memcpy(pbuff,m_Queue->pBuff+m_Queue->index,m_Queue->size);
        *psize+=m_Queue->size;
        pbuff+=m_Queue->size;
        space-=m_Queue->size;
        m_nQueueSize-=m_Queue->size;
        QUEUE* tmp=m_Queue;
        m_Queue=m_Queue->next;
        //tmp->next=m_pHeap;
        //m_pHeap=tmp;
        delete[] tmp->pBuff;
        tmp->pBuff = NULL;
        delete tmp;
        if(m_Queue==NULL)
          {
            m_pcsQueue->unlock();
            return E_SUCCESS;
          }
      }
    if(space>0)
      {
        memcpy(pbuff,m_Queue->pBuff+m_Queue->index,space);
        *psize+=space;
        m_Queue->size-=space;
        m_Queue->index+=space;
        m_nQueueSize-=space;
        //memmove(m_Queue->pBuff,m_Queue->pBuff+space,m_Queue->size);
      }
    m_pcsQueue->unlock();
    return E_SUCCESS;
  }

Here is the call graph for this function:

SINT32 CAQueue::getOrWait ( UINT8 pbuff,
UINT32 psize 
)

Gets data from the Queue or waits until some data is available, if the Queue is empty.

The data is removed from the Queue.

Parameters:
pbuffpointer to a buffer, there the data should be stored
psizeon call contains the size of pbuff, on return contains the size of returned data
Return values:
E_SUCCESSif succesful
E_UNKNOWNin case of an error

Definition at line 205 of file CAQueue.cpp.

References CAMutex::lock(), m_pconvarSize, m_Queue, CAMutex::unlock(), and CAConditionVariable::wait().

Referenced by consumer(), fm_loopSendToMix(), lm_loopSendToMix(), mm_loopSendToMixAfter(), and mm_loopSendToMixBefore().

  {
    m_pconvarSize->lock();
    while(m_Queue==NULL)
      m_pconvarSize->wait();
    SINT32 ret=get(pbuff,psize);
    m_pconvarSize->unlock();
    return ret;
  }

Here is the call graph for this function:

SINT32 CAQueue::getOrWait ( UINT8 pbuff,
UINT32 psize,
UINT32  msTimeout 
)

Gets data from the Queue or waits until some data is available, if the Queue is empty or a timeout is reached.

The data is removed from the Queue.

Parameters:
pbuffpointer to a buffer, there the data should be stored
psizeon call contains the size of pbuff, on return contains the size of returned data
msTimeouttimeout in milli seconds
Return values:
E_SUCCESSif succesful
E_TIMEDOUTif timeout was reached
E_UNKNOWNin case of an error

Definition at line 226 of file CAQueue.cpp.

References E_TIMEDOUT, CAMutex::lock(), m_pconvarSize, m_Queue, CAMutex::unlock(), and CAConditionVariable::wait().

  {
    m_pconvarSize->lock();
    SINT32 ret;
    while(m_Queue==NULL)
      {
        ret=m_pconvarSize->wait(msTimeout);
        if(ret==E_TIMEDOUT)
          {
            m_pconvarSize->unlock();
            return E_TIMEDOUT;
          }
      }   
    ret=get(pbuff,psize);
    m_pconvarSize->unlock();
    return ret;
  }

Here is the call graph for this function:

UINT32 CAQueue::getSize ( ) [inline]

Returns the size of stored data in byte.

Returns:
size of Queue

Definition at line 95 of file CAQueue.hpp.

References CAMutex::lock(), m_nQueueSize, m_pcsQueue, and CAMutex::unlock().

Referenced by CAFirstMixA::checkUserConnections(), CAFirstMix::doUserLogin_internal(), MemFormatTarget::dumpMem(), fm_loopReadFromMix(), lm_loopReadFromMix(), CALastMixA::loop(), CAFirstMixB::loop(), CAFirstMixA::loop(), CALastMixB::loop(), mm_loopReadFromMixAfter(), mm_loopReadFromMixBefore(), CAFirstMixA::sendToUsers(), and test().

        {
          if (m_pcsQueue)
          {
            m_pcsQueue->lock();
            UINT32 s=m_nQueueSize;
            m_pcsQueue->unlock();
            return s;
          }
          return 0;
        }

Here is the call graph for this function:

bool CAQueue::isClosed ( ) [inline]

Returns true, if the Queue is closed.

Return values:
true,ifQueue is closed
false,otherwise

Definition at line 120 of file CAQueue.hpp.

References m_bClosed.

Referenced by CALastMixA::loop().

        {
          return m_bClosed;
        }
bool CAQueue::isEmpty ( ) [inline]

Returns true, if the Queue is empty.

Return values:
true,ifQueue is empty
false,ifQueue contains data

Definition at line 111 of file CAQueue.hpp.

References m_Queue.

Referenced by CALastMixA::loop(), CAChain::sendUpstreamData(), and test().

        {
          return (m_Queue==NULL);
        }
SINT32 CAQueue::peek ( UINT8 pbuff,
UINT32 psize 
)

Peeks data from the Queue.

The data is NOT removed from the Queue.

Parameters:
pbuffpointer to a buffer, where the data should be stored
psizeon call contains the size of pbuff, on return contains the size of returned data
Return values:
E_SUCCESSif succesful
E_CLOSEDif the queue is already empty AND closed
E_UNKNOWNin case of an error

Definition at line 252 of file CAQueue.cpp.

References E_CLOSED, E_SUCCESS, E_UNKNOWN, _t_queue::index, CAMutex::lock(), m_bClosed, m_pcsQueue, m_Queue, _t_queue::next, _t_queue::pBuff, _t_queue::size, and CAMutex::unlock().

Referenced by MemFormatTarget::dumpMem(), CALastMixA::loop(), and CAChain::sendUpstreamDataInternal().

  {
    if(pbuff==NULL||psize==NULL)
      return E_UNKNOWN;
    if(*psize==0)
      return E_SUCCESS;
    m_pcsQueue->lock();
    UINT32 space=*psize;
    *psize=0;
    if(m_Queue==NULL)
      {
        SINT32 ret=E_SUCCESS;
        if(m_bClosed)
          ret=E_CLOSED;
        m_pcsQueue->unlock();
        return ret;
      }
    QUEUE* tmpQueue=m_Queue;
    while(space>=tmpQueue->size)
      {
        memcpy(pbuff,tmpQueue->pBuff+tmpQueue->index,tmpQueue->size);
        *psize+=tmpQueue->size;
        pbuff+=tmpQueue->size;
        space-=tmpQueue->size;
        tmpQueue=tmpQueue->next;
        if(tmpQueue==NULL)
          {
            m_pcsQueue->unlock();
            return E_SUCCESS;
          }
      }
    memcpy(pbuff,tmpQueue->pBuff+tmpQueue->index,space);
    *psize+=space;
    m_pcsQueue->unlock();
    return E_SUCCESS;
  } 

Here is the call graph for this function:

Removes data from the Queue.

Parameters:
psizeon call contains the size of data to remove, on return contains the size of removed data
Return values:
E_SUCCESSif succesful
E_UNKNOWNin case of an error

Definition at line 296 of file CAQueue.cpp.

References E_SUCCESS, E_UNKNOWN, _t_queue::index, CAMutex::lock(), m_nQueueSize, m_pcsQueue, m_Queue, _t_queue::next, _t_queue::pBuff, _t_queue::size, and CAMutex::unlock().

Referenced by CALastMixA::loop(), and CAChain::sendUpstreamDataInternal().

  {
    if(m_Queue==NULL||psize==NULL)
      return E_UNKNOWN;
    if(*psize==0)
      return E_SUCCESS;
    m_pcsQueue->lock();
    UINT32 space=*psize;
    *psize=0;
    while(space>=m_Queue->size)
      {
        *psize+=m_Queue->size;
        space-=m_Queue->size;
        m_nQueueSize-=m_Queue->size;
        QUEUE* tmp=m_Queue;
        m_Queue=m_Queue->next;
//        tmp->next=m_pHeap;
//        m_pHeap=tmp;
        delete[] tmp->pBuff;
        tmp->pBuff = NULL;
        delete tmp;
        if(m_Queue==NULL)
          {
            m_pcsQueue->unlock();
            return E_SUCCESS;
          }
      }
    if(space>0)
      {
        *psize+=space;
        m_Queue->size-=space;
        m_nQueueSize-=space;
        m_Queue->index+=space;
        //memmove(m_Queue->pBuff,m_Queue->pBuff+space,m_Queue->size);
      }
    m_pcsQueue->unlock();
    return E_SUCCESS;
  }

Here is the call graph for this function:

SINT32 CAQueue::test ( ) [static]

Method to test the Queue.

Return values:
E_SUCCESS,ifQueue implementation seams to be ok

Definition at line 380 of file CAQueue.cpp.

References add(), __queue_test::buff, CAQueue(), consumer(), E_SUCCESS, E_UNKNOWN, get(), getRandom(), getSize(), isEmpty(), CAThread::join(), __queue_test::len, __queue_test::pQueue, producer(), CAThread::setMainLoop(), CAThread::start(), and TEST_SIZE.

Referenced by main().

  {
    CAQueue* pQueue=new CAQueue(1000);
    #define TEST_SIZE 1000000
    UINT8* source=new UINT8[TEST_SIZE];
    UINT8* target=new UINT8[TEST_SIZE];
    getRandom(source,TEST_SIZE);
    UINT32 count=0;
    UINT32 aktSize;
    
    srand((unsigned)time( NULL ));
    
    //Single Thread.....
    //adding
    
    while(TEST_SIZE-count>10000)
        {
          aktSize=rand();
          aktSize%=0xFFFF;
          aktSize%=(TEST_SIZE-count);
          if(pQueue->add(source+count,aktSize)!=E_SUCCESS)
            return E_UNKNOWN;
          count+=aktSize;
          if(pQueue->getSize()!=count)
            return E_UNKNOWN;
        }
    if(pQueue->add(source+count,TEST_SIZE-count)!=E_SUCCESS)
      return E_UNKNOWN;
    if(pQueue->getSize()!=TEST_SIZE)
      return E_UNKNOWN;
    
    //getting
    count=0;
    while(!pQueue->isEmpty())
      {
        aktSize=rand();
        aktSize%=0xFFFF;
        if(pQueue->get(target+count,&aktSize)!=E_SUCCESS)
          return E_UNKNOWN;
        count+=aktSize;
      }
    if(count!=TEST_SIZE)
      return E_UNKNOWN;
    if(memcmp(source,target,TEST_SIZE)!=0)
      return E_UNKNOWN;
    
    //Multiple Threads....
    CAThread* pthreadProducer=new CAThread((UINT8*)"Queue Producer Thread");
    CAThread* pthreadConsumer=new CAThread((UINT8*)"Queue Consumer Thread");
    pthreadProducer->setMainLoop(producer);
    pthreadConsumer->setMainLoop(consumer);
    struct __queue_test t1,t2;
    t1.buff=source;
    t2.buff=target;
    t2.len=t1.len=TEST_SIZE;
    t2.pQueue=t1.pQueue=pQueue;
    pthreadProducer->start(&t1);
    pthreadConsumer->start(&t2);
    pthreadProducer->join();
    pthreadConsumer->join();
    delete pthreadProducer;
    pthreadProducer = NULL;
    delete pthreadConsumer;
    pthreadConsumer = NULL;
    delete pQueue;
    pQueue = NULL;
    if(memcmp(source,target,TEST_SIZE)!=0)
      return E_UNKNOWN;
    
    delete []source;
    source = NULL;
    delete []target;
    target = NULL;
    return E_SUCCESS;
  }

Here is the call graph for this function:


Member Data Documentation

volatile bool CAQueue::m_bClosed [private]

Definition at line 141 of file CAQueue.hpp.

Referenced by CAQueue(), close(), isClosed(), and peek().

Definition at line 139 of file CAQueue.hpp.

Referenced by add(), CAQueue(), and clean().

Definition at line 142 of file CAQueue.hpp.

Referenced by CAQueue().

volatile UINT32 CAQueue::m_nQueueSize [private]

Definition at line 140 of file CAQueue.hpp.

Referenced by add(), CAQueue(), clean(), get(), getSize(), and remove().

Definition at line 145 of file CAQueue.hpp.

Referenced by add(), CAQueue(), getOrWait(), and ~CAQueue().

Definition at line 144 of file CAQueue.hpp.

Referenced by add(), CAQueue(), clean(), close(), get(), getSize(), peek(), remove(), and ~CAQueue().

QUEUE* CAQueue::m_Queue [private]

Definition at line 138 of file CAQueue.hpp.

Referenced by add(), CAQueue(), clean(), get(), getOrWait(), isEmpty(), peek(), and remove().


The documentation for this class was generated from the following files: