Mixe for Privacy and Anonymity in the Internet
CAChain.cpp
Go to the documentation of this file.
00001 /*
00002  * Copyright (c) 2006, The JAP-Team 
00003  * All rights reserved.
00004  * Redistribution and use in source and binary forms, with or without
00005  * modification, are permitted provided that the following conditions are met:
00006  *
00007  *   - Redistributions of source code must retain the above copyright notice, 
00008  *     this list of conditions and the following disclaimer.
00009  *
00010  *   - Redistributions in binary form must reproduce the above copyright
00011  *     notice, this list of conditions and the following disclaimer in the
00012  *     documentation and/or other materials provided with the distribution.
00013  *
00014  *   - Neither the name of the University of Technology Dresden, Germany nor
00015  *     the names of its contributors may be used to endorse or promote
00016  *     products derived from this software without specific prior written
00017  *     permission. 
00018  *
00019  *
00020  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00021  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
00022  * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
00023  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE
00024  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
00025  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
00026  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
00027  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
00028  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
00029  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00030  * POSSIBILITY OF SUCH DAMAGE
00031  */
00032 #include "../StdAfx.h"
00033 #ifndef ONLY_LOCAL_PROXY
00034 #include "CAChain.hpp"
00035 #include "typedefsb.hpp"
00036 #include "CALastMixBChannelList.hpp"
00037 #include "../CAUtil.hpp"
00038 
00039 #ifndef DELAY_CHANNELS
00040 CAChain::CAChain(UINT8* a_chainId) {
00041 #else
00042 CAChain::CAChain(UINT8* a_chainId, CAMutex* a_delayBucketMutex, SINT32* a_delayBucket) {
00043 #endif
00044   m_chainId = a_chainId;
00045   m_firstChannel = NULL;
00046   m_socket = NULL;
00047   m_lastAccessTime = -1;
00048   m_upstreamSendQueue = new CAQueue(DATA_SIZE);
00049   m_upstreamClosed = false;
00050   m_downstreamClosed = false;
00051   m_firstSocketGroup = NULL;
00052   m_connectionError = false;
00053   m_unknownChainId = false;
00054   m_firstDownstreamPacket = true;
00055   #ifdef LOG_CHAIN_STATISTICS
00056     m_packetsFromUser = 0;
00057     m_bytesFromUser = 0;
00058     m_packetsToUser = 0;
00059     m_bytesToUser = 0;
00060     getcurrentTimeMicros(m_creationTime);
00061   #endif
00062   #ifdef DELAY_CHANNELS
00063     m_pDelayBucketMutex = a_delayBucketMutex;
00064     m_pDelayBucket = a_delayBucket;
00065   #endif
00066 }
00067 
00068 CAChain::~CAChain(void) {  
00069   if (m_socket != NULL) {
00070     removeFromAllSocketGroupsInternal();
00071     m_socket->close();
00072     delete m_socket;
00073     m_socket = NULL;
00074   }
00075   m_upstreamSendQueue->clean();
00076   delete m_upstreamSendQueue;
00077   m_upstreamSendQueue = NULL;
00078   /* remove all associated channels (normally there shouldn't be any, but in
00079    * case of a shutdown, some channels may be still open)
00080    */
00081   while (m_firstChannel != NULL) {
00082     t_channelEntry* channelEntry = m_firstChannel;
00083     /* remove the entry from the channel-table */
00084     channelEntry->channel->associatedChannelList->removeFromTable(channelEntry->channel);
00085     /* remove all deadlines */
00086     while (channelEntry->channel->firstResponseDeadline != NULL) {
00087       t_deadlineEntry* currentDeadline = channelEntry->channel->firstResponseDeadline;
00088       channelEntry->channel->firstResponseDeadline = currentDeadline->nextDeadline;
00089       delete currentDeadline;
00090       currentDeadline = NULL;
00091     }
00092     /* remove the channel-cipher */
00093     delete channelEntry->channel->channelCipher;
00094     channelEntry->channel->channelCipher = NULL;
00095     m_firstChannel = channelEntry->nextChannel;
00096     delete channelEntry;
00097     channelEntry = NULL;
00098   }
00099   #ifdef LOG_CHAIN_STATISTICS
00100     /* log chain-statistics with format:
00101      * Chain-ID, Chain duration [micros], Upload (bytes), Download (bytes), Packets from user, Packets to user
00102      */
00103      UINT64 currentTime;
00104      getcurrentTimeMicros(currentTime);
00105      UINT32 duration = diff64(currentTime, m_creationTime);
00106      UINT8* chainId = getPrintableChainId();
00107      CAMsg::printMsg(LOG_DEBUG, "%s,%u,%u,%u,%u,%u\n", chainId, duration, m_bytesFromUser, m_bytesToUser, m_packetsFromUser, m_packetsToUser);
00108      delete []chainId;
00109      chainId = NULL;
00110   #endif
00111   delete []m_chainId;
00112   m_chainId = NULL;
00113   #ifdef DELAY_CHANNELS
00114     /* free the delay-bucket (set it to -1), don't delete the mutex because it
00115      * is used for all delay-buckets
00116      */
00117     m_pDelayBucketMutex->lock();
00118     *m_pDelayBucket = -1;
00119     m_pDelayBucketMutex->unlock();
00120   #endif
00121 }
00122 
00123 UINT8* CAChain::getChainId() {
00124   return m_chainId;
00125 }
00126 
00127 #ifdef LOG_CHAIN_STATISTICS
00128   void CAChain::setSocket(CASocket* a_socket, UINT32 a_alreadyProcessedPackets, UINT32 a_alreadyProcessedBytes) {
00129     m_socket = a_socket;
00130     m_bytesFromUser = a_alreadyProcessedBytes;
00131     m_packetsFromUser = a_alreadyProcessedPackets;
00132   }
00133 #else
00134   void CAChain::setSocket(CASocket* a_socket) {
00135     m_socket = a_socket;
00136   }
00137 #endif
00138 
00145 #ifdef HAVE_EPOLL
00146 SINT32 CAChain::processDownstream(CASocketGroupEpoll* a_signalingGroup, MIXPACKET* a_downstreamPacket, UINT32* a_processedBytes) {
00147 #else
00148 SINT32 CAChain::processDownstream(CASocketGroup* a_signalingGroup, MIXPACKET* a_downstreamPacket, UINT32* a_processedBytes) {
00149 #endif
00150   *a_processedBytes = 0;
00151   /* first: get the time - we will need it */
00152   timespec currentTime;
00153   getcurrentTime(currentTime);
00154   if (m_lastAccessTime != -1) {
00155     /* currently we dont't have an associated channel -> check whether the
00156      * access-timeout is reached
00157      */
00158     if (m_lastAccessTime + CHAIN_TIMEOUT < currentTime.tv_sec) {
00159       /* timeout is reached */
00160       closeChainInternal();
00161       return 3;
00162     }
00163     /* there is currently no channel associated -> we can't do anything */
00164     return 1;
00165   }
00166   /* we have at least one associated channel */
00167   /* check whether we have to drop packages because of outdated deadlines */
00168   t_deadlineEntry* testedDeadlineEntry = m_firstChannel->channel->firstResponseDeadline;
00169   if (((testedDeadlineEntry->deadline.tv_sec + DEADLINE_TIMEOUT) < currentTime.tv_sec) || (((testedDeadlineEntry->deadline.tv_sec + DEADLINE_TIMEOUT) == currentTime.tv_sec) && (testedDeadlineEntry->deadline.tv_nsec <= currentTime.tv_nsec))) {
00170     /* we are too late, it wouldn't make sense to send the packet -> we will
00171      * reduce traffic by dropping the packet (and all following packets of the
00172      * channel) -> currently we have to send at least a CHANNEL-CLOSE, so keep
00173      * one packet in the channel
00174      */
00175     if (m_firstChannel->channel->remainingDownstreamPackets > 1) {
00176       /* we will really loose packets -> synchronization between client and
00177        * server is destroyed -> signal connection error and close the chain
00178        */
00179       signalConnectionError();
00180       UINT8* chainId = getPrintableChainId();
00181       CAMsg::printMsg(LOG_INFO, "Dropped downstream-packets from chain '%s'!\n", chainId);
00182       delete []chainId;
00183       chainId = NULL;
00184       while (m_firstChannel->channel->remainingDownstreamPackets > 1) {
00185         m_firstChannel->channel->remainingDownstreamPackets--;
00186         m_firstChannel->channel->firstResponseDeadline = testedDeadlineEntry->nextDeadline;
00187         delete testedDeadlineEntry;
00188         testedDeadlineEntry = m_firstChannel->channel->firstResponseDeadline;
00189       }
00190     }
00191   }
00192   /* now try to send something */
00193   t_downstreamChainCell* pChainCell = (t_downstreamChainCell*)(a_downstreamPacket->data);
00194   if ((m_socket != NULL) && (!m_downstreamClosed) && (m_firstChannel != NULL)) {
00195     if (m_firstChannel->channel->remainingDownstreamPackets > 1) {
00196       /* we are able to send data to the client -> look whether data is
00197        * available at the socket
00198        */
00199       if (isSignaledInSocketGroup(a_signalingGroup)) {
00200         /* there is something available -> check how much data we can process
00201          */
00202         UINT16 payloadData = MAX_SEQUEL_DOWNSTREAM_CHAINCELL_PAYLOAD;
00203         if (m_firstDownstreamPacket) {
00204           payloadData = MAX_FIRST_DOWNSTREAM_CHAINCELL_PAYLOAD;
00205         }
00206         #ifdef DELAY_CHANNELS
00207           payloadData = min(payloadData, (UINT16)getDelayBucketInternal());
00208         #endif
00209         if (payloadData > 0) {
00210           /* we will receive something */
00211           /* if the packet isn't filled fully, some randomness for the
00212            * remainging space would be great
00213            */
00214           getRandom(a_downstreamPacket->data, DATA_SIZE);
00215           SINT32 bytesReceived;
00216           if (m_firstDownstreamPacket) {
00217             bytesReceived = m_socket->receive(pChainCell->firstCell.data, payloadData);
00218           }
00219           else {
00220             bytesReceived = m_socket->receive(pChainCell->sequelCell.data, payloadData);
00221           }
00222           if (bytesReceived >= 0) {
00223             if (bytesReceived == 0) {
00224               /* seems to be the end of the data-stream */
00225               closeDownstream();
00226             }
00227             else {
00228               /* we have received some bytes -> create the packet */
00229               #ifdef DELAY_CHANNELS
00230                 removeFromDelayBucketInternal(bytesReceived);
00231               #endif
00232               if (m_firstDownstreamPacket) {
00233                 /* also we have to send the Chain-ID */
00234                 memcpy(pChainCell->firstCell.chainId, m_chainId, CHAIN_ID_LENGTH);
00235                 m_firstDownstreamPacket = false;
00236               }
00237               pChainCell->lengthAndFlags = htons((UINT16)bytesReceived);
00238               a_downstreamPacket->channel = m_firstChannel->channel->channelId;
00239               a_downstreamPacket->flags = CHANNEL_DATA;
00240               m_firstChannel->channel->channelCipher->crypt2(a_downstreamPacket->data, a_downstreamPacket->data, DATA_SIZE);
00241               m_firstChannel->channel->remainingDownstreamPackets--;
00242               t_deadlineEntry* currentDeadline = m_firstChannel->channel->firstResponseDeadline;
00243               m_firstChannel->channel->firstResponseDeadline = currentDeadline->nextDeadline;
00244               delete currentDeadline;
00245               currentDeadline = NULL;
00246               *a_processedBytes = (UINT32)bytesReceived;
00247               #ifdef LOG_CHAIN_STATISTICS
00248                 m_packetsToUser++;
00249                 m_bytesToUser = m_bytesToUser + (UINT32)bytesReceived;
00250               #endif
00251               return 0;
00252             }
00253           }
00254           else {
00255             /* there was a connection error */
00256             signalConnectionError();
00257           }
00258         }
00259       }
00260     }
00261   }
00262   /* we cannot send any real data, but maybe we have to send some protocol data */
00263   if (m_firstChannel->channel->remainingDownstreamPackets == 1) {
00264     /* currently we have to send a CHANNEL-CLOSE */
00265     getRandom(a_downstreamPacket->data, DATA_SIZE);
00266     a_downstreamPacket->channel = m_firstChannel->channel->channelId;
00267     a_downstreamPacket->flags = CHANNEL_CLOSE;
00268     /* delete channel-resources */
00269     t_lastMixBChannelListEntry* currentChannel = m_firstChannel->channel;
00270     currentChannel->associatedChannelList->removeFromTable(currentChannel);
00271     delete currentChannel->firstResponseDeadline;
00272     currentChannel->firstResponseDeadline = NULL;
00273     delete currentChannel->channelCipher;
00274     currentChannel->channelCipher = NULL;
00275     delete currentChannel;
00276     currentChannel = NULL;
00277     t_channelEntry* currentChannelEntry = m_firstChannel;
00278     /* change to the next channel */
00279     m_firstChannel = m_firstChannel->nextChannel;
00280     delete currentChannelEntry;
00281     currentChannelEntry = NULL;
00282     #ifdef LOG_CHAIN_STATISTICS
00283       /* a packet (CHANNEL_CLOSE) without payload is sent */
00284       m_packetsToUser++;
00285     #endif
00286     if (m_firstChannel == NULL) {
00287       if (m_downstreamClosed && m_upstreamClosed) {
00288         /* it was the last channel and the chain is closed -> it can be
00289          * removed from the table
00290          */
00291         return 2;
00292       }
00293       else {
00294         /* it was the last channel, but the chain isn't closed -> start the
00295          * access timeout
00296          */
00297         timespec currentTime;
00298         getcurrentTime(currentTime);
00299         m_lastAccessTime = currentTime.tv_sec;
00300         return 0;
00301       }
00302     }
00303     /* we've sent a close but it wasn't the last channel */
00304     return 0;
00305   }
00306   /* no data, no channel-close, but maybe we have to send a packet because of
00307    * a deadline
00308    */
00309   if ((m_firstChannel->channel->firstResponseDeadline->deadline.tv_sec < currentTime.tv_sec) || ((m_firstChannel->channel->firstResponseDeadline->deadline.tv_sec == currentTime.tv_sec) && (m_firstChannel->channel->firstResponseDeadline->deadline.tv_nsec <= currentTime.tv_nsec))) {
00310     /* deadline reached */
00311     getRandom(a_downstreamPacket->data, DATA_SIZE);
00312     pChainCell->lengthAndFlags = 0;
00313     if (m_firstDownstreamPacket) {
00314       /* also we have to send the Chain-ID */
00315       memcpy(pChainCell->firstCell.chainId, m_chainId, CHAIN_ID_LENGTH);
00316       m_firstDownstreamPacket = false;
00317     }
00318     /* maybe we have to set some flags */
00319     if (m_unknownChainId) {
00320       pChainCell->lengthAndFlags = pChainCell->lengthAndFlags | CHAINFLAG_UNKNOWN_CHAIN;
00321       /* reset the flag */
00322       m_unknownChainId = false;
00323     }
00324     if (m_connectionError) {
00325       pChainCell->lengthAndFlags = pChainCell->lengthAndFlags | CHAINFLAG_CONNECTION_ERROR;
00326       /* reset the flag */
00327       m_connectionError = false;
00328     }
00329     if (m_downstreamClosed) {
00330       pChainCell->lengthAndFlags = pChainCell->lengthAndFlags | CHAINFLAG_STREAM_CLOSED;
00331       /* don't reset the flag */
00332     }
00333     /* ensure correct byte order */
00334     pChainCell->lengthAndFlags = htons(pChainCell->lengthAndFlags);
00335     /* finalize packet */
00336     a_downstreamPacket->channel = m_firstChannel->channel->channelId;
00337     a_downstreamPacket->flags = CHANNEL_DATA;
00338     m_firstChannel->channel->channelCipher->crypt2(a_downstreamPacket->data, a_downstreamPacket->data, DATA_SIZE);
00339     /* clean up */
00340     m_firstChannel->channel->remainingDownstreamPackets--;
00341     t_deadlineEntry* currentDeadline = m_firstChannel->channel->firstResponseDeadline;
00342     m_firstChannel->channel->firstResponseDeadline = currentDeadline->nextDeadline;
00343     delete currentDeadline;
00344     currentDeadline = NULL;
00345     #ifdef LOG_CHAIN_STATISTICS
00346       /* a packet without payload is sent */
00347       m_packetsToUser++;
00348     #endif
00349     return 0;
00350   }
00351   /* no deadline reached and nothing else to do */
00352   return 1;
00353 }
00354 
00355 #ifdef HAVE_EPOLL
00356 bool CAChain::isSignaledInSocketGroup(CASocketGroupEpoll* a_socketGroup) {
00357   if (m_socket != NULL) {
00358     return a_socketGroup->isSignaled(this);
00359   }
00360   return false;
00361 }
00362 #else
00363 bool CAChain::isSignaledInSocketGroup(CASocketGroup* a_socketGroup) {
00364   if (m_socket != NULL) {
00365     return a_socketGroup->isSignaled(*m_socket);
00366   }
00367   return false;
00368 }
00369 #endif
00370 
00371 #ifdef HAVE_EPOLL
00372 void CAChain::addToSocketGroup(CASocketGroupEpoll* a_socketGroup) {
00373 #else
00374 void CAChain::addToSocketGroup(CASocketGroup* a_socketGroup) {
00375 #endif
00376   if (m_socket != NULL) {
00377     /* check whether our socket isn't already in the specified socket-group
00378     */
00379     t_socketGroupEntry* currentEntry = m_firstSocketGroup;
00380     t_socketGroupEntry** previousNextEntryPointer = &m_firstSocketGroup;
00381     bool alreadyIncluded = false;
00382     while ((currentEntry != NULL) && (!alreadyIncluded)) {
00383       if (currentEntry->socketGroup == a_socketGroup) {
00384         alreadyIncluded = true;
00385       }
00386       else {
00387         previousNextEntryPointer = &(currentEntry->nextSocketGroup);
00388         currentEntry = currentEntry->nextSocketGroup;
00389       }
00390     }
00391     if (!alreadyIncluded) {
00392       #ifdef HAVE_EPOLL
00393         a_socketGroup->add(*m_socket, this);
00394       #else
00395         a_socketGroup->add(*m_socket);
00396       #endif
00397       currentEntry = new t_socketGroupEntry;
00398       currentEntry->nextSocketGroup = NULL;
00399       currentEntry->socketGroup = a_socketGroup;
00400       *previousNextEntryPointer = currentEntry;
00401     }
00402   }
00403 }
00404 
00405 #ifdef HAVE_EPOLL
00406 void CAChain::removeFromSocketGroup(CASocketGroupEpoll* a_socketGroup) {
00407 #else
00408 void CAChain::removeFromSocketGroup(CASocketGroup* a_socketGroup) {
00409 #endif
00410   if (m_socket != NULL) {
00411     /* check whether our socket is in the specified socket-group */
00412     t_socketGroupEntry* currentEntry = m_firstSocketGroup;
00413     t_socketGroupEntry** previousNextEntryPointer = &m_firstSocketGroup;
00414     while (currentEntry != NULL) {
00415       if (currentEntry->socketGroup == a_socketGroup) {
00416         /* we are in the specified socket group -> remove occurance */
00417         a_socketGroup->remove(*m_socket);
00418         *previousNextEntryPointer = currentEntry->nextSocketGroup;
00419         delete currentEntry;
00420         currentEntry = NULL;
00421       }
00422       else {
00423         previousNextEntryPointer = &(currentEntry->nextSocketGroup);
00424         currentEntry = currentEntry->nextSocketGroup;
00425       }
00426     }
00427   }
00428 }
00429 
00430 #ifdef HAVE_EPOLL
00431 UINT32 CAChain::sendUpstreamData(UINT32 a_maxLength, CASocketGroupEpoll* a_removedSocketGroup) {
00432 #else
00433 UINT32 CAChain::sendUpstreamData(UINT32 a_maxLength, CASocketGroup* a_removedSocketGroup) {
00434 #endif
00435   UINT32 processedBytes = sendUpstreamDataInternal(a_maxLength);
00436   if (m_upstreamSendQueue->isEmpty()) {
00437     /* queue is empty -> we can remove the entry from the socketgroup */
00438     removeFromSocketGroup(a_removedSocketGroup);
00439   }
00440   return processedBytes;
00441 }
00442 
00443 
00444 void CAChain::addDataToUpstreamQueue(UINT8* a_buffer, UINT32 a_size) {
00445   if (!m_upstreamClosed) {
00446     /* only add data if upstream isn't closed */
00447     m_upstreamSendQueue->add(a_buffer, a_size);
00448     #ifdef LOG_CHAIN_STATISTICS
00449       m_packetsFromUser++;
00450       m_bytesFromUser = m_bytesFromUser + a_size;
00451     #endif
00452   }
00453 }
00454 
00455 void CAChain::closeUpstream() {
00456   /* currently we will close the whole chain immediately */
00457   closeChainInternal();
00458 }
00459 
00460 void CAChain::closeDownstream() {
00461   /* currently we will close the whole chain immediately */
00462   closeChainInternal();
00463 }
00464 
00465 void CAChain::signalConnectionError() {
00466   m_connectionError = true;
00467   /* we will also close the chain */
00468   closeChainInternal();
00469 }
00470 
00471 void CAChain::signalUnknownChain() {
00472   m_unknownChainId = true;
00473   /* we will not send any chain-id -> disable m_firstDownstreamPacket */
00474   m_firstDownstreamPacket = false;
00475   /* we will also close the chain */
00476   closeChainInternal();
00477 }
00478 
00479 UINT8* CAChain::getPrintableChainId() {
00480   UINT8* printableChainId = bytes2hex(m_chainId, CHAIN_ID_LENGTH);
00481   strtrim(printableChainId);
00482   return printableChainId;
00483 }
00484 
00485 void CAChain::addChannel(t_lastMixBChannelListEntry* a_channel, bool a_fastResponse) {
00486   t_channelEntry* lastChannel = NULL;
00487   bool invalidChannel = false;
00488   if (m_firstChannel != NULL) {
00489     if (m_firstChannel->nextChannel != NULL) {
00490       /* somebody is trying to add a third channel to the chain but currently
00491        * only 2 channels can be associated to a data-chain -> ignore the new
00492        * channel (attention: currently we have to send at least a
00493        * CHANNEL-CLOSE because there is no channel-timeout at first and middle
00494        * mixes), send an IOException and close the chain
00495        */
00496       invalidChannel = true;
00497       signalConnectionError();
00498       /* find the last associated channel */
00499       lastChannel = m_firstChannel->nextChannel;
00500       while (lastChannel->nextChannel != NULL) {
00501         lastChannel = lastChannel->nextChannel;
00502       }   
00503     }
00504     else {
00505       lastChannel = m_firstChannel;
00506     }
00507   }
00508   t_channelEntry* newChannel = new t_channelEntry;
00509   /* initialize the fields */
00510   newChannel->nextChannel = NULL;
00511   newChannel->channel = a_channel;
00512   if (lastChannel != NULL) {
00513     /* close all previous channels immediately */
00514     forceImmediateResponsesInternal();
00515     /* now add the new channel */
00516     lastChannel->nextChannel = newChannel;
00517   }
00518   else {
00519     m_firstChannel = newChannel;
00520   }
00521   timespec currentTime;
00522   getcurrentTime(currentTime);
00523   if (!invalidChannel) {
00524     if ((!(m_upstreamClosed && m_downstreamClosed)) && (m_lastAccessTime != -1)) {
00525       /* if not downstream and upstream is closed and also an access-timeout
00526        * is running -> stop that access-timeout
00527        */
00528       m_lastAccessTime = -1;
00529     }
00530     a_channel->remainingDownstreamPackets = CHANNEL_DOWNSTREAM_PACKETS;
00531     /* create deadlines for the new downstream-packets */
00532     t_deadlineEntry** lastNextDeadlinePointer = &(a_channel->firstResponseDeadline);
00533     for (UINT32 i = 0; i < CHANNEL_DOWNSTREAM_PACKETS; i++) {
00534       t_deadlineEntry* currentDeadline = new t_deadlineEntry;
00535       currentDeadline->nextDeadline = NULL;
00536       if (!m_downstreamClosed) {
00537         if (a_fastResponse && (i == 0)) {
00538           /* we shall send a fast response -> send back the first packet of
00539            * the new channel immediately
00540            */
00541           currentDeadline->deadline.tv_sec = currentTime.tv_sec;
00542         }
00543         else {
00544           /* use normal channel-timeout */
00545           currentDeadline->deadline.tv_sec = currentTime.tv_sec + CHANNEL_TIMEOUT;
00546         }
00547       }
00548       else {
00549         /* downstream is already closed -> send packets back immediately */
00550         currentDeadline->deadline.tv_sec = currentTime.tv_sec;
00551       }
00552       currentDeadline->deadline.tv_nsec = currentTime.tv_nsec;
00553       *lastNextDeadlinePointer = currentDeadline;
00554       lastNextDeadlinePointer = &(currentDeadline->nextDeadline);
00555     }
00556   }
00557   else {
00558     /* send only one packet (will be CHANNEL-CLOSE) */
00559     a_channel->remainingDownstreamPackets = 1;
00560     a_channel->firstResponseDeadline = new t_deadlineEntry;
00561     a_channel->firstResponseDeadline->nextDeadline = NULL;
00562     a_channel->firstResponseDeadline->deadline.tv_sec = currentTime.tv_sec;
00563     a_channel->firstResponseDeadline->deadline.tv_nsec = currentTime.tv_nsec;
00564   }
00565 }
00566 
00567 void CAChain::closeChainInternal() {
00568   m_upstreamClosed = true;
00569   m_downstreamClosed = true;
00570   m_upstreamSendQueue->clean();
00571   if (m_socket != NULL) {
00572     removeFromAllSocketGroupsInternal();
00573     m_socket->close();
00574     delete m_socket;
00575     m_socket = NULL;
00576   }
00577   /* send back all response-packets immediately */
00578   forceImmediateResponsesInternal();
00579 }
00580 
00581 void CAChain::forceImmediateResponsesInternal() {
00582   /* set all deadlines to the current time (if they don't have a previous
00583    * time) */
00584   timespec currentTime;
00585   getcurrentTime(currentTime);
00586   t_channelEntry* currentChannel = m_firstChannel;
00587   while (currentChannel != NULL) {     
00588     t_deadlineEntry* currentDeadline = currentChannel->channel->firstResponseDeadline;
00589     while (currentDeadline != NULL) {
00590       if (currentDeadline->deadline.tv_sec > currentTime.tv_sec) {
00591         currentDeadline->deadline.tv_sec = currentTime.tv_sec;
00592         currentDeadline->deadline.tv_nsec = currentTime.tv_nsec;
00593       }
00594       else {
00595         if ((currentDeadline->deadline.tv_sec == currentTime.tv_sec) && (currentDeadline->deadline.tv_nsec > currentTime.tv_nsec)) {
00596           currentDeadline->deadline.tv_nsec = currentTime.tv_nsec;
00597         }
00598       }
00599       currentDeadline = currentDeadline->nextDeadline;
00600     }
00601     currentChannel = currentChannel->nextChannel;
00602   }
00603 }  
00604 
00605 void CAChain::removeFromAllSocketGroupsInternal() {
00606   t_socketGroupEntry* currentEntry = m_firstSocketGroup;
00607   m_firstSocketGroup = NULL;
00608   while (currentEntry != NULL) {
00609     currentEntry->socketGroup->remove(*m_socket);
00610     t_socketGroupEntry* nextEntry = currentEntry->nextSocketGroup;
00611     delete currentEntry;
00612     currentEntry = nextEntry;
00613   }
00614 }
00615 
00616 UINT32 CAChain::sendUpstreamDataInternal(UINT32 a_maxLength) {
00617   UINT32 bytesSent = 0;
00618   if ((!m_upstreamClosed) && (m_socket != NULL)) {
00619     UINT32 length = a_maxLength;
00620     UINT8* buffer = new UINT8[length];
00621     if (m_upstreamSendQueue->peek(buffer, &length) == E_SUCCESS) {
00622       /* queue has filled the buffer */
00623       SINT32 errorCode = m_socket->send(buffer, length);
00624       if (errorCode >= 0) {
00625         length = (UINT32)errorCode;
00626         bytesSent = length;
00627         m_upstreamSendQueue->remove(&length);
00628       }
00629       else {
00630         /* error while sending data */
00631         signalConnectionError();
00632       }
00633     }
00634   }
00635   return bytesSent;
00636 }
00637 
00638 #ifdef DELAY_CHANNELS
00639   SINT32 CAChain::getDelayBucketInternal() {
00640     SINT32 delayBucket;
00641     m_pDelayBucketMutex->lock();
00642     delayBucket = *m_pDelayBucket;
00643     m_pDelayBucketMutex->unlock();
00644     return delayBucket;
00645   }
00646 
00647   void CAChain::removeFromDelayBucketInternal(SINT32 a_bytesToRemove) {
00648     m_pDelayBucketMutex->lock();
00649     *m_pDelayBucket = (*m_pDelayBucket) + a_bytesToRemove;
00650     m_pDelayBucketMutex->unlock();
00651   }
00652 #endif
00653 #endif //ONLY_LOCAL_PROXY