|
Mixe for Privacy and Anonymity in the Internet
|
#include "doxygen.h"#include "CAMix.hpp"#include "CAMuxSocket.hpp"#include "CAASymCipher.hpp"#include "CAFirstMixChannelList.hpp"#include "CAIPList.hpp"#include "CASocketGroup.hpp"#include "CAQueue.hpp"#include "CAUtil.hpp"#include "CAThread.hpp"#include "CAThreadPool.hpp"#include "TermsAndConditions.hpp"#include "CALogPacketStats.hpp"#include "CAConditionVariable.hpp"#include "CATempIPBlockList.hpp"#include "CASocketGroupEpoll.hpp"#include "CAMixWithReplayDB.hpp"#include "CAMutex.hpp"Go to the source code of this file.
Classes | |
| class | tUINT32withLock |
| class | CAFirstMix |
Defines | |
| #define | TNC_SREQUEST "TermsAndConditionsRequest" |
| #define | TNC_RESPONSE "TermsAndConditionsResponse" |
| #define | TNC_SINTERRUPT "TermsAndConditionsInterrupt" |
| #define | TNC_REQ_TRANSLATION "Translation" |
| #define | TNC_RESOURCES "Resources" |
| #define | TNC_RESOURCE_TEMPLATE "Template" |
| #define | TNC_TEMPLATE_ROOT_ELEMENT "TermsAndConditionsTemplate" |
| #define | TNC_RESOURCE_CUSTOMIZED_SECT "CustomizedSections" |
| #define | TNC_RESPONSE_INVALID_REQUEST "InvalidTermsAndConditionsRequest" |
| #define | TNC_CONFIRM_REQ "TermsAndConditionsConfirm" |
Functions | |
| THREAD_RETURN | fm_loopSendToMix (void *) |
| How to end this thread: 0. | |
| THREAD_RETURN | fm_loopReadFromMix (void *) |
| THREAD_RETURN | fm_loopAcceptUsers (void *) |
| THREAD_RETURN | fm_loopReadFromUsers (void *) |
| THREAD_RETURN | fm_loopDoUserLogin (void *param) |
| THREAD_RETURN | fm_loopLog (void *) |
| THREAD_RETURN | iplist_loopDoLogCountries (void *param) |
| #define TNC_CONFIRM_REQ "TermsAndConditionsConfirm" |
Definition at line 69 of file CAFirstMix.hpp.
Referenced by CAFirstMix::CAFirstMix().
| #define TNC_REQ_TRANSLATION "Translation" |
Definition at line 62 of file CAFirstMix.hpp.
Referenced by CAFirstMix::handleTermsAndConditionsLogin().
| #define TNC_RESOURCE_CUSTOMIZED_SECT "CustomizedSections" |
Definition at line 66 of file CAFirstMix.hpp.
Referenced by CAFirstMix::handleTermsAndConditionsLogin().
| #define TNC_RESOURCE_TEMPLATE "Template" |
Definition at line 64 of file CAFirstMix.hpp.
Referenced by CAFirstMix::handleTermsAndConditionsLogin().
| #define TNC_RESOURCES "Resources" |
Definition at line 63 of file CAFirstMix.hpp.
Referenced by CAFirstMix::handleTermsAndConditionsLogin().
| #define TNC_RESPONSE "TermsAndConditionsResponse" |
Definition at line 60 of file CAFirstMix.hpp.
Referenced by CAFirstMix::handleTermsAndConditionsLogin().
| #define TNC_RESPONSE_INVALID_REQUEST "InvalidTermsAndConditionsRequest" |
Definition at line 68 of file CAFirstMix.hpp.
Referenced by CAFirstMix::handleTermsAndConditionsLogin().
| #define TNC_SINTERRUPT "TermsAndConditionsInterrupt" |
Definition at line 61 of file CAFirstMix.hpp.
Referenced by CAFirstMix::CAFirstMix().
| #define TNC_SREQUEST "TermsAndConditionsRequest" |
Definition at line 59 of file CAFirstMix.hpp.
Referenced by CAFirstMix::CAFirstMix().
| #define TNC_TEMPLATE_ROOT_ELEMENT "TermsAndConditionsTemplate" |
Definition at line 65 of file CAFirstMix.hpp.
Referenced by CAFirstMix::handleTermsAndConditionsExtension().
| THREAD_RETURN fm_loopAcceptUsers | ( | void * | ) |
Definition at line 1212 of file CAFirstMix.cpp.
References CASocket::accept(), CASocketGroup::add(), CAThreadPool::addRequest(), BEGIN_STACK, CATempIPBlockList::checkIP(), CATempIPBlockList::count(), CASocket::countOpenSockets(), CAThreadPool::countRequests(), CAFirstMix::decNewConnections(), E_SOCKETCLOSED, E_SUCCESS, E_TIMEDOUT, E_UNKNOWN, FINISH_STACK, fm_loopDoUserLogin(), GET_NET_ERROR, CAMuxSocket::getCASocket(), CACmdLnOptions::getMaxNrOfUsers(), CAFirstMix::getNrOfUsers(), CALibProxytest::getOptions(), CASocket::getPeerIP(), CAFirstMix::incNewConnections(), INIT_STACK, CAIPList::insertIP(), CATempIPBlockList::insertIP(), isAllowedToPassRestrictions(), CASocketGroup::isSignaled(), CAFirstMix::m_arMixParameters, CAFirstMix::m_arrSocketsIn, CAFirstMix::m_bRestart, CAFirstMix::m_newConnections, CAFirstMix::m_nSocketsIn, CAFirstMix::m_pIPBlockList, CAFirstMix::m_pIPList, CAFirstMix::m_pthreadsLogin, t_mix_parameters::m_u32ReplayOffset, CAFirstMix::MAX_CONCURRENT_NEW_CONNECTIONS, msSleep(), T_UserLoginData::peerIP, T_UserLoginData::pMix, T_UserLoginData::pNewUser, CAMsg::printMsg(), CASocketGroup::remove(), CAIPList::removeIP(), CASocketGroup::select(), sSleep(), and THREAD_RETURN_SUCCESS.
{
INIT_STACK;
BEGIN_STACK("CAFirstMix::fm_loopAcceptUsers");
CAFirstMix* pFirstMix=(CAFirstMix*)param;
CASocket** socketsIn=pFirstMix->m_arrSocketsIn;
CAIPList* pIPList=pFirstMix->m_pIPList;
CATempIPBlockList* pIPBlockList = pFirstMix->m_pIPBlockList;
CAThreadPool* pthreadsLogin=pFirstMix->m_pthreadsLogin;
UINT32 nSocketsIn=pFirstMix->m_nSocketsIn;
CASocketGroup* psocketgroupAccept=new CASocketGroup(false);
CAMuxSocket* pNewMuxSocket;
UINT8* peerIP=new UINT8[4];
UINT32 i=0;
SINT32 countRead;
SINT32 ret;
SINT32 retPeerIP = E_SUCCESS;
pFirstMix->m_newConnections = 0;
// kick out users that already have connected
for(i=0;i<nSocketsIn;i++)
{
while (socketsIn[i]->close() != E_SUCCESS)
{
sSleep(1);
}
}
if (CALibProxytest::getOptions()->createSockets(false,pFirstMix-> m_arrSocketsIn, pFirstMix->m_nSocketsIn) != E_SUCCESS)
{
goto END_THREAD;
}
for(i=0;i<nSocketsIn;i++)
{
psocketgroupAccept->add(*socketsIn[i]);
}
#ifdef REPLAY_DETECTION //before we can start to accept users we have to ensure that we received the replay timestamps form the over mixes
CAMsg::printMsg(LOG_DEBUG,"Waiting for Replay Timestamp from next mixes\n");
i=0;
while(!pFirstMix->m_bRestart && i < pFirstMix->m_u32MixCount-1)
{
if(pFirstMix->m_arMixParameters[i].m_u32ReplayOffset==0)//not set yet
{
msSleep(100);//wait a little bit and try again
continue;
}
i++;
}
CAMsg::printMsg(LOG_DEBUG,"All Replay Timestamp received\n");
#endif
while(!pFirstMix->m_bRestart)
{
if (pIPBlockList->count()>40)
{
CAMsg::printMsg(LOG_DEBUG,"UserAcceptLoop: login timeout list counts %d. We have %d users, %d open sockets and %d new connections. Restarting server sockets...\n",pIPBlockList->count(), pFirstMix->getNrOfUsers() ,CASocket::countOpenSockets(), pFirstMix->m_newConnections);
for(i=0;i<nSocketsIn;i++)
{
psocketgroupAccept->remove(*socketsIn[i]);
while (socketsIn[i]->close() != E_SUCCESS)
{
sSleep(1);
}
}
if (CALibProxytest::getOptions()->createSockets(false,pFirstMix-> m_arrSocketsIn, pFirstMix->m_nSocketsIn) != E_SUCCESS)
{
// could not listen
goto END_THREAD;
}
for(i=0;i<nSocketsIn;i++)
{
psocketgroupAccept->add(*socketsIn[i]);
}
sSleep(1);
}
countRead=psocketgroupAccept->select(10000);
if(countRead<0)
{ //check for Error - are we restarting ?
if(pFirstMix->m_bRestart ||countRead!=E_TIMEDOUT)
goto END_THREAD;
}
i=0;
#ifdef _DEBUG
CAMsg::printMsg(LOG_DEBUG,"UserAcceptLoop: countRead=%i\n",countRead);
#endif
while(countRead>0&&i<nSocketsIn)
{
if(psocketgroupAccept->isSignaled(*socketsIn[i]))
{
countRead--;
#ifdef _DEBUG
CAMsg::printMsg(LOG_DEBUG,"New direct Connection from Client!\n");
#endif
pNewMuxSocket=new CAMuxSocket;
ret=socketsIn[i]->accept(*(pNewMuxSocket->getCASocket()));
pFirstMix->incNewConnections();
if(ret!=E_SUCCESS)
{
// may return E_SOCKETCLOSED or E_SOCKET_LIMIT
CAMsg::printMsg(LOG_ERR,"Accept Error %u - direct Connection from Client!\n",GET_NET_ERROR);
}
else if( (CALibProxytest::getOptions()->getMaxNrOfUsers() > 0 &&
pFirstMix->getNrOfUsers() >= CALibProxytest::getOptions()->getMaxNrOfUsers())
&& (isAllowedToPassRestrictions(pNewMuxSocket->getCASocket()) != E_SUCCESS)
)
{
CAMsg::printMsg(LOG_DEBUG,"CAFirstMix User control: Too many users (Maximum:%d)! Rejecting user...\n", pFirstMix->getNrOfUsers(),CALibProxytest::getOptions()->getMaxNrOfUsers());
ret = E_UNKNOWN;
}
else if ((pFirstMix->m_newConnections > CAFirstMix::MAX_CONCURRENT_NEW_CONNECTIONS)
&& (isAllowedToPassRestrictions(pNewMuxSocket->getCASocket()) != E_SUCCESS)
)
{
/* This should protect the mix from flooding attacks
* No more than MAX_CONCURRENT_NEW_CONNECTIONS are allowed.
*/
#ifdef _DEBUG
CAMsg::printMsg(LOG_DEBUG,"CAFirstMix Flooding protection: Too many concurrent new connections (Maximum:%d)! Rejecting user...\n", CAFirstMix::MAX_CONCURRENT_NEW_CONNECTIONS);
#endif
ret = E_UNKNOWN;
}
//#ifndef PAYMENT
else if ((ret = pNewMuxSocket->getCASocket()->getPeerIP(peerIP)) != E_SUCCESS ||
(retPeerIP = pIPList->insertIP(peerIP)) < 0)
// || (pIPBlockList->checkIP(peerIP) == E_UNKNOWN && isAllowedToPassRestrictions(pNewMuxSocket->getCASocket()) != E_SUCCESS))
{
if (ret != E_SUCCESS)
{
CAMsg::printMsg(LOG_DEBUG,"Could not insert IP address as IP could not be retrieved! We have %d login threads currently running.\n", pthreadsLogin->countRequests());
}
else if (retPeerIP < 0)
{
CAMsg::printMsg(LOG_DEBUG,"CAFirstMix Flooding protection: Could not insert IP address! We have %d login threads currently running.\n", pthreadsLogin->countRequests());
pIPBlockList->insertIP(peerIP);
}
else if (pIPBlockList->checkIP(peerIP) == E_UNKNOWN)
{
CAMsg::printMsg(LOG_DEBUG, "Client IP address %u.%u.x.x is temporarily blocked! User login denied. We have %d open sockets and %d new connections.\n", peerIP[0],peerIP[1], CASocket::countOpenSockets(), pFirstMix->m_newConnections);
pIPList->removeIP(peerIP);
}
ret = E_UNKNOWN;
}
//#endif
else
{
t_UserLoginData* d=new t_UserLoginData;
d->pNewUser=pNewMuxSocket;
d->pMix=pFirstMix;
memcpy(d->peerIP,peerIP,4);
#ifdef DEBUG
CAMsg::printMsg(LOG_DEBUG,"%d concurrent client connections.\n", pFirstMix->m_newConnections);
#endif
if(pthreadsLogin->addRequest(fm_loopDoUserLogin,d)!=E_SUCCESS)
{
CAMsg::printMsg(LOG_ERR,"Could not add an login request to the login thread pool!\n");
ret=E_UNKNOWN;
}
}
if (ret != E_SUCCESS)
{
delete pNewMuxSocket;
pNewMuxSocket = NULL;
pFirstMix->decNewConnections();
if(ret==E_SOCKETCLOSED&&pFirstMix->m_bRestart) //Hm, should we restart ??
{
goto END_THREAD;
}
else //if(ret==E_SOCKET_LIMIT) // Hm no free sockets - wait some time to hope to get a free one...
{
msSleep(400);
}
}
}
i++;
}
}
END_THREAD:
FINISH_STACK("CAFirstMix::fm_loopAcceptUsers");
delete[] peerIP;
peerIP = NULL;
delete psocketgroupAccept;
psocketgroupAccept = NULL;
CAMsg::printMsg(LOG_DEBUG,"Exiting Thread AcceptUser\n");
THREAD_RETURN_SUCCESS;
}
| THREAD_RETURN fm_loopDoUserLogin | ( | void * | param | ) |
Definition at line 1462 of file CAFirstMix.cpp.
References BEGIN_STACK, CAFirstMix::decNewConnections(), CAFirstMix::doUserLogin(), FINISH_STACK, INIT_STACK, T_UserLoginData::peerIP, T_UserLoginData::pMix, T_UserLoginData::pNewUser, SAVE_STACK, and THREAD_RETURN_SUCCESS.
Referenced by fm_loopAcceptUsers().
{
INIT_STACK;
BEGIN_STACK("CAFirstMix::fm_loopDoUserLogin");
#ifdef COUNTRY_STATS
my_thread_init();
#endif
t_UserLoginData* d=(t_UserLoginData*)param;
d->pMix->doUserLogin(d->pNewUser,d->peerIP);
SAVE_STACK("CAFirstMix::fm_loopDoUserLogin", "after user login");
d->pMix->decNewConnections();
delete d;
d = NULL;
#ifdef COUNTRY_STATS
my_thread_end();
#endif
FINISH_STACK("CAFirstMix::fm_loopDoUserLogin");
THREAD_RETURN_SUCCESS;
}
| THREAD_RETURN fm_loopLog | ( | void * | ) |
Definition at line 1407 of file CAFirstMix.cpp.
References logMemoryUsage(), CAFirstMix::m_bRunLog, sSleep(), and THREAD_RETURN_SUCCESS.
{
CAFirstMix* pFirstMix=(CAFirstMix*)param;
pFirstMix->m_bRunLog=true;
UINT32 countLog=0;
while(pFirstMix->m_bRunLog)
{
if(countLog==0)
{
logMemoryUsage();
countLog=10;
}
sSleep(30);
countLog--;
}
THREAD_RETURN_SUCCESS;
}
| THREAD_RETURN fm_loopReadFromMix | ( | void * | ) |
Definition at line 1053 of file CAFirstMix.cpp.
References CASocketGroup::add(), CAQueue::add(), BEGIN_STACK, t_MixPacket::channel, CHANNEL_DUMMY, t_MixPacket::data, DATA_SIZE, diff64(), DUMMY_CHANNEL, E_TIMEDOUT, ev_net_nextConnectionClosed, FINISH_STACK, t_MixPacket::flags, getcurrentTimeMicros(), getcurrentTimeMillis(), getRandom(), CAQueue::getSize(), INIT_STACK, CAFirstMix::m_bRestart, CAFirstMix::m_pMuxOut, CAMix::m_pMuxOutControlChannelDispatcher, CAFirstMix::m_pQueueReadFromMix, CAMix::m_u32KeepAliveRecvInterval, MAX_READ_FROM_NEXT_MIX_QUEUE_SIZE, MIX_POOL_TIMEOUT, MIXPACKET_SIZE, MONITORING_FIRE_NET_EVENT, msSleep(), t_queue_entry::packet, CAPool::pool(), CAMsg::printMsg(), CAControlChannelDispatcher::proccessMixPacket(), CAMuxSocket::receive(), CASocketGroup::select(), setZero64(), and THREAD_RETURN_SUCCESS.
{
INIT_STACK;
BEGIN_STACK("CAFirstMix::fm_loopReadFromMix");
CAFirstMix* pFirstMix=(CAFirstMix*)pParam;
CAMuxSocket* pMuxSocket=pFirstMix->m_pMuxOut;
CAQueue* pQueue=pFirstMix->m_pQueueReadFromMix;
tQueueEntry* pQueueEntry=new tQueueEntry;
MIXPACKET* pMixPacket=&pQueueEntry->packet;
CASingleSocketGroup* pSocketGroup=new CASingleSocketGroup(false);
pSocketGroup->add(*pMuxSocket);
#ifdef USE_POOL
CAPool* pPool=new CAPool(MIX_POOL_SIZE);
#endif
UINT64 keepaliveNow,keepaliveLast;
UINT32 u32KeepAliveRecvInterval=pFirstMix->m_u32KeepAliveRecvInterval;
getcurrentTimeMillis(keepaliveLast);
CAControlChannelDispatcher* pControlChannelDispatcher=pFirstMix->m_pMuxOutControlChannelDispatcher;
while(!pFirstMix->m_bRestart)
{
if(pQueue->getSize()>MAX_READ_FROM_NEXT_MIX_QUEUE_SIZE)
{
#ifdef DEBUG
CAMsg::printMsg(LOG_DEBUG,"CAFirstMix::Queue is full!\n");
#endif
msSleep(200);
getcurrentTimeMillis(keepaliveLast);
continue;
}
//check if the connection is broken because we did not received a Keep_alive-Message
getcurrentTimeMillis(keepaliveNow);
UINT32 keepaliveDiff=diff64(keepaliveNow,keepaliveLast);
if(keepaliveDiff>u32KeepAliveRecvInterval)
{
CAMsg::printMsg(LOG_DEBUG,"CAFirstMix::loopReadFromMix() -- restart because of KeepAlive-Traffic Timeout!\n");
pFirstMix->m_bRestart=true;
MONITORING_FIRE_NET_EVENT(ev_net_nextConnectionClosed);
break;
}
SINT32 ret=pSocketGroup->select(MIX_POOL_TIMEOUT);
if(ret==E_TIMEDOUT)
{
#ifdef USE_POOL
pMixPacket->flags=CHANNEL_DUMMY;
pMixPacket->channel=DUMMY_CHANNEL;
getRandom(pMixPacket->data,DATA_SIZE);
#ifdef LOG_PACKET_TIMES
setZero64(pQueueEntry->timestamp_proccessing_start);
#endif
#else
continue;
#endif
}
else if(ret>0)
{
ret=pMuxSocket->receive(pMixPacket);
#ifdef LOG_PACKET_TIMES
getcurrentTimeMicros(pQueueEntry->timestamp_proccessing_start);
#endif
if(ret!=MIXPACKET_SIZE)
{
pFirstMix->m_bRestart=true;
CAMsg::printMsg(LOG_ERR,"CAFirstMix::lm_loopReadFromMix - received returned: %i -- restarting!\n",ret);
MONITORING_FIRE_NET_EVENT(ev_net_nextConnectionClosed);
break;
}
}
if(pMixPacket->channel>0&&pMixPacket->channel<256)
{
#ifdef DEBUG
CAMsg::printMsg(LOG_DEBUG,"CAFirstMix - sent a packet from the next mix to the ControlChanelDispatcher... \n");
#endif
pControlChannelDispatcher->proccessMixPacket(pMixPacket);
getcurrentTimeMillis(keepaliveLast);
continue;
}
#ifdef USE_POOL
#ifdef LOG_PACKET_TIMES
getcurrentTimeMicros(pQueueEntry->pool_timestamp_in);
#endif
pPool->pool((tPoolEntry*)pQueueEntry);
#ifdef LOG_PACKET_TIMES
getcurrentTimeMicros(pQueueEntry->pool_timestamp_out);
#endif
#endif
pQueue->add(pQueueEntry, sizeof(tQueueEntry));
getcurrentTimeMillis(keepaliveLast);
}
delete pQueueEntry;
pQueueEntry = NULL;
delete pSocketGroup;
pSocketGroup = NULL;
#ifdef USE_POOL
delete pPool;
pPool = NULL;
#endif
FINISH_STACK("CAFirstMix::fm_loopReadFromMix");
THREAD_RETURN_SUCCESS;
}
| THREAD_RETURN fm_loopReadFromUsers | ( | void * | ) |
| THREAD_RETURN fm_loopSendToMix | ( | void * | param | ) |
How to end this thread: 0.
set bRestart=true; 1. Close connection to next mix 2. put some bytes (len>MIX_PACKET_SIZE) in the Mix-Output-Queue
Definition at line 939 of file CAFirstMix.cpp.
References BEGIN_STACK, t_MixPacket::channel, CHANNEL_DUMMY, CHANNEL_OPEN, t_MixPacket::data, DATA_SIZE, DUMMY_CHANNEL, E_SUCCESS, E_TIMEDOUT, FINISH_STACK, t_MixPacket::flags, getcurrentTimeMicros(), CAQueue::getOrWait(), getRandom(), INIT_STACK, isZero64(), len, CAFirstMix::m_bRestart, CAFirstMix::m_pMuxOut, CAMix::m_u32KeepAliveSendInterval, MIX_POOL_TIMEOUT, MIXPACKET_SIZE, t_queue_entry::packet, CAPool::pool(), CAMsg::printMsg(), CAMuxSocket::send(), setZero64(), and THREAD_RETURN_SUCCESS.
{
INIT_STACK;
BEGIN_STACK("CAFirstMix::fm_loopSendToMix");
CAFirstMix* pFirstMix=(CAFirstMix*)param;
CAQueue* pQueue=((CAFirstMix*)param)->m_pQueueSendToMix;
CAMuxSocket* pMuxSocket=pFirstMix->m_pMuxOut;
UINT32 len;
SINT32 ret;
/*#ifdef DATA_RETENTION_LOG
t_dataretentionLogEntry* pDataRetentionLogEntry=new t_dataretentionLogEntry;
#endif
*/
#ifndef USE_POOL
tQueueEntry* pQueueEntry=new tQueueEntry;
MIXPACKET* pMixPacket=&pQueueEntry->packet;
UINT32 u32KeepAliveSendInterval=pFirstMix->m_u32KeepAliveSendInterval;
while(!pFirstMix->m_bRestart)
{
len=sizeof(tQueueEntry);
ret=pQueue->getOrWait((UINT8*)pQueueEntry,&len,u32KeepAliveSendInterval);
if(ret==E_TIMEDOUT)
{//send a dummy as keep-alvie-traffic
pMixPacket->flags=CHANNEL_DUMMY;
pMixPacket->channel=DUMMY_CHANNEL;
getRandom(pMixPacket->data,DATA_SIZE);
}
else if(ret!=E_SUCCESS||len!=sizeof(tQueueEntry))
{
CAMsg::printMsg(LOG_ERR,"CAFirstMix::lm_loopSendToMix - Error in dequeueing MixPaket\n");
CAMsg::printMsg(LOG_ERR,"ret=%i len=%i\n",ret,len);
break;
}
if(pMuxSocket->send(pMixPacket)!=MIXPACKET_SIZE)
{
CAMsg::printMsg(LOG_ERR,"CAFirstMix::lm_loopSendToMix - Error in sending MixPaket\n");
break;
}
#if defined (LOG_PACKET_TIMES)
if(!isZero64(pQueueEntry->timestamp_proccessing_start))
{
getcurrentTimeMicros(pQueueEntry->timestamp_proccessing_end);
pFirstMix->m_pLogPacketStats->addToTimeingStats(*pQueueEntry,pMixPacket->flags,true);
}
#endif
#ifdef DATA_RETENTION_LOG
if((pQueueEntry->packet.flags&CHANNEL_OPEN)!=0)
{
pQueueEntry->dataRetentionLogEntry.t_out=htonl(time(NULL));
pFirstMix->m_pDataRetentionLog->log(&(pQueueEntry->dataRetentionLogEntry));
}
#endif
}
delete pQueueEntry;
pQueueEntry = NULL;
#else
CAPool* pPool=new CAPool(MIX_POOL_SIZE);
tPoolEntry* pPoolEntry=new tPoolEntry;
MIXPACKET* pMixPacket=&pPoolEntry->packet;
while(!pFirstMix->m_bRestart)
{
len=sizeof(tQueueEntry);
ret=pQueue->getOrWait((UINT8*)pPoolEntry,&len,MIX_POOL_TIMEOUT);
if(ret==E_TIMEDOUT)
{
pMixPacket->flags=CHANNEL_DUMMY;
pMixPacket->channel=DUMMY_CHANNEL;
getRandom(pMixPacket->data,DATA_SIZE);
#ifdef LOG_PACKET_TIMES
setZero64(pPoolEntry->timestamp_proccessing_start);
#endif
}
else if(ret!=E_SUCCESS||len!=sizeof(tQueueEntry))
break;
#ifdef LOG_PACKET_TIMES
getcurrentTimeMicros(pPoolEntry->pool_timestamp_in);
#endif
pPool->pool(pPoolEntry);
#ifdef LOG_PACKET_TIMES
getcurrentTimeMicros(pPoolEntry->pool_timestamp_out);
#endif
if(pMuxSocket->send(pMixPacket)!=MIXPACKET_SIZE)
break;
#ifdef LOG_PACKET_TIMES
if(!isZero64(pPoolEntry->timestamp_proccessing_start))
{
getcurrentTimeMicros(pPoolEntry->timestamp_proccessing_end);
pFirstMix->m_pLogPacketStats->addToTimeingStats(*pPoolEntry,pMixPacket->flags,true);
}
#endif
}
delete pPoolEntry;
pPoolEntry = NULL;
delete pPool;
pPool = NULL;
#endif
/*#ifdef DATA_RETENTION_LOG
delete pDataRetentionLogEntry;
#endif
*/
FINISH_STACK("CAFirstMix::fm_loopSendToMix");
CAMsg::printMsg(LOG_DEBUG,"Exiting Thread SendToMix\n");
THREAD_RETURN_SUCCESS;
}
| THREAD_RETURN iplist_loopDoLogCountries | ( | void * | param | ) |
Definition at line 2914 of file CAFirstMix.cpp.
References tUINT32withLock::getAndzero(), CACmdLnOptions::getCascadeName(), CALibProxytest::getOptions(), CAMutex::lock(), LOG_COUNTRIES_INTERVALL, CAFirstMix::m_bRunLogCountries, CAFirstMix::m_CountryStats, CAFirstMix::m_mysqlCon, CAFirstMix::m_PacketsPerCountryIN, CAFirstMix::m_PacketsPerCountryOUT, CAFirstMix::m_pmutexUser, mysqlEscapeTableName(), NR_OF_COUNTRIES, CAMsg::printMsg(), sSleep(), THREAD_RETURN_SUCCESS, and CAMutex::unlock().
{
mysql_thread_init();
CAMsg::printMsg(LOG_DEBUG,"Starting iplist_loopDoLogCountries\n");
CAFirstMix* pFirstMix=(CAFirstMix*)param;
UINT32 s=0;
UINT8 buff[255];
memset(buff,0,255);
CALibProxytest::getOptions()->getCascadeName(buff,255);
mysqlEscapeTableName(buff);
while(pFirstMix->m_bRunLogCountries)
{
if(s==LOG_COUNTRIES_INTERVALL)
{
UINT8 aktDate[255];
time_t aktTime=time(NULL);
strftime((char*)aktDate,255,"%Y%m%d%H%M%S",gmtime(&aktTime));
char query[1024];
sprintf(query,"INSERT into `stats_%s` (date,id,count,packets_in,packets_out) VALUES (\"%s\",\"%%u\",\"%%u\",\"%%u\",\"%%u\")",buff,aktDate);
pFirstMix->m_pmutexUser->lock();
for(UINT32 i=0;i<NR_OF_COUNTRIES+1;i++)
{
if(pFirstMix->m_CountryStats[i]>0)
{
char aktQuery[1024];
sprintf(aktQuery,query,i,pFirstMix->m_CountryStats[i],pFirstMix->m_PacketsPerCountryIN[i].getAndzero(),pFirstMix->m_PacketsPerCountryOUT[i].getAndzero());
SINT32 ret=mysql_query(pFirstMix->m_mysqlCon,aktQuery);
if(ret!=0)
{
CAMsg::printMsg(LOG_INFO,"CountryStats DB - failed to update CountryStats DB with new values - error %i\n",ret);
}
}
}
pFirstMix->m_pmutexUser->unlock();
s=0;
}
sSleep(10);
s++;
}
CAMsg::printMsg(LOG_DEBUG,"Exiting iplist_loopDoLogCountries\n");
mysql_thread_end();
THREAD_RETURN_SUCCESS;
}
1.7.6.1