00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032 #include "../StdAfx.h"
00033 #ifndef ONLY_LOCAL_PROXY
00034 #include "CAChain.hpp"
00035 #include "typedefsb.hpp"
00036 #include "CALastMixBChannelList.hpp"
00037 #include "../CAUtil.hpp"
00038
00039 #ifndef DELAY_CHANNELS
00040 CAChain::CAChain(UINT8* a_chainId) {
00041 #else
00042 CAChain::CAChain(UINT8* a_chainId, CAMutex* a_delayBucketMutex, SINT32* a_delayBucket) {
00043 #endif
00044 m_chainId = a_chainId;
00045 m_firstChannel = NULL;
00046 m_socket = NULL;
00047 m_lastAccessTime = -1;
00048 m_upstreamSendQueue = new CAQueue(DATA_SIZE);
00049 m_upstreamClosed = false;
00050 m_downstreamClosed = false;
00051 m_firstSocketGroup = NULL;
00052 m_connectionError = false;
00053 m_unknownChainId = false;
00054 m_firstDownstreamPacket = true;
00055 #ifdef LOG_CHAIN_STATISTICS
00056 m_packetsFromUser = 0;
00057 m_bytesFromUser = 0;
00058 m_packetsToUser = 0;
00059 m_bytesToUser = 0;
00060 getcurrentTimeMicros(m_creationTime);
00061 #endif
00062 #ifdef DELAY_CHANNELS
00063 m_pDelayBucketMutex = a_delayBucketMutex;
00064 m_pDelayBucket = a_delayBucket;
00065 #endif
00066 }
00067
00068 CAChain::~CAChain(void) {
00069 if (m_socket != NULL) {
00070 removeFromAllSocketGroupsInternal();
00071 m_socket->close();
00072 delete m_socket;
00073 m_socket = NULL;
00074 }
00075 m_upstreamSendQueue->clean();
00076 delete m_upstreamSendQueue;
00077 m_upstreamSendQueue = NULL;
00078
00079
00080
00081 while (m_firstChannel != NULL) {
00082 t_channelEntry* channelEntry = m_firstChannel;
00083
00084 channelEntry->channel->associatedChannelList->removeFromTable(channelEntry->channel);
00085
00086 while (channelEntry->channel->firstResponseDeadline != NULL) {
00087 t_deadlineEntry* currentDeadline = channelEntry->channel->firstResponseDeadline;
00088 channelEntry->channel->firstResponseDeadline = currentDeadline->nextDeadline;
00089 delete currentDeadline;
00090 currentDeadline = NULL;
00091 }
00092
00093 delete channelEntry->channel->channelCipher;
00094 channelEntry->channel->channelCipher = NULL;
00095 m_firstChannel = channelEntry->nextChannel;
00096 delete channelEntry;
00097 channelEntry = NULL;
00098 }
00099 #ifdef LOG_CHAIN_STATISTICS
00100
00101
00102
00103 UINT64 currentTime;
00104 getcurrentTimeMicros(currentTime);
00105 UINT32 duration = diff64(currentTime, m_creationTime);
00106 UINT8* chainId = getPrintableChainId();
00107 CAMsg::printMsg(LOG_DEBUG, "%s,%u,%u,%u,%u,%u\n", chainId, duration, m_bytesFromUser, m_bytesToUser, m_packetsFromUser, m_packetsToUser);
00108 delete []chainId;
00109 chainId = NULL;
00110 #endif
00111 delete []m_chainId;
00112 m_chainId = NULL;
00113 #ifdef DELAY_CHANNELS
00114
00115
00116
00117 m_pDelayBucketMutex->lock();
00118 *m_pDelayBucket = -1;
00119 m_pDelayBucketMutex->unlock();
00120 #endif
00121 }
00122
00123 UINT8* CAChain::getChainId() {
00124 return m_chainId;
00125 }
00126
00127 #ifdef LOG_CHAIN_STATISTICS
00128 void CAChain::setSocket(CASocket* a_socket, UINT32 a_alreadyProcessedPackets, UINT32 a_alreadyProcessedBytes) {
00129 m_socket = a_socket;
00130 m_bytesFromUser = a_alreadyProcessedBytes;
00131 m_packetsFromUser = a_alreadyProcessedPackets;
00132 }
00133 #else
00134 void CAChain::setSocket(CASocket* a_socket) {
00135 m_socket = a_socket;
00136 }
00137 #endif
00138
00145 #ifdef HAVE_EPOLL
00146 SINT32 CAChain::processDownstream(CASocketGroupEpoll* a_signalingGroup, MIXPACKET* a_downstreamPacket, UINT32* a_processedBytes) {
00147 #else
00148 SINT32 CAChain::processDownstream(CASocketGroup* a_signalingGroup, MIXPACKET* a_downstreamPacket, UINT32* a_processedBytes) {
00149 #endif
00150 *a_processedBytes = 0;
00151
00152 timespec currentTime;
00153 getcurrentTime(currentTime);
00154 if (m_lastAccessTime != -1) {
00155
00156
00157
00158 if (m_lastAccessTime + CHAIN_TIMEOUT < currentTime.tv_sec) {
00159
00160 closeChainInternal();
00161 return 3;
00162 }
00163
00164 return 1;
00165 }
00166
00167
00168 t_deadlineEntry* testedDeadlineEntry = m_firstChannel->channel->firstResponseDeadline;
00169 if (((testedDeadlineEntry->deadline.tv_sec + DEADLINE_TIMEOUT) < currentTime.tv_sec) || (((testedDeadlineEntry->deadline.tv_sec + DEADLINE_TIMEOUT) == currentTime.tv_sec) && (testedDeadlineEntry->deadline.tv_nsec <= currentTime.tv_nsec))) {
00170
00171
00172
00173
00174
00175 if (m_firstChannel->channel->remainingDownstreamPackets > 1) {
00176
00177
00178
00179 signalConnectionError();
00180 UINT8* chainId = getPrintableChainId();
00181 CAMsg::printMsg(LOG_INFO, "Dropped downstream-packets from chain '%s'!\n", chainId);
00182 delete []chainId;
00183 chainId = NULL;
00184 while (m_firstChannel->channel->remainingDownstreamPackets > 1) {
00185 m_firstChannel->channel->remainingDownstreamPackets--;
00186 m_firstChannel->channel->firstResponseDeadline = testedDeadlineEntry->nextDeadline;
00187 delete testedDeadlineEntry;
00188 testedDeadlineEntry = m_firstChannel->channel->firstResponseDeadline;
00189 }
00190 }
00191 }
00192
00193 t_downstreamChainCell* pChainCell = (t_downstreamChainCell*)(a_downstreamPacket->data);
00194 if ((m_socket != NULL) && (!m_downstreamClosed) && (m_firstChannel != NULL)) {
00195 if (m_firstChannel->channel->remainingDownstreamPackets > 1) {
00196
00197
00198
00199 if (isSignaledInSocketGroup(a_signalingGroup)) {
00200
00201
00202 UINT16 payloadData = MAX_SEQUEL_DOWNSTREAM_CHAINCELL_PAYLOAD;
00203 if (m_firstDownstreamPacket) {
00204 payloadData = MAX_FIRST_DOWNSTREAM_CHAINCELL_PAYLOAD;
00205 }
00206 #ifdef DELAY_CHANNELS
00207 payloadData = min(payloadData, (UINT16)getDelayBucketInternal());
00208 #endif
00209 if (payloadData > 0) {
00210
00211
00212
00213
00214 getRandom(a_downstreamPacket->data, DATA_SIZE);
00215 SINT32 bytesReceived;
00216 if (m_firstDownstreamPacket) {
00217 bytesReceived = m_socket->receive(pChainCell->firstCell.data, payloadData);
00218 }
00219 else {
00220 bytesReceived = m_socket->receive(pChainCell->sequelCell.data, payloadData);
00221 }
00222 if (bytesReceived >= 0) {
00223 if (bytesReceived == 0) {
00224
00225 closeDownstream();
00226 }
00227 else {
00228
00229 #ifdef DELAY_CHANNELS
00230 removeFromDelayBucketInternal(bytesReceived);
00231 #endif
00232 if (m_firstDownstreamPacket) {
00233
00234 memcpy(pChainCell->firstCell.chainId, m_chainId, CHAIN_ID_LENGTH);
00235 m_firstDownstreamPacket = false;
00236 }
00237 pChainCell->lengthAndFlags = htons((UINT16)bytesReceived);
00238 a_downstreamPacket->channel = m_firstChannel->channel->channelId;
00239 a_downstreamPacket->flags = CHANNEL_DATA;
00240 m_firstChannel->channel->channelCipher->crypt2(a_downstreamPacket->data, a_downstreamPacket->data, DATA_SIZE);
00241 m_firstChannel->channel->remainingDownstreamPackets--;
00242 t_deadlineEntry* currentDeadline = m_firstChannel->channel->firstResponseDeadline;
00243 m_firstChannel->channel->firstResponseDeadline = currentDeadline->nextDeadline;
00244 delete currentDeadline;
00245 currentDeadline = NULL;
00246 *a_processedBytes = (UINT32)bytesReceived;
00247 #ifdef LOG_CHAIN_STATISTICS
00248 m_packetsToUser++;
00249 m_bytesToUser = m_bytesToUser + (UINT32)bytesReceived;
00250 #endif
00251 return 0;
00252 }
00253 }
00254 else {
00255
00256 signalConnectionError();
00257 }
00258 }
00259 }
00260 }
00261 }
00262
00263 if (m_firstChannel->channel->remainingDownstreamPackets == 1) {
00264
00265 getRandom(a_downstreamPacket->data, DATA_SIZE);
00266 a_downstreamPacket->channel = m_firstChannel->channel->channelId;
00267 a_downstreamPacket->flags = CHANNEL_CLOSE;
00268
00269 t_lastMixBChannelListEntry* currentChannel = m_firstChannel->channel;
00270 currentChannel->associatedChannelList->removeFromTable(currentChannel);
00271 delete currentChannel->firstResponseDeadline;
00272 currentChannel->firstResponseDeadline = NULL;
00273 delete currentChannel->channelCipher;
00274 currentChannel->channelCipher = NULL;
00275 delete currentChannel;
00276 currentChannel = NULL;
00277 t_channelEntry* currentChannelEntry = m_firstChannel;
00278
00279 m_firstChannel = m_firstChannel->nextChannel;
00280 delete currentChannelEntry;
00281 currentChannelEntry = NULL;
00282 #ifdef LOG_CHAIN_STATISTICS
00283
00284 m_packetsToUser++;
00285 #endif
00286 if (m_firstChannel == NULL) {
00287 if (m_downstreamClosed && m_upstreamClosed) {
00288
00289
00290
00291 return 2;
00292 }
00293 else {
00294
00295
00296
00297 timespec currentTime;
00298 getcurrentTime(currentTime);
00299 m_lastAccessTime = currentTime.tv_sec;
00300 return 0;
00301 }
00302 }
00303
00304 return 0;
00305 }
00306
00307
00308
00309 if ((m_firstChannel->channel->firstResponseDeadline->deadline.tv_sec < currentTime.tv_sec) || ((m_firstChannel->channel->firstResponseDeadline->deadline.tv_sec == currentTime.tv_sec) && (m_firstChannel->channel->firstResponseDeadline->deadline.tv_nsec <= currentTime.tv_nsec))) {
00310
00311 getRandom(a_downstreamPacket->data, DATA_SIZE);
00312 pChainCell->lengthAndFlags = 0;
00313 if (m_firstDownstreamPacket) {
00314
00315 memcpy(pChainCell->firstCell.chainId, m_chainId, CHAIN_ID_LENGTH);
00316 m_firstDownstreamPacket = false;
00317 }
00318
00319 if (m_unknownChainId) {
00320 pChainCell->lengthAndFlags = pChainCell->lengthAndFlags | CHAINFLAG_UNKNOWN_CHAIN;
00321
00322 m_unknownChainId = false;
00323 }
00324 if (m_connectionError) {
00325 pChainCell->lengthAndFlags = pChainCell->lengthAndFlags | CHAINFLAG_CONNECTION_ERROR;
00326
00327 m_connectionError = false;
00328 }
00329 if (m_downstreamClosed) {
00330 pChainCell->lengthAndFlags = pChainCell->lengthAndFlags | CHAINFLAG_STREAM_CLOSED;
00331
00332 }
00333
00334 pChainCell->lengthAndFlags = htons(pChainCell->lengthAndFlags);
00335
00336 a_downstreamPacket->channel = m_firstChannel->channel->channelId;
00337 a_downstreamPacket->flags = CHANNEL_DATA;
00338 m_firstChannel->channel->channelCipher->crypt2(a_downstreamPacket->data, a_downstreamPacket->data, DATA_SIZE);
00339
00340 m_firstChannel->channel->remainingDownstreamPackets--;
00341 t_deadlineEntry* currentDeadline = m_firstChannel->channel->firstResponseDeadline;
00342 m_firstChannel->channel->firstResponseDeadline = currentDeadline->nextDeadline;
00343 delete currentDeadline;
00344 currentDeadline = NULL;
00345 #ifdef LOG_CHAIN_STATISTICS
00346
00347 m_packetsToUser++;
00348 #endif
00349 return 0;
00350 }
00351
00352 return 1;
00353 }
00354
00355 #ifdef HAVE_EPOLL
00356 bool CAChain::isSignaledInSocketGroup(CASocketGroupEpoll* a_socketGroup) {
00357 if (m_socket != NULL) {
00358 return a_socketGroup->isSignaled(this);
00359 }
00360 return false;
00361 }
00362 #else
00363 bool CAChain::isSignaledInSocketGroup(CASocketGroup* a_socketGroup) {
00364 if (m_socket != NULL) {
00365 return a_socketGroup->isSignaled(*m_socket);
00366 }
00367 return false;
00368 }
00369 #endif
00370
00371 #ifdef HAVE_EPOLL
00372 void CAChain::addToSocketGroup(CASocketGroupEpoll* a_socketGroup) {
00373 #else
00374 void CAChain::addToSocketGroup(CASocketGroup* a_socketGroup) {
00375 #endif
00376 if (m_socket != NULL) {
00377
00378
00379 t_socketGroupEntry* currentEntry = m_firstSocketGroup;
00380 t_socketGroupEntry** previousNextEntryPointer = &m_firstSocketGroup;
00381 bool alreadyIncluded = false;
00382 while ((currentEntry != NULL) && (!alreadyIncluded)) {
00383 if (currentEntry->socketGroup == a_socketGroup) {
00384 alreadyIncluded = true;
00385 }
00386 else {
00387 previousNextEntryPointer = &(currentEntry->nextSocketGroup);
00388 currentEntry = currentEntry->nextSocketGroup;
00389 }
00390 }
00391 if (!alreadyIncluded) {
00392 #ifdef HAVE_EPOLL
00393 a_socketGroup->add(*m_socket, this);
00394 #else
00395 a_socketGroup->add(*m_socket);
00396 #endif
00397 currentEntry = new t_socketGroupEntry;
00398 currentEntry->nextSocketGroup = NULL;
00399 currentEntry->socketGroup = a_socketGroup;
00400 *previousNextEntryPointer = currentEntry;
00401 }
00402 }
00403 }
00404
00405 #ifdef HAVE_EPOLL
00406 void CAChain::removeFromSocketGroup(CASocketGroupEpoll* a_socketGroup) {
00407 #else
00408 void CAChain::removeFromSocketGroup(CASocketGroup* a_socketGroup) {
00409 #endif
00410 if (m_socket != NULL) {
00411
00412 t_socketGroupEntry* currentEntry = m_firstSocketGroup;
00413 t_socketGroupEntry** previousNextEntryPointer = &m_firstSocketGroup;
00414 while (currentEntry != NULL) {
00415 if (currentEntry->socketGroup == a_socketGroup) {
00416
00417 a_socketGroup->remove(*m_socket);
00418 *previousNextEntryPointer = currentEntry->nextSocketGroup;
00419 delete currentEntry;
00420 currentEntry = NULL;
00421 }
00422 else {
00423 previousNextEntryPointer = &(currentEntry->nextSocketGroup);
00424 currentEntry = currentEntry->nextSocketGroup;
00425 }
00426 }
00427 }
00428 }
00429
00430 #ifdef HAVE_EPOLL
00431 UINT32 CAChain::sendUpstreamData(UINT32 a_maxLength, CASocketGroupEpoll* a_removedSocketGroup) {
00432 #else
00433 UINT32 CAChain::sendUpstreamData(UINT32 a_maxLength, CASocketGroup* a_removedSocketGroup) {
00434 #endif
00435 UINT32 processedBytes = sendUpstreamDataInternal(a_maxLength);
00436 if (m_upstreamSendQueue->isEmpty()) {
00437
00438 removeFromSocketGroup(a_removedSocketGroup);
00439 }
00440 return processedBytes;
00441 }
00442
00443
00444 void CAChain::addDataToUpstreamQueue(UINT8* a_buffer, UINT32 a_size) {
00445 if (!m_upstreamClosed) {
00446
00447 m_upstreamSendQueue->add(a_buffer, a_size);
00448 #ifdef LOG_CHAIN_STATISTICS
00449 m_packetsFromUser++;
00450 m_bytesFromUser = m_bytesFromUser + a_size;
00451 #endif
00452 }
00453 }
00454
00455 void CAChain::closeUpstream() {
00456
00457 closeChainInternal();
00458 }
00459
00460 void CAChain::closeDownstream() {
00461
00462 closeChainInternal();
00463 }
00464
00465 void CAChain::signalConnectionError() {
00466 m_connectionError = true;
00467
00468 closeChainInternal();
00469 }
00470
00471 void CAChain::signalUnknownChain() {
00472 m_unknownChainId = true;
00473
00474 m_firstDownstreamPacket = false;
00475
00476 closeChainInternal();
00477 }
00478
00479 UINT8* CAChain::getPrintableChainId() {
00480 UINT8* printableChainId = bytes2hex(m_chainId, CHAIN_ID_LENGTH);
00481 strtrim(printableChainId);
00482 return printableChainId;
00483 }
00484
00485 void CAChain::addChannel(t_lastMixBChannelListEntry* a_channel, bool a_fastResponse) {
00486 t_channelEntry* lastChannel = NULL;
00487 bool invalidChannel = false;
00488 if (m_firstChannel != NULL) {
00489 if (m_firstChannel->nextChannel != NULL) {
00490
00491
00492
00493
00494
00495
00496 invalidChannel = true;
00497 signalConnectionError();
00498
00499 lastChannel = m_firstChannel->nextChannel;
00500 while (lastChannel->nextChannel != NULL) {
00501 lastChannel = lastChannel->nextChannel;
00502 }
00503 }
00504 else {
00505 lastChannel = m_firstChannel;
00506 }
00507 }
00508 t_channelEntry* newChannel = new t_channelEntry;
00509
00510 newChannel->nextChannel = NULL;
00511 newChannel->channel = a_channel;
00512 if (lastChannel != NULL) {
00513
00514 forceImmediateResponsesInternal();
00515
00516 lastChannel->nextChannel = newChannel;
00517 }
00518 else {
00519 m_firstChannel = newChannel;
00520 }
00521 timespec currentTime;
00522 getcurrentTime(currentTime);
00523 if (!invalidChannel) {
00524 if ((!(m_upstreamClosed && m_downstreamClosed)) && (m_lastAccessTime != -1)) {
00525
00526
00527
00528 m_lastAccessTime = -1;
00529 }
00530 a_channel->remainingDownstreamPackets = CHANNEL_DOWNSTREAM_PACKETS;
00531
00532 t_deadlineEntry** lastNextDeadlinePointer = &(a_channel->firstResponseDeadline);
00533 for (UINT32 i = 0; i < CHANNEL_DOWNSTREAM_PACKETS; i++) {
00534 t_deadlineEntry* currentDeadline = new t_deadlineEntry;
00535 currentDeadline->nextDeadline = NULL;
00536 if (!m_downstreamClosed) {
00537 if (a_fastResponse && (i == 0)) {
00538
00539
00540
00541 currentDeadline->deadline.tv_sec = currentTime.tv_sec;
00542 }
00543 else {
00544
00545 currentDeadline->deadline.tv_sec = currentTime.tv_sec + CHANNEL_TIMEOUT;
00546 }
00547 }
00548 else {
00549
00550 currentDeadline->deadline.tv_sec = currentTime.tv_sec;
00551 }
00552 currentDeadline->deadline.tv_nsec = currentTime.tv_nsec;
00553 *lastNextDeadlinePointer = currentDeadline;
00554 lastNextDeadlinePointer = &(currentDeadline->nextDeadline);
00555 }
00556 }
00557 else {
00558
00559 a_channel->remainingDownstreamPackets = 1;
00560 a_channel->firstResponseDeadline = new t_deadlineEntry;
00561 a_channel->firstResponseDeadline->nextDeadline = NULL;
00562 a_channel->firstResponseDeadline->deadline.tv_sec = currentTime.tv_sec;
00563 a_channel->firstResponseDeadline->deadline.tv_nsec = currentTime.tv_nsec;
00564 }
00565 }
00566
00567 void CAChain::closeChainInternal() {
00568 m_upstreamClosed = true;
00569 m_downstreamClosed = true;
00570 m_upstreamSendQueue->clean();
00571 if (m_socket != NULL) {
00572 removeFromAllSocketGroupsInternal();
00573 m_socket->close();
00574 delete m_socket;
00575 m_socket = NULL;
00576 }
00577
00578 forceImmediateResponsesInternal();
00579 }
00580
00581 void CAChain::forceImmediateResponsesInternal() {
00582
00583
00584 timespec currentTime;
00585 getcurrentTime(currentTime);
00586 t_channelEntry* currentChannel = m_firstChannel;
00587 while (currentChannel != NULL) {
00588 t_deadlineEntry* currentDeadline = currentChannel->channel->firstResponseDeadline;
00589 while (currentDeadline != NULL) {
00590 if (currentDeadline->deadline.tv_sec > currentTime.tv_sec) {
00591 currentDeadline->deadline.tv_sec = currentTime.tv_sec;
00592 currentDeadline->deadline.tv_nsec = currentTime.tv_nsec;
00593 }
00594 else {
00595 if ((currentDeadline->deadline.tv_sec == currentTime.tv_sec) && (currentDeadline->deadline.tv_nsec > currentTime.tv_nsec)) {
00596 currentDeadline->deadline.tv_nsec = currentTime.tv_nsec;
00597 }
00598 }
00599 currentDeadline = currentDeadline->nextDeadline;
00600 }
00601 currentChannel = currentChannel->nextChannel;
00602 }
00603 }
00604
00605 void CAChain::removeFromAllSocketGroupsInternal() {
00606 t_socketGroupEntry* currentEntry = m_firstSocketGroup;
00607 m_firstSocketGroup = NULL;
00608 while (currentEntry != NULL) {
00609 currentEntry->socketGroup->remove(*m_socket);
00610 t_socketGroupEntry* nextEntry = currentEntry->nextSocketGroup;
00611 delete currentEntry;
00612 currentEntry = nextEntry;
00613 }
00614 }
00615
00616 UINT32 CAChain::sendUpstreamDataInternal(UINT32 a_maxLength) {
00617 UINT32 bytesSent = 0;
00618 if ((!m_upstreamClosed) && (m_socket != NULL)) {
00619 UINT32 length = a_maxLength;
00620 UINT8* buffer = new UINT8[length];
00621 if (m_upstreamSendQueue->peek(buffer, &length) == E_SUCCESS) {
00622
00623 SINT32 errorCode = m_socket->send(buffer, length);
00624 if (errorCode >= 0) {
00625 length = (UINT32)errorCode;
00626 bytesSent = length;
00627 m_upstreamSendQueue->remove(&length);
00628 }
00629 else {
00630
00631 signalConnectionError();
00632 }
00633 }
00634 }
00635 return bytesSent;
00636 }
00637
00638 #ifdef DELAY_CHANNELS
00639 SINT32 CAChain::getDelayBucketInternal() {
00640 SINT32 delayBucket;
00641 m_pDelayBucketMutex->lock();
00642 delayBucket = *m_pDelayBucket;
00643 m_pDelayBucketMutex->unlock();
00644 return delayBucket;
00645 }
00646
00647 void CAChain::removeFromDelayBucketInternal(SINT32 a_bytesToRemove) {
00648 m_pDelayBucketMutex->lock();
00649 *m_pDelayBucket = (*m_pDelayBucket) + a_bytesToRemove;
00650 m_pDelayBucketMutex->unlock();
00651 }
00652 #endif
00653 #endif //ONLY_LOCAL_PROXY