|
Mixe for Privacy and Anonymity in the Internet
|
00001 /* 00002 * Copyright (c) 2006, The JAP-Team 00003 * All rights reserved. 00004 * Redistribution and use in source and binary forms, with or without 00005 * modification, are permitted provided that the following conditions are met: 00006 * 00007 * - Redistributions of source code must retain the above copyright notice, 00008 * this list of conditions and the following disclaimer. 00009 * 00010 * - Redistributions in binary form must reproduce the above copyright 00011 * notice, this list of conditions and the following disclaimer in the 00012 * documentation and/or other materials provided with the distribution. 00013 * 00014 * - Neither the name of the University of Technology Dresden, Germany nor 00015 * the names of its contributors may be used to endorse or promote 00016 * products derived from this software without specific prior written 00017 * permission. 00018 * 00019 * 00020 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 00021 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 00022 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 00023 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE 00024 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 00025 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 00026 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 00027 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 00028 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 00029 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 00030 * POSSIBILITY OF SUCH DAMAGE 00031 */ 00032 #include "../StdAfx.h" 00033 #ifndef ONLY_LOCAL_PROXY 00034 #include "CAChain.hpp" 00035 #include "typedefsb.hpp" 00036 #include "CALastMixBChannelList.hpp" 00037 #include "../CAUtil.hpp" 00038 00039 #ifndef DELAY_CHANNELS 00040 CAChain::CAChain(UINT8* a_chainId) { 00041 #else 00042 CAChain::CAChain(UINT8* a_chainId, CAMutex* a_delayBucketMutex, SINT32* a_delayBucket) { 00043 #endif 00044 m_chainId = a_chainId; 00045 m_firstChannel = NULL; 00046 m_socket = NULL; 00047 m_lastAccessTime = -1; 00048 m_upstreamSendQueue = new CAQueue(DATA_SIZE); 00049 m_upstreamClosed = false; 00050 m_downstreamClosed = false; 00051 m_firstSocketGroup = NULL; 00052 m_connectionError = false; 00053 m_unknownChainId = false; 00054 m_firstDownstreamPacket = true; 00055 #ifdef LOG_CHAIN_STATISTICS 00056 m_packetsFromUser = 0; 00057 m_bytesFromUser = 0; 00058 m_packetsToUser = 0; 00059 m_bytesToUser = 0; 00060 getcurrentTimeMicros(m_creationTime); 00061 #endif 00062 #ifdef DELAY_CHANNELS 00063 m_pDelayBucketMutex = a_delayBucketMutex; 00064 m_pDelayBucket = a_delayBucket; 00065 #endif 00066 } 00067 00068 CAChain::~CAChain(void) { 00069 if (m_socket != NULL) { 00070 removeFromAllSocketGroupsInternal(); 00071 m_socket->close(); 00072 delete m_socket; 00073 m_socket = NULL; 00074 } 00075 m_upstreamSendQueue->clean(); 00076 delete m_upstreamSendQueue; 00077 m_upstreamSendQueue = NULL; 00078 /* remove all associated channels (normally there shouldn't be any, but in 00079 * case of a shutdown, some channels may be still open) 00080 */ 00081 while (m_firstChannel != NULL) { 00082 t_channelEntry* channelEntry = m_firstChannel; 00083 /* remove the entry from the channel-table */ 00084 channelEntry->channel->associatedChannelList->removeFromTable(channelEntry->channel); 00085 /* remove all deadlines */ 00086 while (channelEntry->channel->firstResponseDeadline != NULL) { 00087 t_deadlineEntry* currentDeadline = channelEntry->channel->firstResponseDeadline; 00088 channelEntry->channel->firstResponseDeadline = currentDeadline->nextDeadline; 00089 delete currentDeadline; 00090 currentDeadline = NULL; 00091 } 00092 /* remove the channel-cipher */ 00093 delete channelEntry->channel->channelCipher; 00094 channelEntry->channel->channelCipher = NULL; 00095 m_firstChannel = channelEntry->nextChannel; 00096 delete channelEntry; 00097 channelEntry = NULL; 00098 } 00099 #ifdef LOG_CHAIN_STATISTICS 00100 /* log chain-statistics with format: 00101 * Chain-ID, Chain duration [micros], Upload (bytes), Download (bytes), Packets from user, Packets to user 00102 */ 00103 UINT64 currentTime; 00104 getcurrentTimeMicros(currentTime); 00105 UINT32 duration = diff64(currentTime, m_creationTime); 00106 UINT8* chainId = getPrintableChainId(); 00107 CAMsg::printMsg(LOG_DEBUG, "%s,%u,%u,%u,%u,%u\n", chainId, duration, m_bytesFromUser, m_bytesToUser, m_packetsFromUser, m_packetsToUser); 00108 delete []chainId; 00109 chainId = NULL; 00110 #endif 00111 delete []m_chainId; 00112 m_chainId = NULL; 00113 #ifdef DELAY_CHANNELS 00114 /* free the delay-bucket (set it to -1), don't delete the mutex because it 00115 * is used for all delay-buckets 00116 */ 00117 m_pDelayBucketMutex->lock(); 00118 *m_pDelayBucket = -1; 00119 m_pDelayBucketMutex->unlock(); 00120 #endif 00121 } 00122 00123 UINT8* CAChain::getChainId() { 00124 return m_chainId; 00125 } 00126 00127 #ifdef LOG_CHAIN_STATISTICS 00128 void CAChain::setSocket(CASocket* a_socket, UINT32 a_alreadyProcessedPackets, UINT32 a_alreadyProcessedBytes) { 00129 m_socket = a_socket; 00130 m_bytesFromUser = a_alreadyProcessedBytes; 00131 m_packetsFromUser = a_alreadyProcessedPackets; 00132 } 00133 #else 00134 void CAChain::setSocket(CASocket* a_socket) { 00135 m_socket = a_socket; 00136 } 00137 #endif 00138 00145 #ifdef HAVE_EPOLL 00146 SINT32 CAChain::processDownstream(CASocketGroupEpoll* a_signalingGroup, MIXPACKET* a_downstreamPacket, UINT32* a_processedBytes) { 00147 #else 00148 SINT32 CAChain::processDownstream(CASocketGroup* a_signalingGroup, MIXPACKET* a_downstreamPacket, UINT32* a_processedBytes) { 00149 #endif 00150 *a_processedBytes = 0; 00151 /* first: get the time - we will need it */ 00152 timespec currentTime; 00153 getcurrentTime(currentTime); 00154 if (m_lastAccessTime != -1) { 00155 /* currently we dont't have an associated channel -> check whether the 00156 * access-timeout is reached 00157 */ 00158 if (m_lastAccessTime + CHAIN_TIMEOUT < currentTime.tv_sec) { 00159 /* timeout is reached */ 00160 closeChainInternal(); 00161 return 3; 00162 } 00163 /* there is currently no channel associated -> we can't do anything */ 00164 return 1; 00165 } 00166 /* we have at least one associated channel */ 00167 /* check whether we have to drop packages because of outdated deadlines */ 00168 t_deadlineEntry* testedDeadlineEntry = m_firstChannel->channel->firstResponseDeadline; 00169 if (((testedDeadlineEntry->deadline.tv_sec + DEADLINE_TIMEOUT) < currentTime.tv_sec) || (((testedDeadlineEntry->deadline.tv_sec + DEADLINE_TIMEOUT) == currentTime.tv_sec) && (testedDeadlineEntry->deadline.tv_nsec <= currentTime.tv_nsec))) { 00170 /* we are too late, it wouldn't make sense to send the packet -> we will 00171 * reduce traffic by dropping the packet (and all following packets of the 00172 * channel) -> currently we have to send at least a CHANNEL-CLOSE, so keep 00173 * one packet in the channel 00174 */ 00175 if (m_firstChannel->channel->remainingDownstreamPackets > 1) { 00176 /* we will really loose packets -> synchronization between client and 00177 * server is destroyed -> signal connection error and close the chain 00178 */ 00179 signalConnectionError(); 00180 UINT8* chainId = getPrintableChainId(); 00181 CAMsg::printMsg(LOG_INFO, "Dropped downstream-packets from chain '%s'!\n", chainId); 00182 delete []chainId; 00183 chainId = NULL; 00184 while (m_firstChannel->channel->remainingDownstreamPackets > 1) { 00185 m_firstChannel->channel->remainingDownstreamPackets--; 00186 m_firstChannel->channel->firstResponseDeadline = testedDeadlineEntry->nextDeadline; 00187 delete testedDeadlineEntry; 00188 testedDeadlineEntry = m_firstChannel->channel->firstResponseDeadline; 00189 } 00190 } 00191 } 00192 /* now try to send something */ 00193 t_downstreamChainCell* pChainCell = (t_downstreamChainCell*)(a_downstreamPacket->data); 00194 if ((m_socket != NULL) && (!m_downstreamClosed) && (m_firstChannel != NULL)) { 00195 if (m_firstChannel->channel->remainingDownstreamPackets > 1) { 00196 /* we are able to send data to the client -> look whether data is 00197 * available at the socket 00198 */ 00199 if (isSignaledInSocketGroup(a_signalingGroup)) { 00200 /* there is something available -> check how much data we can process 00201 */ 00202 UINT16 payloadData = MAX_SEQUEL_DOWNSTREAM_CHAINCELL_PAYLOAD; 00203 if (m_firstDownstreamPacket) { 00204 payloadData = MAX_FIRST_DOWNSTREAM_CHAINCELL_PAYLOAD; 00205 } 00206 #ifdef DELAY_CHANNELS 00207 payloadData = min(payloadData, (UINT16)getDelayBucketInternal()); 00208 #endif 00209 if (payloadData > 0) { 00210 /* we will receive something */ 00211 /* if the packet isn't filled fully, some randomness for the 00212 * remainging space would be great 00213 */ 00214 getRandom(a_downstreamPacket->data, DATA_SIZE); 00215 SINT32 bytesReceived; 00216 if (m_firstDownstreamPacket) { 00217 bytesReceived = m_socket->receive(pChainCell->firstCell.data, payloadData); 00218 } 00219 else { 00220 bytesReceived = m_socket->receive(pChainCell->sequelCell.data, payloadData); 00221 } 00222 if (bytesReceived >= 0) { 00223 if (bytesReceived == 0) { 00224 /* seems to be the end of the data-stream */ 00225 closeDownstream(); 00226 } 00227 else { 00228 /* we have received some bytes -> create the packet */ 00229 #ifdef DELAY_CHANNELS 00230 removeFromDelayBucketInternal(bytesReceived); 00231 #endif 00232 if (m_firstDownstreamPacket) { 00233 /* also we have to send the Chain-ID */ 00234 memcpy(pChainCell->firstCell.chainId, m_chainId, CHAIN_ID_LENGTH); 00235 m_firstDownstreamPacket = false; 00236 } 00237 pChainCell->lengthAndFlags = htons((UINT16)bytesReceived); 00238 a_downstreamPacket->channel = m_firstChannel->channel->channelId; 00239 a_downstreamPacket->flags = CHANNEL_DATA; 00240 m_firstChannel->channel->channelCipher->crypt2(a_downstreamPacket->data, a_downstreamPacket->data, DATA_SIZE); 00241 m_firstChannel->channel->remainingDownstreamPackets--; 00242 t_deadlineEntry* currentDeadline = m_firstChannel->channel->firstResponseDeadline; 00243 m_firstChannel->channel->firstResponseDeadline = currentDeadline->nextDeadline; 00244 delete currentDeadline; 00245 currentDeadline = NULL; 00246 *a_processedBytes = (UINT32)bytesReceived; 00247 #ifdef LOG_CHAIN_STATISTICS 00248 m_packetsToUser++; 00249 m_bytesToUser = m_bytesToUser + (UINT32)bytesReceived; 00250 #endif 00251 return 0; 00252 } 00253 } 00254 else { 00255 /* there was a connection error */ 00256 signalConnectionError(); 00257 } 00258 } 00259 } 00260 } 00261 } 00262 /* we cannot send any real data, but maybe we have to send some protocol data */ 00263 if (m_firstChannel->channel->remainingDownstreamPackets == 1) { 00264 /* currently we have to send a CHANNEL-CLOSE */ 00265 getRandom(a_downstreamPacket->data, DATA_SIZE); 00266 a_downstreamPacket->channel = m_firstChannel->channel->channelId; 00267 a_downstreamPacket->flags = CHANNEL_CLOSE; 00268 /* delete channel-resources */ 00269 t_lastMixBChannelListEntry* currentChannel = m_firstChannel->channel; 00270 currentChannel->associatedChannelList->removeFromTable(currentChannel); 00271 delete currentChannel->firstResponseDeadline; 00272 currentChannel->firstResponseDeadline = NULL; 00273 delete currentChannel->channelCipher; 00274 currentChannel->channelCipher = NULL; 00275 delete currentChannel; 00276 currentChannel = NULL; 00277 t_channelEntry* currentChannelEntry = m_firstChannel; 00278 /* change to the next channel */ 00279 m_firstChannel = m_firstChannel->nextChannel; 00280 delete currentChannelEntry; 00281 currentChannelEntry = NULL; 00282 #ifdef LOG_CHAIN_STATISTICS 00283 /* a packet (CHANNEL_CLOSE) without payload is sent */ 00284 m_packetsToUser++; 00285 #endif 00286 if (m_firstChannel == NULL) { 00287 if (m_downstreamClosed && m_upstreamClosed) { 00288 /* it was the last channel and the chain is closed -> it can be 00289 * removed from the table 00290 */ 00291 return 2; 00292 } 00293 else { 00294 /* it was the last channel, but the chain isn't closed -> start the 00295 * access timeout 00296 */ 00297 timespec currentTime; 00298 getcurrentTime(currentTime); 00299 m_lastAccessTime = currentTime.tv_sec; 00300 return 0; 00301 } 00302 } 00303 /* we've sent a close but it wasn't the last channel */ 00304 return 0; 00305 } 00306 /* no data, no channel-close, but maybe we have to send a packet because of 00307 * a deadline 00308 */ 00309 if ((m_firstChannel->channel->firstResponseDeadline->deadline.tv_sec < currentTime.tv_sec) || ((m_firstChannel->channel->firstResponseDeadline->deadline.tv_sec == currentTime.tv_sec) && (m_firstChannel->channel->firstResponseDeadline->deadline.tv_nsec <= currentTime.tv_nsec))) { 00310 /* deadline reached */ 00311 getRandom(a_downstreamPacket->data, DATA_SIZE); 00312 pChainCell->lengthAndFlags = 0; 00313 if (m_firstDownstreamPacket) { 00314 /* also we have to send the Chain-ID */ 00315 memcpy(pChainCell->firstCell.chainId, m_chainId, CHAIN_ID_LENGTH); 00316 m_firstDownstreamPacket = false; 00317 } 00318 /* maybe we have to set some flags */ 00319 if (m_unknownChainId) { 00320 pChainCell->lengthAndFlags = pChainCell->lengthAndFlags | CHAINFLAG_UNKNOWN_CHAIN; 00321 /* reset the flag */ 00322 m_unknownChainId = false; 00323 } 00324 if (m_connectionError) { 00325 pChainCell->lengthAndFlags = pChainCell->lengthAndFlags | CHAINFLAG_CONNECTION_ERROR; 00326 /* reset the flag */ 00327 m_connectionError = false; 00328 } 00329 if (m_downstreamClosed) { 00330 pChainCell->lengthAndFlags = pChainCell->lengthAndFlags | CHAINFLAG_STREAM_CLOSED; 00331 /* don't reset the flag */ 00332 } 00333 /* ensure correct byte order */ 00334 pChainCell->lengthAndFlags = htons(pChainCell->lengthAndFlags); 00335 /* finalize packet */ 00336 a_downstreamPacket->channel = m_firstChannel->channel->channelId; 00337 a_downstreamPacket->flags = CHANNEL_DATA; 00338 m_firstChannel->channel->channelCipher->crypt2(a_downstreamPacket->data, a_downstreamPacket->data, DATA_SIZE); 00339 /* clean up */ 00340 m_firstChannel->channel->remainingDownstreamPackets--; 00341 t_deadlineEntry* currentDeadline = m_firstChannel->channel->firstResponseDeadline; 00342 m_firstChannel->channel->firstResponseDeadline = currentDeadline->nextDeadline; 00343 delete currentDeadline; 00344 currentDeadline = NULL; 00345 #ifdef LOG_CHAIN_STATISTICS 00346 /* a packet without payload is sent */ 00347 m_packetsToUser++; 00348 #endif 00349 return 0; 00350 } 00351 /* no deadline reached and nothing else to do */ 00352 return 1; 00353 } 00354 00355 #ifdef HAVE_EPOLL 00356 bool CAChain::isSignaledInSocketGroup(CASocketGroupEpoll* a_socketGroup) { 00357 if (m_socket != NULL) { 00358 return a_socketGroup->isSignaled(this); 00359 } 00360 return false; 00361 } 00362 #else 00363 bool CAChain::isSignaledInSocketGroup(CASocketGroup* a_socketGroup) { 00364 if (m_socket != NULL) { 00365 return a_socketGroup->isSignaled(*m_socket); 00366 } 00367 return false; 00368 } 00369 #endif 00370 00371 #ifdef HAVE_EPOLL 00372 void CAChain::addToSocketGroup(CASocketGroupEpoll* a_socketGroup) { 00373 #else 00374 void CAChain::addToSocketGroup(CASocketGroup* a_socketGroup) { 00375 #endif 00376 if (m_socket != NULL) { 00377 /* check whether our socket isn't already in the specified socket-group 00378 */ 00379 t_socketGroupEntry* currentEntry = m_firstSocketGroup; 00380 t_socketGroupEntry** previousNextEntryPointer = &m_firstSocketGroup; 00381 bool alreadyIncluded = false; 00382 while ((currentEntry != NULL) && (!alreadyIncluded)) { 00383 if (currentEntry->socketGroup == a_socketGroup) { 00384 alreadyIncluded = true; 00385 } 00386 else { 00387 previousNextEntryPointer = &(currentEntry->nextSocketGroup); 00388 currentEntry = currentEntry->nextSocketGroup; 00389 } 00390 } 00391 if (!alreadyIncluded) { 00392 #ifdef HAVE_EPOLL 00393 a_socketGroup->add(*m_socket, this); 00394 #else 00395 a_socketGroup->add(*m_socket); 00396 #endif 00397 currentEntry = new t_socketGroupEntry; 00398 currentEntry->nextSocketGroup = NULL; 00399 currentEntry->socketGroup = a_socketGroup; 00400 *previousNextEntryPointer = currentEntry; 00401 } 00402 } 00403 } 00404 00405 #ifdef HAVE_EPOLL 00406 void CAChain::removeFromSocketGroup(CASocketGroupEpoll* a_socketGroup) { 00407 #else 00408 void CAChain::removeFromSocketGroup(CASocketGroup* a_socketGroup) { 00409 #endif 00410 if (m_socket != NULL) { 00411 /* check whether our socket is in the specified socket-group */ 00412 t_socketGroupEntry* currentEntry = m_firstSocketGroup; 00413 t_socketGroupEntry** previousNextEntryPointer = &m_firstSocketGroup; 00414 while (currentEntry != NULL) { 00415 if (currentEntry->socketGroup == a_socketGroup) { 00416 /* we are in the specified socket group -> remove occurance */ 00417 a_socketGroup->remove(*m_socket); 00418 *previousNextEntryPointer = currentEntry->nextSocketGroup; 00419 delete currentEntry; 00420 currentEntry = NULL; 00421 } 00422 else { 00423 previousNextEntryPointer = &(currentEntry->nextSocketGroup); 00424 currentEntry = currentEntry->nextSocketGroup; 00425 } 00426 } 00427 } 00428 } 00429 00430 #ifdef HAVE_EPOLL 00431 UINT32 CAChain::sendUpstreamData(UINT32 a_maxLength, CASocketGroupEpoll* a_removedSocketGroup) { 00432 #else 00433 UINT32 CAChain::sendUpstreamData(UINT32 a_maxLength, CASocketGroup* a_removedSocketGroup) { 00434 #endif 00435 UINT32 processedBytes = sendUpstreamDataInternal(a_maxLength); 00436 if (m_upstreamSendQueue->isEmpty()) { 00437 /* queue is empty -> we can remove the entry from the socketgroup */ 00438 removeFromSocketGroup(a_removedSocketGroup); 00439 } 00440 return processedBytes; 00441 } 00442 00443 00444 void CAChain::addDataToUpstreamQueue(UINT8* a_buffer, UINT32 a_size) { 00445 if (!m_upstreamClosed) { 00446 /* only add data if upstream isn't closed */ 00447 m_upstreamSendQueue->add(a_buffer, a_size); 00448 #ifdef LOG_CHAIN_STATISTICS 00449 m_packetsFromUser++; 00450 m_bytesFromUser = m_bytesFromUser + a_size; 00451 #endif 00452 } 00453 } 00454 00455 void CAChain::closeUpstream() { 00456 /* currently we will close the whole chain immediately */ 00457 closeChainInternal(); 00458 } 00459 00460 void CAChain::closeDownstream() { 00461 /* currently we will close the whole chain immediately */ 00462 closeChainInternal(); 00463 } 00464 00465 void CAChain::signalConnectionError() { 00466 m_connectionError = true; 00467 /* we will also close the chain */ 00468 closeChainInternal(); 00469 } 00470 00471 void CAChain::signalUnknownChain() { 00472 m_unknownChainId = true; 00473 /* we will not send any chain-id -> disable m_firstDownstreamPacket */ 00474 m_firstDownstreamPacket = false; 00475 /* we will also close the chain */ 00476 closeChainInternal(); 00477 } 00478 00479 UINT8* CAChain::getPrintableChainId() { 00480 UINT8* printableChainId = bytes2hex(m_chainId, CHAIN_ID_LENGTH); 00481 strtrim(printableChainId); 00482 return printableChainId; 00483 } 00484 00485 void CAChain::addChannel(t_lastMixBChannelListEntry* a_channel, bool a_fastResponse) { 00486 t_channelEntry* lastChannel = NULL; 00487 bool invalidChannel = false; 00488 if (m_firstChannel != NULL) { 00489 if (m_firstChannel->nextChannel != NULL) { 00490 /* somebody is trying to add a third channel to the chain but currently 00491 * only 2 channels can be associated to a data-chain -> ignore the new 00492 * channel (attention: currently we have to send at least a 00493 * CHANNEL-CLOSE because there is no channel-timeout at first and middle 00494 * mixes), send an IOException and close the chain 00495 */ 00496 invalidChannel = true; 00497 signalConnectionError(); 00498 /* find the last associated channel */ 00499 lastChannel = m_firstChannel->nextChannel; 00500 while (lastChannel->nextChannel != NULL) { 00501 lastChannel = lastChannel->nextChannel; 00502 } 00503 } 00504 else { 00505 lastChannel = m_firstChannel; 00506 } 00507 } 00508 t_channelEntry* newChannel = new t_channelEntry; 00509 /* initialize the fields */ 00510 newChannel->nextChannel = NULL; 00511 newChannel->channel = a_channel; 00512 if (lastChannel != NULL) { 00513 /* close all previous channels immediately */ 00514 forceImmediateResponsesInternal(); 00515 /* now add the new channel */ 00516 lastChannel->nextChannel = newChannel; 00517 } 00518 else { 00519 m_firstChannel = newChannel; 00520 } 00521 timespec currentTime; 00522 getcurrentTime(currentTime); 00523 if (!invalidChannel) { 00524 if ((!(m_upstreamClosed && m_downstreamClosed)) && (m_lastAccessTime != -1)) { 00525 /* if not downstream and upstream is closed and also an access-timeout 00526 * is running -> stop that access-timeout 00527 */ 00528 m_lastAccessTime = -1; 00529 } 00530 a_channel->remainingDownstreamPackets = CHANNEL_DOWNSTREAM_PACKETS; 00531 /* create deadlines for the new downstream-packets */ 00532 t_deadlineEntry** lastNextDeadlinePointer = &(a_channel->firstResponseDeadline); 00533 for (UINT32 i = 0; i < CHANNEL_DOWNSTREAM_PACKETS; i++) { 00534 t_deadlineEntry* currentDeadline = new t_deadlineEntry; 00535 currentDeadline->nextDeadline = NULL; 00536 if (!m_downstreamClosed) { 00537 if (a_fastResponse && (i == 0)) { 00538 /* we shall send a fast response -> send back the first packet of 00539 * the new channel immediately 00540 */ 00541 currentDeadline->deadline.tv_sec = currentTime.tv_sec; 00542 } 00543 else { 00544 /* use normal channel-timeout */ 00545 currentDeadline->deadline.tv_sec = currentTime.tv_sec + CHANNEL_TIMEOUT; 00546 } 00547 } 00548 else { 00549 /* downstream is already closed -> send packets back immediately */ 00550 currentDeadline->deadline.tv_sec = currentTime.tv_sec; 00551 } 00552 currentDeadline->deadline.tv_nsec = currentTime.tv_nsec; 00553 *lastNextDeadlinePointer = currentDeadline; 00554 lastNextDeadlinePointer = &(currentDeadline->nextDeadline); 00555 } 00556 } 00557 else { 00558 /* send only one packet (will be CHANNEL-CLOSE) */ 00559 a_channel->remainingDownstreamPackets = 1; 00560 a_channel->firstResponseDeadline = new t_deadlineEntry; 00561 a_channel->firstResponseDeadline->nextDeadline = NULL; 00562 a_channel->firstResponseDeadline->deadline.tv_sec = currentTime.tv_sec; 00563 a_channel->firstResponseDeadline->deadline.tv_nsec = currentTime.tv_nsec; 00564 } 00565 } 00566 00567 void CAChain::closeChainInternal() { 00568 m_upstreamClosed = true; 00569 m_downstreamClosed = true; 00570 m_upstreamSendQueue->clean(); 00571 if (m_socket != NULL) { 00572 removeFromAllSocketGroupsInternal(); 00573 m_socket->close(); 00574 delete m_socket; 00575 m_socket = NULL; 00576 } 00577 /* send back all response-packets immediately */ 00578 forceImmediateResponsesInternal(); 00579 } 00580 00581 void CAChain::forceImmediateResponsesInternal() { 00582 /* set all deadlines to the current time (if they don't have a previous 00583 * time) */ 00584 timespec currentTime; 00585 getcurrentTime(currentTime); 00586 t_channelEntry* currentChannel = m_firstChannel; 00587 while (currentChannel != NULL) { 00588 t_deadlineEntry* currentDeadline = currentChannel->channel->firstResponseDeadline; 00589 while (currentDeadline != NULL) { 00590 if (currentDeadline->deadline.tv_sec > currentTime.tv_sec) { 00591 currentDeadline->deadline.tv_sec = currentTime.tv_sec; 00592 currentDeadline->deadline.tv_nsec = currentTime.tv_nsec; 00593 } 00594 else { 00595 if ((currentDeadline->deadline.tv_sec == currentTime.tv_sec) && (currentDeadline->deadline.tv_nsec > currentTime.tv_nsec)) { 00596 currentDeadline->deadline.tv_nsec = currentTime.tv_nsec; 00597 } 00598 } 00599 currentDeadline = currentDeadline->nextDeadline; 00600 } 00601 currentChannel = currentChannel->nextChannel; 00602 } 00603 } 00604 00605 void CAChain::removeFromAllSocketGroupsInternal() { 00606 t_socketGroupEntry* currentEntry = m_firstSocketGroup; 00607 m_firstSocketGroup = NULL; 00608 while (currentEntry != NULL) { 00609 currentEntry->socketGroup->remove(*m_socket); 00610 t_socketGroupEntry* nextEntry = currentEntry->nextSocketGroup; 00611 delete currentEntry; 00612 currentEntry = nextEntry; 00613 } 00614 } 00615 00616 UINT32 CAChain::sendUpstreamDataInternal(UINT32 a_maxLength) { 00617 UINT32 bytesSent = 0; 00618 if ((!m_upstreamClosed) && (m_socket != NULL)) { 00619 UINT32 length = a_maxLength; 00620 UINT8* buffer = new UINT8[length]; 00621 if (m_upstreamSendQueue->peek(buffer, &length) == E_SUCCESS) { 00622 /* queue has filled the buffer */ 00623 SINT32 errorCode = m_socket->send(buffer, length); 00624 if (errorCode >= 0) { 00625 length = (UINT32)errorCode; 00626 bytesSent = length; 00627 m_upstreamSendQueue->remove(&length); 00628 } 00629 else { 00630 /* error while sending data */ 00631 signalConnectionError(); 00632 } 00633 } 00634 } 00635 return bytesSent; 00636 } 00637 00638 #ifdef DELAY_CHANNELS 00639 SINT32 CAChain::getDelayBucketInternal() { 00640 SINT32 delayBucket; 00641 m_pDelayBucketMutex->lock(); 00642 delayBucket = *m_pDelayBucket; 00643 m_pDelayBucketMutex->unlock(); 00644 return delayBucket; 00645 } 00646 00647 void CAChain::removeFromDelayBucketInternal(SINT32 a_bytesToRemove) { 00648 m_pDelayBucketMutex->lock(); 00649 *m_pDelayBucket = (*m_pDelayBucket) + a_bytesToRemove; 00650 m_pDelayBucketMutex->unlock(); 00651 } 00652 #endif 00653 #endif //ONLY_LOCAL_PROXY
1.7.6.1