|
@@ -825,7 +825,7 @@ protected: friend class CMPPacketReader;
|
|
|
{
|
|
|
StringBuffer str;
|
|
|
#ifdef _TRACE
|
|
|
- LOG(MCdebugInfo(100), unknownJob, "MP: connecting to %s role: %" I64F "u", remoteep.getUrlStr(str).str(), parent->getRole());
|
|
|
+ LOG(MCdebugInfo, unknownJob, "MP: connecting to %s role: %" I64F "u", remoteep.getUrlStr(str).str(), parent->getRole());
|
|
|
#endif
|
|
|
if (((int)tm.timeout)<0)
|
|
|
remaining = CONNECT_TIMEOUT;
|
|
@@ -841,7 +841,7 @@ protected: friend class CMPPacketReader;
|
|
|
newsock.setown(ISocket::connect_timeout(remoteep,remaining));
|
|
|
newsock->set_keep_alive(true);
|
|
|
#ifdef _FULLTRACE
|
|
|
- LOG(MCdebugInfo(100), unknownJob, "MP: connect after socket connect, retrycount = %d", retrycount);
|
|
|
+ LOG(MCdebugInfo, unknownJob, "MP: connect after socket connect, retrycount = %d", retrycount);
|
|
|
#endif
|
|
|
|
|
|
SocketEndpoint hostep;
|
|
@@ -860,13 +860,13 @@ protected: friend class CMPPacketReader;
|
|
|
connectHdr.id[0].getUrlStr(tmp1);
|
|
|
tmp1.append(' ');
|
|
|
connectHdr.id[1].getUrlStr(tmp1);
|
|
|
- LOG(MCdebugInfo(100), unknownJob, "MP: connect after socket write %s",tmp1.str());
|
|
|
+ LOG(MCdebugInfo, unknownJob, "MP: connect after socket write %s",tmp1.str());
|
|
|
#endif
|
|
|
|
|
|
size32_t rd = 0;
|
|
|
|
|
|
#ifdef _TRACE
|
|
|
- LOG(MCdebugInfo(100), unknownJob, "MP: connect after socket write, waiting for read");
|
|
|
+ LOG(MCdebugInfo, unknownJob, "MP: connect after socket write, waiting for read");
|
|
|
#endif
|
|
|
|
|
|
// Wait for connection reply but also check for A<->B deadlock (where both processes are here
|
|
@@ -935,7 +935,7 @@ protected: friend class CMPPacketReader;
|
|
|
// if other side closes, connect again
|
|
|
if (e->errorCode() == JSOCKERR_graceful_close)
|
|
|
{
|
|
|
- LOG(MCdebugInfo(100), unknownJob, "MP: Retrying (other side closed connection, probably due to clash)");
|
|
|
+ LOG(MCdebugInfo, unknownJob, "MP: Retrying (other side closed connection, probably due to clash)");
|
|
|
e->Release();
|
|
|
break;
|
|
|
}
|
|
@@ -943,7 +943,7 @@ protected: friend class CMPPacketReader;
|
|
|
e->Release();
|
|
|
|
|
|
#ifdef _TRACE
|
|
|
- LOG(MCdebugInfo(100), unknownJob, "MP: Retrying connection to %s, %d attempts left",remoteep.getUrlStr(str).str(),retrycount+1);
|
|
|
+ LOG(MCdebugInfo, unknownJob, "MP: Retrying connection to %s, %d attempts left",remoteep.getUrlStr(str).str(),retrycount+1);
|
|
|
#endif
|
|
|
}
|
|
|
else
|
|
@@ -986,7 +986,7 @@ protected: friend class CMPPacketReader;
|
|
|
}
|
|
|
|
|
|
#ifdef _TRACE
|
|
|
- LOG(MCdebugInfo(100), unknownJob, "MP: connect after socket read rd=%u, sizeof(connectHdr)=%lu", rd, sizeof(connectHdr));
|
|
|
+ LOG(MCdebugInfo, unknownJob, "MP: connect after socket read rd=%u, sizeof(connectHdr)=%lu", rd, sizeof(connectHdr));
|
|
|
#endif
|
|
|
|
|
|
if (rd)
|
|
@@ -1006,7 +1006,7 @@ protected: friend class CMPPacketReader;
|
|
|
if (attachSocket(newsock,remoteep,hostep,true,NULL,addrval))
|
|
|
{
|
|
|
#ifdef _TRACE
|
|
|
- LOG(MCdebugInfo(100), unknownJob, "MP: connected to %s",str.str());
|
|
|
+ LOG(MCdebugInfo, unknownJob, "MP: connected to %s",str.str());
|
|
|
#endif
|
|
|
lastxfer = msTick();
|
|
|
closed = false;
|
|
@@ -1037,7 +1037,7 @@ protected: friend class CMPPacketReader;
|
|
|
#ifdef _TRACE
|
|
|
StringBuffer str;
|
|
|
str.clear();
|
|
|
- LOG(MCdebugInfo(100), unknownJob, "MP: Retrying connection to %s, %d attempts left",remoteep.getUrlStr(str).str(),retrycount+1);
|
|
|
+ LOG(MCdebugInfo, unknownJob, "MP: Retrying connection to %s, %d attempts left",remoteep.getUrlStr(str).str(),retrycount+1);
|
|
|
#endif
|
|
|
}
|
|
|
|
|
@@ -1091,7 +1091,7 @@ public:
|
|
|
CriticalBlock block(connectsect);
|
|
|
if (closed) {
|
|
|
#ifdef _TRACELINKCLOSED
|
|
|
- LOG(MCdebugInfo(100), unknownJob, "WritePacket closed on entry");
|
|
|
+ LOG(MCdebugInfo, unknownJob, "WritePacket closed on entry");
|
|
|
PrintStackReport();
|
|
|
#endif
|
|
|
if (!checkReconnect(tm))
|
|
@@ -1100,7 +1100,7 @@ public:
|
|
|
if (!channelsock) {
|
|
|
if (!connect(tm)) {
|
|
|
#ifdef _FULLTRACE
|
|
|
- LOG(MCdebugInfo(100), unknownJob, "WritePacket connect failed");
|
|
|
+ LOG(MCdebugInfo, unknownJob, "WritePacket connect failed");
|
|
|
#endif
|
|
|
return false;
|
|
|
}
|
|
@@ -1126,7 +1126,7 @@ public:
|
|
|
#ifdef _FULLTRACE
|
|
|
StringBuffer ep1;
|
|
|
StringBuffer ep2;
|
|
|
- LOG(MCdebugInfo(100), unknownJob, "WritePacket(target=%s,(%d,%d,%d))",remoteep.getUrlStr(ep1).str(),hdrsize,hdr2size,bodysize);
|
|
|
+ LOG(MCdebugInfo, unknownJob, "WritePacket(target=%s,(%d,%d,%d))",remoteep.getUrlStr(ep1).str(),hdrsize,hdr2size,bodysize);
|
|
|
unsigned t2 = msTick();
|
|
|
#endif
|
|
|
unsigned n = 0;
|
|
@@ -1145,13 +1145,13 @@ public:
|
|
|
sizes[n++] = bodysize;
|
|
|
}
|
|
|
if (!dest) {
|
|
|
- LOG(MCdebugInfo(100), unknownJob, "MP Warning: WritePacket unexpected NULL socket");
|
|
|
+ LOG(MCdebugInfo, unknownJob, "MP Warning: WritePacket unexpected NULL socket");
|
|
|
return false;
|
|
|
}
|
|
|
dest->write_multiple(n,bufs,sizes);
|
|
|
lastxfer = msTick();
|
|
|
#ifdef _FULLTRACE
|
|
|
- LOG(MCdebugInfo(100), unknownJob, "WritePacket(timewaiting=%d,timesending=%d)",t2-t1,lastxfer-t2);
|
|
|
+ LOG(MCdebugInfo, unknownJob, "WritePacket(timewaiting=%d,timesending=%d)",t2-t1,lastxfer-t2);
|
|
|
#endif
|
|
|
}
|
|
|
catch (IException *e) {
|
|
@@ -1198,11 +1198,11 @@ public:
|
|
|
return false;
|
|
|
}
|
|
|
if (tm.timedout()) {
|
|
|
- LOG(MCdebugInfo(100), unknownJob, "MP: verify, ping failed to %s",ep.str());
|
|
|
+ LOG(MCdebugInfo, unknownJob, "MP: verify, ping failed to %s",ep.str());
|
|
|
closeSocket();
|
|
|
return false;
|
|
|
}
|
|
|
- LOG(MCdebugInfo(100), unknownJob, "MP: verify, ping failed to %s, retrying",ep.str());
|
|
|
+ LOG(MCdebugInfo, unknownJob, "MP: verify, ping failed to %s, retrying",ep.str());
|
|
|
unsigned remaining;
|
|
|
if (!pingtm.timedout(&remaining)&&remaining)
|
|
|
Sleep(remaining);
|
|
@@ -1316,7 +1316,7 @@ public:
|
|
|
#ifdef _FULLTRACE
|
|
|
StringBuffer ep1;
|
|
|
StringBuffer ep2;
|
|
|
- LOG(MCdebugInfo(100), unknownJob, "MP: send(target=%s,sender=%s,tag=%d,replytag=%d,size=%d)",hdr.target.getUrlStr(ep1).str(),hdr.sender.getUrlStr(ep2).str(),hdr.tag,hdr.replytag,hdr.size);
|
|
|
+ LOG(MCdebugInfo, unknownJob, "MP: send(target=%s,sender=%s,tag=%d,replytag=%d,size=%d)",hdr.target.getUrlStr(ep1).str(),hdr.sender.getUrlStr(ep2).str(),hdr.tag,hdr.replytag,hdr.size);
|
|
|
#endif
|
|
|
return channel->writepacket(&hdr,sizeof(hdr),mb.toByteArray(),mb.length(),tm);
|
|
|
}
|
|
@@ -1448,7 +1448,7 @@ public:
|
|
|
#ifdef _FULLTRACE
|
|
|
StringBuffer ep1;
|
|
|
StringBuffer ep2;
|
|
|
- LOG(MCdebugInfo(100), unknownJob, "MP: multi-send(target=%s,sender=%s,tag=%d,replytag=%d,size=%d)",hdr.target.getUrlStr(ep1).str(),hdr.sender.getUrlStr(ep2).str(),hdr.tag,hdr.replytag,hdr.size);
|
|
|
+ LOG(MCdebugInfo, unknownJob, "MP: multi-send(target=%s,sender=%s,tag=%d,replytag=%d,size=%d)",hdr.target.getUrlStr(ep1).str(),hdr.sender.getUrlStr(ep2).str(),hdr.tag,hdr.replytag,hdr.size);
|
|
|
#endif
|
|
|
PacketHeader outhdr;
|
|
|
outhdr = hdr;
|
|
@@ -1466,13 +1466,13 @@ public:
|
|
|
if (i+1==mhdr.numparts)
|
|
|
mhdr.size = mhdr.total-mhdr.ofs;
|
|
|
#ifdef _FULLTRACE
|
|
|
- LOG(MCdebugInfo(100), unknownJob, "MP: multi-send block=%d, num blocks=%d, ofs=%d, size=%d",i,mhdr.numparts,mhdr.ofs,mhdr.size);
|
|
|
+ LOG(MCdebugInfo, unknownJob, "MP: multi-send block=%d, num blocks=%d, ofs=%d, size=%d",i,mhdr.numparts,mhdr.ofs,mhdr.size);
|
|
|
#endif
|
|
|
outhdr.initseq();
|
|
|
outhdr.size = sizeof(outhdr)+sizeof(mhdr)+mhdr.size;
|
|
|
if (!channel->writepacket(&outhdr,sizeof(outhdr),&mhdr,sizeof(mhdr),p,mhdr.size,tm)) {
|
|
|
#ifdef _FULLTRACE
|
|
|
- LOG(MCdebugInfo(100), unknownJob, "MP: multi-send failed");
|
|
|
+ LOG(MCdebugInfo, unknownJob, "MP: multi-send failed");
|
|
|
#endif
|
|
|
return false;
|
|
|
}
|
|
@@ -1561,7 +1561,7 @@ public:
|
|
|
if (pc)
|
|
|
{
|
|
|
#ifdef _TRACELINKCLOSED
|
|
|
- LOG(MCdebugInfo(100), unknownJob, "CMPPacketReader::notifySelected() about to close socket, mode = 0x%x", selected);
|
|
|
+ LOG(MCdebugInfo, unknownJob, "CMPPacketReader::notifySelected() about to close socket, mode = 0x%x", selected);
|
|
|
#endif
|
|
|
pc->closeSocket(false, true);
|
|
|
}
|
|
@@ -1581,7 +1581,7 @@ public:
|
|
|
// assumes packet header will arrive in one go
|
|
|
if (sizeavail<sizeof(hdr)) {
|
|
|
#ifdef _FULLTRACE
|
|
|
- LOG(MCdebugInfo(100), unknownJob, "Selected stalled on header %u %lu",sizeavail,sizeavail-sizeof(hdr));
|
|
|
+ LOG(MCdebugInfo, unknownJob, "Selected stalled on header %u %lu",sizeavail,sizeavail-sizeof(hdr));
|
|
|
#endif
|
|
|
size32_t szread;
|
|
|
sock->read(&hdr,sizeof(hdr),sizeof(hdr),szread,60); // I don't *really* want to block here but not much else can do
|
|
@@ -1602,7 +1602,7 @@ public:
|
|
|
#ifdef _FULLTRACE
|
|
|
StringBuffer ep1;
|
|
|
StringBuffer ep2;
|
|
|
- LOG(MCdebugInfo(100), unknownJob, "MP: ReadPacket(sender=%s,target=%s,tag=%d,replytag=%d,size=%d)",hdr.sender.getUrlStr(ep1).str(),hdr.target.getUrlStr(ep2).str(),hdr.tag,hdr.replytag,hdr.size);
|
|
|
+ LOG(MCdebugInfo, unknownJob, "MP: ReadPacket(sender=%s,target=%s,tag=%d,replytag=%d,size=%d)",hdr.sender.getUrlStr(ep1).str(),hdr.target.getUrlStr(ep2).str(),hdr.tag,hdr.replytag,hdr.size);
|
|
|
#endif
|
|
|
remaining = hdr.size-sizeof(hdr);
|
|
|
activemsg = new CMessageBuffer(remaining); // will get from low level IO at some stage
|
|
@@ -1622,7 +1622,7 @@ public:
|
|
|
if (remaining==0) { // we have the packet so process
|
|
|
|
|
|
#ifdef _FULLTRACE
|
|
|
- LOG(MCdebugInfo(100), unknownJob, "MP: ReadPacket(timetaken = %d,select iterations=%d)",msTick()-parent->startxfer,parent->numiter);
|
|
|
+ LOG(MCdebugInfo, unknownJob, "MP: ReadPacket(timetaken = %d,select iterations=%d)",msTick()-parent->startxfer,parent->numiter);
|
|
|
#endif
|
|
|
do {
|
|
|
switch (activemsg->getTag()) {
|
|
@@ -1749,19 +1749,19 @@ bool CMPChannel::attachSocket(ISocket *newsock,const SocketEndpoint &_remoteep,c
|
|
|
StringBuffer ep2;
|
|
|
_localep.getUrlStr(ep1);
|
|
|
_remoteep.getUrlStr(ep2);
|
|
|
- LOG(MCdebugInfo(100), unknownJob, "MP: Possible clash between %s->%s %d(%d)",ep1.str(),ep2.str(),(int)ismaster,(int)master);
|
|
|
+ LOG(MCdebugInfo, unknownJob, "MP: Possible clash between %s->%s %d(%d)",ep1.str(),ep2.str(),(int)ismaster,(int)master);
|
|
|
|
|
|
try {
|
|
|
if (ismaster!=master) {
|
|
|
if (ismaster) {
|
|
|
- LOG(MCdebugInfo(100), unknownJob, "MP: resolving socket attach clash (master)");
|
|
|
+ LOG(MCdebugInfo, unknownJob, "MP: resolving socket attach clash (master)");
|
|
|
return false;
|
|
|
}
|
|
|
else {
|
|
|
Sleep(50); // give the other side some time to close
|
|
|
CTimeMon tm(10000);
|
|
|
if (verifyConnection(tm,false)) {
|
|
|
- LOG(MCdebugInfo(100), unknownJob, "MP: resolving socket attach clash (verified)");
|
|
|
+ LOG(MCdebugInfo, unknownJob, "MP: resolving socket attach clash (verified)");
|
|
|
return false;
|
|
|
}
|
|
|
}
|
|
@@ -1772,7 +1772,7 @@ bool CMPChannel::attachSocket(ISocket *newsock,const SocketEndpoint &_remoteep,c
|
|
|
e->Release();
|
|
|
}
|
|
|
try {
|
|
|
- LOG(MCdebugInfo(100), unknownJob, "Message Passing - removing stale socket to %s",ep2.str());
|
|
|
+ LOG(MCdebugInfo, unknownJob, "Message Passing - removing stale socket to %s",ep2.str());
|
|
|
CriticalUnblock unblock(connectsect);
|
|
|
closeSocket(true, true);
|
|
|
#ifdef REFUSE_STALE_CONNECTION
|
|
@@ -1820,7 +1820,7 @@ bool CMPChannel::send(MemoryBuffer &mb, mptag_t tag, mptag_t replytag, CTimeMon
|
|
|
if (closed||(reply&&!isConnected())) // flag error if has been disconnected
|
|
|
{
|
|
|
#ifdef _TRACELINKCLOSED
|
|
|
- LOG(MCdebugInfo(100), unknownJob, "CMPChannel::send closed on entry %d",(int)closed);
|
|
|
+ LOG(MCdebugInfo, unknownJob, "CMPChannel::send closed on entry %d",(int)closed);
|
|
|
PrintStackReport();
|
|
|
#endif
|
|
|
if (!checkReconnect(tm))
|
|
@@ -2008,7 +2008,7 @@ CMPConnectThread::CMPConnectThread(CMPServer *_parent, unsigned port, bool _list
|
|
|
listensock = NULL; // delay create till running
|
|
|
parent->setPort(port);
|
|
|
#ifdef _TRACE
|
|
|
- LOG(MCdebugInfo(100), unknownJob, "MP Connect Thread Init Port = %d", port);
|
|
|
+ LOG(MCdebugInfo, unknownJob, "MP Connect Thread Init Port = %d", port);
|
|
|
#endif
|
|
|
running = false;
|
|
|
}
|
|
@@ -2061,7 +2061,7 @@ void CMPConnectThread::startPort(unsigned short port)
|
|
|
int CMPConnectThread::run()
|
|
|
{
|
|
|
#ifdef _TRACE
|
|
|
- LOG(MCdebugInfo(100), unknownJob, "MP: Connect Thread Starting - accept loop");
|
|
|
+ LOG(MCdebugInfo, unknownJob, "MP: Connect Thread Starting - accept loop");
|
|
|
#endif
|
|
|
while (running)
|
|
|
{
|
|
@@ -2214,7 +2214,7 @@ int CMPConnectThread::run()
|
|
|
#ifdef _TRACE
|
|
|
StringBuffer str1;
|
|
|
StringBuffer str2;
|
|
|
- LOG(MCdebugInfo(100), unknownJob, "MP Connect Thread: connected to %s",_remoteep.getUrlStr(str1).str());
|
|
|
+ LOG(MCdebugInfo, unknownJob, "MP Connect Thread: connected to %s",_remoteep.getUrlStr(str1).str());
|
|
|
#endif
|
|
|
}
|
|
|
#ifdef _FULLTRACE
|
|
@@ -2240,11 +2240,11 @@ int CMPConnectThread::run()
|
|
|
else
|
|
|
{
|
|
|
if (running)
|
|
|
- LOG(MCdebugInfo(100), unknownJob, "MP Connect Thread accept returned NULL");
|
|
|
+ LOG(MCdebugInfo, unknownJob, "MP Connect Thread accept returned NULL");
|
|
|
}
|
|
|
}
|
|
|
#ifdef _TRACE
|
|
|
- LOG(MCdebugInfo(100), unknownJob, "MP Connect Thread Stopping");
|
|
|
+ LOG(MCdebugInfo, unknownJob, "MP Connect Thread Stopping");
|
|
|
#endif
|
|
|
return 0;
|
|
|
}
|
|
@@ -2355,7 +2355,7 @@ CMPServer::~CMPServer()
|
|
|
StringBuffer buf;
|
|
|
getReceiveQueueDetails(buf);
|
|
|
if (buf.length())
|
|
|
- LOG(MCdebugInfo(100), unknownJob, "MP: Orphan check\n%s",buf.str());
|
|
|
+ LOG(MCdebugInfo, unknownJob, "MP: Orphan check\n%s",buf.str());
|
|
|
#endif
|
|
|
_releaseAll();
|
|
|
selecthandler->stop(true);
|
|
@@ -2433,7 +2433,7 @@ bool CMPServer::recv(CMessageBuffer &mbuf, const SocketEndpoint *ep, mptag_t tag
|
|
|
}
|
|
|
if (nfy.aborted) {
|
|
|
#ifdef _TRACELINKCLOSED
|
|
|
- LOG(MCdebugInfo(100), unknownJob, "CMPserver::recv closed on notify");
|
|
|
+ LOG(MCdebugInfo, unknownJob, "CMPserver::recv closed on notify");
|
|
|
PrintStackReport();
|
|
|
#endif
|
|
|
IMP_Exception *e=new CMPException(MPERR_link_closed,*nfy.ep);
|
|
@@ -2519,7 +2519,7 @@ unsigned CMPServer::probe(const SocketEndpoint *ep, mptag_t tag,CTimeMon &tm,Soc
|
|
|
}
|
|
|
if (nfy.aborted) {
|
|
|
#ifdef _TRACELINKCLOSED
|
|
|
- LOG(MCdebugInfo(100), unknownJob, "CMPserver::probe closed on notify");
|
|
|
+ LOG(MCdebugInfo, unknownJob, "CMPserver::probe closed on notify");
|
|
|
PrintStackReport();
|
|
|
#endif
|
|
|
IMP_Exception *e=new CMPException(MPERR_link_closed,*ep);
|
|
@@ -2607,7 +2607,7 @@ void CMPServer::notifyClosed(SocketEndpoint &ep, bool trace)
|
|
|
if (trace)
|
|
|
{
|
|
|
StringBuffer url;
|
|
|
- LOG(MCdebugInfo(100), unknownJob, "MP: CMPServer::notifyClosed %s",ep.getUrlStr(url).str());
|
|
|
+ LOG(MCdebugInfo, unknownJob, "MP: CMPServer::notifyClosed %s",ep.getUrlStr(url).str());
|
|
|
PrintStackReport();
|
|
|
}
|
|
|
#endif
|
|
@@ -3305,7 +3305,7 @@ void stopMPServer()
|
|
|
{
|
|
|
stopLogMsgReceivers();
|
|
|
#ifdef _TRACE
|
|
|
- LOG(MCdebugInfo(100), unknownJob, "MP: Stopping MP Server");
|
|
|
+ LOG(MCdebugInfo, unknownJob, "MP: Stopping MP Server");
|
|
|
#endif
|
|
|
_globalMPServer = globalMPServer;
|
|
|
globalMPServer = NULL;
|
|
@@ -3316,7 +3316,7 @@ void stopMPServer()
|
|
|
_globalMPServer->stop();
|
|
|
_globalMPServer->Release();
|
|
|
#ifdef _TRACE
|
|
|
- LOG(MCdebugInfo(100), unknownJob, "MP: Stopped MP Server");
|
|
|
+ LOG(MCdebugInfo, unknownJob, "MP: Stopped MP Server");
|
|
|
#endif
|
|
|
CriticalBlock block(CGlobalMPServer::sect);
|
|
|
initMyNode(0);
|