Mixe for Privacy and Anonymity in the Internet
CAFirstMixB.cpp
Go to the documentation of this file.
00001 /*
00002 Copyright (c) 2000, The JAP-Team 
00003 All rights reserved.
00004 Redistribution and use in source and binary forms, with or without modification, 
00005 are permitted provided that the following conditions are met:
00006 
00007   - Redistributions of source code must retain the above copyright notice, 
00008     this list of conditions and the following disclaimer.
00009 
00010   - Redistributions in binary form must reproduce the above copyright notice, 
00011     this list of conditions and the following disclaimer in the documentation and/or 
00012     other materials provided with the distribution.
00013 
00014   - Neither the name of the University of Technology Dresden, Germany nor the names of its contributors 
00015     may be used to endorse or promote products derived from this software without specific 
00016     prior written permission. 
00017 
00018   
00019 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS ``AS IS'' AND ANY EXPRESS 
00020 OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY 
00021 AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS
00022 BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
00023 (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, 
00024 OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER 
00025 IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 
00026 OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE
00027 */
00028 #include "../StdAfx.h"
00029 #ifndef ONLY_LOCAL_PROXY
00030 #include "CAFirstMixB.hpp"
00031 #include "../CAThread.hpp"
00032 #include "../CASingleSocketGroup.hpp"
00033 #include "../CAInfoService.hpp"
00034 #include "../CAPool.hpp"
00035 #include "../CACmdLnOptions.hpp"
00036 #include "../CAAccountingInstance.hpp"
00037 #ifdef HAVE_EPOLL
00038   #include "../CASocketGroupEpoll.hpp"
00039 #endif
00040 
00041 SINT32 CAFirstMixB::loop()
00042   {
00043 #ifdef NEW_MIX_TYPE
00044     /* should only be compiled if new NEW_MIX_TYPE is defined */
00045 #ifdef DELAY_USERS
00046     m_pChannelList->setDelayParameters(  pglobalOptions->getDelayChannelUnlimitTraffic(),
00047                                       pglobalOptions->getDelayChannelBucketGrow(),
00048                                       pglobalOptions->getDelayChannelBucketGrowIntervall());  
00049 #endif    
00050 
00051   //  CASingleSocketGroup osocketgroupMixOut;
00052     SINT32 countRead;
00053     //#ifdef LOG_PACKET_TIMES
00054     //  tPoolEntry* pPoolEntry=new tPoolEntry;
00055     //  MIXPACKET* pMixPacket=&pPoolEntry->mixpacket;
00056     //#else
00057     tQueueEntry* pQueueEntry=new tQueueEntry;
00058     MIXPACKET* pMixPacket=&pQueueEntry->packet;
00059     //#endif  
00060     m_nUser=0;
00061     SINT32 ret;
00062     //osocketgroupMixOut.add(*m_pMuxOut);
00063   
00064     UINT8* tmpBuff=new UINT8[sizeof(tQueueEntry)];
00065     CAMsg::printMsg(LOG_DEBUG,"Starting Message Loop... \n");
00066     bool bAktiv;
00067     UINT8 rsaBuff[RSA_SIZE];
00068 #ifdef LOG_TRAFFIC_PER_USER
00069     UINT64 current_time;
00070     UINT32 diff_time;
00071     CAMsg::printMsg(LOG_DEBUG,"Channel log formats:\n");
00072     CAMsg::printMsg(LOG_DEBUG,"1. Close received from user (times in micros) - 1:Channel-ID,Connection-ID,PacketsIn (only data and open),PacketsOut (only data),ChannelDuration (open packet received --> close packet put into send queue to next mix)\n");
00073     CAMsg::printMsg(LOG_DEBUG,"2. Channel close from Mix(times in micros)- 2.:Channel-ID,Connection-ID,PacketsIn (only data and open), PacketsOut (only data),ChannelDuration (open packet received)--> close packet put into send queue to next user\n");
00074 #endif
00075 //    CAThread threadReadFromUsers;
00076 //    threadReadFromUsers.setMainLoop(loopReadFromUsers);
00077 //    threadReadFromUsers.start(this);
00078 
00079     while(!m_bRestart)                                                            /* the main mix loop as long as there are things that are not handled by threads. */
00080       {
00081         bAktiv=false;
00082 //LOOP_START:
00083 
00084 //First Step
00085 //Checking for new connections    
00086 // Now in a separat Thread.... 
00087 
00088 // Second Step 
00089 // Checking for data from users
00090 // Now in a separate Thread (see loopReadFromUsers())
00091 //Only proccess user data, if queue to next mix is not to long!!
00092 #define MAX_NEXT_MIX_QUEUE_SIZE 10000000 //10 MByte
00093         if(m_pQueueSendToMix->getSize()<MAX_NEXT_MIX_QUEUE_SIZE)
00094           {
00095             countRead=m_psocketgroupUsersRead->select(/*false,*/0);        // how many JAP<->mix connections have received data from their coresponding JAP
00096             if(countRead>0)
00097               bAktiv=true;
00098 #ifdef HAVE_EPOLL
00099             //if we have epool we do not need to search the whole list
00100             //of connected JAPs to find the ones who have sent data
00101             //as epool will return ONLY these connections.
00102             fmHashTableEntry* pHashEntry=(fmHashTableEntry*)m_psocketgroupUsersRead->getFirstSignaledSocketData();
00103             while(pHashEntry!=NULL)
00104               {
00105                 CAMuxSocket* pMuxSocket=pHashEntry->pMuxSocket;
00106 #else
00107             //if we do not have epoll we have to go to the whole
00108             //list of open connections to find the ones which
00109             //actually have sent some data
00110             fmHashTableEntry* pHashEntry=m_pChannelList->getFirst();
00111             while(pHashEntry!=NULL&&countRead>0)                      // iterate through all connections as long as there is at least one active left
00112               {
00113                 CAMuxSocket* pMuxSocket=pHashEntry->pMuxSocket;
00114                 if(m_psocketgroupUsersRead->isSignaled(*pMuxSocket))  // if this one seems to have data
00115                   {
00116                     countRead--;
00117 #endif
00118                     ret=pMuxSocket->receive(pMixPacket,0);
00119                     #if defined LOG_PACKET_TIMES||defined(LOG_CHANNEL)
00120                       getcurrentTimeMicros(pQueueEntry->timestamp_proccessing_start);
00121                       set64(pQueueEntry->timestamp_proccessing_start_OP,pQueueEntry->timestamp_proccessing_start);
00122                     #endif  
00123                     if(ret==SOCKET_ERROR/*||pHashEntry->accessUntil<time()*/) 
00124                       {  
00125                                                       // remove dead connections
00126                         #ifdef LOG_TRAFFIC_PER_USER
00127                           getcurrentTimeMillis(current_time);
00128                           diff_time=diff64(current_time,pHashEntry->timeCreated);
00129                           m_pIPList->removeIP(pHashEntry->peerIP,diff_time,pHashEntry->trafficIn,pHashEntry->trafficOut);
00130                         #else
00131                           m_pIPList->removeIP(pHashEntry->peerIP);
00132                         #endif
00133                         m_psocketgroupUsersRead->remove(pMuxSocket->getSocket());
00134                         m_psocketgroupUsersWrite->remove(pMuxSocket->getSocket());
00135                         ASSERT(pHashEntry->pQueueSend!=NULL,"Send queue is NULL");
00136                         delete pHashEntry->pQueueSend;
00137                         pHashEntry->pQueueSend = NULL;
00138                         delete pHashEntry->pSymCipher;
00139                         pHashEntry->pSymCipher = NULL;
00140                         #ifdef COUNTRY_STATS
00141                           decUsers(pHashEntry);
00142                         #else
00143                           decUsers();
00144                         #endif
00145                         /* remove the client part (MuxSocket) but keep the
00146                          * outgoing channels until a CHANNEL-CLOSE from the
00147                          * last mix is received
00148                          */
00149                         m_pChannelList->removeClientPart(pMuxSocket);
00150                         delete pMuxSocket;
00151                         pMuxSocket = NULL;
00152                       }
00153                     else if(ret==MIXPACKET_SIZE)                       // we've read enough data for a whole mix packet. nice!
00154                       {
00155                         #ifdef LOG_TRAFFIC_PER_USER
00156                           pHashEntry->trafficIn++;
00157                         #endif
00158                         #ifdef COUNTRY_STATS
00159                           m_PacketsPerCountryIN[pHashEntry->countryID].inc();
00160                         #endif  
00161                         //New control channel code...!
00162             SINT32 ret = 0;
00163             if(pMixPacket->channel>0&&pMixPacket->channel<256)
00164             {
00165               if (pHashEntry->pControlChannelDispatcher->proccessMixPacket(pMixPacket))
00166               {
00167                 goto NEXT_USER;
00168               }
00169               else
00170               {
00171                 CAMsg::printMsg(LOG_DEBUG, "Packet is invalid and could not be processed!\n");
00172                 ret = 3;
00173               }
00174             }
00175 #ifdef PAYMENT
00176                         // payment code added by Bastian Voigt
00177             if (ret == 0)
00178             {
00179               ret = CAAccountingInstance::handleJapPacket(pHashEntry);  
00180             }
00181             if (ret == 2)
00182             {
00183               goto NEXT_USER;
00184             }   
00185 #endif                                                  
00186                         if(ret == 3) 
00187                           {
00188                             // this jap is evil! terminate connection and add IP to blacklist
00189                             CAMsg::printMsg(LOG_DEBUG, "CAFirstMixB: Detected evil Jap.. closing connection! Removing IP..\n", ret);
00190                             fmChannelListEntry* pEntry;
00191                             pEntry=m_pChannelList->getFirstChannelForSocket(pMuxSocket);
00192                             while(pEntry!=NULL)
00193                               {
00194                                 getRandom(pMixPacket->data,DATA_SIZE);
00195                                 pMixPacket->flags=CHANNEL_CLOSE;
00196                                 pMixPacket->channel=pEntry->channelOut;
00197                                 #ifdef LOG_PACKET_TIMES
00198                                   setZero64(pQueueEntry->timestamp_proccessing_start);
00199                                 #endif
00200                                 m_pQueueSendToMix->add(pMixPacket,sizeof(tQueueEntry));
00201                                 delete pEntry->pCipher;
00202                                 pEntry->pCipher = NULL;
00203                                 pEntry=m_pChannelList->getNextChannel(pEntry);
00204                               }
00205                             m_pIPList->removeIP(pHashEntry->peerIP);
00206                             m_psocketgroupUsersRead->remove(pMuxSocket->getSocket());
00207                             m_psocketgroupUsersWrite->remove(pMuxSocket->getSocket());
00208                             delete pHashEntry->pQueueSend;
00209                             pHashEntry->pQueueSend = NULL;
00210                             delete pHashEntry->pSymCipher;
00211                             pHashEntry->pSymCipher = NULL;
00212                             m_pChannelList->remove(pMuxSocket);
00213                             delete pMuxSocket;
00214                             pMuxSocket = NULL;
00215                             decUsers();
00216                             goto NEXT_USER;
00217                           }
00218 
00219                         if(pMixPacket->flags==CHANNEL_DUMMY)          // just a dummy to keep the connection alife in e.g. NAT gateways 
00220                           { 
00221                             getRandom(pMixPacket->data,DATA_SIZE);
00222                             #ifdef LOG_PACKET_TIMES
00223                               setZero64(pQueueEntry->timestamp_proccessing_start);
00224                             #endif
00225                             pHashEntry->pQueueSend->add(pMixPacket,sizeof(tQueueEntry));
00226                             #ifdef HAVE_EPOLL
00227                               m_psocketgroupUsersWrite->add(*pMuxSocket,pHashEntry); 
00228                             #else
00229                               m_psocketgroupUsersWrite->add(*pMuxSocket); 
00230                             #endif
00231                           }
00232                         else                                         // finally! a normal mix packet
00233                           {
00234                             CASymCipher* pCipher=NULL;
00235                             fmChannelListEntry* pEntry;
00236                             pEntry=m_pChannelList->get(pMuxSocket,pMixPacket->channel);
00237                             if(pEntry!=NULL&&pMixPacket->flags==CHANNEL_DATA)
00238                               {
00239                                 pMixPacket->channel=pEntry->channelOut;
00240                                 pCipher=pEntry->pCipher;
00241                                 pCipher->crypt1(pMixPacket->data,pMixPacket->data,DATA_SIZE);
00242                                                      // queue the packet for sending to the next mix.
00243                                 #ifdef LOG_PACKET_TIMES
00244                                   getcurrentTimeMicros(pQueueEntry->timestamp_proccessing_end_OP);
00245                                 #endif
00246                                 m_pQueueSendToMix->add(pMixPacket,sizeof(tQueueEntry));
00247                                 incMixedPackets();
00248                                 #ifdef LOG_CHANNEL
00249                                   pEntry->packetsInFromUser++;
00250                                 #endif
00251                               }
00252                             else if(pEntry==NULL&&pMixPacket->flags==CHANNEL_OPEN)  // open a new mix channel
00253                               { // stefan: muesste das nicht vor die behandlung von CHANNEL_DATA? oder gilt OPEN => !DATA ? 
00254                                 //es gilt: open -> data
00255                                 pHashEntry->pSymCipher->crypt1(pMixPacket->data,rsaBuff,KEY_SIZE);
00256                                 pCipher= new CASymCipher();
00257                                 pCipher->setKey(rsaBuff);
00258                                 for(int i=0;i<16;i++)
00259                                   rsaBuff[i]=0xFF;
00260                                 pCipher->setIV2(rsaBuff);
00261                                 pCipher->crypt1(pMixPacket->data+KEY_SIZE,pMixPacket->data,DATA_SIZE-KEY_SIZE);
00262                                 getRandom(pMixPacket->data+DATA_SIZE-KEY_SIZE,KEY_SIZE);
00263                                 #ifdef LOG_CHANNEL
00264                                   HCHANNEL tmpC=pMixPacket->channel;
00265                                 #endif
00266                                 if(m_pChannelList->addChannel(pMuxSocket,pMixPacket->channel,pCipher,&pMixPacket->channel)!=E_SUCCESS)
00267                                   { //todo move up ?
00268                                     delete pCipher;
00269                                     pCipher = NULL;
00270                                   }
00271                                 else
00272                                   {
00273                                     #ifdef LOG_PACKET_TIMES
00274                                       getcurrentTimeMicros(pQueueEntry->timestamp_proccessing_end_OP);
00275                                     #endif
00276                                     #ifdef LOG_CHANNEL
00277                                       fmChannelListEntry* pTmpEntry=m_pChannelList->get(pMuxSocket,tmpC);
00278                                       pTmpEntry->packetsInFromUser++;
00279                                       set64(pTmpEntry->timeCreated,pQueueEntry->timestamp_proccessing_start);
00280                                     #endif
00281                                     m_pQueueSendToMix->add(pMixPacket,sizeof(tQueueEntry));
00282                                     incMixedPackets();
00283                                     #ifdef _DEBUG
00284 //                                      CAMsg::printMsg(LOG_DEBUG,"Added out channel: %u\n",pMixPacket->channel);
00285                                     #endif
00286                                   }
00287                               }
00288                           }
00289                       }
00290                 #ifdef HAVE_EPOLL
00291 NEXT_USER:
00292                   pHashEntry=(fmHashTableEntry*)m_psocketgroupUsersRead->getNextSignaledSocketData();
00293                 #else
00294                   }//if is signaled
00295 NEXT_USER:
00296                   pHashEntry=m_pChannelList->getNext();
00297                 #endif
00298               }
00299           }
00300 //Third step
00301 //Sending to next mix
00302 
00303 // Now in a separate Thread (see loopSendToMix())
00304 
00305 //Step 4
00306 //Stepa 4a Receiving form Mix to Queue now in separat Thread
00307 //Step 4b Proccesing MixPackets received from Mix
00308 //todo check for error!!!
00309         countRead=m_nUser+1;
00310         while(countRead>0&&m_pQueueReadFromMix->getSize()>=sizeof(tQueueEntry))
00311           {
00312             bAktiv=true;
00313             countRead--;
00314             ret=sizeof(tQueueEntry);
00315             m_pQueueReadFromMix->get((UINT8*)pQueueEntry,(UINT32*)&ret);
00316             #ifdef LOG_PACKET_TIMES
00317               getcurrentTimeMicros(pQueueEntry->timestamp_proccessing_start_OP);
00318             #endif
00319             if(pMixPacket->flags==CHANNEL_CLOSE) //close event
00320               {
00321                 #if defined(_DEBUG) && !defined(__MIX_TEST)
00322 //                  CAMsg::printMsg(LOG_DEBUG,"Closing Channel: %u ...\n",pMixPacket->channel);
00323                 #endif
00324                 fmChannelList* pEntry=m_pChannelList->get(pMixPacket->channel);
00325                 if(pEntry!=NULL)
00326                   {
00327                     if (pEntry->pHead != NULL) {
00328                       pMixPacket->channel=pEntry->channelIn;
00329                       getRandom(pMixPacket->data,DATA_SIZE);
00330                       #ifdef LOG_PACKET_TIMES
00331                         getcurrentTimeMicros(pQueueEntry->timestamp_proccessing_end_OP);
00332                       #endif
00333                       pEntry->pHead->pQueueSend->add(pMixPacket,sizeof(tQueueEntry));
00334                       #ifdef LOG_TRAFFIC_PER_USER
00335                         pEntry->pHead->trafficOut++;
00336                       #endif
00337                       #ifdef COUNTRY_STATS
00338                         m_PacketsPerCountryOUT[pEntry->pHead->countryID].inc();
00339                       #endif  
00340                       #ifdef LOG_CHANNEL  
00341                         //pEntry->packetsOutToUser++;
00342                         getcurrentTimeMicros(current_time);
00343                         diff_time=diff64(current_time,pEntry->timeCreated);
00344                         CAMsg::printMsg(LOG_DEBUG,"2:%u,%Lu,%u,%u,%u\n",
00345                                                 pEntry->channelIn,pEntry->pHead->id,pEntry->packetsInFromUser,pEntry->packetsOutToUser,
00346                                                 diff_time);
00347                       #endif
00348                     
00349                       #ifdef HAVE_EPOLL
00350                         m_psocketgroupUsersWrite->add(*pEntry->pHead->pMuxSocket,pEntry->pHead); 
00351                       #else
00352                         m_psocketgroupUsersWrite->add(*pEntry->pHead->pMuxSocket); 
00353                       #endif                    
00354                       delete pEntry->pCipher;
00355                       pEntry->pCipher = NULL;
00356                       m_pChannelList->removeChannel(pEntry->pHead->pMuxSocket,pEntry->channelIn);
00357                     }
00358                     else {
00359                       /* the client has already closed the connection but we
00360                        * have waited for the CHANNEL-CLOSE packet from the
00361                        * last mix -> now we have it
00362                        */
00363                       #ifdef LOG_PACKET_TIMES
00364                         getcurrentTimeMicros(pQueueEntry->timestamp_proccessing_end_OP);
00365                       #endif
00366                       delete pEntry->pCipher;
00367                       pEntry->pCipher = NULL;
00368                       m_pChannelList->removeVacantOutChannel(pEntry);
00369                       pEntry = NULL;
00370                     }
00371                 }
00372               }
00373             else
00374               {//flag !=close
00375                 #if defined(_DEBUG) && !defined(__MIX_TEST)
00376 //                  CAMsg::printMsg(LOG_DEBUG,"Sending Data to Browser!\n");
00377                 #endif
00378                 fmChannelList* pEntry=m_pChannelList->get(pMixPacket->channel);
00379                 if(pEntry!=NULL) {
00380                   if (pEntry->pHead != NULL) {
00381                     #ifdef LOG_CRIME
00382                       if((pMixPacket->flags&CHANNEL_SIG_CRIME)==CHANNEL_SIG_CRIME)
00383                         {
00384                           UINT32 id=(pMixPacket->flags>>8)&0x000000FF;
00385                           int log=LOG_ENCRYPTED;
00386                           if(!pglobalOptions->isEncryptedLogEnabled())
00387                             log=LOG_CRIT;
00388                           CAMsg::printMsg(log,"Detecting crime activity - ID: %u -- In-IP is: %u.%u.%u.%u \n",id,pEntry->pHead->peerIP[0],pEntry->pHead->peerIP[1],pEntry->pHead->peerIP[2],pEntry->pHead->peerIP[3]);
00389                           continue;
00390                         }
00391                     #endif
00392                     pMixPacket->channel=pEntry->channelIn;
00393                     pEntry->pCipher->crypt2(pMixPacket->data,pMixPacket->data,DATA_SIZE);
00394                     
00395                     #ifdef LOG_PACKET_TIMES
00396                       getcurrentTimeMicros(pQueueEntry->timestamp_proccessing_end_OP);
00397                     #endif
00398                     pEntry->pHead->pQueueSend->add(pMixPacket,sizeof(tQueueEntry));
00399                     #ifdef LOG_TRAFFIC_PER_USER
00400                       pEntry->pHead->trafficOut++;
00401                     #endif
00402                     #ifdef COUNTRY_STATS
00403                       m_PacketsPerCountryOUT[pEntry->pHead->countryID].inc();
00404                     #endif  
00405                     #ifdef LOG_CGANNEL  
00406                       pEntry->packetsOutToUser++;
00407                     #endif
00408                     #ifdef HAVE_EPOLL
00409                       m_psocketgroupUsersWrite->add(*pEntry->pHead->pMuxSocket,pEntry->pHead); 
00410                     #else
00411                       m_psocketgroupUsersWrite->add(*pEntry->pHead->pMuxSocket); 
00412                     #endif    
00413                     incMixedPackets();
00414                   }
00415                   else {
00416                     /* connection to client is already closed -> we wait for
00417                      * CLOSE-CHANNEL from last mix (but this is no CLOSE-
00418                      * CHANNEL packet -> do only some compatibility things and
00419                      * ignore the packet)
00420                      */
00421                     #ifdef LOG_CRIME
00422                       /* we don't have the user-information any more (but also
00423                        * the user cannot receive any data); nevertheless write
00424                        * a log-message to show what happened
00425                        */
00426                       if((pMixPacket->flags&CHANNEL_SIG_CRIME)==CHANNEL_SIG_CRIME) {
00427                         UINT32 id = (pMixPacket->flags>>8)&0x000000FF;
00428                         int log = LOG_ENCRYPTED;
00429                         if (!pglobalOptions->isEncryptedLogEnabled()) {
00430                           log=LOG_CRIT;
00431                         }
00432                         CAMsg::printMsg(log,"Detecting crime activity - ID: %u -- In-IP is: not available (user has already closed connection)\n",id);
00433                         continue;
00434                       }
00435                     #endif
00436                     #ifdef LOG_PACKET_TIMES
00437                       getcurrentTimeMicros(pQueueEntry->timestamp_proccessing_end_OP);
00438                     #endif
00439                   }
00440                 }
00441                 else
00442                   {                  
00443                     #ifdef _DEBUG
00444                       if(pMixPacket->flags!=CHANNEL_DUMMY)
00445                         {
00446 /*                          CAMsg::printMsg(LOG_DEBUG,"Error Sending Data to Browser -- "
00447                               "Channel-Id %u not valid!\n",pMixPacket->channel
00448                             );*/
00449                           #ifdef LOG_CHANNEL
00450                             CAMsg::printMsg(LOG_INFO,"Packet late arrive for channel: %u\n",pMixPacket->channel);
00451                           #endif
00452                         }
00453                     #endif
00454                   }
00455               }
00456           }
00457 
00458 //Step 5 
00459 //Writing to users...
00460         countRead=m_psocketgroupUsersWrite->select(/*true,*/0);
00461 #ifdef HAVE_EPOLL    
00462         fmHashTableEntry* pfmHashEntry=(fmHashTableEntry*)m_psocketgroupUsersWrite->getFirstSignaledSocketData();
00463         while(pfmHashEntry!=NULL)
00464           {
00465 #else
00466         fmHashTableEntry* pfmHashEntry=m_pChannelList->getFirst();
00467         while(countRead>0&&pfmHashEntry!=NULL)
00468           {
00469             if(m_psocketgroupUsersWrite->isSignaled(*pfmHashEntry->pMuxSocket))
00470               {
00471                 countRead--;
00472 #endif
00473 #ifdef DELAY_USERS
00474                 if(pfmHashEntry->delayBucket>0)
00475                 {
00476 #endif
00477                 if(pfmHashEntry->pQueueSend->getSize()>0)
00478                 {
00479                 bAktiv=true;
00480                 UINT32 len=sizeof(tQueueEntry);
00481                 if(pfmHashEntry->uAlreadySendPacketSize==-1)
00482                   {
00483                     pfmHashEntry->pQueueSend->get((UINT8*)&pfmHashEntry->oQueueEntry,&len); 
00484                     #ifdef PAYMENT
00485                       //do not count control channel packets!
00486                       if(pfmHashEntry->oQueueEntry.packet.channel>0&&pfmHashEntry->oQueueEntry.packet.channel<256)
00487                         pfmHashEntry->bCountPacket=false;
00488                       else
00489                         pfmHashEntry->bCountPacket=true;
00490                     #endif
00491                     pfmHashEntry->pMuxSocket->prepareForSend(&(pfmHashEntry->oQueueEntry.packet));
00492                     pfmHashEntry->uAlreadySendPacketSize=0;
00493                   }
00494                 len=MIXPACKET_SIZE-pfmHashEntry->uAlreadySendPacketSize;
00495                 ret=pfmHashEntry->pMuxSocket->getCASocket()->send(((UINT8*)&(pfmHashEntry->oQueueEntry))+pfmHashEntry->uAlreadySendPacketSize,len);
00496                 if(ret>0)
00497                   {
00498                     pfmHashEntry->uAlreadySendPacketSize+=ret;
00499                     if(pfmHashEntry->uAlreadySendPacketSize==MIXPACKET_SIZE)
00500                       {
00501                         #ifdef PAYMENT
00502                           if(pfmHashEntry->bCountPacket)
00503                             {
00504                               // count packet for payment
00505                 if (CAAccountingInstance::handleJapPacket(pfmHashEntry) == 2)
00506                 {
00507                   goto NEXT_USER_WRITING;
00508                 }
00509                             }
00510                         #endif
00511                         #ifdef DELAY_USERS
00512                           pfmHashEntry->delayBucket--;
00513                         #endif
00514                         pfmHashEntry->uAlreadySendPacketSize=-1;
00515                         #ifdef LOG_PACKET_TIMES
00516                           if(!isZero64(pfmHashEntry->oQueueEntry.timestamp_proccessing_start))
00517                             {
00518                               getcurrentTimeMicros(pfmHashEntry->oQueueEntry.timestamp_proccessing_end);
00519                               m_pLogPacketStats->addToTimeingStats(pfmHashEntry->oQueueEntry,CHANNEL_DATA,false);
00520                             }
00521                         #endif
00522                       }
00523                    }
00524                 }
00525 #ifdef DELAY_USERS
00526                 }
00527 #endif
00528                   //todo error handling
00529 #ifdef HAVE_EPOLL
00530 NEXT_USER_WRITING:
00531             pfmHashEntry=(fmHashTableEntry*)m_psocketgroupUsersWrite->getNextSignaledSocketData();
00532 #else
00533               }//if is socket signaled
00534 NEXT_USER_WRITING:              
00535             pfmHashEntry=m_pChannelList->getNext();
00536 #endif
00537           }
00538         if(!bAktiv)
00539           msSleep(100);
00540       }
00541 //ERR:
00542 //@todo move cleanup to clean() !
00543     CAMsg::printMsg(LOG_CRIT,"Seams that we are restarting now!!\n");
00544     m_bRestart=true;
00545     m_pMuxOut->close();
00546     for(UINT32 i=0;i<m_nSocketsIn;i++)
00547       m_arrSocketsIn[i].close();
00548     //writng some bytes to the queue...
00549     UINT8 b[sizeof(tQueueEntry)+1];
00550     m_pQueueSendToMix->add(b,sizeof(tQueueEntry)+1);
00551 //#if !defined(_DEBUG) && !defined(NO_LOOPACCEPTUSER)
00552     CAMsg::printMsg(LOG_CRIT,"Wait for LoopAcceptUsers!\n");
00553     m_pthreadAcceptUsers->join();
00554 //#endif
00555     CAMsg::printMsg(LOG_CRIT,"Wait for LoopSendToMix!\n");
00556     m_pthreadSendToMix->join(); //will not join if queue is empty (and so wating)!!!
00557     CAMsg::printMsg(LOG_CRIT,"Wait for LoopReadFromMix!\n");
00558     m_pthreadReadFromMix->join();
00559     #ifdef LOG_PACKET_TIMES
00560       CAMsg::printMsg(LOG_CRIT,"Wait for LoopLogPacketStats to terminate!\n");
00561       m_pLogPacketStats->stop();
00562     #endif  
00563     //waits until all login threads terminates....
00564     // we have to be sure that the Accept thread was alread stoped!
00565     m_pthreadsLogin->destroy(true);
00566     CAMsg::printMsg(LOG_CRIT,"Before deleting CAFirstMixChannelList()!\n");
00567     CAMsg::printMsg  (LOG_CRIT,"Memeory usage before: %u\n",getMemoryUsage());  
00568     fmHashTableEntry* pHashEntry=m_pChannelList->getFirst();
00569     while(pHashEntry!=NULL)
00570       {
00571         CAMuxSocket * pMuxSocket=pHashEntry->pMuxSocket;
00572         delete pHashEntry->pQueueSend;
00573         pHashEntry->pQueueSend = NULL;
00574         delete pHashEntry->pSymCipher;
00575         pHashEntry->pSymCipher = NULL; 
00576 
00577         fmChannelListEntry* pEntry=m_pChannelList->getFirstChannelForSocket(pHashEntry->pMuxSocket);
00578         while(pEntry!=NULL)
00579           {
00580             delete pEntry->pCipher;
00581             pEntry->pCipher = NULL;
00582   
00583             pEntry=m_pChannelList->getNextChannel(pEntry);
00584           }
00585         m_pChannelList->remove(pHashEntry->pMuxSocket);
00586         pMuxSocket->close();
00587         delete pMuxSocket;
00588         pMuxSocket = NULL;
00589         pHashEntry=m_pChannelList->getNext();
00590       }
00591     /* clean all vacant out-channels (the connection from the client was
00592      * closed but we've waited for the CHANNEL-CLOSE from the last mix)
00593      */
00594     m_pChannelList->cleanVacantOutChannels();
00595     CAMsg::printMsg  (LOG_CRIT,"Memory usage after: %u\n",getMemoryUsage());  
00596     delete pQueueEntry;
00597     pQueueEntry = NULL;
00598     delete []tmpBuff;
00599     tmpBuff = NULL;
00600     CAMsg::printMsg(LOG_CRIT,"Main Loop exited!!\n");
00601 #endif // NEW_MIX_TYPE
00602     return E_UNKNOWN;
00603   }
00604 #endif //ONLY_LOCAL_PROXY