|
@@ -438,7 +438,9 @@ class CMPConnectThread: public Thread
|
|
CMPServer *parent;
|
|
CMPServer *parent;
|
|
int mpSoMaxConn;
|
|
int mpSoMaxConn;
|
|
unsigned mpTraceLevel;
|
|
unsigned mpTraceLevel;
|
|
|
|
+ Owned<IWhiteListHandler> whiteListCallback;
|
|
void checkSelfDestruct(void *p,size32_t sz);
|
|
void checkSelfDestruct(void *p,size32_t sz);
|
|
|
|
+
|
|
public:
|
|
public:
|
|
CMPConnectThread(CMPServer *_parent, unsigned port);
|
|
CMPConnectThread(CMPServer *_parent, unsigned port);
|
|
~CMPConnectThread()
|
|
~CMPConnectThread()
|
|
@@ -456,6 +458,14 @@ public:
|
|
printf("CMPConnectThread::stop timed out\n");
|
|
printf("CMPConnectThread::stop timed out\n");
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
+ void installWhiteListCallback(IWhiteListHandler *_whiteListCallback)
|
|
|
|
+ {
|
|
|
|
+ whiteListCallback.set(_whiteListCallback);
|
|
|
|
+ }
|
|
|
|
+ IWhiteListHandler *queryWhiteListCallback() const
|
|
|
|
+ {
|
|
|
|
+ return whiteListCallback;
|
|
|
|
+ }
|
|
};
|
|
};
|
|
|
|
|
|
class PingPacketHandler;
|
|
class PingPacketHandler;
|
|
@@ -476,6 +486,7 @@ class CMPServer: private CMPChannelHT, implements IMPServer
|
|
CMPNotifyClosedThread *notifyclosedthread;
|
|
CMPNotifyClosedThread *notifyclosedthread;
|
|
CriticalSection sect;
|
|
CriticalSection sect;
|
|
protected:
|
|
protected:
|
|
|
|
+ unsigned __int64 role;
|
|
unsigned short port;
|
|
unsigned short port;
|
|
public:
|
|
public:
|
|
bool checkclosed;
|
|
bool checkclosed;
|
|
@@ -491,11 +502,12 @@ public:
|
|
|
|
|
|
IMPLEMENT_IINTERFACE_USING(CMPChannelHT);
|
|
IMPLEMENT_IINTERFACE_USING(CMPChannelHT);
|
|
|
|
|
|
- CMPServer(unsigned _port);
|
|
|
|
|
|
+ CMPServer(unsigned __int64 _role, unsigned _port);
|
|
~CMPServer();
|
|
~CMPServer();
|
|
void start();
|
|
void start();
|
|
virtual void stop();
|
|
virtual void stop();
|
|
- unsigned short getPort() { return port; }
|
|
|
|
|
|
+ unsigned short getPort() const { return port; }
|
|
|
|
+ unsigned __int64 getRole() const { return role; }
|
|
void setPort(unsigned short _port) { port = _port; }
|
|
void setPort(unsigned short _port) { port = _port; }
|
|
CMPChannel *lookup(const SocketEndpoint &remoteep);
|
|
CMPChannel *lookup(const SocketEndpoint &remoteep);
|
|
ISocketSelectHandler &querySelectHandler() { return *selecthandler; };
|
|
ISocketSelectHandler &querySelectHandler() { return *selecthandler; };
|
|
@@ -569,6 +581,14 @@ public:
|
|
break;
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
+ virtual void installWhiteListCallback(IWhiteListHandler *whiteListCallback) override
|
|
|
|
+ {
|
|
|
|
+ connectthread->installWhiteListCallback(whiteListCallback);
|
|
|
|
+ }
|
|
|
|
+ virtual IWhiteListHandler *queryWhiteListCallback() const override
|
|
|
|
+ {
|
|
|
|
+ return connectthread->queryWhiteListCallback();
|
|
|
|
+ }
|
|
};
|
|
};
|
|
|
|
|
|
//===========================================================================
|
|
//===========================================================================
|
|
@@ -708,6 +728,42 @@ void traceSlowReadTms(const char *msg, ISocket *sock, void *dst, size32_t minSiz
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+/* Legacy header sent id[2] only.
|
|
|
|
+ * To remain backward compatible (when new MP clients are connecting to old Dali),
|
|
|
|
+ * we send a regular empty PacketHeader as well that has the 'role' embedded within it,
|
|
|
|
+ * in unused fields. TAG_SYS_BCAST is used as the message tag, because it is an
|
|
|
|
+ * unused feature that all Dali's simply receive and delete.
|
|
|
|
+ */
|
|
|
|
+struct ConnectHdr
|
|
|
|
+{
|
|
|
|
+ ConnectHdr(const SocketEndpoint &hostEp, const SocketEndpoint &remoteEp, unsigned __int64 role)
|
|
|
|
+ {
|
|
|
|
+ id[0].set(hostEp);
|
|
|
|
+ id[1].set(remoteEp);
|
|
|
|
+
|
|
|
|
+ hdr.size = sizeof(PacketHeader);
|
|
|
|
+ hdr.tag = TAG_SYS_BCAST;
|
|
|
|
+ hdr.flags = 0;
|
|
|
|
+ hdr.version = MP_PROTOCOL_VERSION;
|
|
|
|
+ setRole(role);
|
|
|
|
+ }
|
|
|
|
+ ConnectHdr()
|
|
|
|
+ {
|
|
|
|
+ }
|
|
|
|
+ SocketEndpointV4 id[2];
|
|
|
|
+ PacketHeader hdr;
|
|
|
|
+ inline void setRole(unsigned __int64 role)
|
|
|
|
+ {
|
|
|
|
+ hdr.replytag = (mptag_t) (role >> 32);
|
|
|
|
+ hdr.sequence = (unsigned) (role & 0xffffffff);
|
|
|
|
+ }
|
|
|
|
+ inline unsigned __int64 getRole() const
|
|
|
|
+ {
|
|
|
|
+ return (((unsigned __int64)hdr.replytag)<<32) | ((unsigned __int64)hdr.sequence);
|
|
|
|
+ }
|
|
|
|
+};
|
|
|
|
+
|
|
|
|
+
|
|
class CMPPacketReader;
|
|
class CMPPacketReader;
|
|
|
|
|
|
class CMPChannel: public CInterface
|
|
class CMPChannel: public CInterface
|
|
@@ -755,19 +811,23 @@ protected: friend class CMPPacketReader;
|
|
// must be called from connectsect
|
|
// must be called from connectsect
|
|
// also in sendmutex
|
|
// also in sendmutex
|
|
|
|
|
|
- ISocket *newsock=NULL;
|
|
|
|
|
|
+ Owned<ISocket> newsock;
|
|
unsigned retrycount = CONNECT_RETRYCOUNT;
|
|
unsigned retrycount = CONNECT_RETRYCOUNT;
|
|
unsigned remaining;
|
|
unsigned remaining;
|
|
|
|
+ Owned<IException> exitException;
|
|
|
|
|
|
- while (!channelsock) {
|
|
|
|
- try {
|
|
|
|
|
|
+ while (!channelsock)
|
|
|
|
+ {
|
|
|
|
+ try
|
|
|
|
+ {
|
|
StringBuffer str;
|
|
StringBuffer str;
|
|
#ifdef _TRACE
|
|
#ifdef _TRACE
|
|
LOG(MCdebugInfo(100), unknownJob, "MP: connecting to %s",remoteep.getUrlStr(str).str());
|
|
LOG(MCdebugInfo(100), unknownJob, "MP: connecting to %s",remoteep.getUrlStr(str).str());
|
|
#endif
|
|
#endif
|
|
if (((int)tm.timeout)<0)
|
|
if (((int)tm.timeout)<0)
|
|
remaining = CONNECT_TIMEOUT;
|
|
remaining = CONNECT_TIMEOUT;
|
|
- else if (tm.timedout(&remaining)) {
|
|
|
|
|
|
+ else if (tm.timedout(&remaining))
|
|
|
|
+ {
|
|
#ifdef _FULLTRACE
|
|
#ifdef _FULLTRACE
|
|
PROGLOG("MP: connect timed out 1");
|
|
PROGLOG("MP: connect timed out 1");
|
|
#endif
|
|
#endif
|
|
@@ -775,34 +835,31 @@ protected: friend class CMPPacketReader;
|
|
}
|
|
}
|
|
if (remaining<10000)
|
|
if (remaining<10000)
|
|
remaining = 10000; // 10s min granularity for MP
|
|
remaining = 10000; // 10s min granularity for MP
|
|
- newsock = ISocket::connect_timeout(remoteep,remaining);
|
|
|
|
|
|
+ newsock.setown(ISocket::connect_timeout(remoteep,remaining));
|
|
newsock->set_keep_alive(true);
|
|
newsock->set_keep_alive(true);
|
|
#ifdef _FULLTRACE
|
|
#ifdef _FULLTRACE
|
|
LOG(MCdebugInfo(100), unknownJob, "MP: connect after socket connect, retrycount = %d", retrycount);
|
|
LOG(MCdebugInfo(100), unknownJob, "MP: connect after socket connect, retrycount = %d", retrycount);
|
|
#endif
|
|
#endif
|
|
|
|
|
|
- SocketEndpointV4 id[2];
|
|
|
|
SocketEndpoint hostep;
|
|
SocketEndpoint hostep;
|
|
hostep.setLocalHost(parent->getPort());
|
|
hostep.setLocalHost(parent->getPort());
|
|
- id[0].set(hostep);
|
|
|
|
- id[1].set(remoteep);
|
|
|
|
|
|
+ ConnectHdr connectHdr(hostep, remoteep, parent->getRole());
|
|
|
|
|
|
- unsigned __int64 addrval = DIGIT1*id[0].ip[0] + DIGIT2*id[0].ip[1] + DIGIT3*id[0].ip[2] + DIGIT4*id[0].ip[3] + id[0].port;
|
|
|
|
|
|
+ unsigned __int64 addrval = DIGIT1*connectHdr.id[0].ip[0] + DIGIT2*connectHdr.id[0].ip[1] + DIGIT3*connectHdr.id[0].ip[2] + DIGIT4*connectHdr.id[0].ip[3] + connectHdr.id[0].port;
|
|
#ifdef _TRACE
|
|
#ifdef _TRACE
|
|
PROGLOG("MP: connect addrval = %" I64F "u", addrval);
|
|
PROGLOG("MP: connect addrval = %" I64F "u", addrval);
|
|
#endif
|
|
#endif
|
|
|
|
|
|
- newsock->write(&id[0],sizeof(id));
|
|
|
|
|
|
+ newsock->write(&connectHdr,sizeof(connectHdr));
|
|
|
|
|
|
#ifdef _FULLTRACE
|
|
#ifdef _FULLTRACE
|
|
StringBuffer tmp1;
|
|
StringBuffer tmp1;
|
|
- id[0].getUrlStr(tmp1);
|
|
|
|
|
|
+ connectHdr.id[0].getUrlStr(tmp1);
|
|
tmp1.append(' ');
|
|
tmp1.append(' ');
|
|
- id[1].getUrlStr(tmp1);
|
|
|
|
|
|
+ connectHdr.id[1].getUrlStr(tmp1);
|
|
LOG(MCdebugInfo(100), unknownJob, "MP: connect after socket write %s",tmp1.str());
|
|
LOG(MCdebugInfo(100), unknownJob, "MP: connect after socket write %s",tmp1.str());
|
|
#endif
|
|
#endif
|
|
|
|
|
|
- size32_t reply = 0;
|
|
|
|
size32_t rd = 0;
|
|
size32_t rd = 0;
|
|
|
|
|
|
#ifdef _TRACE
|
|
#ifdef _TRACE
|
|
@@ -841,9 +898,11 @@ protected: friend class CMPPacketReader;
|
|
|
|
|
|
rd = 0;
|
|
rd = 0;
|
|
|
|
|
|
|
|
+ MemoryBuffer replyMb;
|
|
|
|
+ void *replyMem = replyMb.ensureCapacity(0x1000); // 4K - max size to allow for serialized exception
|
|
try
|
|
try
|
|
{
|
|
{
|
|
- newsock->readtms(&reply,sizeof(reply),sizeof(reply),rd,CONNECT_TIMEOUT_INTERVAL);
|
|
|
|
|
|
+ newsock->readtms(replyMem, sizeof(rd), replyMb.capacity(), rd, CONNECT_TIMEOUT_INTERVAL);
|
|
}
|
|
}
|
|
catch (IException *e)
|
|
catch (IException *e)
|
|
{
|
|
{
|
|
@@ -853,36 +912,35 @@ protected: friend class CMPPacketReader;
|
|
if ( (e->errorCode() != JSOCKERR_timeout_expired) ||
|
|
if ( (e->errorCode() != JSOCKERR_timeout_expired) ||
|
|
((e->errorCode() == JSOCKERR_timeout_expired) && (loopCnt == 0)) )
|
|
((e->errorCode() == JSOCKERR_timeout_expired) && (loopCnt == 0)) )
|
|
{
|
|
{
|
|
- if (tm.timedout(&remaining))
|
|
|
|
- {
|
|
|
|
|
|
+ if (tm.timedout(&remaining))
|
|
|
|
+ {
|
|
#ifdef _FULLTRACE
|
|
#ifdef _FULLTRACE
|
|
- EXCLOG(e,"MP: connect timed out 3");
|
|
|
|
|
|
+ EXCLOG(e,"MP: connect timed out 3");
|
|
#endif
|
|
#endif
|
|
- e->Release();
|
|
|
|
- newsock->Release();
|
|
|
|
- return false;
|
|
|
|
- }
|
|
|
|
|
|
+ e->Release();
|
|
|
|
+ return false;
|
|
|
|
+ }
|
|
#ifdef _TRACE
|
|
#ifdef _TRACE
|
|
- EXCLOG(e, "MP: Failed to connect");
|
|
|
|
-#endif
|
|
|
|
- if ((retrycount--==0)||(tm.timeout==MP_ASYNC_SEND))
|
|
|
|
- { // don't bother retrying on async send
|
|
|
|
- e->Release();
|
|
|
|
- throw new CMPException(MPERR_connection_failed,remoteep);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- // if other side closes, connect again
|
|
|
|
- if (e->errorCode() == JSOCKERR_graceful_close)
|
|
|
|
- {
|
|
|
|
- LOG(MCdebugInfo(100), unknownJob, "MP: Retrying (other side closed connection, probably due to clash)");
|
|
|
|
- e->Release();
|
|
|
|
- break;
|
|
|
|
- }
|
|
|
|
|
|
+ EXCLOG(e, "MP: Failed to connect");
|
|
|
|
+#endif
|
|
|
|
+ if ((retrycount--==0)||(tm.timeout==MP_ASYNC_SEND))
|
|
|
|
+ { // don't bother retrying on async send
|
|
|
|
+ e->Release();
|
|
|
|
+ throw new CMPException(MPERR_connection_failed,remoteep);
|
|
|
|
+ }
|
|
|
|
|
|
|
|
+ // if other side closes, connect again
|
|
|
|
+ if (e->errorCode() == JSOCKERR_graceful_close)
|
|
|
|
+ {
|
|
|
|
+ LOG(MCdebugInfo(100), unknownJob, "MP: Retrying (other side closed connection, probably due to clash)");
|
|
e->Release();
|
|
e->Release();
|
|
|
|
+ break;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ e->Release();
|
|
|
|
|
|
#ifdef _TRACE
|
|
#ifdef _TRACE
|
|
- LOG(MCdebugInfo(100), unknownJob, "MP: Retrying connection to %s, %d attempts left",remoteep.getUrlStr(str).str(),retrycount+1);
|
|
|
|
|
|
+ LOG(MCdebugInfo(100), unknownJob, "MP: Retrying connection to %s, %d attempts left",remoteep.getUrlStr(str).str(),retrycount+1);
|
|
#endif
|
|
#endif
|
|
}
|
|
}
|
|
else
|
|
else
|
|
@@ -900,15 +958,35 @@ protected: friend class CMPPacketReader;
|
|
#ifdef _FULLTRACE
|
|
#ifdef _FULLTRACE
|
|
PROGLOG("MP: rd = %d", rd);
|
|
PROGLOG("MP: rd = %d", rd);
|
|
#endif
|
|
#endif
|
|
- if (rd != 0)
|
|
|
|
|
|
+ /* NB: legacy clients that don't handle the exception deserialization here
|
|
|
|
+ * will see reply as success, so no clean error,
|
|
|
|
+ * but will fail shortly afterwards since server connection is closed
|
|
|
|
+ */
|
|
|
|
+ if (rd > sizeof(rd)) // legacy clients will only ever send a reply of 0 or 4, if greater, then new client is replying with an exception
|
|
|
|
+ {
|
|
|
|
+ MemoryBuffer mb;
|
|
|
|
+ mb.setBuffer(rd, replyMem, false);
|
|
|
|
+ size32_t len;
|
|
|
|
+ mb.read(len); // exception length
|
|
|
|
+ if (len)
|
|
|
|
+ {
|
|
|
|
+ exitException.setown(deserializeException(mb));
|
|
|
|
+ throw exitException.getLink();
|
|
|
|
+ }
|
|
break;
|
|
break;
|
|
|
|
+ }
|
|
|
|
+ else if (rd != 0)
|
|
|
|
+ {
|
|
|
|
+ assertex(rd == sizeof(rd));
|
|
|
|
+ break;
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
#ifdef _TRACE
|
|
#ifdef _TRACE
|
|
- LOG(MCdebugInfo(100), unknownJob, "MP: connect after socket read rd=%u, reply=%u, sizeof(id)=%lu", rd, reply, sizeof(id));
|
|
|
|
|
|
+ LOG(MCdebugInfo(100), unknownJob, "MP: connect after socket read rd=%u, reply=%u, sizeof(connectHdr)=%lu", rd, reply, sizeof(connectHdr));
|
|
#endif
|
|
#endif
|
|
|
|
|
|
- if (reply!=0)
|
|
|
|
|
|
+ if (rd)
|
|
{
|
|
{
|
|
unsigned elapsedMs = msTick() - startMs;
|
|
unsigned elapsedMs = msTick() - startMs;
|
|
if (elapsedMs >= TRACESLOW_THRESHOLD)
|
|
if (elapsedMs >= TRACESLOW_THRESHOLD)
|
|
@@ -922,11 +1000,8 @@ protected: friend class CMPPacketReader;
|
|
WARNLOG("MP: connect to: %s, took: %d ms", epStr.str(), elapsedMs);
|
|
WARNLOG("MP: connect to: %s, took: %d ms", epStr.str(), elapsedMs);
|
|
}
|
|
}
|
|
|
|
|
|
- assertex(reply==sizeof(id)); // how can this fail?
|
|
|
|
if (attachSocket(newsock,remoteep,hostep,true,NULL,addrval))
|
|
if (attachSocket(newsock,remoteep,hostep,true,NULL,addrval))
|
|
{
|
|
{
|
|
- newsock->Release();
|
|
|
|
- newsock = NULL;
|
|
|
|
#ifdef _TRACE
|
|
#ifdef _TRACE
|
|
LOG(MCdebugInfo(100), unknownJob, "MP: connected to %s",str.str());
|
|
LOG(MCdebugInfo(100), unknownJob, "MP: connected to %s",str.str());
|
|
#endif
|
|
#endif
|
|
@@ -939,6 +1014,8 @@ protected: friend class CMPPacketReader;
|
|
}
|
|
}
|
|
catch (IException *e)
|
|
catch (IException *e)
|
|
{
|
|
{
|
|
|
|
+ if (exitException)
|
|
|
|
+ throw;
|
|
if (tm.timedout(&remaining)) {
|
|
if (tm.timedout(&remaining)) {
|
|
#ifdef _FULLTRACE
|
|
#ifdef _FULLTRACE
|
|
EXCLOG(e,"MP: connect timed out 2");
|
|
EXCLOG(e,"MP: connect timed out 2");
|
|
@@ -961,8 +1038,7 @@ protected: friend class CMPPacketReader;
|
|
#endif
|
|
#endif
|
|
}
|
|
}
|
|
|
|
|
|
- ::Release(newsock);
|
|
|
|
- newsock = NULL;
|
|
|
|
|
|
+ newsock.clear();
|
|
|
|
|
|
{
|
|
{
|
|
CriticalUnblock unblock(connectsect); // to avoid connecting philosopher problem
|
|
CriticalUnblock unblock(connectsect); // to avoid connecting philosopher problem
|
|
@@ -1706,7 +1782,6 @@ bool CMPChannel::attachSocket(ISocket *newsock,const SocketEndpoint &_remoteep,c
|
|
FLLOG(MCoperatorWarning, unknownJob, e,"MP attachsocket(2)");
|
|
FLLOG(MCoperatorWarning, unknownJob, e,"MP attachsocket(2)");
|
|
e->Release();
|
|
e->Release();
|
|
}
|
|
}
|
|
-
|
|
|
|
}
|
|
}
|
|
|
|
|
|
if (confirm)
|
|
if (confirm)
|
|
@@ -1977,14 +2052,18 @@ int CMPConnectThread::run()
|
|
#ifdef _TRACE
|
|
#ifdef _TRACE
|
|
LOG(MCdebugInfo(100), unknownJob, "MP: Connect Thread Starting - accept loop");
|
|
LOG(MCdebugInfo(100), unknownJob, "MP: Connect Thread Starting - accept loop");
|
|
#endif
|
|
#endif
|
|
- while (running) {
|
|
|
|
|
|
+ while (running)
|
|
|
|
+ {
|
|
ISocket *sock=NULL;
|
|
ISocket *sock=NULL;
|
|
- try {
|
|
|
|
- sock=listensock->accept(true);
|
|
|
|
|
|
+ SocketEndpoint peerEp;
|
|
|
|
+ try
|
|
|
|
+ {
|
|
|
|
+ sock=listensock->accept(true, &peerEp);
|
|
#ifdef _FULLTRACE
|
|
#ifdef _FULLTRACE
|
|
StringBuffer s;
|
|
StringBuffer s;
|
|
SocketEndpoint ep1;
|
|
SocketEndpoint ep1;
|
|
- if (sock) {
|
|
|
|
|
|
+ if (sock)
|
|
|
|
+ {
|
|
sock->getPeerEndpoint(ep1);
|
|
sock->getPeerEndpoint(ep1);
|
|
PROGLOG("MP: Connect Thread: socket accepted from %s",ep1.getUrlStr(s).str());
|
|
PROGLOG("MP: Connect Thread: socket accepted from %s",ep1.getUrlStr(s).str());
|
|
}
|
|
}
|
|
@@ -1995,14 +2074,19 @@ int CMPConnectThread::run()
|
|
LOG(MCdebugInfo, unknownJob, e,"MP accept failed");
|
|
LOG(MCdebugInfo, unknownJob, e,"MP accept failed");
|
|
throw; // error handling TBD
|
|
throw; // error handling TBD
|
|
}
|
|
}
|
|
- if (sock) {
|
|
|
|
- try {
|
|
|
|
|
|
+ if (sock)
|
|
|
|
+ {
|
|
|
|
+ try
|
|
|
|
+ {
|
|
sock->set_keep_alive(true);
|
|
sock->set_keep_alive(true);
|
|
size32_t rd;
|
|
size32_t rd;
|
|
SocketEndpoint _remoteep;
|
|
SocketEndpoint _remoteep;
|
|
SocketEndpoint hostep;
|
|
SocketEndpoint hostep;
|
|
- SocketEndpointV4 id[2];
|
|
|
|
- traceSlowReadTms("MP: initial accept packet from", sock, &id[0], sizeof(id), sizeof(id), rd, CONFIRM_TIMEOUT, CONFIRM_TIMEOUT_INTERVAL);
|
|
|
|
|
|
+ ConnectHdr connectHdr;
|
|
|
|
+ bool legacyClient = false;
|
|
|
|
+
|
|
|
|
+ // NB: min size is ConnectHdr.id for legacy clients, can thus distinguish old from new
|
|
|
|
+ traceSlowReadTms("MP: initial accept packet from", sock, &connectHdr, sizeof(connectHdr.id), sizeof(connectHdr), rd, CONFIRM_TIMEOUT, CONFIRM_TIMEOUT_INTERVAL);
|
|
if (0 == rd)
|
|
if (0 == rd)
|
|
{
|
|
{
|
|
if (mpTraceLevel > 1)
|
|
if (mpTraceLevel > 1)
|
|
@@ -2015,45 +2099,84 @@ int CMPConnectThread::run()
|
|
sock->Release();
|
|
sock->Release();
|
|
continue;
|
|
continue;
|
|
}
|
|
}
|
|
- else if (rd != sizeof(id))
|
|
|
|
|
|
+ else
|
|
{
|
|
{
|
|
- // not sure how to get here as this is not one of the possible outcomes of above: rd == 0 or rd == sizeof(id) or an exception
|
|
|
|
- SocketEndpoint ep;
|
|
|
|
- sock->getPeerEndpoint(ep);
|
|
|
|
- StringBuffer errMsg("MP Connect Thread: invalid number of connection bytes serialized from ");
|
|
|
|
- ep.getUrlStr(errMsg);
|
|
|
|
- FLLOG(MCoperatorWarning, unknownJob, "%s", errMsg.str());
|
|
|
|
- sock->close();
|
|
|
|
- sock->Release();
|
|
|
|
- continue;
|
|
|
|
|
|
+ if (rd == sizeof(connectHdr.id)) // legacy client
|
|
|
|
+ {
|
|
|
|
+ legacyClient = true;
|
|
|
|
+ connectHdr.setRole(0); // unknown
|
|
|
|
+ }
|
|
|
|
+ else if (rd < sizeof(connectHdr.id) || rd > sizeof(connectHdr))
|
|
|
|
+ {
|
|
|
|
+ // not sure how to get here as this is not one of the possible outcomes of above: rd == 0 or rd == sizeof(id) or an exception
|
|
|
|
+ StringBuffer errMsg("MP Connect Thread: invalid number of connection bytes serialized from ");
|
|
|
|
+ peerEp.getUrlStr(errMsg);
|
|
|
|
+ FLLOG(MCoperatorWarning, unknownJob, "%s", errMsg.str());
|
|
|
|
+ sock->close();
|
|
|
|
+ sock->Release();
|
|
|
|
+ continue;
|
|
|
|
+ }
|
|
}
|
|
}
|
|
- id[0].get(_remoteep);
|
|
|
|
- id[1].get(hostep);
|
|
|
|
|
|
|
|
- unsigned __int64 addrval = DIGIT1*id[0].ip[0] + DIGIT2*id[0].ip[1] + DIGIT3*id[0].ip[2] + DIGIT4*id[0].ip[3] + id[0].port;
|
|
|
|
|
|
+ if (whiteListCallback)
|
|
|
|
+ {
|
|
|
|
+ StringBuffer ipStr;
|
|
|
|
+ peerEp.getIpText(ipStr);
|
|
|
|
+ StringBuffer responseText; // filled if denied
|
|
|
|
+ if (!whiteListCallback->isWhiteListed(ipStr, connectHdr.getRole(), &responseText))
|
|
|
|
+ {
|
|
|
|
+ Owned<IException> e = makeStringException(-1, responseText);
|
|
|
|
+ OWARNLOG(e, nullptr);
|
|
|
|
+
|
|
|
|
+ if (legacyClient)
|
|
|
|
+ {
|
|
|
|
+ /* NB: legacy client can't handle exception response
|
|
|
|
+ * Acknowledge legacy connection, then close socket
|
|
|
|
+ * The effect will be the client sees an MPERR_link_closed
|
|
|
|
+ */
|
|
|
|
+ size32_t reply = sizeof(connectHdr.id);
|
|
|
|
+ sock->write(&reply, sizeof(reply));
|
|
|
|
+ }
|
|
|
|
+ else
|
|
|
|
+ {
|
|
|
|
+ MemoryBuffer mb;
|
|
|
|
+ DelayedSizeMarker marker(mb);
|
|
|
|
+ serializeException(e, mb);
|
|
|
|
+ marker.write();
|
|
|
|
+ sock->write(mb.toByteArray(), mb.length());
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ sock->close();
|
|
|
|
+ sock->Release();
|
|
|
|
+ continue;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ connectHdr.id[0].get(_remoteep);
|
|
|
|
+ connectHdr.id[1].get(hostep);
|
|
|
|
+
|
|
|
|
+ unsigned __int64 addrval = DIGIT1*connectHdr.id[0].ip[0] + DIGIT2*connectHdr.id[0].ip[1] + DIGIT3*connectHdr.id[0].ip[2] + DIGIT4*connectHdr.id[0].ip[3] + connectHdr.id[0].port;
|
|
#ifdef _TRACE
|
|
#ifdef _TRACE
|
|
PROGLOG("MP: Connect Thread: addrval = %" I64F "u", addrval);
|
|
PROGLOG("MP: Connect Thread: addrval = %" I64F "u", addrval);
|
|
#endif
|
|
#endif
|
|
|
|
|
|
if (_remoteep.isNull() || hostep.isNull())
|
|
if (_remoteep.isNull() || hostep.isNull())
|
|
{
|
|
{
|
|
- SocketEndpoint ep;
|
|
|
|
- sock->getPeerEndpoint(ep);
|
|
|
|
StringBuffer errMsg;
|
|
StringBuffer errMsg;
|
|
SocketEndpointV4 zeroTest[2];
|
|
SocketEndpointV4 zeroTest[2];
|
|
memset(zeroTest, 0x0, sizeof(zeroTest));
|
|
memset(zeroTest, 0x0, sizeof(zeroTest));
|
|
- if (memcmp(id, zeroTest, sizeof(id)))
|
|
|
|
|
|
+ if (memcmp(connectHdr.id, zeroTest, sizeof(connectHdr.id)))
|
|
{
|
|
{
|
|
// JCSMORE, I think _remoteep really must/should match a IP of this local host
|
|
// JCSMORE, I think _remoteep really must/should match a IP of this local host
|
|
errMsg.append("MP Connect Thread: invalid remote and/or host ep serialized from ");
|
|
errMsg.append("MP Connect Thread: invalid remote and/or host ep serialized from ");
|
|
- ep.getUrlStr(errMsg);
|
|
|
|
|
|
+ peerEp.getUrlStr(errMsg);
|
|
FLLOG(MCoperatorWarning, unknownJob, "%s", errMsg.str());
|
|
FLLOG(MCoperatorWarning, unknownJob, "%s", errMsg.str());
|
|
}
|
|
}
|
|
else if (mpTraceLevel > 1)
|
|
else if (mpTraceLevel > 1)
|
|
{
|
|
{
|
|
// all zeros msg received
|
|
// all zeros msg received
|
|
errMsg.append("MP Connect Thread: connect with empty msg received, assumed port monitor check from ");
|
|
errMsg.append("MP Connect Thread: connect with empty msg received, assumed port monitor check from ");
|
|
- ep.getUrlStr(errMsg);
|
|
|
|
|
|
+ peerEp.getUrlStr(errMsg);
|
|
PROGLOG("%s", errMsg.str());
|
|
PROGLOG("%s", errMsg.str());
|
|
}
|
|
}
|
|
sock->close();
|
|
sock->close();
|
|
@@ -2067,14 +2190,16 @@ int CMPConnectThread::run()
|
|
hostep.getUrlStr(tmp1);
|
|
hostep.getUrlStr(tmp1);
|
|
PROGLOG("MP: Connect Thread: after read %s",tmp1.str());
|
|
PROGLOG("MP: Connect Thread: after read %s",tmp1.str());
|
|
#endif
|
|
#endif
|
|
- checkSelfDestruct(&id[0],sizeof(id));
|
|
|
|
|
|
+ checkSelfDestruct(&connectHdr.id[0],sizeof(connectHdr.id));
|
|
Owned<CMPChannel> channel = parent->lookup(_remoteep);
|
|
Owned<CMPChannel> channel = parent->lookup(_remoteep);
|
|
- if (!channel->attachSocket(sock,_remoteep,hostep,false,&rd,addrval)) {
|
|
|
|
|
|
+ if (!channel->attachSocket(sock,_remoteep,hostep,false,&rd,addrval))
|
|
|
|
+ {
|
|
#ifdef _FULLTRACE
|
|
#ifdef _FULLTRACE
|
|
PROGLOG("MP Connect Thread: lookup failed");
|
|
PROGLOG("MP Connect Thread: lookup failed");
|
|
#endif
|
|
#endif
|
|
}
|
|
}
|
|
- else {
|
|
|
|
|
|
+ else
|
|
|
|
+ {
|
|
#ifdef _TRACE
|
|
#ifdef _TRACE
|
|
StringBuffer str1;
|
|
StringBuffer str1;
|
|
StringBuffer str2;
|
|
StringBuffer str2;
|
|
@@ -2091,7 +2216,8 @@ int CMPConnectThread::run()
|
|
sock->close();
|
|
sock->close();
|
|
e->Release();
|
|
e->Release();
|
|
}
|
|
}
|
|
- try {
|
|
|
|
|
|
+ try
|
|
|
|
+ {
|
|
sock->Release();
|
|
sock->Release();
|
|
}
|
|
}
|
|
catch (IException *e)
|
|
catch (IException *e)
|
|
@@ -2100,7 +2226,8 @@ int CMPConnectThread::run()
|
|
e->Release();
|
|
e->Release();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
- else {
|
|
|
|
|
|
+ else
|
|
|
|
+ {
|
|
if (running)
|
|
if (running)
|
|
LOG(MCdebugInfo(100), unknownJob, "MP Connect Thread accept returned NULL");
|
|
LOG(MCdebugInfo(100), unknownJob, "MP Connect Thread accept returned NULL");
|
|
}
|
|
}
|
|
@@ -2183,9 +2310,10 @@ CMPChannel *CMPServer::lookup(const SocketEndpoint &endpoint)
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
-CMPServer::CMPServer(unsigned _port)
|
|
|
|
|
|
+CMPServer::CMPServer(unsigned __int64 _role, unsigned _port)
|
|
{
|
|
{
|
|
RTsalt=0xff;
|
|
RTsalt=0xff;
|
|
|
|
+ role = _role;
|
|
port = 0; // connectthread tells me what port it actually connected on
|
|
port = 0; // connectthread tells me what port it actually connected on
|
|
checkclosed = false;
|
|
checkclosed = false;
|
|
connectthread = new CMPConnectThread(this, _port);
|
|
connectthread = new CMPConnectThread(this, _port);
|
|
@@ -2987,7 +3115,7 @@ public:
|
|
parent->removeChannel(channel);
|
|
parent->removeChannel(channel);
|
|
}
|
|
}
|
|
|
|
|
|
- virtual const SocketEndpoint &queryChannelPeerEndpoint(const SocketEndpoint &sender) override
|
|
|
|
|
|
+ virtual const SocketEndpoint &queryChannelPeerEndpoint(const SocketEndpoint &sender) const override
|
|
{
|
|
{
|
|
Owned<CMPChannel> channel = parent->lookup(sender);
|
|
Owned<CMPChannel> channel = parent->lookup(sender);
|
|
assertex(channel);
|
|
assertex(channel);
|
|
@@ -3020,7 +3148,7 @@ ICommunicator *CMPServer::createCommunicator(IGroup *group, bool outer)
|
|
IMPServer *startNewMPServer(unsigned port)
|
|
IMPServer *startNewMPServer(unsigned port)
|
|
{
|
|
{
|
|
assertex(sizeof(PacketHeader)==32);
|
|
assertex(sizeof(PacketHeader)==32);
|
|
- CMPServer *mpServer = new CMPServer(port);
|
|
|
|
|
|
+ CMPServer *mpServer = new CMPServer(0, port);
|
|
mpServer->start();
|
|
mpServer->start();
|
|
return mpServer;
|
|
return mpServer;
|
|
}
|
|
}
|
|
@@ -3034,7 +3162,7 @@ class CGlobalMPServer : public CMPServer
|
|
public:
|
|
public:
|
|
static CriticalSection sect;
|
|
static CriticalSection sect;
|
|
|
|
|
|
- CGlobalMPServer(unsigned _port) : CMPServer(_port)
|
|
|
|
|
|
+ CGlobalMPServer(unsigned __int64 _role, unsigned _port) : CMPServer(_role, _port)
|
|
{
|
|
{
|
|
worldcomm = NULL;
|
|
worldcomm = NULL;
|
|
nestLevel = 0;
|
|
nestLevel = 0;
|
|
@@ -3068,13 +3196,13 @@ MODULE_EXIT()
|
|
::Release(globalMPServer);
|
|
::Release(globalMPServer);
|
|
}
|
|
}
|
|
|
|
|
|
-void startMPServer(unsigned port, bool paused)
|
|
|
|
|
|
+void startMPServer(unsigned __int64 role, unsigned port, bool paused)
|
|
{
|
|
{
|
|
assertex(sizeof(PacketHeader)==32);
|
|
assertex(sizeof(PacketHeader)==32);
|
|
CriticalBlock block(CGlobalMPServer::sect);
|
|
CriticalBlock block(CGlobalMPServer::sect);
|
|
if (NULL == globalMPServer)
|
|
if (NULL == globalMPServer)
|
|
{
|
|
{
|
|
- globalMPServer = new CGlobalMPServer(port);
|
|
|
|
|
|
+ globalMPServer = new CGlobalMPServer(role, port);
|
|
initMyNode(globalMPServer->getPort());
|
|
initMyNode(globalMPServer->getPort());
|
|
}
|
|
}
|
|
if (0 == globalMPServer->queryNest())
|
|
if (0 == globalMPServer->queryNest())
|
|
@@ -3091,6 +3219,11 @@ void startMPServer(unsigned port, bool paused)
|
|
globalMPServer->incNest();
|
|
globalMPServer->incNest();
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+void startMPServer(unsigned port, bool paused)
|
|
|
|
+{
|
|
|
|
+ startMPServer(0, port, paused);
|
|
|
|
+}
|
|
|
|
+
|
|
void stopMPServer()
|
|
void stopMPServer()
|
|
{
|
|
{
|
|
CGlobalMPServer *_globalMPServer = NULL;
|
|
CGlobalMPServer *_globalMPServer = NULL;
|