Mixe for Privacy and Anonymity in the Internet
Protected Member Functions
CAFirstMixB Class Reference

#include <CAFirstMixB.hpp>

Inheritance diagram for CAFirstMixB:
[legend]
Collaboration diagram for CAFirstMixB:
[legend]

List of all members.

Protected Member Functions

SINT32 loop ()

Detailed Description

Definition at line 34 of file CAFirstMixB.hpp.


Member Function Documentation

SINT32 CAFirstMixB::loop ( ) [protected, virtual]

Implements CAFirstMix.

Definition at line 41 of file CAFirstMixB.cpp.

References CASocketGroupEpoll::add(), CAQueue::add(), CAFirstMixChannelList::addChannel(), ASSERT, t_fmhashtableentry::bCountPacket, t_MixPacket::channel, CHANNEL_CLOSE, CHANNEL_DATA, CHANNEL_DUMMY, CHANNEL_OPEN, CHANNEL_SIG_CRIME, t_firstmixchannellist::channelIn, t_firstmixchannellist::channelOut, CAMuxSocket::close(), t_fmhashtableentry::countryID, CASymCipher::crypt1(), CASymCipher::crypt2(), t_MixPacket::data, DATA_SIZE, CAFirstMix::decUsers(), t_fmhashtableentry::delayBucket, CAThreadPool::destroy(), diff64(), E_SUCCESS, E_UNKNOWN, t_MixPacket::flags, CAQueue::get(), CAFirstMixChannelList::get(), CAMuxSocket::getCASocket(), getcurrentTimeMicros(), getcurrentTimeMillis(), CAFirstMixChannelList::getFirst(), CAFirstMixChannelList::getFirstChannelForSocket(), CASocketGroupEpoll::getFirstSignaledSocketData(), getMemoryUsage(), CAFirstMixChannelList::getNext(), CAFirstMixChannelList::getNextChannel(), CASocketGroupEpoll::getNextSignaledSocketData(), getRandom(), CAQueue::getSize(), CAMuxSocket::getSocket(), CAAccountingInstance::handleJapPacket(), t_fmhashtableentry::id, tUINT32withLock::inc(), CAFirstMix::incMixedPackets(), CASocketGroupEpoll::isSignaled(), isZero64(), CAThread::join(), KEY_SIZE, len, LOG_ENCRYPTED, CAFirstMix::m_arrSocketsIn, CAFirstMix::m_bRestart, CAFirstMix::m_nSocketsIn, CAFirstMix::m_nUser, CAFirstMix::m_PacketsPerCountryIN, CAFirstMix::m_PacketsPerCountryOUT, CAFirstMix::m_pChannelList, CAFirstMix::m_pIPList, CAFirstMix::m_pMuxOut, CAFirstMix::m_pQueueReadFromMix, CAFirstMix::m_pQueueSendToMix, CAFirstMix::m_psocketgroupUsersRead, CAFirstMix::m_psocketgroupUsersWrite, CAFirstMix::m_pthreadAcceptUsers, CAFirstMix::m_pthreadReadFromMix, CAFirstMix::m_pthreadSendToMix, CAFirstMix::m_pthreadsLogin, MAX_NEXT_MIX_QUEUE_SIZE, MIXPACKET_SIZE, msSleep(), t_fmhashtableentry::oQueueEntry, t_queue_entry::packet, t_firstmixchannellist::pCipher, t_fmhashtableentry::pControlChannelDispatcher, t_fmhashtableentry::peerIP, t_firstmixchannellist::pHead, t_fmhashtableentry::pMuxSocket, t_fmhashtableentry::pQueueSend, CAMuxSocket::prepareForSend(), CAMsg::printMsg(), CAControlChannelDispatcher::proccessMixPacket(), t_fmhashtableentry::pSymCipher, CAMuxSocket::receive(), CASocketGroupEpoll::remove(), CAFirstMixChannelList::remove(), CAFirstMixChannelList::removeChannel(), CAIPList::removeIP(), RSA_SIZE, CASocketGroupEpoll::select(), CASocket::send(), set64(), CAFirstMixChannelList::setDelayParameters(), CASymCipher::setIV2(), CASymCipher::setKey(), setZero64(), SOCKET_ERROR, and t_fmhashtableentry::uAlreadySendPacketSize.

  {
#ifdef NEW_MIX_TYPE
    /* should only be compiled if new NEW_MIX_TYPE is defined */
#ifdef DELAY_USERS
    m_pChannelList->setDelayParameters(  pglobalOptions->getDelayChannelUnlimitTraffic(),
                                      pglobalOptions->getDelayChannelBucketGrow(),
                                      pglobalOptions->getDelayChannelBucketGrowIntervall());  
#endif    

  //  CASingleSocketGroup osocketgroupMixOut;
    SINT32 countRead;
    //#ifdef LOG_PACKET_TIMES
    //  tPoolEntry* pPoolEntry=new tPoolEntry;
    //  MIXPACKET* pMixPacket=&pPoolEntry->mixpacket;
    //#else
    tQueueEntry* pQueueEntry=new tQueueEntry;
    MIXPACKET* pMixPacket=&pQueueEntry->packet;
    //#endif  
    m_nUser=0;
    SINT32 ret;
    //osocketgroupMixOut.add(*m_pMuxOut);
  
    UINT8* tmpBuff=new UINT8[sizeof(tQueueEntry)];
    CAMsg::printMsg(LOG_DEBUG,"Starting Message Loop... \n");
    bool bAktiv;
    UINT8 rsaBuff[RSA_SIZE];
#ifdef LOG_TRAFFIC_PER_USER
    UINT64 current_time;
    UINT32 diff_time;
    CAMsg::printMsg(LOG_DEBUG,"Channel log formats:\n");
    CAMsg::printMsg(LOG_DEBUG,"1. Close received from user (times in micros) - 1:Channel-ID,Connection-ID,PacketsIn (only data and open),PacketsOut (only data),ChannelDuration (open packet received --> close packet put into send queue to next mix)\n");
    CAMsg::printMsg(LOG_DEBUG,"2. Channel close from Mix(times in micros)- 2.:Channel-ID,Connection-ID,PacketsIn (only data and open), PacketsOut (only data),ChannelDuration (open packet received)--> close packet put into send queue to next user\n");
#endif
//    CAThread threadReadFromUsers;
//    threadReadFromUsers.setMainLoop(loopReadFromUsers);
//    threadReadFromUsers.start(this);

    while(!m_bRestart)                                                            /* the main mix loop as long as there are things that are not handled by threads. */
      {
        bAktiv=false;
//LOOP_START:

//First Step
//Checking for new connections    
// Now in a separat Thread.... 

// Second Step 
// Checking for data from users
// Now in a separate Thread (see loopReadFromUsers())
//Only proccess user data, if queue to next mix is not to long!!
#define MAX_NEXT_MIX_QUEUE_SIZE 10000000 //10 MByte
        if(m_pQueueSendToMix->getSize()<MAX_NEXT_MIX_QUEUE_SIZE)
          {
            countRead=m_psocketgroupUsersRead->select(/*false,*/0);        // how many JAP<->mix connections have received data from their coresponding JAP
            if(countRead>0)
              bAktiv=true;
#ifdef HAVE_EPOLL
            //if we have epool we do not need to search the whole list
            //of connected JAPs to find the ones who have sent data
            //as epool will return ONLY these connections.
            fmHashTableEntry* pHashEntry=(fmHashTableEntry*)m_psocketgroupUsersRead->getFirstSignaledSocketData();
            while(pHashEntry!=NULL)
              {
                CAMuxSocket* pMuxSocket=pHashEntry->pMuxSocket;
#else
            //if we do not have epoll we have to go to the whole
            //list of open connections to find the ones which
            //actually have sent some data
            fmHashTableEntry* pHashEntry=m_pChannelList->getFirst();
            while(pHashEntry!=NULL&&countRead>0)                      // iterate through all connections as long as there is at least one active left
              {
                CAMuxSocket* pMuxSocket=pHashEntry->pMuxSocket;
                if(m_psocketgroupUsersRead->isSignaled(*pMuxSocket))  // if this one seems to have data
                  {
                    countRead--;
#endif
                    ret=pMuxSocket->receive(pMixPacket,0);
                    #if defined LOG_PACKET_TIMES||defined(LOG_CHANNEL)
                      getcurrentTimeMicros(pQueueEntry->timestamp_proccessing_start);
                      set64(pQueueEntry->timestamp_proccessing_start_OP,pQueueEntry->timestamp_proccessing_start);
                    #endif  
                    if(ret==SOCKET_ERROR/*||pHashEntry->accessUntil<time()*/) 
                      {  
                                                      // remove dead connections
                        #ifdef LOG_TRAFFIC_PER_USER
                          getcurrentTimeMillis(current_time);
                          diff_time=diff64(current_time,pHashEntry->timeCreated);
                          m_pIPList->removeIP(pHashEntry->peerIP,diff_time,pHashEntry->trafficIn,pHashEntry->trafficOut);
                        #else
                          m_pIPList->removeIP(pHashEntry->peerIP);
                        #endif
                        m_psocketgroupUsersRead->remove(pMuxSocket->getSocket());
                        m_psocketgroupUsersWrite->remove(pMuxSocket->getSocket());
                        ASSERT(pHashEntry->pQueueSend!=NULL,"Send queue is NULL");
                        delete pHashEntry->pQueueSend;
                        pHashEntry->pQueueSend = NULL;
                        delete pHashEntry->pSymCipher;
                        pHashEntry->pSymCipher = NULL;
                        #ifdef COUNTRY_STATS
                          decUsers(pHashEntry);
                        #else
                          decUsers();
                        #endif
                        /* remove the client part (MuxSocket) but keep the
                         * outgoing channels until a CHANNEL-CLOSE from the
                         * last mix is received
                         */
                        m_pChannelList->removeClientPart(pMuxSocket);
                        delete pMuxSocket;
                        pMuxSocket = NULL;
                      }
                    else if(ret==MIXPACKET_SIZE)                       // we've read enough data for a whole mix packet. nice!
                      {
                        #ifdef LOG_TRAFFIC_PER_USER
                          pHashEntry->trafficIn++;
                        #endif
                        #ifdef COUNTRY_STATS
                          m_PacketsPerCountryIN[pHashEntry->countryID].inc();
                        #endif  
                        //New control channel code...!
            SINT32 ret = 0;
            if(pMixPacket->channel>0&&pMixPacket->channel<256)
            {
              if (pHashEntry->pControlChannelDispatcher->proccessMixPacket(pMixPacket))
              {
                goto NEXT_USER;
              }
              else
              {
                CAMsg::printMsg(LOG_DEBUG, "Packet is invalid and could not be processed!\n");
                ret = 3;
              }
            }
#ifdef PAYMENT
                        // payment code added by Bastian Voigt
            if (ret == 0)
            {
              ret = CAAccountingInstance::handleJapPacket(pHashEntry);  
            }
            if (ret == 2)
            {
              goto NEXT_USER;
            }   
#endif                                                  
                        if(ret == 3) 
                          {
                            // this jap is evil! terminate connection and add IP to blacklist
                            CAMsg::printMsg(LOG_DEBUG, "CAFirstMixB: Detected evil Jap.. closing connection! Removing IP..\n", ret);
                            fmChannelListEntry* pEntry;
                            pEntry=m_pChannelList->getFirstChannelForSocket(pMuxSocket);
                            while(pEntry!=NULL)
                              {
                                getRandom(pMixPacket->data,DATA_SIZE);
                                pMixPacket->flags=CHANNEL_CLOSE;
                                pMixPacket->channel=pEntry->channelOut;
                                #ifdef LOG_PACKET_TIMES
                                  setZero64(pQueueEntry->timestamp_proccessing_start);
                                #endif
                                m_pQueueSendToMix->add(pMixPacket,sizeof(tQueueEntry));
                                delete pEntry->pCipher;
                                pEntry->pCipher = NULL;
                                pEntry=m_pChannelList->getNextChannel(pEntry);
                              }
                            m_pIPList->removeIP(pHashEntry->peerIP);
                            m_psocketgroupUsersRead->remove(pMuxSocket->getSocket());
                            m_psocketgroupUsersWrite->remove(pMuxSocket->getSocket());
                            delete pHashEntry->pQueueSend;
                            pHashEntry->pQueueSend = NULL;
                            delete pHashEntry->pSymCipher;
                            pHashEntry->pSymCipher = NULL;
                            m_pChannelList->remove(pMuxSocket);
                            delete pMuxSocket;
                            pMuxSocket = NULL;
                            decUsers();
                            goto NEXT_USER;
                          }

                        if(pMixPacket->flags==CHANNEL_DUMMY)          // just a dummy to keep the connection alife in e.g. NAT gateways 
                          { 
                            getRandom(pMixPacket->data,DATA_SIZE);
                            #ifdef LOG_PACKET_TIMES
                              setZero64(pQueueEntry->timestamp_proccessing_start);
                            #endif
                            pHashEntry->pQueueSend->add(pMixPacket,sizeof(tQueueEntry));
                            #ifdef HAVE_EPOLL
                              m_psocketgroupUsersWrite->add(*pMuxSocket,pHashEntry); 
                            #else
                              m_psocketgroupUsersWrite->add(*pMuxSocket); 
                            #endif
                          }
                        else                                         // finally! a normal mix packet
                          {
                            CASymCipher* pCipher=NULL;
                            fmChannelListEntry* pEntry;
                            pEntry=m_pChannelList->get(pMuxSocket,pMixPacket->channel);
                            if(pEntry!=NULL&&pMixPacket->flags==CHANNEL_DATA)
                              {
                                pMixPacket->channel=pEntry->channelOut;
                                pCipher=pEntry->pCipher;
                                pCipher->crypt1(pMixPacket->data,pMixPacket->data,DATA_SIZE);
                                                     // queue the packet for sending to the next mix.
                                #ifdef LOG_PACKET_TIMES
                                  getcurrentTimeMicros(pQueueEntry->timestamp_proccessing_end_OP);
                                #endif
                                m_pQueueSendToMix->add(pMixPacket,sizeof(tQueueEntry));
                                incMixedPackets();
                                #ifdef LOG_CHANNEL
                                  pEntry->packetsInFromUser++;
                                #endif
                              }
                            else if(pEntry==NULL&&pMixPacket->flags==CHANNEL_OPEN)  // open a new mix channel
                              { // stefan: muesste das nicht vor die behandlung von CHANNEL_DATA? oder gilt OPEN => !DATA ? 
                                //es gilt: open -> data
                                pHashEntry->pSymCipher->crypt1(pMixPacket->data,rsaBuff,KEY_SIZE);
                                pCipher= new CASymCipher();
                                pCipher->setKey(rsaBuff);
                                for(int i=0;i<16;i++)
                                  rsaBuff[i]=0xFF;
                                pCipher->setIV2(rsaBuff);
                                pCipher->crypt1(pMixPacket->data+KEY_SIZE,pMixPacket->data,DATA_SIZE-KEY_SIZE);
                                getRandom(pMixPacket->data+DATA_SIZE-KEY_SIZE,KEY_SIZE);
                                #ifdef LOG_CHANNEL
                                  HCHANNEL tmpC=pMixPacket->channel;
                                #endif
                                if(m_pChannelList->addChannel(pMuxSocket,pMixPacket->channel,pCipher,&pMixPacket->channel)!=E_SUCCESS)
                                  { //todo move up ?
                                    delete pCipher;
                                    pCipher = NULL;
                                  }
                                else
                                  {
                                    #ifdef LOG_PACKET_TIMES
                                      getcurrentTimeMicros(pQueueEntry->timestamp_proccessing_end_OP);
                                    #endif
                                    #ifdef LOG_CHANNEL
                                      fmChannelListEntry* pTmpEntry=m_pChannelList->get(pMuxSocket,tmpC);
                                      pTmpEntry->packetsInFromUser++;
                                      set64(pTmpEntry->timeCreated,pQueueEntry->timestamp_proccessing_start);
                                    #endif
                                    m_pQueueSendToMix->add(pMixPacket,sizeof(tQueueEntry));
                                    incMixedPackets();
                                    #ifdef _DEBUG
//                                      CAMsg::printMsg(LOG_DEBUG,"Added out channel: %u\n",pMixPacket->channel);
                                    #endif
                                  }
                              }
                          }
                      }
                #ifdef HAVE_EPOLL
NEXT_USER:
                  pHashEntry=(fmHashTableEntry*)m_psocketgroupUsersRead->getNextSignaledSocketData();
                #else
                  }//if is signaled
NEXT_USER:
                  pHashEntry=m_pChannelList->getNext();
                #endif
              }
          }
//Third step
//Sending to next mix

// Now in a separate Thread (see loopSendToMix())

//Step 4
//Stepa 4a Receiving form Mix to Queue now in separat Thread
//Step 4b Proccesing MixPackets received from Mix
//todo check for error!!!
        countRead=m_nUser+1;
        while(countRead>0&&m_pQueueReadFromMix->getSize()>=sizeof(tQueueEntry))
          {
            bAktiv=true;
            countRead--;
            ret=sizeof(tQueueEntry);
            m_pQueueReadFromMix->get((UINT8*)pQueueEntry,(UINT32*)&ret);
            #ifdef LOG_PACKET_TIMES
              getcurrentTimeMicros(pQueueEntry->timestamp_proccessing_start_OP);
            #endif
            if(pMixPacket->flags==CHANNEL_CLOSE) //close event
              {
                #if defined(_DEBUG) && !defined(__MIX_TEST)
//                  CAMsg::printMsg(LOG_DEBUG,"Closing Channel: %u ...\n",pMixPacket->channel);
                #endif
                fmChannelList* pEntry=m_pChannelList->get(pMixPacket->channel);
                if(pEntry!=NULL)
                  {
                    if (pEntry->pHead != NULL) {
                      pMixPacket->channel=pEntry->channelIn;
                      getRandom(pMixPacket->data,DATA_SIZE);
                      #ifdef LOG_PACKET_TIMES
                        getcurrentTimeMicros(pQueueEntry->timestamp_proccessing_end_OP);
                      #endif
                      pEntry->pHead->pQueueSend->add(pMixPacket,sizeof(tQueueEntry));
                      #ifdef LOG_TRAFFIC_PER_USER
                        pEntry->pHead->trafficOut++;
                      #endif
                      #ifdef COUNTRY_STATS
                        m_PacketsPerCountryOUT[pEntry->pHead->countryID].inc();
                      #endif  
                      #ifdef LOG_CHANNEL  
                        //pEntry->packetsOutToUser++;
                        getcurrentTimeMicros(current_time);
                        diff_time=diff64(current_time,pEntry->timeCreated);
                        CAMsg::printMsg(LOG_DEBUG,"2:%u,%Lu,%u,%u,%u\n",
                                                pEntry->channelIn,pEntry->pHead->id,pEntry->packetsInFromUser,pEntry->packetsOutToUser,
                                                diff_time);
                      #endif
                    
                      #ifdef HAVE_EPOLL
                        m_psocketgroupUsersWrite->add(*pEntry->pHead->pMuxSocket,pEntry->pHead); 
                      #else
                        m_psocketgroupUsersWrite->add(*pEntry->pHead->pMuxSocket); 
                      #endif                    
                      delete pEntry->pCipher;
                      pEntry->pCipher = NULL;
                      m_pChannelList->removeChannel(pEntry->pHead->pMuxSocket,pEntry->channelIn);
                    }
                    else {
                      /* the client has already closed the connection but we
                       * have waited for the CHANNEL-CLOSE packet from the
                       * last mix -> now we have it
                       */
                      #ifdef LOG_PACKET_TIMES
                        getcurrentTimeMicros(pQueueEntry->timestamp_proccessing_end_OP);
                      #endif
                      delete pEntry->pCipher;
                      pEntry->pCipher = NULL;
                      m_pChannelList->removeVacantOutChannel(pEntry);
                      pEntry = NULL;
                    }
                }
              }
            else
              {//flag !=close
                #if defined(_DEBUG) && !defined(__MIX_TEST)
//                  CAMsg::printMsg(LOG_DEBUG,"Sending Data to Browser!\n");
                #endif
                fmChannelList* pEntry=m_pChannelList->get(pMixPacket->channel);
                if(pEntry!=NULL) {
                  if (pEntry->pHead != NULL) {
                    #ifdef LOG_CRIME
                      if((pMixPacket->flags&CHANNEL_SIG_CRIME)==CHANNEL_SIG_CRIME)
                        {
                          UINT32 id=(pMixPacket->flags>>8)&0x000000FF;
                          int log=LOG_ENCRYPTED;
                          if(!pglobalOptions->isEncryptedLogEnabled())
                            log=LOG_CRIT;
                          CAMsg::printMsg(log,"Detecting crime activity - ID: %u -- In-IP is: %u.%u.%u.%u \n",id,pEntry->pHead->peerIP[0],pEntry->pHead->peerIP[1],pEntry->pHead->peerIP[2],pEntry->pHead->peerIP[3]);
                          continue;
                        }
                    #endif
                    pMixPacket->channel=pEntry->channelIn;
                    pEntry->pCipher->crypt2(pMixPacket->data,pMixPacket->data,DATA_SIZE);
                    
                    #ifdef LOG_PACKET_TIMES
                      getcurrentTimeMicros(pQueueEntry->timestamp_proccessing_end_OP);
                    #endif
                    pEntry->pHead->pQueueSend->add(pMixPacket,sizeof(tQueueEntry));
                    #ifdef LOG_TRAFFIC_PER_USER
                      pEntry->pHead->trafficOut++;
                    #endif
                    #ifdef COUNTRY_STATS
                      m_PacketsPerCountryOUT[pEntry->pHead->countryID].inc();
                    #endif  
                    #ifdef LOG_CGANNEL  
                      pEntry->packetsOutToUser++;
                    #endif
                    #ifdef HAVE_EPOLL
                      m_psocketgroupUsersWrite->add(*pEntry->pHead->pMuxSocket,pEntry->pHead); 
                    #else
                      m_psocketgroupUsersWrite->add(*pEntry->pHead->pMuxSocket); 
                    #endif    
                    incMixedPackets();
                  }
                  else {
                    /* connection to client is already closed -> we wait for
                     * CLOSE-CHANNEL from last mix (but this is no CLOSE-
                     * CHANNEL packet -> do only some compatibility things and
                     * ignore the packet)
                     */
                    #ifdef LOG_CRIME
                      /* we don't have the user-information any more (but also
                       * the user cannot receive any data); nevertheless write
                       * a log-message to show what happened
                       */
                      if((pMixPacket->flags&CHANNEL_SIG_CRIME)==CHANNEL_SIG_CRIME) {
                        UINT32 id = (pMixPacket->flags>>8)&0x000000FF;
                        int log = LOG_ENCRYPTED;
                        if (!pglobalOptions->isEncryptedLogEnabled()) {
                          log=LOG_CRIT;
                        }
                        CAMsg::printMsg(log,"Detecting crime activity - ID: %u -- In-IP is: not available (user has already closed connection)\n",id);
                        continue;
                      }
                    #endif
                    #ifdef LOG_PACKET_TIMES
                      getcurrentTimeMicros(pQueueEntry->timestamp_proccessing_end_OP);
                    #endif
                  }
                }
                else
                  {                  
                    #ifdef _DEBUG
                      if(pMixPacket->flags!=CHANNEL_DUMMY)
                        {
/*                          CAMsg::printMsg(LOG_DEBUG,"Error Sending Data to Browser -- "
                              "Channel-Id %u not valid!\n",pMixPacket->channel
                            );*/
                          #ifdef LOG_CHANNEL
                            CAMsg::printMsg(LOG_INFO,"Packet late arrive for channel: %u\n",pMixPacket->channel);
                          #endif
                        }
                    #endif
                  }
              }
          }

//Step 5 
//Writing to users...
        countRead=m_psocketgroupUsersWrite->select(/*true,*/0);
#ifdef HAVE_EPOLL    
        fmHashTableEntry* pfmHashEntry=(fmHashTableEntry*)m_psocketgroupUsersWrite->getFirstSignaledSocketData();
        while(pfmHashEntry!=NULL)
          {
#else
        fmHashTableEntry* pfmHashEntry=m_pChannelList->getFirst();
        while(countRead>0&&pfmHashEntry!=NULL)
          {
            if(m_psocketgroupUsersWrite->isSignaled(*pfmHashEntry->pMuxSocket))
              {
                countRead--;
#endif
#ifdef DELAY_USERS
                if(pfmHashEntry->delayBucket>0)
                {
#endif
                if(pfmHashEntry->pQueueSend->getSize()>0)
                {
                bAktiv=true;
                UINT32 len=sizeof(tQueueEntry);
                if(pfmHashEntry->uAlreadySendPacketSize==-1)
                  {
                    pfmHashEntry->pQueueSend->get((UINT8*)&pfmHashEntry->oQueueEntry,&len); 
                    #ifdef PAYMENT
                      //do not count control channel packets!
                      if(pfmHashEntry->oQueueEntry.packet.channel>0&&pfmHashEntry->oQueueEntry.packet.channel<256)
                        pfmHashEntry->bCountPacket=false;
                      else
                        pfmHashEntry->bCountPacket=true;
                    #endif
                    pfmHashEntry->pMuxSocket->prepareForSend(&(pfmHashEntry->oQueueEntry.packet));
                    pfmHashEntry->uAlreadySendPacketSize=0;
                  }
                len=MIXPACKET_SIZE-pfmHashEntry->uAlreadySendPacketSize;
                ret=pfmHashEntry->pMuxSocket->getCASocket()->send(((UINT8*)&(pfmHashEntry->oQueueEntry))+pfmHashEntry->uAlreadySendPacketSize,len);
                if(ret>0)
                  {
                    pfmHashEntry->uAlreadySendPacketSize+=ret;
                    if(pfmHashEntry->uAlreadySendPacketSize==MIXPACKET_SIZE)
                      {
                        #ifdef PAYMENT
                          if(pfmHashEntry->bCountPacket)
                            {
                              // count packet for payment
                if (CAAccountingInstance::handleJapPacket(pfmHashEntry) == 2)
                {
                  goto NEXT_USER_WRITING;
                }
                            }
                        #endif
                        #ifdef DELAY_USERS
                          pfmHashEntry->delayBucket--;
                        #endif
                        pfmHashEntry->uAlreadySendPacketSize=-1;
                        #ifdef LOG_PACKET_TIMES
                          if(!isZero64(pfmHashEntry->oQueueEntry.timestamp_proccessing_start))
                            {
                              getcurrentTimeMicros(pfmHashEntry->oQueueEntry.timestamp_proccessing_end);
                              m_pLogPacketStats->addToTimeingStats(pfmHashEntry->oQueueEntry,CHANNEL_DATA,false);
                            }
                        #endif
                      }
                   }
                }
#ifdef DELAY_USERS
                }
#endif
                  //todo error handling
#ifdef HAVE_EPOLL
NEXT_USER_WRITING:
            pfmHashEntry=(fmHashTableEntry*)m_psocketgroupUsersWrite->getNextSignaledSocketData();
#else
              }//if is socket signaled
NEXT_USER_WRITING:              
            pfmHashEntry=m_pChannelList->getNext();
#endif
          }
        if(!bAktiv)
          msSleep(100);
      }
//ERR:
//@todo move cleanup to clean() !
    CAMsg::printMsg(LOG_CRIT,"Seams that we are restarting now!!\n");
    m_bRestart=true;
    m_pMuxOut->close();
    for(UINT32 i=0;i<m_nSocketsIn;i++)
      m_arrSocketsIn[i].close();
    //writng some bytes to the queue...
    UINT8 b[sizeof(tQueueEntry)+1];
    m_pQueueSendToMix->add(b,sizeof(tQueueEntry)+1);
//#if !defined(_DEBUG) && !defined(NO_LOOPACCEPTUSER)
    CAMsg::printMsg(LOG_CRIT,"Wait for LoopAcceptUsers!\n");
    m_pthreadAcceptUsers->join();
//#endif
    CAMsg::printMsg(LOG_CRIT,"Wait for LoopSendToMix!\n");
    m_pthreadSendToMix->join(); //will not join if queue is empty (and so wating)!!!
    CAMsg::printMsg(LOG_CRIT,"Wait for LoopReadFromMix!\n");
    m_pthreadReadFromMix->join();
    #ifdef LOG_PACKET_TIMES
      CAMsg::printMsg(LOG_CRIT,"Wait for LoopLogPacketStats to terminate!\n");
      m_pLogPacketStats->stop();
    #endif  
    //waits until all login threads terminates....
    // we have to be sure that the Accept thread was alread stoped!
    m_pthreadsLogin->destroy(true);
    CAMsg::printMsg(LOG_CRIT,"Before deleting CAFirstMixChannelList()!\n");
    CAMsg::printMsg  (LOG_CRIT,"Memeory usage before: %u\n",getMemoryUsage());  
    fmHashTableEntry* pHashEntry=m_pChannelList->getFirst();
    while(pHashEntry!=NULL)
      {
        CAMuxSocket * pMuxSocket=pHashEntry->pMuxSocket;
        delete pHashEntry->pQueueSend;
        pHashEntry->pQueueSend = NULL;
        delete pHashEntry->pSymCipher;
        pHashEntry->pSymCipher = NULL; 

        fmChannelListEntry* pEntry=m_pChannelList->getFirstChannelForSocket(pHashEntry->pMuxSocket);
        while(pEntry!=NULL)
          {
            delete pEntry->pCipher;
            pEntry->pCipher = NULL;
  
            pEntry=m_pChannelList->getNextChannel(pEntry);
          }
        m_pChannelList->remove(pHashEntry->pMuxSocket);
        pMuxSocket->close();
        delete pMuxSocket;
        pMuxSocket = NULL;
        pHashEntry=m_pChannelList->getNext();
      }
    /* clean all vacant out-channels (the connection from the client was
     * closed but we've waited for the CHANNEL-CLOSE from the last mix)
     */
    m_pChannelList->cleanVacantOutChannels();
    CAMsg::printMsg  (LOG_CRIT,"Memory usage after: %u\n",getMemoryUsage());  
    delete pQueueEntry;
    pQueueEntry = NULL;
    delete []tmpBuff;
    tmpBuff = NULL;
    CAMsg::printMsg(LOG_CRIT,"Main Loop exited!!\n");
#endif // NEW_MIX_TYPE
    return E_UNKNOWN;
  }

Here is the call graph for this function:


The documentation for this class was generated from the following files: