|
Mixe for Privacy and Anonymity in the Internet
|
00001 /* 00002 Copyright (c) 2000, The JAP-Team 00003 All rights reserved. 00004 Redistribution and use in source and binary forms, with or without modification, 00005 are permitted provided that the following conditions are met: 00006 00007 - Redistributions of source code must retain the above copyright notice, 00008 this list of conditions and the following disclaimer. 00009 00010 - Redistributions in binary form must reproduce the above copyright notice, 00011 this list of conditions and the following disclaimer in the documentation and/or 00012 other materials provided with the distribution. 00013 00014 - Neither the name of the University of Technology Dresden, Germany nor the names of its contributors 00015 may be used to endorse or promote products derived from this software without specific 00016 prior written permission. 00017 00018 00019 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS ``AS IS'' AND ANY EXPRESS 00020 OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY 00021 AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS 00022 BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES 00023 (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, 00024 OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER 00025 IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 00026 OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE 00027 */ 00028 #include "../StdAfx.h" 00029 #ifndef ONLY_LOCAL_PROXY 00030 #include "CAFirstMixB.hpp" 00031 #include "../CAThread.hpp" 00032 #include "../CASingleSocketGroup.hpp" 00033 #include "../CAInfoService.hpp" 00034 #include "../CAPool.hpp" 00035 #include "../CACmdLnOptions.hpp" 00036 #include "../CAAccountingInstance.hpp" 00037 #ifdef HAVE_EPOLL 00038 #include "../CASocketGroupEpoll.hpp" 00039 #endif 00040 00041 SINT32 CAFirstMixB::loop() 00042 { 00043 #ifdef NEW_MIX_TYPE 00044 /* should only be compiled if new NEW_MIX_TYPE is defined */ 00045 #ifdef DELAY_USERS 00046 m_pChannelList->setDelayParameters( pglobalOptions->getDelayChannelUnlimitTraffic(), 00047 pglobalOptions->getDelayChannelBucketGrow(), 00048 pglobalOptions->getDelayChannelBucketGrowIntervall()); 00049 #endif 00050 00051 // CASingleSocketGroup osocketgroupMixOut; 00052 SINT32 countRead; 00053 //#ifdef LOG_PACKET_TIMES 00054 // tPoolEntry* pPoolEntry=new tPoolEntry; 00055 // MIXPACKET* pMixPacket=&pPoolEntry->mixpacket; 00056 //#else 00057 tQueueEntry* pQueueEntry=new tQueueEntry; 00058 MIXPACKET* pMixPacket=&pQueueEntry->packet; 00059 //#endif 00060 m_nUser=0; 00061 SINT32 ret; 00062 //osocketgroupMixOut.add(*m_pMuxOut); 00063 00064 UINT8* tmpBuff=new UINT8[sizeof(tQueueEntry)]; 00065 CAMsg::printMsg(LOG_DEBUG,"Starting Message Loop... \n"); 00066 bool bAktiv; 00067 UINT8 rsaBuff[RSA_SIZE]; 00068 #ifdef LOG_TRAFFIC_PER_USER 00069 UINT64 current_time; 00070 UINT32 diff_time; 00071 CAMsg::printMsg(LOG_DEBUG,"Channel log formats:\n"); 00072 CAMsg::printMsg(LOG_DEBUG,"1. Close received from user (times in micros) - 1:Channel-ID,Connection-ID,PacketsIn (only data and open),PacketsOut (only data),ChannelDuration (open packet received --> close packet put into send queue to next mix)\n"); 00073 CAMsg::printMsg(LOG_DEBUG,"2. Channel close from Mix(times in micros)- 2.:Channel-ID,Connection-ID,PacketsIn (only data and open), PacketsOut (only data),ChannelDuration (open packet received)--> close packet put into send queue to next user\n"); 00074 #endif 00075 // CAThread threadReadFromUsers; 00076 // threadReadFromUsers.setMainLoop(loopReadFromUsers); 00077 // threadReadFromUsers.start(this); 00078 00079 while(!m_bRestart) /* the main mix loop as long as there are things that are not handled by threads. */ 00080 { 00081 bAktiv=false; 00082 //LOOP_START: 00083 00084 //First Step 00085 //Checking for new connections 00086 // Now in a separat Thread.... 00087 00088 // Second Step 00089 // Checking for data from users 00090 // Now in a separate Thread (see loopReadFromUsers()) 00091 //Only proccess user data, if queue to next mix is not to long!! 00092 #define MAX_NEXT_MIX_QUEUE_SIZE 10000000 //10 MByte 00093 if(m_pQueueSendToMix->getSize()<MAX_NEXT_MIX_QUEUE_SIZE) 00094 { 00095 countRead=m_psocketgroupUsersRead->select(/*false,*/0); // how many JAP<->mix connections have received data from their coresponding JAP 00096 if(countRead>0) 00097 bAktiv=true; 00098 #ifdef HAVE_EPOLL 00099 //if we have epool we do not need to search the whole list 00100 //of connected JAPs to find the ones who have sent data 00101 //as epool will return ONLY these connections. 00102 fmHashTableEntry* pHashEntry=(fmHashTableEntry*)m_psocketgroupUsersRead->getFirstSignaledSocketData(); 00103 while(pHashEntry!=NULL) 00104 { 00105 CAMuxSocket* pMuxSocket=pHashEntry->pMuxSocket; 00106 #else 00107 //if we do not have epoll we have to go to the whole 00108 //list of open connections to find the ones which 00109 //actually have sent some data 00110 fmHashTableEntry* pHashEntry=m_pChannelList->getFirst(); 00111 while(pHashEntry!=NULL&&countRead>0) // iterate through all connections as long as there is at least one active left 00112 { 00113 CAMuxSocket* pMuxSocket=pHashEntry->pMuxSocket; 00114 if(m_psocketgroupUsersRead->isSignaled(*pMuxSocket)) // if this one seems to have data 00115 { 00116 countRead--; 00117 #endif 00118 ret=pMuxSocket->receive(pMixPacket,0); 00119 #if defined LOG_PACKET_TIMES||defined(LOG_CHANNEL) 00120 getcurrentTimeMicros(pQueueEntry->timestamp_proccessing_start); 00121 set64(pQueueEntry->timestamp_proccessing_start_OP,pQueueEntry->timestamp_proccessing_start); 00122 #endif 00123 if(ret==SOCKET_ERROR/*||pHashEntry->accessUntil<time()*/) 00124 { 00125 // remove dead connections 00126 #ifdef LOG_TRAFFIC_PER_USER 00127 getcurrentTimeMillis(current_time); 00128 diff_time=diff64(current_time,pHashEntry->timeCreated); 00129 m_pIPList->removeIP(pHashEntry->peerIP,diff_time,pHashEntry->trafficIn,pHashEntry->trafficOut); 00130 #else 00131 m_pIPList->removeIP(pHashEntry->peerIP); 00132 #endif 00133 m_psocketgroupUsersRead->remove(pMuxSocket->getSocket()); 00134 m_psocketgroupUsersWrite->remove(pMuxSocket->getSocket()); 00135 ASSERT(pHashEntry->pQueueSend!=NULL,"Send queue is NULL"); 00136 delete pHashEntry->pQueueSend; 00137 pHashEntry->pQueueSend = NULL; 00138 delete pHashEntry->pSymCipher; 00139 pHashEntry->pSymCipher = NULL; 00140 #ifdef COUNTRY_STATS 00141 decUsers(pHashEntry); 00142 #else 00143 decUsers(); 00144 #endif 00145 /* remove the client part (MuxSocket) but keep the 00146 * outgoing channels until a CHANNEL-CLOSE from the 00147 * last mix is received 00148 */ 00149 m_pChannelList->removeClientPart(pMuxSocket); 00150 delete pMuxSocket; 00151 pMuxSocket = NULL; 00152 } 00153 else if(ret==MIXPACKET_SIZE) // we've read enough data for a whole mix packet. nice! 00154 { 00155 #ifdef LOG_TRAFFIC_PER_USER 00156 pHashEntry->trafficIn++; 00157 #endif 00158 #ifdef COUNTRY_STATS 00159 m_PacketsPerCountryIN[pHashEntry->countryID].inc(); 00160 #endif 00161 //New control channel code...! 00162 SINT32 ret = 0; 00163 if(pMixPacket->channel>0&&pMixPacket->channel<256) 00164 { 00165 if (pHashEntry->pControlChannelDispatcher->proccessMixPacket(pMixPacket)) 00166 { 00167 goto NEXT_USER; 00168 } 00169 else 00170 { 00171 CAMsg::printMsg(LOG_DEBUG, "Packet is invalid and could not be processed!\n"); 00172 ret = 3; 00173 } 00174 } 00175 #ifdef PAYMENT 00176 // payment code added by Bastian Voigt 00177 if (ret == 0) 00178 { 00179 ret = CAAccountingInstance::handleJapPacket(pHashEntry); 00180 } 00181 if (ret == 2) 00182 { 00183 goto NEXT_USER; 00184 } 00185 #endif 00186 if(ret == 3) 00187 { 00188 // this jap is evil! terminate connection and add IP to blacklist 00189 CAMsg::printMsg(LOG_DEBUG, "CAFirstMixB: Detected evil Jap.. closing connection! Removing IP..\n", ret); 00190 fmChannelListEntry* pEntry; 00191 pEntry=m_pChannelList->getFirstChannelForSocket(pMuxSocket); 00192 while(pEntry!=NULL) 00193 { 00194 getRandom(pMixPacket->data,DATA_SIZE); 00195 pMixPacket->flags=CHANNEL_CLOSE; 00196 pMixPacket->channel=pEntry->channelOut; 00197 #ifdef LOG_PACKET_TIMES 00198 setZero64(pQueueEntry->timestamp_proccessing_start); 00199 #endif 00200 m_pQueueSendToMix->add(pMixPacket,sizeof(tQueueEntry)); 00201 delete pEntry->pCipher; 00202 pEntry->pCipher = NULL; 00203 pEntry=m_pChannelList->getNextChannel(pEntry); 00204 } 00205 m_pIPList->removeIP(pHashEntry->peerIP); 00206 m_psocketgroupUsersRead->remove(pMuxSocket->getSocket()); 00207 m_psocketgroupUsersWrite->remove(pMuxSocket->getSocket()); 00208 delete pHashEntry->pQueueSend; 00209 pHashEntry->pQueueSend = NULL; 00210 delete pHashEntry->pSymCipher; 00211 pHashEntry->pSymCipher = NULL; 00212 m_pChannelList->remove(pMuxSocket); 00213 delete pMuxSocket; 00214 pMuxSocket = NULL; 00215 decUsers(); 00216 goto NEXT_USER; 00217 } 00218 00219 if(pMixPacket->flags==CHANNEL_DUMMY) // just a dummy to keep the connection alife in e.g. NAT gateways 00220 { 00221 getRandom(pMixPacket->data,DATA_SIZE); 00222 #ifdef LOG_PACKET_TIMES 00223 setZero64(pQueueEntry->timestamp_proccessing_start); 00224 #endif 00225 pHashEntry->pQueueSend->add(pMixPacket,sizeof(tQueueEntry)); 00226 #ifdef HAVE_EPOLL 00227 m_psocketgroupUsersWrite->add(*pMuxSocket,pHashEntry); 00228 #else 00229 m_psocketgroupUsersWrite->add(*pMuxSocket); 00230 #endif 00231 } 00232 else // finally! a normal mix packet 00233 { 00234 CASymCipher* pCipher=NULL; 00235 fmChannelListEntry* pEntry; 00236 pEntry=m_pChannelList->get(pMuxSocket,pMixPacket->channel); 00237 if(pEntry!=NULL&&pMixPacket->flags==CHANNEL_DATA) 00238 { 00239 pMixPacket->channel=pEntry->channelOut; 00240 pCipher=pEntry->pCipher; 00241 pCipher->crypt1(pMixPacket->data,pMixPacket->data,DATA_SIZE); 00242 // queue the packet for sending to the next mix. 00243 #ifdef LOG_PACKET_TIMES 00244 getcurrentTimeMicros(pQueueEntry->timestamp_proccessing_end_OP); 00245 #endif 00246 m_pQueueSendToMix->add(pMixPacket,sizeof(tQueueEntry)); 00247 incMixedPackets(); 00248 #ifdef LOG_CHANNEL 00249 pEntry->packetsInFromUser++; 00250 #endif 00251 } 00252 else if(pEntry==NULL&&pMixPacket->flags==CHANNEL_OPEN) // open a new mix channel 00253 { // stefan: muesste das nicht vor die behandlung von CHANNEL_DATA? oder gilt OPEN => !DATA ? 00254 //es gilt: open -> data 00255 pHashEntry->pSymCipher->crypt1(pMixPacket->data,rsaBuff,KEY_SIZE); 00256 pCipher= new CASymCipher(); 00257 pCipher->setKey(rsaBuff); 00258 for(int i=0;i<16;i++) 00259 rsaBuff[i]=0xFF; 00260 pCipher->setIV2(rsaBuff); 00261 pCipher->crypt1(pMixPacket->data+KEY_SIZE,pMixPacket->data,DATA_SIZE-KEY_SIZE); 00262 getRandom(pMixPacket->data+DATA_SIZE-KEY_SIZE,KEY_SIZE); 00263 #ifdef LOG_CHANNEL 00264 HCHANNEL tmpC=pMixPacket->channel; 00265 #endif 00266 if(m_pChannelList->addChannel(pMuxSocket,pMixPacket->channel,pCipher,&pMixPacket->channel)!=E_SUCCESS) 00267 { //todo move up ? 00268 delete pCipher; 00269 pCipher = NULL; 00270 } 00271 else 00272 { 00273 #ifdef LOG_PACKET_TIMES 00274 getcurrentTimeMicros(pQueueEntry->timestamp_proccessing_end_OP); 00275 #endif 00276 #ifdef LOG_CHANNEL 00277 fmChannelListEntry* pTmpEntry=m_pChannelList->get(pMuxSocket,tmpC); 00278 pTmpEntry->packetsInFromUser++; 00279 set64(pTmpEntry->timeCreated,pQueueEntry->timestamp_proccessing_start); 00280 #endif 00281 m_pQueueSendToMix->add(pMixPacket,sizeof(tQueueEntry)); 00282 incMixedPackets(); 00283 #ifdef _DEBUG 00284 // CAMsg::printMsg(LOG_DEBUG,"Added out channel: %u\n",pMixPacket->channel); 00285 #endif 00286 } 00287 } 00288 } 00289 } 00290 #ifdef HAVE_EPOLL 00291 NEXT_USER: 00292 pHashEntry=(fmHashTableEntry*)m_psocketgroupUsersRead->getNextSignaledSocketData(); 00293 #else 00294 }//if is signaled 00295 NEXT_USER: 00296 pHashEntry=m_pChannelList->getNext(); 00297 #endif 00298 } 00299 } 00300 //Third step 00301 //Sending to next mix 00302 00303 // Now in a separate Thread (see loopSendToMix()) 00304 00305 //Step 4 00306 //Stepa 4a Receiving form Mix to Queue now in separat Thread 00307 //Step 4b Proccesing MixPackets received from Mix 00308 //todo check for error!!! 00309 countRead=m_nUser+1; 00310 while(countRead>0&&m_pQueueReadFromMix->getSize()>=sizeof(tQueueEntry)) 00311 { 00312 bAktiv=true; 00313 countRead--; 00314 ret=sizeof(tQueueEntry); 00315 m_pQueueReadFromMix->get((UINT8*)pQueueEntry,(UINT32*)&ret); 00316 #ifdef LOG_PACKET_TIMES 00317 getcurrentTimeMicros(pQueueEntry->timestamp_proccessing_start_OP); 00318 #endif 00319 if(pMixPacket->flags==CHANNEL_CLOSE) //close event 00320 { 00321 #if defined(_DEBUG) && !defined(__MIX_TEST) 00322 // CAMsg::printMsg(LOG_DEBUG,"Closing Channel: %u ...\n",pMixPacket->channel); 00323 #endif 00324 fmChannelList* pEntry=m_pChannelList->get(pMixPacket->channel); 00325 if(pEntry!=NULL) 00326 { 00327 if (pEntry->pHead != NULL) { 00328 pMixPacket->channel=pEntry->channelIn; 00329 getRandom(pMixPacket->data,DATA_SIZE); 00330 #ifdef LOG_PACKET_TIMES 00331 getcurrentTimeMicros(pQueueEntry->timestamp_proccessing_end_OP); 00332 #endif 00333 pEntry->pHead->pQueueSend->add(pMixPacket,sizeof(tQueueEntry)); 00334 #ifdef LOG_TRAFFIC_PER_USER 00335 pEntry->pHead->trafficOut++; 00336 #endif 00337 #ifdef COUNTRY_STATS 00338 m_PacketsPerCountryOUT[pEntry->pHead->countryID].inc(); 00339 #endif 00340 #ifdef LOG_CHANNEL 00341 //pEntry->packetsOutToUser++; 00342 getcurrentTimeMicros(current_time); 00343 diff_time=diff64(current_time,pEntry->timeCreated); 00344 CAMsg::printMsg(LOG_DEBUG,"2:%u,%Lu,%u,%u,%u\n", 00345 pEntry->channelIn,pEntry->pHead->id,pEntry->packetsInFromUser,pEntry->packetsOutToUser, 00346 diff_time); 00347 #endif 00348 00349 #ifdef HAVE_EPOLL 00350 m_psocketgroupUsersWrite->add(*pEntry->pHead->pMuxSocket,pEntry->pHead); 00351 #else 00352 m_psocketgroupUsersWrite->add(*pEntry->pHead->pMuxSocket); 00353 #endif 00354 delete pEntry->pCipher; 00355 pEntry->pCipher = NULL; 00356 m_pChannelList->removeChannel(pEntry->pHead->pMuxSocket,pEntry->channelIn); 00357 } 00358 else { 00359 /* the client has already closed the connection but we 00360 * have waited for the CHANNEL-CLOSE packet from the 00361 * last mix -> now we have it 00362 */ 00363 #ifdef LOG_PACKET_TIMES 00364 getcurrentTimeMicros(pQueueEntry->timestamp_proccessing_end_OP); 00365 #endif 00366 delete pEntry->pCipher; 00367 pEntry->pCipher = NULL; 00368 m_pChannelList->removeVacantOutChannel(pEntry); 00369 pEntry = NULL; 00370 } 00371 } 00372 } 00373 else 00374 {//flag !=close 00375 #if defined(_DEBUG) && !defined(__MIX_TEST) 00376 // CAMsg::printMsg(LOG_DEBUG,"Sending Data to Browser!\n"); 00377 #endif 00378 fmChannelList* pEntry=m_pChannelList->get(pMixPacket->channel); 00379 if(pEntry!=NULL) { 00380 if (pEntry->pHead != NULL) { 00381 #ifdef LOG_CRIME 00382 if((pMixPacket->flags&CHANNEL_SIG_CRIME)==CHANNEL_SIG_CRIME) 00383 { 00384 UINT32 id=(pMixPacket->flags>>8)&0x000000FF; 00385 int log=LOG_ENCRYPTED; 00386 if(!pglobalOptions->isEncryptedLogEnabled()) 00387 log=LOG_CRIT; 00388 CAMsg::printMsg(log,"Detecting crime activity - ID: %u -- In-IP is: %u.%u.%u.%u \n",id,pEntry->pHead->peerIP[0],pEntry->pHead->peerIP[1],pEntry->pHead->peerIP[2],pEntry->pHead->peerIP[3]); 00389 continue; 00390 } 00391 #endif 00392 pMixPacket->channel=pEntry->channelIn; 00393 pEntry->pCipher->crypt2(pMixPacket->data,pMixPacket->data,DATA_SIZE); 00394 00395 #ifdef LOG_PACKET_TIMES 00396 getcurrentTimeMicros(pQueueEntry->timestamp_proccessing_end_OP); 00397 #endif 00398 pEntry->pHead->pQueueSend->add(pMixPacket,sizeof(tQueueEntry)); 00399 #ifdef LOG_TRAFFIC_PER_USER 00400 pEntry->pHead->trafficOut++; 00401 #endif 00402 #ifdef COUNTRY_STATS 00403 m_PacketsPerCountryOUT[pEntry->pHead->countryID].inc(); 00404 #endif 00405 #ifdef LOG_CGANNEL 00406 pEntry->packetsOutToUser++; 00407 #endif 00408 #ifdef HAVE_EPOLL 00409 m_psocketgroupUsersWrite->add(*pEntry->pHead->pMuxSocket,pEntry->pHead); 00410 #else 00411 m_psocketgroupUsersWrite->add(*pEntry->pHead->pMuxSocket); 00412 #endif 00413 incMixedPackets(); 00414 } 00415 else { 00416 /* connection to client is already closed -> we wait for 00417 * CLOSE-CHANNEL from last mix (but this is no CLOSE- 00418 * CHANNEL packet -> do only some compatibility things and 00419 * ignore the packet) 00420 */ 00421 #ifdef LOG_CRIME 00422 /* we don't have the user-information any more (but also 00423 * the user cannot receive any data); nevertheless write 00424 * a log-message to show what happened 00425 */ 00426 if((pMixPacket->flags&CHANNEL_SIG_CRIME)==CHANNEL_SIG_CRIME) { 00427 UINT32 id = (pMixPacket->flags>>8)&0x000000FF; 00428 int log = LOG_ENCRYPTED; 00429 if (!pglobalOptions->isEncryptedLogEnabled()) { 00430 log=LOG_CRIT; 00431 } 00432 CAMsg::printMsg(log,"Detecting crime activity - ID: %u -- In-IP is: not available (user has already closed connection)\n",id); 00433 continue; 00434 } 00435 #endif 00436 #ifdef LOG_PACKET_TIMES 00437 getcurrentTimeMicros(pQueueEntry->timestamp_proccessing_end_OP); 00438 #endif 00439 } 00440 } 00441 else 00442 { 00443 #ifdef _DEBUG 00444 if(pMixPacket->flags!=CHANNEL_DUMMY) 00445 { 00446 /* CAMsg::printMsg(LOG_DEBUG,"Error Sending Data to Browser -- " 00447 "Channel-Id %u not valid!\n",pMixPacket->channel 00448 );*/ 00449 #ifdef LOG_CHANNEL 00450 CAMsg::printMsg(LOG_INFO,"Packet late arrive for channel: %u\n",pMixPacket->channel); 00451 #endif 00452 } 00453 #endif 00454 } 00455 } 00456 } 00457 00458 //Step 5 00459 //Writing to users... 00460 countRead=m_psocketgroupUsersWrite->select(/*true,*/0); 00461 #ifdef HAVE_EPOLL 00462 fmHashTableEntry* pfmHashEntry=(fmHashTableEntry*)m_psocketgroupUsersWrite->getFirstSignaledSocketData(); 00463 while(pfmHashEntry!=NULL) 00464 { 00465 #else 00466 fmHashTableEntry* pfmHashEntry=m_pChannelList->getFirst(); 00467 while(countRead>0&&pfmHashEntry!=NULL) 00468 { 00469 if(m_psocketgroupUsersWrite->isSignaled(*pfmHashEntry->pMuxSocket)) 00470 { 00471 countRead--; 00472 #endif 00473 #ifdef DELAY_USERS 00474 if(pfmHashEntry->delayBucket>0) 00475 { 00476 #endif 00477 if(pfmHashEntry->pQueueSend->getSize()>0) 00478 { 00479 bAktiv=true; 00480 UINT32 len=sizeof(tQueueEntry); 00481 if(pfmHashEntry->uAlreadySendPacketSize==-1) 00482 { 00483 pfmHashEntry->pQueueSend->get((UINT8*)&pfmHashEntry->oQueueEntry,&len); 00484 #ifdef PAYMENT 00485 //do not count control channel packets! 00486 if(pfmHashEntry->oQueueEntry.packet.channel>0&&pfmHashEntry->oQueueEntry.packet.channel<256) 00487 pfmHashEntry->bCountPacket=false; 00488 else 00489 pfmHashEntry->bCountPacket=true; 00490 #endif 00491 pfmHashEntry->pMuxSocket->prepareForSend(&(pfmHashEntry->oQueueEntry.packet)); 00492 pfmHashEntry->uAlreadySendPacketSize=0; 00493 } 00494 len=MIXPACKET_SIZE-pfmHashEntry->uAlreadySendPacketSize; 00495 ret=pfmHashEntry->pMuxSocket->getCASocket()->send(((UINT8*)&(pfmHashEntry->oQueueEntry))+pfmHashEntry->uAlreadySendPacketSize,len); 00496 if(ret>0) 00497 { 00498 pfmHashEntry->uAlreadySendPacketSize+=ret; 00499 if(pfmHashEntry->uAlreadySendPacketSize==MIXPACKET_SIZE) 00500 { 00501 #ifdef PAYMENT 00502 if(pfmHashEntry->bCountPacket) 00503 { 00504 // count packet for payment 00505 if (CAAccountingInstance::handleJapPacket(pfmHashEntry) == 2) 00506 { 00507 goto NEXT_USER_WRITING; 00508 } 00509 } 00510 #endif 00511 #ifdef DELAY_USERS 00512 pfmHashEntry->delayBucket--; 00513 #endif 00514 pfmHashEntry->uAlreadySendPacketSize=-1; 00515 #ifdef LOG_PACKET_TIMES 00516 if(!isZero64(pfmHashEntry->oQueueEntry.timestamp_proccessing_start)) 00517 { 00518 getcurrentTimeMicros(pfmHashEntry->oQueueEntry.timestamp_proccessing_end); 00519 m_pLogPacketStats->addToTimeingStats(pfmHashEntry->oQueueEntry,CHANNEL_DATA,false); 00520 } 00521 #endif 00522 } 00523 } 00524 } 00525 #ifdef DELAY_USERS 00526 } 00527 #endif 00528 //todo error handling 00529 #ifdef HAVE_EPOLL 00530 NEXT_USER_WRITING: 00531 pfmHashEntry=(fmHashTableEntry*)m_psocketgroupUsersWrite->getNextSignaledSocketData(); 00532 #else 00533 }//if is socket signaled 00534 NEXT_USER_WRITING: 00535 pfmHashEntry=m_pChannelList->getNext(); 00536 #endif 00537 } 00538 if(!bAktiv) 00539 msSleep(100); 00540 } 00541 //ERR: 00542 //@todo move cleanup to clean() ! 00543 CAMsg::printMsg(LOG_CRIT,"Seams that we are restarting now!!\n"); 00544 m_bRestart=true; 00545 m_pMuxOut->close(); 00546 for(UINT32 i=0;i<m_nSocketsIn;i++) 00547 m_arrSocketsIn[i].close(); 00548 //writng some bytes to the queue... 00549 UINT8 b[sizeof(tQueueEntry)+1]; 00550 m_pQueueSendToMix->add(b,sizeof(tQueueEntry)+1); 00551 //#if !defined(_DEBUG) && !defined(NO_LOOPACCEPTUSER) 00552 CAMsg::printMsg(LOG_CRIT,"Wait for LoopAcceptUsers!\n"); 00553 m_pthreadAcceptUsers->join(); 00554 //#endif 00555 CAMsg::printMsg(LOG_CRIT,"Wait for LoopSendToMix!\n"); 00556 m_pthreadSendToMix->join(); //will not join if queue is empty (and so wating)!!! 00557 CAMsg::printMsg(LOG_CRIT,"Wait for LoopReadFromMix!\n"); 00558 m_pthreadReadFromMix->join(); 00559 #ifdef LOG_PACKET_TIMES 00560 CAMsg::printMsg(LOG_CRIT,"Wait for LoopLogPacketStats to terminate!\n"); 00561 m_pLogPacketStats->stop(); 00562 #endif 00563 //waits until all login threads terminates.... 00564 // we have to be sure that the Accept thread was alread stoped! 00565 m_pthreadsLogin->destroy(true); 00566 CAMsg::printMsg(LOG_CRIT,"Before deleting CAFirstMixChannelList()!\n"); 00567 CAMsg::printMsg (LOG_CRIT,"Memeory usage before: %u\n",getMemoryUsage()); 00568 fmHashTableEntry* pHashEntry=m_pChannelList->getFirst(); 00569 while(pHashEntry!=NULL) 00570 { 00571 CAMuxSocket * pMuxSocket=pHashEntry->pMuxSocket; 00572 delete pHashEntry->pQueueSend; 00573 pHashEntry->pQueueSend = NULL; 00574 delete pHashEntry->pSymCipher; 00575 pHashEntry->pSymCipher = NULL; 00576 00577 fmChannelListEntry* pEntry=m_pChannelList->getFirstChannelForSocket(pHashEntry->pMuxSocket); 00578 while(pEntry!=NULL) 00579 { 00580 delete pEntry->pCipher; 00581 pEntry->pCipher = NULL; 00582 00583 pEntry=m_pChannelList->getNextChannel(pEntry); 00584 } 00585 m_pChannelList->remove(pHashEntry->pMuxSocket); 00586 pMuxSocket->close(); 00587 delete pMuxSocket; 00588 pMuxSocket = NULL; 00589 pHashEntry=m_pChannelList->getNext(); 00590 } 00591 /* clean all vacant out-channels (the connection from the client was 00592 * closed but we've waited for the CHANNEL-CLOSE from the last mix) 00593 */ 00594 m_pChannelList->cleanVacantOutChannels(); 00595 CAMsg::printMsg (LOG_CRIT,"Memory usage after: %u\n",getMemoryUsage()); 00596 delete pQueueEntry; 00597 pQueueEntry = NULL; 00598 delete []tmpBuff; 00599 tmpBuff = NULL; 00600 CAMsg::printMsg(LOG_CRIT,"Main Loop exited!!\n"); 00601 #endif // NEW_MIX_TYPE 00602 return E_UNKNOWN; 00603 } 00604 #endif //ONLY_LOCAL_PROXY
1.7.6.1