Mixe for Privacy and Anonymity in the Internet
CAFirstMixA.cpp
Go to the documentation of this file.
00001 /*
00002 Copyright (c) 2000, The JAP-Team
00003 All rights reserved.
00004 Redistribution and use in source and binary forms, with or without modification,
00005 are permitted provided that the following conditions are met:
00006 
00007   - Redistributions of source code must retain the above copyright notice,
00008     this list of conditions and the following disclaimer.
00009 
00010   - Redistributions in binary form must reproduce the above copyright notice,
00011     this list of conditions and the following disclaimer in the documentation and/or
00012     other materials provided with the distribution.
00013 
00014   - Neither the name of the University of Technology Dresden, Germany nor the names of its contributors
00015     may be used to endorse or promote products derived from this software without specific
00016     prior written permission.
00017 
00018 
00019 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS ``AS IS'' AND ANY EXPRESS
00020 OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
00021 AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS
00022 BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
00023 (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,
00024 OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER
00025 IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
00026 OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE
00027 */
00028 #include "../StdAfx.h"
00029 #ifndef ONLY_LOCAL_PROXY
00030 #include "CAFirstMixA.hpp"
00031 #include "../CALibProxytest.hpp"
00032 #include "../CAThread.hpp"
00033 #include "../CASingleSocketGroup.hpp"
00034 #include "../CAInfoService.hpp"
00035 #include "../CAPool.hpp"
00036 #include "../CACmdLnOptions.hpp"
00037 #include "../CAAccountingInstance.hpp"
00038 #include "../CAStatusManager.hpp"
00039 #ifdef HAVE_EPOLL
00040   #include "../CASocketGroupEpoll.hpp"
00041 #endif
00042 
00043 void CAFirstMixA::shutDown()
00044 {
00045   m_bIsShuttingDown = true;
00046 
00047 #ifdef PAYMENT
00048   UINT32 connectionsClosed = 0;
00049   fmHashTableEntry* timeoutHashEntry;
00050 
00051 
00052   /* make sure no reconnect is possible when shutting down */
00053   if(m_pthreadAcceptUsers!=NULL)
00054   {
00055     CAMsg::printMsg(LOG_CRIT,"Wait for LoopAcceptUsers!\n");
00056     m_bRestart=true;
00057     m_pthreadAcceptUsers->join();
00058     delete m_pthreadAcceptUsers;
00059   }
00060   m_pthreadAcceptUsers=NULL;
00061 
00062   if(m_pInfoService != NULL)
00063   {
00064     CAMsg::printMsg(LOG_DEBUG,"Shutting down infoservice.\n");
00065     m_pInfoService->stop();
00066   }
00067 
00068   if(m_pChannelList!=NULL) // may happen if mixes did not yet connect to each other
00069   {
00070     while ((timeoutHashEntry = m_pChannelList->popTimeoutEntry(true)) != NULL)
00071     {
00072       CAMsg::printMsg(LOG_DEBUG,"Shutting down, closing client connection.\n");
00073       connectionsClosed++;
00074       closeConnection(timeoutHashEntry);
00075     }
00076     CAMsg::printMsg(LOG_DEBUG,"Closed %i client connections.\n", connectionsClosed);
00077   }
00078 #endif
00079   m_bRestart = true;
00080   m_bIsShuttingDown = false;
00081 }
00082 
00083 
00084 
00085 SINT32 CAFirstMixA::closeConnection(fmHashTableEntry* pHashEntry)
00086 {
00087   if (pHashEntry == NULL)
00088   {
00089     return E_UNKNOWN;
00090   }
00091 
00092   INIT_STACK;
00093   BEGIN_STACK("CAFirstMixA::closeConnection");
00094 
00095 
00096   fmChannelListEntry* pEntry;
00097   tQueueEntry* pQueueEntry = new tQueueEntry;
00098   MIXPACKET* pMixPacket=&pQueueEntry->packet;
00099 
00100   #ifdef LOG_TRAFFIC_PER_USER
00101     UINT64 current_time;
00102     getcurrentTimeMillis(current_time);
00103     CAMsg::printMsg(LOG_DEBUG,"Removing Connection wiht ID: %Lu -- login time [ms] %Lu -- logout time [ms] %Lu -- Traffic was: IN: %u  --  OUT: %u\n",pHashEntry->id,pHashEntry->timeCreated,current_time,pHashEntry->trafficIn,pHashEntry->trafficOut);
00104   #endif
00105   m_pIPList->removeIP(pHashEntry->peerIP);
00106 
00107   m_psocketgroupUsersRead->remove(*(pHashEntry->pMuxSocket));
00108   m_psocketgroupUsersWrite->remove(*(pHashEntry->pMuxSocket));
00109   pEntry = m_pChannelList->getFirstChannelForSocket(pHashEntry->pMuxSocket);
00110 
00111   while(pEntry!=NULL)
00112   {
00113     getRandom(pMixPacket->data,DATA_SIZE);
00114     pMixPacket->flags=CHANNEL_CLOSE;
00115     pMixPacket->channel=pEntry->channelOut;
00116     #ifdef LOG_PACKET_TIMES
00117       setZero64(pQueueEntry->timestamp_proccessing_start);
00118     #endif
00119     m_pQueueSendToMix->add(pQueueEntry, sizeof(tQueueEntry));
00120     delete pEntry->pCipher;
00121     pEntry->pCipher = NULL;
00122     pEntry=m_pChannelList->getNextChannel(pEntry);
00123 #ifdef CH_LOG_STUDY
00124     nrOfChOpMutex->lock();
00125     currentOpenedChannels--;
00126     nrOfChOpMutex->unlock();
00127 #endif //CH_LOG_STUDY
00128   }
00129   ASSERT(pHashEntry->pQueueSend!=NULL,"Send queue is NULL");
00130   delete pHashEntry->pQueueSend;
00131   pHashEntry->pQueueSend = NULL;
00132   delete pHashEntry->pSymCipher;
00133   pHashEntry->pSymCipher = NULL;
00134 
00135   #ifdef COUNTRY_STATS
00136     decUsers(pHashEntry);
00137   #else
00138     decUsers();
00139   #endif
00140 
00141   CAMuxSocket* pMuxSocket = pHashEntry->pMuxSocket;
00142   // Save the socket - its pointer will be deleted in this method!!! Crazy mad programming...
00143   m_pChannelList->remove(pHashEntry->pMuxSocket);
00144   delete pMuxSocket;
00145   pMuxSocket = NULL;
00146   //pHashEntry->pMuxSocket = NULL; // not needed now, but maybe in the future...
00147 
00148   delete pQueueEntry;
00149   pQueueEntry = NULL;
00150 
00151   FINISH_STACK("CAFirstMixA::closeConnection");
00152 
00153   return E_SUCCESS;
00154 }
00155 
00156 #define FIRST_MIX_SIZE_OF_SYMMETRIC_KEYS 2*KEY_SIZE
00157 
00158 SINT32 CAFirstMixA::loop()
00159   {
00160 #ifndef NEW_MIX_TYPE
00161 #ifdef DELAY_USERS
00162     m_pChannelList->setDelayParameters( CALibProxytest::getOptions()->getDelayChannelUnlimitTraffic(),
00163                                       CALibProxytest::getOptions()->getDelayChannelBucketGrow(),
00164                                       CALibProxytest::getOptions()->getDelayChannelBucketGrowIntervall());
00165 #endif
00166 
00167   //  CASingleSocketGroup osocketgroupMixOut;
00168     SINT32 countRead=0;
00169     //#ifdef LOG_PACKET_TIMES
00170     //  tPoolEntry* pPoolEntry=new tPoolEntry;
00171     //  MIXPACKET* pMixPacket=&pPoolEntry->mixpacket;
00172     //#else
00173     tQueueEntry* pQueueEntry = new tQueueEntry;
00174     MIXPACKET* pMixPacket=&pQueueEntry->packet;
00175     //#endif
00176     m_nUser=0;
00177     SINT32 ret;
00178     //osocketgroupMixOut.add(*m_pMuxOut);
00179 
00180     UINT8* tmpBuff=new UINT8[sizeof(tQueueEntry)];
00181     CAMsg::printMsg(LOG_DEBUG,"Starting Message Loop... \n");
00182     bool bAktiv;
00183     UINT8 rsaBuff[RSA_SIZE];
00184 
00185 #ifdef LOG_TRAFFIC_PER_USER
00186     UINT64 current_time;
00187     UINT32 diff_time;
00188     CAMsg::printMsg(LOG_DEBUG,"Channel log formats:\n");
00189     CAMsg::printMsg(LOG_DEBUG,"1. Close received from user (times in micros) - 1:Channel-ID,Connection-ID,Channel open timestamp (microseconds),PacketsIn (only data and open),PacketsOut (only data),ChannelDuration (open packet received --> close packet put into send queue to next mix)\n");
00190     CAMsg::printMsg(LOG_DEBUG,"2. Channel close from Mix(times in micros)- 2.:Channel-ID,Connection-ID,Channel open timestamp (microseconds),PacketsIn (only data and open), PacketsOut (only data),ChannelDuration (open packet received)--> close packet put into send queue to next user\n");
00191 #endif
00192 
00193 #ifdef _DEBUG
00194     CAThread* pLogThread=new CAThread((UINT8*)"CAFirstMixA - LogLoop");
00195     pLogThread->setMainLoop(fm_loopLog);
00196     pLogThread->start(this);
00197 #endif
00198 
00199 #ifdef LOG_CRIME
00200     CASocketAddrINet* surveillanceIPs = CALibProxytest::getOptions()->getCrimeSurveillanceIPs();
00201     UINT32 nrOfSurveillanceIPs = CALibProxytest::getOptions()->getNrOfCrimeSurveillanceIPs();
00202 #endif
00203 //    CAThread threadReadFromUsers;
00204 //    threadReadFromUsers.setMainLoop(loopReadFromUsers);
00205 //    threadReadFromUsers.start(this);
00206 
00207     while(!m_bRestart) /* the main mix loop as long as there are things that are not handled by threads. */
00208       {
00209 
00210         bAktiv=false;
00211 
00212 //LOOP_START:
00213 #ifdef PAYMENT
00214         // while checking if there are connections to close: synch with login threads
00215         m_pmutexLogin->lock();
00216         checkUserConnections();
00217         m_pmutexLogin->unlock();
00218 #endif
00219 //First Step
00220 //Checking for new connections
00221 // Now in a separate Thread....
00222 
00223 // Second Step
00224 // Checking for data from users
00225 // Now in a separate Thread (see loopReadFromUsers())
00226 //Only proccess user data, if queue to next mix is not to long!!
00227 
00228         if(m_pQueueSendToMix->getSize()<MAX_NEXT_MIX_QUEUE_SIZE)
00229           {
00230             countRead=m_psocketgroupUsersRead->select(/*false,*/0);       // how many JAP<->mix connections have received data from their coresponding JAP
00231             if(countRead>0)
00232               bAktiv=true;
00233 #ifdef HAVE_EPOLL
00234             //if we have epoll we do not need to search the whole list
00235             //of connected JAPs to find the ones who have sent data
00236             //as epoll will return ONLY these connections.
00237             fmHashTableEntry* pHashEntry=(fmHashTableEntry*)m_psocketgroupUsersRead->getFirstSignaledSocketData();
00238             while(pHashEntry!=NULL)
00239               {
00240                 CAMuxSocket* pMuxSocket=pHashEntry->pMuxSocket;
00241 #else
00242             //if we do not have epoll we have to go to the whole
00243             //list of open connections to find the ones which
00244             //actually have sent some data
00245             fmHashTableEntry* pHashEntry=m_pChannelList->getFirst();
00246             while(pHashEntry!=NULL&&countRead>0)                      // iterate through all connections as long as there is at least one active left
00247               {
00248                 CAMuxSocket* pMuxSocket=pHashEntry->pMuxSocket;
00249                 if(m_psocketgroupUsersRead->isSignaled(*pMuxSocket))  // if this one seems to have data
00250                   {
00251 #endif
00252 /*#ifdef DELAY_USERS
00253  * Don't delay upstream
00254                 if( m_pChannelList->hasDelayBuckets(pHashEntry->delayBucketID) )
00255                 {
00256 #endif*/
00257                     countRead--;
00258                     ret=pMuxSocket->receive(pMixPacket,0);
00259 
00260                     #if defined LOG_PACKET_TIMES||defined(LOG_CHANNEL)
00261                       getcurrentTimeMicros(pQueueEntry->timestamp_proccessing_start);
00262                       set64(pQueueEntry->timestamp_proccessing_start_OP,pQueueEntry->timestamp_proccessing_start);
00263                     #endif
00264                     #ifdef DATA_RETENTION_LOG
00265                       pQueueEntry->dataRetentionLogEntry.t_in=htonl(time(NULL));
00266                     #endif
00267                     if(ret<0&&ret!=E_AGAIN/*||pHashEntry->accessUntil<time()*/)
00268                     {
00269                       // remove dead connections
00270                       closeConnection(pHashEntry);
00271                     }
00272                     else if(ret==MIXPACKET_SIZE)                      // we've read enough data for a whole mix packet. nice!
00273                       {
00274 #ifdef PAYMENT
00275                         if (pHashEntry->bRecoverTimeout)
00276                         {
00277                           // renew the timeout only if recovery is allowed
00278                           m_pChannelList->pushTimeoutEntry(pHashEntry);
00279                         }
00280 #endif
00281                         #ifdef LOG_TRAFFIC_PER_USER
00282                           pHashEntry->trafficIn++;
00283                         #endif
00284                         #ifdef COUNTRY_STATS
00285                           m_PacketsPerCountryIN[pHashEntry->countryID].inc();
00286                         #endif
00287                         //New control channel code...!
00288                         if(pMixPacket->channel>0&&pMixPacket->channel<256)
00289                         {
00290                           if (pHashEntry->pControlChannelDispatcher->proccessMixPacket(pMixPacket))
00291                           {
00292                             goto NEXT_USER;
00293                           }
00294                           else
00295                           {
00296                             CAMsg::printMsg(LOG_DEBUG, "Control channel packet is invalid and could not be processed!\n");
00297                             closeConnection(pHashEntry);
00298                             goto NEXT_USER;
00299                           }
00300                         }
00301 #ifdef PAYMENT
00302                         if(accountTrafficUpstream(pHashEntry) != E_SUCCESS) goto NEXT_USER;
00303 #endif
00304                         if(pMixPacket->flags==CHANNEL_DUMMY) // just a dummy to keep the connection alife in e.g. NAT gateways
00305                         {
00306                           CAMsg::printMsg(LOG_DEBUG,"received dummy traffic\n");
00307                           getRandom(pMixPacket->data,DATA_SIZE);
00308                           #ifdef LOG_PACKET_TIMES
00309                             setZero64(pQueueEntry->timestamp_proccessing_start);
00310                           #endif
00311                           #ifdef LOG_TRAFFIC_PER_USER
00312                             pHashEntry->trafficOut++;
00313                           #endif
00314                           #ifdef COUNTRY_STATS
00315                             m_PacketsPerCountryOUT[pHashEntry->countryID].inc();
00316                           #endif
00317                           pHashEntry->pQueueSend->add(pQueueEntry,sizeof(tQueueEntry));
00318                           #ifdef HAVE_EPOLL
00319                             //m_psocketgroupUsersWrite->add(*pMuxSocket,pHashEntry);
00320                           #else
00321                             m_psocketgroupUsersWrite->add(*pMuxSocket);
00322                           #endif
00323                         }
00324                         else if(pMixPacket->flags==CHANNEL_CLOSE)     // closing one mix-channel (not the JAP<->mix connection!)
00325                         {
00326                           fmChannelListEntry* pEntry;
00327                           pEntry=m_pChannelList->get(pMuxSocket,pMixPacket->channel);
00328                           if(pEntry!=NULL)
00329                           {
00330                             pMixPacket->channel=pEntry->channelOut;
00331                             getRandom(pMixPacket->data,DATA_SIZE);
00332                             #ifdef LOG_PACKET_TIMES
00333                               getcurrentTimeMicros(pQueueEntry->timestamp_proccessing_end_OP);
00334                             #endif
00335                             m_pQueueSendToMix->add(pQueueEntry, sizeof(tQueueEntry));
00336                             /* Don't delay upstream
00337                             #ifdef DELAY_USERS
00338                             m_pChannelList->decDelayBuckets(pHashEntry->delayBucketID);
00339                             #endif*/
00340                             #ifdef LOG_CHANNEL
00341                               //pEntry->packetsInFromUser++;
00342                               getcurrentTimeMicros(current_time);
00343                               diff_time=diff64(current_time,pEntry->timeCreated);
00344                               CAMsg::printMsg(LOG_DEBUG,"1:%u,%Lu,%Lu,%u,%u,%u\n",
00345                                                         pEntry->channelIn,pEntry->pHead->id,pEntry->timeCreated,pEntry->packetsInFromUser,pEntry->packetsOutToUser,
00346                                                         diff_time);
00347                             #endif
00348                             delete pEntry->pCipher;              // forget the symetric key of this connection
00349                             pEntry->pCipher = NULL;
00350                             m_pChannelList->removeChannel(pMuxSocket,pEntry->channelIn);
00351 
00352                             #ifdef CH_LOG_STUDY
00353                             nrOfChOpMutex->lock();
00354                             currentOpenedChannels--;
00355                             nrOfChOpMutex->unlock();
00356                             #endif //CH_LOG_STUDY
00357                           }
00358                           #ifdef _DEBUG
00359                           else
00360                           {
00361 //                            CAMsg::printMsg(LOG_DEBUG,"Invalid ID to close from Browser!\n");
00362                           }
00363                           #endif
00364                         }
00365                         else                                         // finally! a normal mix packet
00366                         {
00367                           CASymCipher* pCipher=NULL;
00368                           fmChannelListEntry* pEntry;
00369                           pEntry=m_pChannelList->get(pMuxSocket,pMixPacket->channel);
00370                           if(pEntry!=NULL&&pMixPacket->flags==CHANNEL_DATA)
00371                           {
00372                             pMixPacket->channel=pEntry->channelOut;
00373                             pCipher=pEntry->pCipher;
00374                             pCipher->crypt1(pMixPacket->data,pMixPacket->data,DATA_SIZE);
00375                                                  // queue the packet for sending to the next mix.
00376                             #ifdef LOG_PACKET_TIMES
00377                               getcurrentTimeMicros(pQueueEntry->timestamp_proccessing_end_OP);
00378                             #endif
00379 
00380                             //check if this IP must be logged due to crime detection
00381                             #ifdef LOG_CRIME
00382                               crimeSurveillance(surveillanceIPs, nrOfSurveillanceIPs,
00383                                 pEntry->pHead->peerIP, pMixPacket);
00384                             #endif
00385                             m_pQueueSendToMix->add(pQueueEntry, sizeof(tQueueEntry));
00386                             /* Don't delay upstream
00387                             #ifdef DELAY_USERS
00388                             m_pChannelList->decDelayBuckets(pHashEntry->delayBucketID);
00389                             #endif*/
00390                             incMixedPackets();
00391                             #ifdef LOG_CHANNEL
00392                               pEntry->packetsInFromUser++;
00393                             #endif
00394                           }
00395                           else if(pEntry==NULL&&pMixPacket->flags==CHANNEL_OPEN)  // open a new mix channel
00396                           { // stefan: muesste das nicht vor die behandlung von CHANNEL_DATA? oder gilt OPEN => !DATA ?
00397                             //es gilt: open -> data
00398                             pHashEntry->pSymCipher->crypt1(pMixPacket->data,rsaBuff,FIRST_MIX_SIZE_OF_SYMMETRIC_KEYS);
00399                             #ifdef REPLAY_DETECTION
00400                             // replace time(NULL) with the real timestamp ()
00401                             // packet-timestamp*REPLAY_BASE + m_u64ReferenceTime
00402                               if(m_pReplayDB->insert(rsaBuff,time(NULL))!=E_SUCCESS)
00403                               {
00404                                 CAMsg::printMsg(LOG_INFO,"Replay: Duplicate packet ignored.\n");
00405                                 continue;
00406                               }
00407                             #endif
00408                             pCipher= new CASymCipher();
00409                             pCipher->setKeys(rsaBuff,FIRST_MIX_SIZE_OF_SYMMETRIC_KEYS);
00410                             for(int i=0;i<16;i++)
00411                               rsaBuff[i]=0xFF;
00412                             pCipher->setIV2(rsaBuff);
00413                             pCipher->crypt1(pMixPacket->data+FIRST_MIX_SIZE_OF_SYMMETRIC_KEYS,pMixPacket->data,DATA_SIZE-FIRST_MIX_SIZE_OF_SYMMETRIC_KEYS);
00414                             getRandom(pMixPacket->data+DATA_SIZE-FIRST_MIX_SIZE_OF_SYMMETRIC_KEYS,FIRST_MIX_SIZE_OF_SYMMETRIC_KEYS);
00415                             #if defined (LOG_CHANNEL) ||defined(DATA_RETENTION_LOG)
00416                               HCHANNEL tmpC=pMixPacket->channel;
00417                             #endif
00418 
00419                             HCHANNEL inChannel = pMixPacket->channel;
00420                             if(m_pChannelList->addChannel(pMuxSocket,pMixPacket->channel,pCipher,&pMixPacket->channel)!=E_SUCCESS)
00421                             { //todo move up ?
00422                               delete pCipher;
00423                               pCipher = NULL;
00424                             }
00425                             else
00426                             {
00427                               #ifdef CH_LOG_STUDY
00428                               nrOfChOpMutex->lock();
00429                               if(pHashEntry->channelOpenedLastIntervalTS !=
00430                                 lastLogTime)
00431                               {
00432                                 pHashEntry->channelOpenedLastIntervalTS =
00433                                   lastLogTime;
00434                                 nrOfOpenedChannels++;
00435                               }
00436                               currentOpenedChannels++;
00437                               nrOfChOpMutex->unlock();
00438                               #endif //CH_LOG_STUDY
00439 
00440                               #ifdef LOG_PACKET_TIMES
00441                                 getcurrentTimeMicros(pQueueEntry->timestamp_proccessing_end_OP);
00442                               #endif
00443                               #ifdef LOG_CHANNEL
00444                                 fmChannelListEntry* pTmpEntry=m_pChannelList->get(pMuxSocket,tmpC);
00445                                 pTmpEntry->packetsInFromUser++;
00446                                 set64(pTmpEntry->timeCreated,pQueueEntry->timestamp_proccessing_start);
00447                               #endif
00448                               #ifdef DATA_RETENTION_LOG
00449                                 pQueueEntry->dataRetentionLogEntry.entity.first.channelid=htonl(pMixPacket->channel);
00450                                 fmChannelListEntry* pTmpEntry1=m_pChannelList->get(pMuxSocket,tmpC);
00451                                 memcpy(pQueueEntry->dataRetentionLogEntry.entity.first.ip_in,pTmpEntry1->pHead->peerIP,4);
00452                                 pQueueEntry->dataRetentionLogEntry.entity.first.port_in=(UINT16)pTmpEntry1->pHead->peerPort;
00453                                 pQueueEntry->dataRetentionLogEntry.entity.first.port_in=htons(pQueueEntry->dataRetentionLogEntry.entity.first.port_in);
00454                               #endif
00455 
00456                               //check if this IP must be logged due to crime detection
00457                               #ifdef LOG_CRIME
00458 
00459                                 pEntry=m_pChannelList->get(pMuxSocket, inChannel);
00460                                 if(pEntry != NULL)
00461                                 {
00462                                   crimeSurveillance(surveillanceIPs, nrOfSurveillanceIPs,
00463                                       pEntry->pHead->peerIP, pMixPacket);
00464                                 }
00465                               #endif
00466                               m_pQueueSendToMix->add(pQueueEntry, sizeof(tQueueEntry));
00467                               /* Don't delay upstream
00468                               #ifdef DELAY_USERS
00469                               m_pChannelList->decDelayBuckets(pHashEntry->delayBucketID);
00470                               #endif*/
00471                               incMixedPackets();
00472                               #ifdef _DEBUG
00473 //                                      CAMsg::printMsg(LOG_DEBUG,"Added out channel: %u\n",pMixPacket->channel);
00474                               #endif
00475                             }
00476                           }
00477                         }
00478                       }
00479 /*#ifdef DELAY_USERS
00480                   }
00481 #endif*/
00482                 #ifdef HAVE_EPOLL
00483 NEXT_USER:
00484                   pHashEntry=(fmHashTableEntry*)m_psocketgroupUsersRead->getNextSignaledSocketData();
00485                 #else
00486                   }//if is signaled
00487 NEXT_USER:
00488                   pHashEntry=m_pChannelList->getNext();
00489                 #endif
00490               }
00491               if(countRead>0)
00492               {
00493                 CAMsg::printMsg(LOG_DEBUG,"CAFirstMixA::loop() - read from user --> countRead >0 after processing all connections!\n");
00494               }
00495           }
00496 //Third step
00497 //Sending to next mix
00498 
00499 // Now in a separate Thread (see loopSendToMix())
00500 
00501 //Step 4
00502 //Step 4a Receiving from mix to queue now in a separate thread
00503 //Step 4b Processing MixPackets received from Mix
00504 //todo check for error!!!
00505         countRead=m_nUser+1;
00506         while(countRead>0&&m_pQueueReadFromMix->getSize()>=sizeof(tQueueEntry))
00507           {
00508             bAktiv=true;
00509             countRead--;
00510             ret=sizeof(tQueueEntry);
00511             m_pQueueReadFromMix->get((UINT8*)pQueueEntry,(UINT32*)&ret);
00512             #ifdef LOG_PACKET_TIMES
00513               getcurrentTimeMicros(pQueueEntry->timestamp_proccessing_start_OP);
00514             #endif
00515             if(pMixPacket->flags==CHANNEL_CLOSE) //close event
00516               {
00517                 #if defined(_DEBUG) && !defined(__MIX_TEST)
00518 //                  CAMsg::printMsg(LOG_DEBUG,"Closing Channel: %u ...\n",pMixPacket->channel);
00519                 #endif
00520                 fmChannelList* pEntry=m_pChannelList->get(pMixPacket->channel);
00521                 if(pEntry!=NULL)
00522                   {
00523                     /* a hack to solve the SSL problem:
00524                      * set channel of downstream packet to in channel after they are dequeued
00525                      * from pEntry->pQueueSend so we can retrieve the channel entry to decrement
00526                      * the per channel count of enqueued downstream bytes.
00527                      */
00528                     #ifndef SSL_HACK
00529                     pMixPacket->channel=pEntry->channelIn;
00530                     #endif
00531 
00532                     pEntry->pCipher->crypt2(pMixPacket->data,pMixPacket->data,DATA_SIZE);
00533                     //getRandom(pMixPacket->data,DATA_SIZE);
00534                     #ifdef LOG_PACKET_TIMES
00535                       getcurrentTimeMicros(pQueueEntry->timestamp_proccessing_end_OP);
00536                     #endif
00537                     pEntry->pHead->pQueueSend->add(pQueueEntry, sizeof(tQueueEntry));
00538                     #ifdef LOG_TRAFFIC_PER_USER
00539                       pEntry->pHead->trafficOut++;
00540                     #endif
00541                     #ifdef COUNTRY_STATS
00542                       m_PacketsPerCountryOUT[pEntry->pHead->countryID].inc();
00543                     #endif
00544                     #ifdef SSL_HACK
00545                       /* a hack to solve the SSL problem:
00546                        * per channel count of enqueued downstream bytes
00547                        */
00548                       pEntry->downStreamBytes += sizeof(tQueueEntry);
00549                     #endif
00550                     #ifdef LOG_CHANNEL
00551                       pEntry->packetsOutToUser++;
00552                       getcurrentTimeMicros(current_time);
00553                       diff_time=diff64(current_time,pEntry->timeCreated);
00554                       CAMsg::printMsg(LOG_DEBUG,"2:%u,%Lu,%Lu,%u,%u,%u\n",
00555                                                 pEntry->channelIn,pEntry->pHead->id,pEntry->timeCreated,pEntry->packetsInFromUser,pEntry->packetsOutToUser,
00556                                                 diff_time);
00557                     #endif
00558 
00559                     #ifdef HAVE_EPOLL
00560                       //m_psocketgroupUsersWrite->add(*pEntry->pHead->pMuxSocket,pEntry->pHead);
00561                     #else
00562                       m_psocketgroupUsersWrite->add(*pEntry->pHead->pMuxSocket);
00563                     #endif
00564 
00565 
00566                     #ifndef SSL_HACK
00567                       delete pEntry->pCipher;              // forget the symetric key of this connection
00568                       pEntry->pCipher = NULL;
00569                       m_pChannelList->removeChannel(pEntry->pHead->pMuxSocket, pEntry->channelIn);
00570                     #ifdef CH_LOG_STUDY
00571                       nrOfChOpMutex->lock();
00572                       currentOpenedChannels--;
00573                       nrOfChOpMutex->unlock();
00574                     #endif
00575                     /* a hack to solve the SSL problem:
00576                      * remove channel after the close packet is enqueued
00577                      * from pEntry->pQueueSend
00578                      */
00579                     #endif
00580                   }
00581                   else
00582                   {
00583                     #ifdef DEBUG
00584                       CAMsg::printMsg(LOG_DEBUG, "CAFirstMixA: close channel -> client but channel does not exist.\n");
00585                     #endif
00586                   }
00587 
00588               }
00589             else
00590               {//flag !=close
00591                 #if defined(_DEBUG) && !defined(__MIX_TEST)
00592 //                  CAMsg::printMsg(LOG_DEBUG,"Sending Data to Browser!\n");
00593                 #endif
00594                 fmChannelList* pEntry=m_pChannelList->get(pMixPacket->channel);
00595 
00596                 if(pEntry!=NULL)
00597                   {
00598                     #ifdef LOG_CRIME
00599                       if((pMixPacket->flags&CHANNEL_SIG_CRIME)==CHANNEL_SIG_CRIME)
00600                         {
00601                           //UINT32 id=(pMixPacket->flags>>8)&0x000000FF;
00602                           int log=LOG_ENCRYPTED;
00603                           if(!CALibProxytest::getOptions()->isEncryptedLogEnabled())
00604                             log=LOG_CRIT;
00605                           CAMsg::printMsg(log,"Detecting crime activity - next mix channel: %u -- "
00606                               "In-IP is: %u.%u.%u.%u \n", pMixPacket->channel,
00607                               pEntry->pHead->peerIP[0],
00608                               pEntry->pHead->peerIP[1],
00609                               pEntry->pHead->peerIP[2],
00610                               pEntry->pHead->peerIP[3]);
00611                           continue;
00612                         }
00613                     #endif
00614 
00615                     /* a hack to solve the SSL problem:
00616                      * same as CHANNEL_CLOSE packets
00617                      */
00618                     #ifndef SSL_HACK
00619                     pMixPacket->channel=pEntry->channelIn;
00620                     #endif
00621 
00622                     pEntry->pCipher->crypt2(pMixPacket->data,pMixPacket->data,DATA_SIZE);
00623 
00624                     #ifdef LOG_PACKET_TIMES
00625                       getcurrentTimeMicros(pQueueEntry->timestamp_proccessing_end_OP);
00626                     #endif
00627                     pEntry->pHead->pQueueSend->add(pQueueEntry, sizeof(tQueueEntry));
00628                     /*CAMsg::printMsg(
00629                         LOG_INFO,"adding data packet to queue: %x, queue size: %u bytes\n",
00630                         pEntry->pHead->pQueueSend, pEntry->pHead->pQueueSend->getSize());*/
00631                     #ifdef LOG_TRAFFIC_PER_USER
00632                       pEntry->pHead->trafficOut++;
00633                     #endif
00634                     #ifdef COUNTRY_STATS
00635                       m_PacketsPerCountryOUT[pEntry->pHead->countryID].inc();
00636                     #endif
00637                     #ifdef LOG_CHANNEL
00638                       pEntry->packetsOutToUser++;
00639                     #endif
00640                     #ifdef SSL_HACK
00641                       /* a hack to solve the SSL problem:
00642                        * per channel count of downstream packets in bytes
00643                        */
00644                       pEntry->downStreamBytes += sizeof(tQueueEntry);
00645                     #endif
00646 
00647                     #ifdef HAVE_EPOLL
00648                       /*int epret = m_psocketgroupUsersWrite->add(*pEntry->pHead->pMuxSocket,pEntry->pHead);
00649                       if(epret == E_UNKNOWN)
00650                       {
00651                         epret=errno;
00652                         CAMsg::printMsg(LOG_INFO,"epoll_add returns: %s (return value: %d) \n", strerror(epret), epret);
00653                       }*/
00654                       #else
00655                       m_psocketgroupUsersWrite->add(*pEntry->pHead->pMuxSocket);
00656                     #endif
00657 
00658                     incMixedPackets();
00659                   }
00660                 else
00661                   {
00662                     #ifdef _DEBUG
00663                       if(pMixPacket->flags!=CHANNEL_DUMMY)
00664                         {
00665 /*                          CAMsg::printMsg(LOG_DEBUG,"Error Sending Data to Browser -- "
00666                               "Channel-Id %u not valid!\n",pMixPacket->channel
00667                             );*/
00668                           #ifdef LOG_CHANNEL
00669                             CAMsg::printMsg(LOG_INFO,"Packet late arrive for channel: %u\n",pMixPacket->channel);
00670                           #endif
00671                         }
00672                     #endif
00673                   }
00674               }
00675           }
00676 
00679         bAktiv = sendToUsers();
00680 
00681         if(!bAktiv)
00682           msSleep(100);
00683       }
00684 //ERR:
00685     CAMsg::printMsg(LOG_CRIT,"Seems that we are restarting now!!\n");
00686     m_bRunLog=false;
00687     //clean();
00688     delete pQueueEntry;
00689     pQueueEntry = NULL;
00690     delete []tmpBuff;
00691     tmpBuff = NULL;
00692 #ifdef _DEBUG
00693     pLogThread->join();
00694     delete pLogThread;
00695     pLogThread = NULL;
00696 #endif
00697     CAMsg::printMsg(LOG_CRIT,"Main Loop exited!!\n");
00698 #endif//NEW_MIX_TYPE
00699     return E_UNKNOWN;
00700   }
00701 #endif //ONLY_LOCAL_PROXY
00702 
00703 /* last part of the main loop:
00704  * return true if the loop when at least one packet was sent
00705  * or false otherwise.
00706  */
00707 bool CAFirstMixA::sendToUsers()
00708 {
00709   SINT32 countRead = m_psocketgroupUsersWrite->select(/*true,*/0);
00710   tQueueEntry *packetToSend = NULL;
00711   SINT32 packetSize = sizeof(tQueueEntry);
00712   CAQueue *controlMessageUserQueue = NULL;
00713   CAQueue *dataMessageUserQueue = NULL;
00714 
00715   CAQueue *processedQueue = NULL; /* one the above queues that should be used for processing*/
00716   UINT32 extractSize = 0;
00717   bool bAktiv = false;
00718   UINT32 iSocketErrors = 0;
00719 
00720 /* Cyclic polling: gets all open sockets that will not block when invoking send()
00721  * but will only send at most one packet. After that control is returned to loop()
00722  */
00723 #ifdef HAVE_EPOLL
00724   fmHashTableEntry* pfmHashEntry=
00725     (fmHashTableEntry*) m_psocketgroupUsersWrite->getFirstSignaledSocketData();
00726 
00727   while(pfmHashEntry != NULL)
00728   {
00729 
00730 #else
00731   fmHashTableEntry* pfmHashEntry=m_pChannelList->getFirst();
00732   while( (countRead > 0) && (pfmHashEntry != NULL) )
00733   {
00734     if(m_psocketgroupUsersWrite->isSignaled(*pfmHashEntry->pMuxSocket))
00735     {
00736       countRead--;
00737 #endif
00738       /* loop turn init */
00739       extractSize = 0;
00740       processedQueue = NULL;
00741       packetToSend = &(pfmHashEntry->oQueueEntry);
00742       controlMessageUserQueue = pfmHashEntry->pControlMessageQueue;
00743       dataMessageUserQueue = pfmHashEntry->pQueueSend;
00744 
00745       //Control messages have a higher priority.
00746       if(controlMessageUserQueue->getSize() > 0)
00747       {
00748         processedQueue = controlMessageUserQueue;
00749         pfmHashEntry->bCountPacket = false;
00750       }
00751       else if( (dataMessageUserQueue->getSize() > 0)
00752 #ifdef DELAY_USERS
00753           && m_pChannelList->hasDelayBuckets(pfmHashEntry->delayBucketID)
00754 #endif
00755       )
00756       {
00757         processedQueue = dataMessageUserQueue;
00758         pfmHashEntry->bCountPacket = true;
00759       }
00760 
00761       if(processedQueue != NULL)
00762       {
00763         extractSize = packetSize;
00764         bAktiv=true;
00765 
00766         if(pfmHashEntry->uAlreadySendPacketSize == -1)
00767         {
00768           processedQueue->get((UINT8*) packetToSend, &extractSize);
00769 
00770           /* Hack for SSL BUG */
00771 #ifdef SSL_HACK
00772           finishPacket(pfmHashEntry);
00773 #endif //SSL_HACK
00774           pfmHashEntry->pMuxSocket->prepareForSend(&(packetToSend->packet));
00775           pfmHashEntry->uAlreadySendPacketSize = 0;
00776         }
00777       }
00778 
00779       if( (extractSize > 0) || (pfmHashEntry->uAlreadySendPacketSize > 0) )
00780       {
00781         SINT32 len =  MIXPACKET_SIZE - pfmHashEntry->uAlreadySendPacketSize;
00782         UINT8* packetToSendOffset = ((UINT8*)&(packetToSend->packet)) + pfmHashEntry->uAlreadySendPacketSize;
00783         CASocket* clientSocket = pfmHashEntry->pMuxSocket->getCASocket();
00784 
00785         SINT32 ret = clientSocket->send(packetToSendOffset, len);
00786 
00787         if(ret > 0)
00788         {
00789 #ifdef PAYMENT
00790           SINT32 accounting = E_SUCCESS;
00791 #endif
00792           pfmHashEntry->uAlreadySendPacketSize += ret;
00793 
00794           if(pfmHashEntry->uAlreadySendPacketSize == MIXPACKET_SIZE)
00795           {
00796             #ifdef DELAY_USERS
00797             if(processedQueue != controlMessageUserQueue)
00798             {
00799               m_pChannelList->decDelayBuckets(pfmHashEntry->delayBucketID);
00800             }
00801             #endif
00802 
00803             #ifdef LOG_PACKET_TIMES
00804               if(!isZero64(pfmHashEntry->oQueueEntry.timestamp_proccessing_start))
00805                 {
00806                   getcurrentTimeMicros(pfmHashEntry->oQueueEntry.timestamp_proccessing_end);
00807                   m_pLogPacketStats->addToTimeingStats(pfmHashEntry->oQueueEntry,CHANNEL_DATA,false);
00808                 }
00809             #endif
00810             pfmHashEntry->uAlreadySendPacketSize=-1;
00811 #ifdef PAYMENT
00812             /* count this packet for accounting */
00813             accounting = accountTrafficDownstream(pfmHashEntry);
00814 #endif
00815           }
00816 
00817         }
00818         else if(ret<0&&ret!=E_AGAIN)
00819         {
00820           iSocketErrors++;
00821           // if (iSocketErrors == 1) // show debug message only at the first error; otherwise, the log may get huge
00822           {
00823             SOCKET sock=clientSocket->getSocket();
00824             CAMsg::printMsg(LOG_DEBUG,"CAFirstMixA::sendtoUser() - send error %d on socket %d. Reason: %s (%i)\n", ret, sock, GET_NET_ERROR_STR(GET_NET_ERROR), GET_NET_ERROR);
00825           }
00826           // kick the user out - these only happens in extreme situations...
00827           closeConnection(pfmHashEntry);
00828         }
00829         //TODO error handling
00830       }
00831 
00832 #ifdef HAVE_EPOLL
00833     pfmHashEntry=(fmHashTableEntry*)m_psocketgroupUsersWrite->getNextSignaledSocketData();
00834   }
00835 #else
00836     }//if is socket signaled
00837     pfmHashEntry=m_pChannelList->getNext();
00838   }
00839 #endif
00840   if (iSocketErrors > 1)
00841   {
00842     CAMsg::printMsg(LOG_ERR, "CAFirstMixA::sendtoUser() - %d send errors on a socket occured!\n", iSocketErrors);
00843   }
00844   
00845   return bAktiv;
00846 }
00847 
00848 #ifdef PAYMENT
00849 SINT32 CAFirstMixA::accountTrafficUpstream(fmHashTableEntry* pHashEntry)
00850 {
00851   SINT32 ret = E_SUCCESS;
00852 
00853   SINT32 handleResult = CAAccountingInstance::handleJapPacket(pHashEntry, false, false);
00854 
00855   if (CAAccountingInstance::HANDLE_PACKET_CONNECTION_OK == handleResult)
00856   {
00857     // renew the timeout
00858     //pHashEntry->bRecoverTimeout = true;
00859     m_pChannelList->pushTimeoutEntry(pHashEntry);
00860   }
00861   else if (CAAccountingInstance::HANDLE_PACKET_PREPARE_FOR_CLOSING_CONNECTION == handleResult)
00862   {
00863     // do not forward this packet
00864     pHashEntry->bRecoverTimeout = false;
00865     m_pChannelList->setKickoutForced(pHashEntry, KICKOUT_FORCED);
00866     CAMsg::printMsg(LOG_DEBUG, "CAFirstMixA: 1. setting bRecover timout to false for entry %x!\n", pHashEntry);
00867     //m_pChannelList->pushTimeoutEntry(pHashEntry);
00868     //don't let any upstream data messages pass for this user.
00869     ret = E_UNKNOWN;
00870   }
00871   else if (CAAccountingInstance::HANDLE_PACKET_CLOSE_CONNECTION == handleResult)
00872   {
00873     // kickout this user - he deserves it...
00874     CAMsg::printMsg(LOG_DEBUG, "CAFirstMixA: kickout upstream!\n");
00875     closeConnection(pHashEntry);
00876     ret = E_UNKNOWN;
00877   }
00878   //please remember that these values also may be returned even though they do not require
00879   //any further processing
00880   /*else if ( (ret == CAAccountingInstance::HANDLE_PACKET_CONNECTION_UNCHECKED) &&
00881         (ret == CAAccountingInstance::HANDLE_PACKET_HOLD_CONNECTION) )
00882   {
00883 
00884   }*/
00885   return ret;
00886 }
00887 #endif
00888 
00889 #ifdef PAYMENT
00890 SINT32 CAFirstMixA::accountTrafficDownstream(fmHashTableEntry* pfmHashEntry)
00891 {
00892   // count packet for payment
00893   SINT32 ret = CAAccountingInstance::handleJapPacket(pfmHashEntry, !(pfmHashEntry->bCountPacket), true);
00894   if (ret == CAAccountingInstance::HANDLE_PACKET_CONNECTION_OK )
00895   {
00896     // renew the timeout
00897     //pfmHashEntry->bRecoverTimeout = true;
00898     m_pChannelList->pushTimeoutEntry(pfmHashEntry);
00899   }
00900   else if (ret == CAAccountingInstance::HANDLE_PACKET_PREPARE_FOR_CLOSING_CONNECTION )
00901   {
00902     // when all control messages are sent the users connection will be closed
00903     //pfmHashEntry->bRecoverTimeout = false;
00904     m_pChannelList->setKickoutForced(pfmHashEntry, KICKOUT_FORCED);
00905     //m_pChannelList->pushTimeoutEntry(pfmHashEntry);
00906   }
00907   else if (ret == CAAccountingInstance::HANDLE_PACKET_CLOSE_CONNECTION )
00908   {
00909     // close users connection immediately
00910     CAMsg::printMsg(LOG_DEBUG, "CAFirstMixA: Closing JAP connection due to illegal payment status!\n", ret);
00911     closeConnection(pfmHashEntry);
00912     return ERR_INTERN_SOCKET_CLOSED;
00913   }
00914   //please remember that these values also may be returned even though they do not require
00915   //any further processing
00916   /*else if ( (ret == CAAccountingInstance::HANDLE_PACKET_CONNECTION_UNCHECKED) &&
00917         (ret == CAAccountingInstance::HANDLE_PACKET_HOLD_CONNECTION) )
00918   {
00919 
00920   }*/
00921   return E_SUCCESS;
00922 }
00923 #endif
00924 
00925 void CAFirstMixA::notifyAllUserChannels(fmHashTableEntry *pfmHashEntry, UINT16 flags)
00926 {
00927   if(pfmHashEntry == NULL) 
00928     return;
00929   fmChannelListEntry* pEntry = m_pChannelList->getFirstChannelForSocket(pfmHashEntry->pMuxSocket);
00930   tQueueEntry* pQueueEntry=new tQueueEntry;
00931   MIXPACKET *notifyPacket = &(pQueueEntry->packet);
00932   memset(notifyPacket, 0, MIXPACKET_SIZE);
00933 
00934   notifyPacket->flags = flags;
00935   while(pEntry != NULL)
00936   {
00937     if(pEntry->bIsSuspended)
00938     {
00939       notifyPacket->channel = pEntry->channelOut;
00940       getRandom(notifyPacket->data,DATA_SIZE);
00941 #ifdef _DEBUG
00942       CAMsg::printMsg(LOG_INFO,"Sent flags %u for channel: %u\n", flags, notifyPacket->channel);
00943 #endif
00944       m_pQueueSendToMix->add(pQueueEntry, sizeof(tQueueEntry));
00945       pEntry->bIsSuspended = false;
00946     }
00947     pEntry=m_pChannelList->getNextChannel(pEntry);
00948   }
00949   pfmHashEntry->cSuspend=0;
00950   delete pQueueEntry;
00951 }
00952 
00953 //@todo: not a reliable solution. Still have to find the bug that causes SSL connections to be resetted
00954 //while large downloads are performed by the same user (only occurs in cascades with more than two mixes)
00955 #ifdef SSL_HACK
00956 
00957 void CAFirstMixA::finishPacket(fmHashTableEntry *pfmHashEntry)
00958 {
00959   tQueueEntry *packetToSend = &(pfmHashEntry->oQueueEntry);
00960   fmChannelList* cListEntry=m_pChannelList->get(packetToSend->packet.channel);
00961   if(cListEntry != NULL)
00962   {
00963     packetToSend->packet.channel = cListEntry->channelIn;
00964     cListEntry->downStreamBytes -= sizeof(tQueueEntry);
00965 #ifdef DEBUG
00966     CAMsg::printMsg(LOG_DEBUG, "CAFirstMixA: channels of current packet, in: %u, out: %u, count: %u, flags: 0x%x\n",
00967         cListEntry->channelIn, cListEntry->channelOut, cListEntry->downStreamBytes,
00968         packetToSend->packet.flags);
00969 #endif
00970     if(packetToSend->packet.flags == CHANNEL_CLOSE)
00971     {
00972       delete cListEntry->pCipher;
00973       cListEntry->pCipher = NULL;
00974       m_pChannelList->removeChannel(pfmHashEntry->pMuxSocket, cListEntry->channelIn);
00975 #ifdef CH_LOG_STUDY
00976       nrOfChOpMutex->lock();
00977       currentOpenedChannels--;
00978       nrOfChOpMutex->unlock();
00979 #endif //CH_LOG_STUDY
00980     }
00981   }
00982 }
00983 
00984 #endif //SSL_HACK
00985 
00986 #ifdef PAYMENT
00987 void CAFirstMixA::checkUserConnections()
00988 {
00989   // check the timeout for all connections
00990   fmHashTableEntry* timeoutHashEntry;
00991   fmHashTableEntry* firstIteratorEntry = NULL;
00992   bool currentEntryKickoutForced = false;
00993   /* this check also includes forced kickouts which have not bRecoverTimeout set. */
00994   while ( (timeoutHashEntry = m_pChannelList->popTimeoutEntry(true)) != NULL )
00995   {
00996     currentEntryKickoutForced = m_pChannelList->isKickoutForced(timeoutHashEntry);
00997     if(firstIteratorEntry == timeoutHashEntry)
00998     {
00999       m_pChannelList->pushTimeoutEntry(timeoutHashEntry, currentEntryKickoutForced);
01000       break;
01001     }
01002 
01003     if (!currentEntryKickoutForced)
01004     {
01005       //CAMsg::printMsg(LOG_ERR, "%p\n, ", timeoutHashEntry);
01006       if(m_pChannelList->isTimedOut(timeoutHashEntry) )
01007       {
01008         CAMsg::printMsg(LOG_DEBUG,"Client connection closed due to timeout.\n");
01009         closeConnection(timeoutHashEntry);
01010         continue;
01011       }
01012     }
01013     else
01014     {
01015       //A user to be kicked out: empty his downstream data queue.
01016       timeoutHashEntry->pQueueSend->clean();
01017 
01018       if( (timeoutHashEntry->pControlMessageQueue->getSize() == 0) ||
01019         (timeoutHashEntry->kickoutSendRetries <= 0) )
01020       {
01021         CAMsg::printMsg(LOG_WARNING, "Kickout immediately owner %x!\n", timeoutHashEntry);
01022         UINT32 authFlags = CAAccountingInstance::getAuthFlags(timeoutHashEntry);
01023         if (authFlags > 0)
01024         {
01025           CAMsg::printMsg(LOG_WARNING,"Client connection closed due to forced timeout! Payment auth flags: %u\n", authFlags);
01026         }
01027         else
01028         {
01029           CAMsg::printMsg(LOG_WARNING,"Client connection closed due to forced timeout!\n");
01030         }
01031         //CAAccountingInstance::setPrepaidBytesToZero(timeoutHashEntry->pAccountingInfo);
01032         closeConnection(timeoutHashEntry);
01033         continue;
01034       }
01035       else
01036       {
01037         //Note this counter initialized by calling CAFirstMixChannelList::add
01038         //and accessed by this thread, both do never run concurrently.
01039         //So we can avoid locking.
01040         timeoutHashEntry->kickoutSendRetries--;
01041         CAMsg::printMsg(LOG_INFO, "Size of control message queue for user to be kicked out: %u bytes, retries %d.\n",
01042             timeoutHashEntry->pControlMessageQueue->getSize(), timeoutHashEntry->kickoutSendRetries);
01043       }
01044       // Let the client obtain all his remaining control message packets
01045       //(which in most cases contain the error message with the kickout reason.
01046       CAMsg::printMsg(LOG_WARNING,"A kickout is supposed to happen. Let the user get his %u control message bytes before...\n",
01047           timeoutHashEntry->pControlMessageQueue->getSize());
01048     }
01049     if(firstIteratorEntry == NULL)
01050     {
01051       firstIteratorEntry = timeoutHashEntry;
01052     }
01053     m_pChannelList->pushTimeoutEntry(timeoutHashEntry, currentEntryKickoutForced);
01054   }
01055 }
01056 #endif
01057 
01058 #ifdef LOG_CRIME
01059 void CAFirstMixA::crimeSurveillance(CASocketAddrINet* surveillanceIPs, UINT32 nrOfSurveillanceIPs,UINT8 *peerIP, MIXPACKET *pMixPacket)
01060 {
01061   if( (nrOfSurveillanceIPs > 0) && (surveillanceIPs != NULL) )
01062   {
01063     for(UINT32 i = 0; i < nrOfSurveillanceIPs; i++)
01064     {
01065       if(surveillanceIPs[i].equalsIP(peerIP))
01066       {
01067         CAMsg::printMsg(LOG_CRIT,"Crime detection: User surveillance, IP %u.%u.%u.%u with next mix channel %u\n",peerIP[0], peerIP[1], peerIP[2], peerIP[3],pMixPacket->channel);
01068         pMixPacket->flags |= CHANNEL_SIG_CRIME;
01069       }
01070     }
01071   }
01072 }
01073 #endif