|
@@ -460,7 +460,7 @@ public:
|
|
|
void stop();
|
|
|
unsigned short getPort() { return port; }
|
|
|
void setPort(unsigned short _port) { port = _port; }
|
|
|
- CMPChannel &lookup(const SocketEndpoint &remoteep);
|
|
|
+ CMPChannel *lookup(const SocketEndpoint &remoteep);
|
|
|
ISocketSelectHandler &querySelectHandler() { return *selecthandler; };
|
|
|
CBufferQueue &getReceiveQ() { return receiveq; }
|
|
|
bool recv(CMessageBuffer &mbuf, const SocketEndpoint *ep, mptag_t tag, CTimeMon &tm);
|
|
@@ -1932,7 +1932,8 @@ int CMPConnectThread::run()
|
|
|
PROGLOG("MP: Connect Thread: after read %s",tmp1.str());
|
|
|
#endif
|
|
|
checkSelfDestruct(&id[0],sizeof(id));
|
|
|
- if (!parent->lookup(_remoteep).attachSocket(sock,_remoteep,hostep,false,&rd,addrval)) {
|
|
|
+ Linked<CMPChannel> channel = parent->lookup(_remoteep);
|
|
|
+ if (!channel->attachSocket(sock,_remoteep,hostep,false,&rd,addrval)) {
|
|
|
#ifdef _FULLTRACE
|
|
|
PROGLOG("MP Connect Thread: lookup failed");
|
|
|
#endif
|
|
@@ -2014,7 +2015,7 @@ public:
|
|
|
|
|
|
//-----------------------------------------------------------------------------------
|
|
|
|
|
|
-CMPChannel &CMPServer::lookup(const SocketEndpoint &endpoint)
|
|
|
+CMPChannel *CMPServer::lookup(const SocketEndpoint &endpoint)
|
|
|
{
|
|
|
// there is an assumption here that no removes will be done within this loop
|
|
|
CriticalBlock block(serversect);
|
|
@@ -2042,7 +2043,7 @@ CMPChannel &CMPServer::lookup(const SocketEndpoint &endpoint)
|
|
|
e = new CMPChannel(this,ep);
|
|
|
add(*e);
|
|
|
}
|
|
|
- return *e;
|
|
|
+ return LINK(e);
|
|
|
}
|
|
|
|
|
|
|
|
@@ -2338,11 +2339,11 @@ public:
|
|
|
}
|
|
|
|
|
|
CTimeMon tm(timeout);
|
|
|
- CMPChannel &channel = parent->lookup(dst->endpoint());
|
|
|
+ Linked<CMPChannel> channel = parent->lookup(dst->endpoint());
|
|
|
unsigned remaining;
|
|
|
if (tm.timedout(&remaining))
|
|
|
return false;
|
|
|
- if (!channel.send(mbuf,tag,mbuf.getReplyTag(),tm,false))
|
|
|
+ if (!channel->send(mbuf,tag,mbuf.getReplyTag(),tm,false))
|
|
|
return false;
|
|
|
mbuf.clear(); // for consistent semantics
|
|
|
return true;
|
|
@@ -2352,11 +2353,11 @@ public:
|
|
|
{
|
|
|
CriticalBlock block(verifysect);
|
|
|
CTimeMon tm(timeout);
|
|
|
- CMPChannel &channel = parent->lookup(node->endpoint());
|
|
|
+ Linked<CMPChannel> channel = parent->lookup(node->endpoint());
|
|
|
unsigned remaining;
|
|
|
if (tm.timedout(&remaining))
|
|
|
return false;
|
|
|
- return channel.verifyConnection(tm,true);
|
|
|
+ return channel->verifyConnection(tm,true);
|
|
|
}
|
|
|
|
|
|
void verifyAll(StringBuffer &log)
|
|
@@ -2395,12 +2396,12 @@ public:
|
|
|
else
|
|
|
doverify = (myrank<rank);
|
|
|
if (doverify) {
|
|
|
- CMPChannel &channel = parent->lookup(group->queryNode(rank).endpoint());
|
|
|
+ Linked<CMPChannel> channel = parent->lookup(group->queryNode(rank).endpoint());
|
|
|
unsigned remaining;
|
|
|
if (tm.timedout(&remaining)) {
|
|
|
return false;
|
|
|
}
|
|
|
- if (!channel.verifyConnection(tm,true)) {
|
|
|
+ if (!channel->verifyConnection(tm,true)) {
|
|
|
return false;
|
|
|
}
|
|
|
}
|
|
@@ -2410,8 +2411,8 @@ public:
|
|
|
ForEachNodeInGroup(rank,*group) {
|
|
|
bool doverify = ((rank&1)==(myrank&1))?(myrank<rank):(myrank>rank);
|
|
|
if (doverify) {
|
|
|
- CMPChannel &channel = parent->lookup(group->queryNode(rank).endpoint());
|
|
|
- while (!channel.verifyConnection(tm,false)) {
|
|
|
+ Linked<CMPChannel> channel = parent->lookup(group->queryNode(rank).endpoint());
|
|
|
+ while (!channel->verifyConnection(tm,false)) {
|
|
|
unsigned remaining;
|
|
|
if (tm.timedout(&remaining))
|
|
|
return false;
|
|
@@ -2508,9 +2509,9 @@ public:
|
|
|
void disconnect(INode *node)
|
|
|
{
|
|
|
CriticalBlock block(verifysect);
|
|
|
- CMPChannel &channel = parent->lookup(node->endpoint());
|
|
|
- channel.closeSocket();
|
|
|
- parent->removeChannel(&channel);
|
|
|
+ Linked<CMPChannel> channel = parent->lookup(node->endpoint());
|
|
|
+ channel->closeSocket();
|
|
|
+ parent->removeChannel(channel);
|
|
|
}
|
|
|
|
|
|
CInterCommunicator(CMPServer *_parent)
|
|
@@ -2537,7 +2538,7 @@ class CCommunicator: public CInterface, public ICommunicator
|
|
|
return group->queryNode(rank).endpoint();
|
|
|
}
|
|
|
|
|
|
- CMPChannel &queryChannel(rank_t rank)
|
|
|
+ CMPChannel *getChannel(rank_t rank)
|
|
|
{
|
|
|
return parent->lookup(queryEndpoint(rank));
|
|
|
}
|
|
@@ -2585,11 +2586,11 @@ public:
|
|
|
endrank = dstrank;
|
|
|
for (;dstrank<=endrank;dstrank++) {
|
|
|
if (dstrank!=myrank) {
|
|
|
- CMPChannel &channel = queryChannel(dstrank);
|
|
|
+ Linked<CMPChannel> channel = getChannel(dstrank);
|
|
|
unsigned remaining;
|
|
|
if (tm.timedout(&remaining))
|
|
|
return false;
|
|
|
- if (!channel.send(mbuf,tag,mbuf.getReplyTag(),tm,false))
|
|
|
+ if (!channel->send(mbuf,tag,mbuf.getReplyTag(),tm,false))
|
|
|
return false;
|
|
|
}
|
|
|
}
|
|
@@ -2654,11 +2655,11 @@ public:
|
|
|
assertex(rank!=RANK_RANDOM);
|
|
|
assertex(rank!=RANK_ALL);
|
|
|
CTimeMon tm(timeout);
|
|
|
- CMPChannel &channel = queryChannel(rank);
|
|
|
+ Linked<CMPChannel> channel = getChannel(rank);
|
|
|
unsigned remaining;
|
|
|
if (tm.timedout(&remaining))
|
|
|
return false;
|
|
|
- return channel.verifyConnection(tm,true);
|
|
|
+ return channel->verifyConnection(tm,true);
|
|
|
}
|
|
|
|
|
|
bool verifyAll(bool duplex, unsigned timeout)
|
|
@@ -2676,12 +2677,12 @@ public:
|
|
|
else
|
|
|
doverify = (myrank<rank);
|
|
|
if (doverify) {
|
|
|
- CMPChannel &channel = queryChannel(rank);
|
|
|
+ Linked<CMPChannel> channel = getChannel(rank);
|
|
|
unsigned remaining;
|
|
|
if (tm.timedout(&remaining)) {
|
|
|
return false;
|
|
|
}
|
|
|
- if (!channel.verifyConnection(tm,true))
|
|
|
+ if (!channel->verifyConnection(tm,true))
|
|
|
return false;
|
|
|
}
|
|
|
}
|
|
@@ -2690,8 +2691,8 @@ public:
|
|
|
ForEachNodeInGroup(rank,*group) {
|
|
|
bool doverify = ((rank&1)==(myrank&1))?(myrank<rank):(myrank>rank);
|
|
|
if (doverify) {
|
|
|
- CMPChannel &channel = queryChannel(rank);
|
|
|
- while (!channel.verifyConnection(tm,false)) {
|
|
|
+ Linked<CMPChannel> channel = getChannel(rank);
|
|
|
+ while (!channel->verifyConnection(tm,false)) {
|
|
|
unsigned remaining;
|
|
|
if (tm.timedout(&remaining))
|
|
|
return false;
|
|
@@ -2806,12 +2807,12 @@ public:
|
|
|
}
|
|
|
|
|
|
CTimeMon tm(timeout);
|
|
|
- CMPChannel &channel = parent->lookup(mbuf.getSender());
|
|
|
+ Linked<CMPChannel> channel = parent->lookup(mbuf.getSender());
|
|
|
unsigned remaining;
|
|
|
if (tm.timedout(&remaining)) {
|
|
|
return false;
|
|
|
}
|
|
|
- if (channel.send(mbuf,replytag,TAG_NULL,tm, true)) {
|
|
|
+ if (channel->send(mbuf,replytag,TAG_NULL,tm, true)) {
|
|
|
mbuf.setReplyTag(TAG_NULL);
|
|
|
return true;
|
|
|
}
|
|
@@ -2827,9 +2828,9 @@ public:
|
|
|
void disconnect(INode *node)
|
|
|
{
|
|
|
CriticalBlock block(verifysect);
|
|
|
- CMPChannel &channel = parent->lookup(node->endpoint());
|
|
|
- channel.closeSocket();
|
|
|
- parent->removeChannel(&channel);
|
|
|
+ Linked<CMPChannel> channel = parent->lookup(node->endpoint());
|
|
|
+ channel->closeSocket();
|
|
|
+ parent->removeChannel(channel);
|
|
|
}
|
|
|
|
|
|
CCommunicator(CMPServer *_parent,IGroup *_group, bool _outer)
|