/*##############################################################################
Copyright (C) 2011 HPCC Systems.
All rights reserved. This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see .
############################################################################## */
// LogThread.cpp: implementation of the CLogThread class.
//
//////////////////////////////////////////////////////////////////////
#include "LogThread.h"
#include "jmisc.hpp"
#include "soapbind.hpp"
#include "esploggingservice_esp.ipp"
#include "espcontext.hpp"
#define MaxLogQueueLength 500000
#define QueueSizeSignal 10000
static int DefaultThreadPoolSize = 50;
static int LogThreadWaitTime = 90;
#ifndef _WIN32
#include
#include
#endif
IClientLogThread * createLogClient(IPropertyTree *cfg, const char *process, const char *service,bool bFlatten)
{
return createLogClient2(cfg, process, service, "loggingserver", bFlatten);
}
IClientLogThread * createLogClient3(IPropertyTree *logcfg, const char *service, bool bFlatten)
{
if(!logcfg)
return NULL;
DBGLOG("Creating recovery logging client");
IClientLogThread* pLoggingThread = 0;
pLoggingThread = new CLogThread(logcfg,service, bFlatten, false);
pLoggingThread->start();
return pLoggingThread;
}
IClientLogThread * createLogClient2(IPropertyTree *cfg, const char *process, const char *service, const char* name, bool bFlatten, bool bModelRequest)
{
DBGLOG("Creating recovery logging client %s", name);
IClientLogThread* pLoggingThread = 0;
StringBuffer xpath,loggingServer;
xpath.appendf("Software/EspProcess[@name=\"%s\"]/EspService[@name=\"%s\"]/%s", process, service, name);
IPropertyTree* pServerInfo = cfg->queryPropTree(xpath.str());
if (pServerInfo == 0)
return 0;
pLoggingThread = new CLogThread(pServerInfo,service, bFlatten, bModelRequest);
pLoggingThread->start();
return pLoggingThread;
}
IClientLogThread * createModelLogClient(IPropertyTree *cfg, const char *process, const char *service, bool bFlatten )
{
return createLogClient2( cfg, process, service, "loggingserver", bFlatten, true );
}
//////////////////////////////////////////////////////////////////////
// Construction/Destruction
//////////////////////////////////////////////////////////////////////
CLogThread::CLogThread(IPropertyTree* pServerConfig , const char* Service, bool bFlatten, bool bModelRequest) : m_bRun(true),m_Logcount(0) ,
m_LogTreshhold(0), m_bModelRequest(bModelRequest), m_logResponse(false)
{
if(pServerConfig==NULL)
throw MakeStringException(500,"No Logging Configuration");
if(pServerConfig->hasProp("url")==false)
throw MakeStringException(500,"No Logging Server URL");
m_ServiceURL.appendf("%s",pServerConfig->queryProp("url"));
m_bFailSafeLogging = false;
if(pServerConfig->hasProp("failsafe")==true)
{
const char* failsafe = pServerConfig->queryProp("failsafe");
if(failsafe != 0 && strcmp(failsafe,"true") == 0)
m_bFailSafeLogging = true;
}
StringBuffer poolsizebuf;
if(pServerConfig->hasProp("MaxLoggingThreads"))
pServerConfig->getProp("MaxLoggingThreads", poolsizebuf);
if(poolsizebuf.length() > 0)
{
m_ThreadPoolSize = atoi(poolsizebuf.str());
}
else
{
// If thread pool size not even specified, use default size
m_ThreadPoolSize = DefaultThreadPoolSize;
}
pServerConfig->hasProp("MaxLogQueueLength")==true ? m_MaxLogQueueLength = pServerConfig->getPropInt("MaxLogQueueLength") : m_MaxLogQueueLength = MaxLogQueueLength;
pServerConfig->hasProp("QueueSizeSignal")==true ? m_SignalGrowingQueueAt = pServerConfig->getPropInt("QueueSizeSignal") : m_SignalGrowingQueueAt = QueueSizeSignal;
pServerConfig->hasProp("Throttle")==true ? m_bThrottle = pServerConfig->getPropBool("Throttle") : m_bThrottle = true;
m_logResponse = pServerConfig->getPropBool("LogResponseXml",false);
if(pServerConfig->hasProp("BurstWaitInterval"))
m_BurstWaitInterval = pServerConfig->getPropInt("BurstWaitInterval");
else
m_BurstWaitInterval = 0;
if(pServerConfig->hasProp("LinearWaitInterval"))
m_LinearWaitInterval = pServerConfig->getPropInt("LinearWaitInterval");
else
m_LinearWaitInterval = 0;
if(pServerConfig->hasProp("NiceLevel"))
m_NiceLevel = pServerConfig->getPropInt("NiceLevel");
else
m_NiceLevel = 0;
//by default flatten any trees passes into the service
m_bFlattenTree = bFlatten;
m_LogSendDelta = 0;
m_LogSend = 0;
if(m_ThreadPoolSize > 0)
m_pLoggingService.setown(new CPooledClientWsLogService(m_ThreadPoolSize));
else
m_pLoggingService.setown(createWsLogServiceClient());
m_pLoggingService->addServiceUrl(m_ServiceURL.str());
//temporary fix for authentication on logging service....
const char* loguser = pServerConfig->queryProp("LoggingUser");
const char* logpasswd = pServerConfig->queryProp("LoggingPassword");
m_pLoggingService->setUsernameToken((loguser && *loguser)?loguser:"loggingclient", (logpasswd&&*logpasswd)?logpasswd:"loggingpassword","");
if(m_bFailSafeLogging == true)
{
if(pServerConfig->hasProp("LogsDir"))
m_LogFailSafe.setown(createFailsafelogger(Service, pServerConfig->queryProp("LogsDir")));
else
m_LogFailSafe.setown(createFailsafelogger(Service));
}
bSeedAvailable = false;
bMadeSeedRequest = false;
id_counter=0;
time_t tNow;
time(&tNow);
localtime_r(&tNow, &m_startTime);
}
CLogThread::~CLogThread()
{
DBGLOG("CLogThread::~CLogThread()");
}
bool CLogThread::FetchTransactionSeed(StringBuffer& TransactionSeedID)
{
try{
Owned pSeedReq = m_pLoggingService->createTransactionSeedRequest();
Owned pSeedResp = m_pLoggingService->TransactionSeed(pSeedReq.get());
//if we get to here then we have made the request but no seeds are available
bMadeSeedRequest = true;
if(pSeedResp->getSeedAvailable()==true)
{
TransactionSeedID.appendf("%s",pSeedResp->getSeedId());
return true;
}
else
return false;
}
catch(IException* ex)
{
StringBuffer errorStr;
ex->errorMessage(errorStr);
ERRLOG("Exception caught generating transaction seed (%d) %s",ex->errorCode(),errorStr.str());
ex->Release();
}
catch(...)
{
ERRLOG("Unknown exception caught generating transaction seed");
}
return false;
}
bool CLogThread::GenerateTransactionSeed(StringBuffer& UniqueID, char backendType)
{
CriticalBlock b(seed_gen_crit);
if(bSeedAvailable == false && bMadeSeedRequest==true)
{
//we have checked for a seed but none are available
return false;
}
else if (bSeedAvailable == false)
{
//we have not checked for a seed or we failed when making the request..
bSeedAvailable = FetchTransactionSeed(m_InitialTransactionSeedID);
if(bSeedAvailable==false)
return false;
DBGLOG("Fetched Transaction Seed %s\n", m_InitialTransactionSeedID.str());
}
UniqueID.appendf("%s%c%u",m_InitialTransactionSeedID.str(),backendType,++id_counter);
if(UniqueID.length() > 16)
{
//Sybase limits transaction_id to 16 bytes. If longer, need to get another seed ID and reset id_counter.
m_InitialTransactionSeedID.clear();
bSeedAvailable = FetchTransactionSeed(m_InitialTransactionSeedID);
if(bSeedAvailable==false)
return false;
DBGLOG("TransactionID length exceeded 16 bytes. So re-fetched a Transaction Seed %s.\n", m_InitialTransactionSeedID.str());
id_counter = 0;
UniqueID.clear().appendf("%s-%u",m_InitialTransactionSeedID.str(),++id_counter);
}
return true;
}
IClientLogInfo& CLogThread::addLogInfoElement(IArrayOf& LogArray)
{
IClientLogInfo* logInfo = new CLogInfo("");
LogArray.append(*(dynamic_cast(logInfo)));
return *logInfo;
}
IClientModelLogInformation& CLogThread::getModelLogInformation()
{
return *(new CModelLogInformation(""));
}
IClientModelLogInfo& CLogThread::getModelLogInfo(IClientModelLogInformation* pModelLogInformation)
{
IClientModelLogInfo* pMLogInfo = new CModelLogInfo("");
if(pModelLogInformation!=0)
{
pModelLogInformation->getModels().append(*(dynamic_cast(pMLogInfo)));
}
return *pMLogInfo;
}
IClientAttributeGroupLogInfo& CLogThread::getAttributeGroupLogInfo(IClientModelLogInformation* pModelLogInformation)
{
IClientAttributeGroupLogInfo* pAttribGrpLogInfo = new CAttributeGroupLogInfo("");
if(pModelLogInformation!=0)
{
pModelLogInformation->getAttributeGroups().append(*(dynamic_cast(pAttribGrpLogInfo)));
}
return *pAttribGrpLogInfo;
}
IClientAttributeLogInfo& CLogThread::getAttributeLogInfo(IClientAttributeGroupLogInfo* pAttributeGroupLogInfo)
{
IClientAttributeLogInfo* pAttribLogInfo = new CAttributeLogInfo("");
if(pAttributeGroupLogInfo!=0)
{
pAttributeGroupLogInfo->getAttributes().append(*(dynamic_cast(pAttribLogInfo)));
}
return *pAttribLogInfo;
}
IClientScoreLogInfo& CLogThread::getScoreLogInfo(IClientModelLogInfo* pModelLogInfo)
{
IClientScoreLogInfo* pScoreLogInfo = new CScoreLogInfo("");
if(pModelLogInfo!=0)
{
pModelLogInfo->getScores().append(*(dynamic_cast(pScoreLogInfo)));
}
return *pScoreLogInfo;
}
IClientReasonCodeLogInfo& CLogThread::getReasonCodeLogInfo(IClientScoreLogInfo* pScoreLogInfo)
{
IClientReasonCodeLogInfo* pReasonCodeLogInfo = (IClientReasonCodeLogInfo*) new CReasonCodeLogInfo("");
if(pScoreLogInfo!=0)
{
pScoreLogInfo->getReasonCodes().append(*(dynamic_cast(pReasonCodeLogInfo)));
}
return *pReasonCodeLogInfo;
}
bool CLogThread::IsModelLogging()
{
return m_bModelRequest;
}
bool CLogThread::queueLog(IEspContext & context,const char* serviceName,int RecordsReturned, IArrayOf& LogArray, IInterface& logInfo)
{
StringBuffer dataStr;
serializeRequest(context,logInfo,dataStr);
Owned pLogTreeInfo = createPTreeFromXMLString(dataStr.str(), ipt_none, xr_none);
return queueLog(context,serviceName,RecordsReturned,LogArray, *pLogTreeInfo);
}
bool CLogThread::queueLog(IEspContext & context,const char* serviceName,int RecordsReturned,bool bBlind,bool bEncrypt, IArrayOf& LogArray, IInterface& logInfo, IConstModelLogInformation* pModelLogInfo)
{
LOG_INFO _LogStruct(serviceName,RecordsReturned,bBlind);
_LogStruct.Encrypt = bEncrypt;
serializeRequest(context,logInfo,_LogStruct.RequestStr);
Owned pLogTreeInfo = createPTreeFromXMLString(_LogStruct.RequestStr.str(), ipt_none, xr_none);
addLogInfo(LogArray,*pLogTreeInfo.get());
return queueLog(context,_LogStruct, LogArray,pModelLogInfo);
}
bool CLogThread::queueLog(IEspContext & context,const char* serviceName,int RecordsReturned, IInterface& logInfo)
{
StringBuffer dataStr;
serializeRequest(context,logInfo,dataStr);
return queueLog(context,serviceName,RecordsReturned,dataStr);
}
StringBuffer& CLogThread::serializeRequest(IEspContext& context,IInterface& logInfo, StringBuffer& returnStr)
{
IRpcSerializable* rpcreq = dynamic_cast(&logInfo);
if(rpcreq==NULL)
throw MakeStringException(500,"Issue serializing log information");
// We want to serialize anything here for logging purpose: e.g., internal user fields: CompanyId
// rpcreq->serialize(&context,returnStr, "LogData");
// rpcreq->serialize(NULL,returnStr, "LogData");
//BUG#26047
//logInfo function parameter is instance of the incoming request object of the service.
//instance objects of context and request are dependent upon the protocol binding.
//Request parameters are relevent for HTTP protocol but are not relevent for protocolX.
//Since request parameters pointer is not initilized in processing protocolX request it remains NULL
//and causing this crash.
IProperties* params = context.queryRequestParameters();
if(params!=NULL)
{
bool notInternal = !params->hasProp("internal");
if (notInternal)
params->setProp("internal","1");
rpcreq->serialize(&context,returnStr, "LogData");
if (notInternal)
params->removeProp("internal");
}else{
rpcreq->serialize(NULL,returnStr, "LogData");
}
return returnStr;
}
bool CLogThread::queueLog(const char *user, const char *realm, const char *peer, const char* serviceName,const char* GUID, int RecordsReturned, IPropertyTree& logInfo)
{
IArrayOf LogArray;
addLogInfo(LogArray, logInfo);
LOG_INFO _LogInfo(serviceName,GUID, RecordsReturned,false);
return queueLog(user, realm, peer, _LogInfo, LogArray);
}
bool CLogThread::queueLog(const char *user, const char *realm, const char *peer, LOG_INFO& _LogStruct, IArrayOf& LogArray)
{
if(!m_pLoggingService.get())
return false;
IClientLOGServiceUpdateRequest* tptrRequest;
if( m_bModelRequest )
{
tptrRequest = dynamic_cast(m_pLoggingService->createUpdateModelLogServiceRequest());
} else {
tptrRequest = m_pLoggingService->createUpdateLogServiceRequest();
}
Owned pRequest( tptrRequest );
if (pRequest == 0)
return false;
pRequest->setUserName(user);
pRequest->setDomainName(realm);
pRequest->setRecordCount(_LogStruct.recordsReturned);
pRequest->setServiceName(_LogStruct.serviceName);
pRequest->setIP(peer);
//This appends the tree structure into the correct format...
pRequest->setLogInformation(LogArray);
return queueLog(pRequest,_LogStruct);
}
bool CLogThread::queueLog(IEspContext & context,const char* serviceName,int RecordsReturned, IPropertyTree& logInfo)
{
LOG_INFO _LogInfo(serviceName,RecordsReturned,false);
IArrayOf LogArray;
return queueLog(context,_LogInfo, LogArray, logInfo);
}
bool CLogThread::queueLog(IEspContext & context,const char* serviceName,const char* GUID, int RecordsReturned, IPropertyTree& logInfo)
{
LOG_INFO _LogInfo(serviceName,GUID, RecordsReturned,false);
IArrayOf LogArray;
return queueLog(context,_LogInfo, LogArray, logInfo);
}
bool CLogThread::queueLog(IEspContext & context,const char* serviceName,int RecordsReturned,IArrayOf& LogArray, IPropertyTree& logInfo)
{
LOG_INFO _LogStruct(serviceName,RecordsReturned,0);
return queueLog(context,_LogStruct,LogArray, logInfo);
}
bool CLogThread::queueLog(IEspContext & context,const char* serviceName,int RecordsReturned, IArrayOf& LogArray)
{
LOG_INFO _LogStruct(serviceName,RecordsReturned,false);
return queueLog(context,_LogStruct, LogArray);
}
bool CLogThread::queueLog(IEspContext & context,LOG_INFO& _LogStruct, IArrayOf& LogArray, IPropertyTree& logInfo)
{
//This appends the tree structure into the correct format...
addLogInfo(LogArray,logInfo);
return queueLog(context,_LogStruct, LogArray); ;
}
bool CLogThread::queueLog(IEspContext & context,LOG_INFO& _LogStruct, IArrayOf& LogArray, IConstModelLogInformation* pModelLogInfo)
{
if(!m_pLoggingService.get())
return false;
//Owned pRequest = m_pLoggingService->createUpdateLogServiceRequest();
IClientLOGServiceUpdateRequest* tptrRequest;
if( m_bModelRequest )
{
IClientLOGServiceUpdateModelRequest* pUpdateModelRequest = m_pLoggingService->createUpdateModelLogServiceRequest();
if(pModelLogInfo!=0)
{
pUpdateModelRequest->setModelLogInformation(*pModelLogInfo);
}
tptrRequest = dynamic_cast(pUpdateModelRequest);
} else {
tptrRequest = m_pLoggingService->createUpdateLogServiceRequest();
}
Owned pRequest( tptrRequest );
if (pRequest == 0)
return false;
StringBuffer UserID,realm,peer;
pRequest->setUserName(context.getUserID(UserID).str());
pRequest->setDomainName(context.getRealm(realm).str());
pRequest->setRecordCount(_LogStruct.recordsReturned);
pRequest->setServiceName(_LogStruct.serviceName);
pRequest->setIP(context.getPeer(peer).str());
bool bBlind = _LogStruct.Blind;
bool bEncrypt = _LogStruct.Encrypt;
ISecPropertyList* properties = context.querySecuritySettings();
if( properties !=NULL)
{
if(bBlind==false)
{
if(properties->findProperty("blind")!=NULL)
strncmp(properties->findProperty("blind")->getValue(),"1",1) == 0 ? bBlind=true : bBlind=false;
}
if(bEncrypt==false && properties->findProperty("encryptedlogging")!=NULL)
{
if(strncmp(properties->findProperty("encryptedlogging")->getValue(),"1",1) == 0)
bEncrypt=true;
}
}
if(bEncrypt==true)
{
//need to do encrpyted logging
pRequest->setEncryptedLogging(true);
pRequest->setRawLogInformation(_LogStruct.RequestStr.str());
}
pRequest->setBlindLogging(bBlind);
pRequest->setLogInformation(LogArray);
return queueLog(pRequest,_LogStruct);
}
bool CLogThread::queueLog(IEspContext & context,const char* serviceName,int RecordsReturned, StringBuffer& logInfo)
{
return queueLog(context,serviceName,RecordsReturned,logInfo.str());
}
bool CLogThread::queueLog(IEspContext & context,const char* serviceName,int RecordsReturned,IArrayOf& LogArray, StringBuffer& logInfo)
{
Owned pLogTreeInfo = createPTreeFromXMLString(logInfo, ipt_none, xr_none);
return queueLog(context,serviceName,RecordsReturned,LogArray, *pLogTreeInfo.get());
}
bool CLogThread::queueLog(IEspContext & context,const char* serviceName, const char* request, const char* response)
{
IProperties* pProperties = context.queryRequestParameters();
StringBuffer UserID, UserRealm, UserReference, peer;
if(pProperties != NULL && pProperties->hasProp("userid_"))
UserID.appendf("%s",pProperties->queryProp("userid_"));
else
context.getUserID(UserID);
if(pProperties != NULL && pProperties->hasProp("fqdn_"))
UserRealm.appendf("%s",pProperties->queryProp("fqdn_"));
else
context.getRealm(UserRealm);
Owned pLogTreeInfo = createPTreeFromXMLString(request, ipt_none, xr_none);
IArrayOf LogArray;
addLogInfo(LogArray, *pLogTreeInfo.get());
if(pProperties != NULL && pProperties->hasProp("referencecode_"))
{
//lets manually add the reference number....
IClientLogInfo& LogInfoTransaction = addLogInfoElement(LogArray);
LogInfoTransaction.setName("referencenumber");
LogInfoTransaction.setValue(pProperties->queryProp("referencecode_"));
}
LOG_INFO _LogStruct(serviceName,-1,false);
return queueLog(UserID.str(), UserRealm.str() , context.getPeer(peer).str(),_LogStruct, LogArray );
}
bool CLogThread::queueLog(IEspContext & context,const char* serviceName,int RecordsReturned, const char* logInfo)
{
try {
Owned pLogTreeInfo = createPTreeFromXMLString(logInfo, ipt_none, xr_none);
return queueLog(context,serviceName,RecordsReturned,*pLogTreeInfo);
} catch (IException* e) {
StringBuffer msg;
e->errorMessage(msg);
DBGLOG("Exception caught in CLogThread::queueLog: %s", msg.str());
return false;
} catch (...) {
DBGLOG("Unknown exception caught in CLogThread::queueLog()");
return false;
}
}
bool CLogThread::queueLog(IClientLOGServiceUpdateRequest * pRequest,LOG_INFO& _LogStruct)
{
int QueueSize = m_pServiceLog.ordinality();
//YMA: this is bad. Transaction records will be lost because of this exception. Better to let the queue keep growing so that transactions will be
// written to the tank file. Plus, most deployments configure the MaxLogQueueLength to be 10000, which is way too small.
//if(QueueSize > m_MaxLogQueueLength)
// throw MakeStringException(503,"Service Unavailable (Audit)");
if(QueueSize > m_MaxLogQueueLength)
ERRLOG("SOS!!! Logging queue size %d execeeded MaxLogQueueLength %d, check the logging server at %s!!!",QueueSize, m_MaxLogQueueLength, m_ServiceURL.str());
if(QueueSize!=0 && QueueSize % m_SignalGrowingQueueAt == 0)
ERRLOG("Logging Queue at %d records. Check the logging server at %s.",QueueSize, m_ServiceURL.str());
if(m_bFailSafeLogging == true && m_LogFailSafe.get())
{
//generate a GUID for the query. The cache it within the failsafe object
StringBuffer _GUID;
if(_LogStruct.GUID !=NULL && *_LogStruct.GUID!='\0')
{
_GUID.appendf("%s",_LogStruct.GUID.str());
}
else
{
m_LogFailSafe->GenerateGUID(_GUID,_LogStruct.GUIDSeed);
}
pRequest->setGUID(_GUID.str());
m_LogFailSafe->Add(_GUID,*pRequest);
}
m_pServiceLog.enqueue(LINK(pRequest));
if (++m_Logcount >= m_LogTreshhold && m_LogSendDelta == 0)
{
m_sem.signal();
m_Logcount = 0;
}
return true;
}
void CLogThread::addLogInfo(IArrayOf& valueArray,IPropertyTree& logInfo)
{
StringBuffer dataStr,nameStr,valueStr;
Owned itr = logInfo.getElements("*");
itr->first();
while(itr->isValid())
{
IPropertyTree &node = itr->query();
const char* name = node.queryName();
if (getTreeFlattening()==true && node.hasChildren() == true)
{
if(IsArray(node)==true)
{
FlattenArray(valueArray,node,nameStr);
}
else
{
FlattenTree(valueArray,node,nameStr);
}
// logElement.setName(node.queryName());
// dataStr.clear();
/*toXML(&node,dataStr);
//DOM temporary work about for the lack of XML decoding in esp arrays
StringBuffer encodedData;
JBASE64_Encode(dataStr.str(), dataStr.length() , encodedData);
logElement.setData(encodedData.str());
*/
}
else if (getTreeFlattening()==false && node.hasChildren() == true)
{
IClientLogInfo& logElement = addLogInfoElement(valueArray);
logElement.setName(node.queryName());
dataStr.clear();
toXML(&node,dataStr);
//DOM temporary work about for the lack of XML decoding in esp arrays
StringBuffer encodedData;
JBASE64_Encode(dataStr.str(), dataStr.length() , encodedData);
logElement.setData(encodedData.str());
}
else if (node.queryProp("") != 0 && ( strcmp(node.queryProp(""),"0") != 0 ))
{
IClientLogInfo& logElement = addLogInfoElement(valueArray);
logElement.setName(node.queryName());
logElement.setValue(node.queryProp(""));
}
itr->next();
}
}
bool CLogThread::IsArray(IPropertyTree& tree)
{
// If the node have more than one children, and all have the same name,
// then it is an array.
StringBuffer name, temp;
Owned itr = tree.getElements("*");
int count = 0;
for (itr->first(); itr->isValid(); itr->next())
{
if (count==0)
itr->query().getName(name);
else
{
itr->query().getName(temp);
if (stricmp(name,temp)!=0)
return false;
temp.clear();
}
count++;
}
//Loophole in code above if there is only 1 item in the array
if(count==1)
{
if (name!=NULL && stricmp(name,"Item")==0)
return true;
}
return count>1;
}
bool CLogThread::FlattenArray(IArrayOf& valueArray,IPropertyTree& tree,StringBuffer& RootName)
{
StringBuffer Value,Name;
if (tree.hasChildren() == true)
{
Name.appendf("%s",tree.queryName());
Owned itrItem = tree.getElements("./*");
itrItem->first();
while(itrItem->isValid()==true)
{
IPropertyTree &node = itrItem->query();
if(Value.length()!=0)
Value.append(",");
Value.appendf("%s",node.queryProp(""));
itrItem->next();
}
IClientLogInfo& logElement = addLogInfoElement(valueArray);
logElement.setName(Name.str());
logElement.setValue(Value.str());
}
return true;
}
bool CLogThread::FlattenTree(IArrayOf& valueArray,IPropertyTree& tree,StringBuffer& RootName)
{
StringBuffer Value,Name;
if (tree.hasChildren() == true)
{
Owned itr = tree.getElements("*");
itr->first();
while(itr->isValid())
{
IPropertyTree &node = itr->query();
if(RootName.length() > 0)
Name.appendf("%s_",RootName.str());
Name.appendf("%s",node.queryName());
if (node.hasChildren() == true)
{
if(IsArray(node)==true)
FlattenArray(valueArray,node,Name);
else
FlattenTree(valueArray,node,Name);
}
else
{
const char* _value = tree.queryProp(node.queryName());
if(tree.hasProp(node.queryName())==true && _value!=0 && _value!='\0')
{
Value.appendf("%s",tree.queryProp(node.queryName()));
IClientLogInfo& logElement = addLogInfoElement(valueArray);
logElement.setName(Name.str());
logElement.setValue(Value.str());
//DBGLOG("Add log element: %s, %s", Name.str(), Value.str());
Value.clear();
}
}
Name.clear();
itr->next();
}
}
else
{
return false;
}
return true;
}
void CLogThread::deserializeLogInfo(IArrayOf& valueArray,IPropertyTree& logInfo)
{
Owned itr = logInfo.getElements("LogInfo");
itr->first();
while(itr->isValid())
{
IPropertyTree &node = itr->query();
IClientLogInfo& logElement = addLogInfoElement(valueArray);
logElement.setName(node.queryProp("Name"));
logElement.setValue(node.queryProp("Value"));
logElement.setData(node.queryProp("Data"));
itr->next();
}
}
int CLogThread::run()
{
Link();
CheckErrorLogs();
while(m_bRun)
{
m_sem.wait(10000);
SendLog();
checkRollOver();
}
Release();
return 0;
}
void CLogThread::checkRollOver()
{
if(!m_LogFailSafe.get())
return;
try
{
time_t tNow;
time(&tNow);
struct tm ltNow;
localtime_r(&tNow, <Now);
if(ltNow.tm_year != m_startTime.tm_year || ltNow.tm_yday != m_startTime.tm_yday)
{
localtime_r(&tNow, &m_startTime); // reset the start time for next rollover check
int numNewArrivals = m_pServiceLog.ordinality();
{
MTimeSection mt(NULL, "Tank file rollover");
m_LogFailSafe->SafeRollover();
}
if(numNewArrivals > 0)
{
DBGLOG("writing %d requests in the queue to the rolled over tank file.", numNewArrivals);
for(int i = 0; i < numNewArrivals; i++)
{
IClientLOGServiceUpdateRequest* pRequest = m_pServiceLog.item(i);
if (pRequest)
{
IEspLOGServiceUpdateRequest* pEspRequest = dynamic_cast(pRequest);
if(pEspRequest)
{
const char* guid = pEspRequest->getGUID();
if(guid)
m_LogFailSafe->Add(guid,*pRequest);
}
}
}
}
}
}
catch(IException* Ex)
{
StringBuffer str;
Ex->errorMessage(str);
ERRLOG("Exception thrown during tank file rollover: %s",str.str());
Ex->Release();
}
catch(...)
{
ERRLOG("Unknown exception thrown during tank file rollover.");
}
}
void CLogThread::CheckErrorLogs()
{
if(!m_LogFailSafe.get())
return;
try{
bool bRelogError = false;
if(m_LogFailSafe->FindOldLogs() == true)
{
DBGLOG("We have old logs!!!!!!");
DBGLOG("Will now try and recover the lost log messages");
StringArray LostLogMessages;
m_LogFailSafe->LoadOldLogs(LostLogMessages);
ForEachItemIn(aidx, LostLogMessages)
{
LOG_INFO _Info;
Owned pRequest = DeserializeRequest(LostLogMessages.item(aidx),_Info);
if (pRequest==0 || queueLog(pRequest,_Info) == false)
bRelogError=true;
}
}
//if everything went ok then we should be able to rollover the old logs.
if(bRelogError == false)
m_LogFailSafe->RollOldLogs();
}
catch(IException* ex)
{
StringBuffer errorStr;
ex->errorMessage(errorStr);
errorStr.appendf("%s",errorStr.str());
ERRLOG("CheckErrorLogs: %s:" ,errorStr.str());
ex->Release();
}
catch(...)
{
ERRLOG("Unknown exception thrown in CheckErrorLogs");
}
}
IClientLOGServiceUpdateRequest* CLogThread::DeserializeRequest(const char* requestStr,LOG_INFO& _Info)
{
if(!requestStr)
return 0;
//
//
IClientLOGServiceUpdateRequest* pRequest = 0;
if( m_bModelRequest )
{
pRequest = dynamic_cast(m_pLoggingService->createUpdateModelLogServiceRequest());
} else {
pRequest = m_pLoggingService->createUpdateLogServiceRequest();
}
if (pRequest == 0)
return 0;
StringBuffer xPath,GUID,Cache;
//request should be in the form GUID\tCache
m_LogFailSafe->SplitLogRecord(requestStr,GUID,Cache);
_Info.GUID = GUID.str();
Owned pLogTree = createPTreeFromXMLString(Cache.str(), ipt_none, xr_none);
pRequest->setUserName(pLogTree->queryProp("UserName"));
pRequest->setDomainName(pLogTree->queryProp("DomainName"));
pRequest->setRecordCount(pLogTree->getPropInt("RecordCount"));
pRequest->setServiceName(pLogTree->queryProp("ServiceName"));
pRequest->setIP(pLogTree->queryProp("IP"));
pRequest->setRawLogInformation(pLogTree->queryProp("RawLogInformation"));
if(pLogTree->hasProp("EncryptedLogging")==true)
pRequest->setEncryptedLogging(pLogTree->getPropBool("EncryptedLogging"));
if(pLogTree->hasProp("BlindLogging")==true)
pRequest->setBlindLogging(pLogTree->getPropBool("BlindLogging"));
IArrayOf logArray;
IPropertyTree* logInfo = pLogTree->queryBranch("LogInformation");
if(logInfo)
deserializeLogInfo(logArray,*logInfo);
pRequest->setLogInformation(logArray);
Owned modelLogInformation;
if(m_bModelRequest)
{
IPropertyTree* pModelLogTree = pLogTree->queryBranch("ModelLogInformation");
if(pModelLogTree!=0)
{
IClientModelLogInformation* pClientModelLogInfo = &getModelLogInformation();
UnserializeModelLogInfo(pModelLogTree,pClientModelLogInfo);
modelLogInformation.setown(dynamic_cast(pClientModelLogInfo));
IClientLOGServiceUpdateModelRequest* pModelRequest = dynamic_cast(pRequest);
pModelRequest->setModelLogInformation(dynamic_cast(*pClientModelLogInfo));
}
}
return pRequest;
}
void CLogThread::UnserializeModelLogInfo(IPropertyTree* pModelTreeInfo,IClientModelLogInformation* pModelLogInformation)
{
try
{
Owned modelItr = pModelTreeInfo->getElements("Models/Model");
if (modelItr->first())
{
while(modelItr->isValid()==true)
{
IPropertyTree& modelSrc = modelItr->query();
IClientModelLogInfo& model = getModelLogInfo(pModelLogInformation);
model.setName(modelSrc.queryProp("Name"));
int scoreSequence = 0;
Owned ScoreItr = modelSrc.getElements("Scores/Score");
if(ScoreItr->first())
{
while(ScoreItr->isValid()==true)
{
IPropertyTree& ScoreSrc = ScoreItr->query();
IClientScoreLogInfo& Score = getScoreLogInfo(&model);
Score.setName(ScoreSrc.queryProp("Name"));
Score.setLogIdentifier(ScoreSrc.getPropInt("LogIdentifier"));
Score.setValue( ScoreSrc.getPropInt("Value") );
Score.setSequence(ScoreSrc.getPropInt("Sequence"));
int reasonSequence = 0;
Owned ReasonItr = ScoreSrc.getElements("ReasonCodes/ReasonCode");
if(ReasonItr->first())
{
while(ReasonItr->isValid()==true)
{
IPropertyTree& ReasonSrc = ReasonItr->query();
IClientReasonCodeLogInfo& ReasonCode = getReasonCodeLogInfo(&Score);
ReasonCode.setValue( ReasonSrc.queryProp("Value") );
ReasonCode.setSequence( ReasonSrc.getPropInt("Sequence") );
// leave off the description as it is not logged
//if(ReasonSrc.hasProp("Description")==true)
// ReasonCode.setDescription( ReasonSrc.queryProp("Description") );
ReasonItr->next();
reasonSequence++;
}
}
scoreSequence++;
ScoreItr->next();
} // ScoreIter
} // ScoreIter
modelItr->next();
} // ModelIter
} // if any models
Owned attribGrpItr = pModelTreeInfo->getElements("AttributeGroups/AttributeGroup");
if (attribGrpItr->first())
{
while(attribGrpItr->isValid()==true)
{
IPropertyTree& attribGrpSrc = attribGrpItr->query();
IClientAttributeGroupLogInfo& attribGroup = getAttributeGroupLogInfo(pModelLogInformation);
attribGroup.setName(attribGrpSrc.queryProp("Name"));
if(attribGrpSrc.hasProp("index")==true)
attribGroup.setLogIdentifier( attribGrpSrc.getPropInt("index") );
Owned attribItr = attribGrpSrc.getElements("Attributes/Attribute");
if(attribItr->first())
{
while(attribItr->isValid()==true)
{
IPropertyTree& attribSrc = attribItr->query();
IClientAttributeLogInfo& attrib = getAttributeLogInfo(&attribGroup);
attrib.setName(attribSrc.queryProp("Name"));
attrib.setValue(attribSrc.queryProp("Value"));
attribItr->next();
}
}
attribGrpItr->next();
}
}
}
catch (IException* e)
{
StringBuffer msg;
ERRLOG("Fail to parse Model Log information: %s", e->errorMessage(msg).str());
e->Release();
}
catch (...)
{
ERRLOG("Fail to parse Model Log information");
}
}
void CLogThread::SendLog()
{
//TODO: returning here might cause SendDelta to be greater than 0, causing tank files not be able to rollover.
if(m_bRun == false)
return;
int recSend = 0;
IClientLOGServiceUpdateRequest* pRequest = 0;
try{
ForEachQueueItemIn(i,m_pServiceLog)
{
m_LogSend++;
if(m_bThrottle)
{
if(m_BurstWaitInterval > 0 && m_LogSend % 10 == 0 && m_LogSendDelta > 5)
{
m_SenderSem.wait(m_BurstWaitInterval);
}
else if(m_LinearWaitInterval > 0)
{
m_SenderSem.wait(m_LinearWaitInterval);
}
}
pRequest = (IClientLOGServiceUpdateRequest*)m_pServiceLog.dequeue();
if (pRequest != 0)
{
IEspLOGServiceUpdateRequest* pEspRequest = dynamic_cast(pRequest);
try
{
//need to link once.....
//ESPLOG(LogNormal+1,"Sending ACK %s",pEspRequest->getGUID());
//DBGLOG("Sending log %s",pEspRequest->getGUID()?pEspRequest->getGUID():"");
m_pLoggingService->async_UpdateLogService(pRequest, this, pRequest);
pRequest->Release();
m_LogSendDelta++;
}
catch(IException* Ex)
{
StringBuffer str;
Ex->errorMessage(str);
ERRLOG("Exception %s thrown within logging client",str.str());
m_pServiceLog.enqueue(pRequest);
Ex->Release();
}
catch(...)
{
ERRLOG("Unknown Error thrown within logging client");
m_pServiceLog.enqueue(pRequest);
}
}
}
}
catch(IException* e)
{
StringBuffer errorStr;
e->errorMessage(errorStr);
}
catch(...)
{
int i = 0;
}
return;
}
void CLogThread::HandleLoggingServerResponse(IClientLOGServiceUpdateRequest* Request,IClientLOGServiceUpdateResponse *Response)
{
m_LogSendDelta--;
if(Response==0 || Response->getUpdateLogStatus() == 0 || strlen(Response->getGUID())<10)
{
StringBuffer reasonbuf;
if(Response == 0)
reasonbuf.append("response is NULL");
else if(Response->getUpdateLogStatus() == 0)
reasonbuf.appendf("Log status is 0 for %s", Response->getGUID()?Response->getGUID():"");
else
reasonbuf.appendf("GUID(%s) is not correct", Response->getGUID()?Response->getGUID():"");
DBGLOG("Failed at the server so adding back to the queue. Error: %s", reasonbuf.str());
//means we failed..... so we will have to try again
m_pServiceLog.enqueue(LINK(Request));
return;
}
if(m_bFailSafeLogging == true && m_LogFailSafe.get())
{
//ESPLOG(LogNormal+1,"Adding ACK %s",Response->getGUID());
//DBGLOG("Adding ACK %s",Response->getGUID());
m_LogFailSafe->AddACK(Response->getGUID());
}
// m_SenderSem.signal();
return;
}
int CLogThread::onUpdateLogServiceComplete(IClientLOGServiceUpdateResponse *Response,IInterface* state)
{
m_LogSend--;
if(Response==0 && state==0)
{
DBGLOG("NULL LogRequest passed into onUpdateLogServiceComplete");
return 0;
}
IClientLOGServiceUpdateRequest* Request = dynamic_cast(state);
if(Request==0)
{
DBGLOG("Could not cast state to IClientLOGServiceUpdateRequest");
}
HandleLoggingServerResponse(Request,Response);
return 0;
}
int CLogThread::onUpdateModelLogServiceComplete(IClientLOGServiceUpdateResponse *Response,IInterface* state)
{
if(Response==0 && state==0)
{
DBGLOG("NULL LogRequest passed into onUpdateModelLogServiceComplete");
return 0;
}
IClientLOGServiceUpdateModelRequest* Request = dynamic_cast(state);
if(Request==0)
{
DBGLOG("Could not cast state to IClientLOGServiceUpdateModelRequest");
}
// For a UpdateModelRequest, first cast to it's parent class so
// we can use the same code in HandleLoggingServerResponse
IClientLOGServiceUpdateRequest* base_request = dynamic_cast(state);
if( base_request == 0 )
{
DBGLOG("Could not cast state to IClientLOGServiceUpdateRequest to pass to HandleLoggingServerResponse");
return 0;
}
HandleLoggingServerResponse(base_request,Response);
return 0;
}
int CLogThread::onUpdateLogServiceError(IClientLOGServiceUpdateResponse *Response,IInterface* state)
{
DBGLOG("Error Log");
m_LogSend--;
IClientLOGServiceUpdateRequest* Request = dynamic_cast(state);
if(Request==0)
{
DBGLOG("Could not cast state to IClientLOGServiceUpdateRequest");
}
HandleLoggingServerResponse(Request,Response);
return 0;
}
int CLogThread::onUpdateModelLogServiceError(IClientLOGServiceUpdateResponse *Response,IInterface* state)
{
DBGLOG("Error Log");
m_LogSend--;
IClientLOGServiceUpdateModelRequest* Request = dynamic_cast(state);
if(Request==0)
{
DBGLOG("Could not cast state to IClientLOGServiceUpdateModelRequest");
}
// For a UpdateModelRequest, first cast to it's parent class so
// we can use the same code in HandleLoggingServerResponse
IClientLOGServiceUpdateRequest* base_request = dynamic_cast(state);
if( base_request == 0 )
{
DBGLOG("Could not cast state to IClientLOGServiceUpdateRequest to pass to HandleLoggingServerResponse");
return 0;
}
HandleLoggingServerResponse(base_request,Response);
return 0;
}
void CLogThread::start()
{
if(m_NiceLevel > 0)
setNice(m_NiceLevel);
Thread::start();
}
void CLogThread::finish()
{
try{
DBGLOG("Log delta of %d",m_LogSendDelta);
if (m_pServiceLog.ordinality() == 0 && m_LogFailSafe.get() && m_LogSendDelta == 0)
m_LogFailSafe->RollCurrentLog();
}
catch(...){DBGLOG("Exception");}
m_bRun = false;
m_sem.signal();
join();
}
void CLogThread::CleanQueue()
{
ForEachQueueItemIn(i,m_pServiceLog)
{
IClientLOGServiceUpdateRequest* pRequest = (IClientLOGServiceUpdateRequest*)m_pServiceLog.dequeue();
if (pRequest != 0)
pRequest->Release();
}
}
void CPooledClientWsLogService::async_UpdateLogService(IClientLOGServiceUpdateRequest *request, IClientWsLogServiceEvents *events,IInterface* state)
{
if(m_url.length()==0){ throw MakeStringExceptionDirect(-1, "url not set"); }
CLOGServiceUpdateModelRequest* espModelRequest = dynamic_cast(request);
CLOGServiceUpdateRequest* esprequest = dynamic_cast(request);
if(espModelRequest!=0)
{
espModelRequest->setMethod("UpdateModelLogService");
espModelRequest->setReqId(m_reqId++);
espModelRequest->setEventSink(events);
espModelRequest->setState(state);
espModelRequest->setUserId(m_userid.str());
espModelRequest->setPassword(m_password.str());
espModelRequest->setRealm(m_realm.str());
espModelRequest->Link();
if(state!=NULL)
state->Link();
m_thread_pool->start((void *)(IRpcRequestBinding *)(espModelRequest), "Model Logging Thread", LogThreadWaitTime * 1000);
}else if(esprequest!=0)
{
esprequest->setMethod("UpdateLogService");
esprequest->setReqId(m_reqId++);
esprequest->setEventSink(events);
esprequest->setState(state);
esprequest->setUserId(m_userid.str());
esprequest->setPassword(m_password.str());
esprequest->setRealm(m_realm.str());
esprequest->Link();
if(state!=NULL)
state->Link();
m_thread_pool->start((void *)(IRpcRequestBinding *)(esprequest), "Logging Thread", LogThreadWaitTime * 1000);
}else{
throw MakeStringExceptionDirect(-1, "LogServiceUpdateRequest is null.");
}
}