فهرست منبع

HPCC-23199 Fix port clashing between Thor slave channels.

Channels were not using their own port map range, which
spuriously caused multiple channels to use the same (1st channel)
range and run out of oom, resulting in port in use errors.

NB: this also avoid unecessarily reserving the watchdog and
debug ports on slaves within the port range, which only ever
used on the master.

Signed-off-by: Jake Smith <jake.smith@lexisnexisrisk.com>
Jake Smith 5 سال پیش
والد
کامیت
e1c61d7474

+ 3 - 3
thorlcr/activities/join/thjoinslave.cpp

@@ -176,7 +176,7 @@ public:
     ~JoinSlaveActivity()
     {
         if (portbase) 
-            freePort(portbase,NUMSLAVEPORTS);
+            queryJobChannel().freePort(portbase, NUMSLAVEPORTS);
     }
 
     virtual void init(MemoryBuffer &data, MemoryBuffer &slaveData) override
@@ -214,7 +214,7 @@ public:
             mpTagRPC = container.queryJobChannel().deserializeMPTag(data);
             mptag_t barrierTag = container.queryJobChannel().deserializeMPTag(data);
             barrier.setown(container.queryJobChannel().createBarrier(barrierTag));
-            portbase = allocPort(NUMSLAVEPORTS);
+            portbase = queryJobChannel().allocPort(NUMSLAVEPORTS);
             ActPrintLog("SortJoinSlaveActivity::init portbase = %d, mpTagRPC=%d",portbase,(int)mpTagRPC);
             server.setLocalHost(portbase); 
             sorter.setown(CreateThorSorter(this, server,&container.queryJob().queryIDiskUsage(),&queryJobChannel().queryJobComm(),mpTagRPC));
@@ -408,7 +408,7 @@ public:
         rightInput.clear();
         if (portbase)
         {
-            freePort(portbase, NUMSLAVEPORTS);
+            queryJobChannel().freePort(portbase, NUMSLAVEPORTS);
             portbase = 0;
         }
         CSlaveActivity::kill();

+ 1 - 13
thorlcr/activities/keyedjoin/thkeyedjoinslave-legacy.cpp

@@ -550,7 +550,7 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem
     unsigned currentMatchIdx, currentJoinGroupSize, currentAdded, currentMatched;
     Owned<CJoinGroup> djg, doneJG;
     OwnedConstThorRow defaultRight;
-    unsigned portbase, node;
+    unsigned node;
     IArrayOf<IDelayedFile> fetchFiles;
     FPosTableEntry *localFPosToNodeMap; // maps fpos->local part #
     FPosTableEntry *globalFPosToNodeMap; // maps fpos->node for all parts of file. If file is remote, localFPosToNodeMap will have all parts
@@ -1661,7 +1661,6 @@ public:
         tlkKeySet.setown(createKeyIndexSet());
         pool = NULL;
         currentMatchIdx = currentJoinGroupSize = currentAdded = currentMatched = 0;
-        portbase = 0;
         pendingGroups = 0;
         superWidth = 0;
         additionalStats = 0;
@@ -1693,8 +1692,6 @@ public:
         }
         ::Release(fetchHandler);
         ::Release(inputHelper);
-        if (portbase)
-            freePort(portbase, NUMSLAVEPORTS*3);
         ::Release(resultDistStream);
         defaultRight.clear();
         if (pool) delete pool;
@@ -2079,15 +2076,6 @@ public:
             resultDistStream = new CKeyLocalLookup(*this, helper->queryIndexRecordSize()->queryRecordAccessor(true));
         }
     }
-    virtual void kill() override
-    {
-        if (portbase)
-        {
-            freePort(portbase, NUMSLAVEPORTS);
-            portbase = 0;
-        }
-        PARENT::kill();
-    }
     virtual void abort() override
     {
         PARENT::abort();

+ 3 - 3
thorlcr/activities/msort/thmsortslave.cpp

@@ -66,14 +66,14 @@ public:
     ~MSortSlaveActivity()
     {
         if (portbase) 
-            freePort(portbase,NUMSLAVEPORTS);
+            queryJobChannel().freePort(portbase,NUMSLAVEPORTS);
     }
     virtual void init(MemoryBuffer &data, MemoryBuffer &slaveData) override
     {
         mpTagRPC = container.queryJobChannel().deserializeMPTag(data);
         mptag_t barrierTag = container.queryJobChannel().deserializeMPTag(data);
         barrier.setown(container.queryJobChannel().createBarrier(barrierTag));
-        portbase = allocPort(NUMSLAVEPORTS);
+        portbase = queryJobChannel().allocPort(NUMSLAVEPORTS);
         ActPrintLog("MSortSlaveActivity::init portbase = %d, mpTagRPC = %d",portbase,(int)mpTagRPC);
         server.setLocalHost(portbase); 
         helper = (IHThorSortArg *)queryHelper();
@@ -183,7 +183,7 @@ public:
         }
         if (portbase)
         {
-            freePort(portbase, NUMSLAVEPORTS);
+            queryJobChannel().freePort(portbase, NUMSLAVEPORTS);
             portbase = 0;
         }
         PARENT::kill();

+ 3 - 3
thorlcr/activities/selfjoin/thselfjoinslave.cpp

@@ -109,7 +109,7 @@ public:
     ~SelfJoinSlaveActivity()
     {
         if(portbase) 
-            freePort(portbase,NUMSLAVEPORTS);
+            queryJobChannel().freePort(portbase, NUMSLAVEPORTS);
     }
 
 // IThorSlaveActivity
@@ -120,7 +120,7 @@ public:
             mpTagRPC = container.queryJobChannel().deserializeMPTag(data);
             mptag_t barrierTag = container.queryJobChannel().deserializeMPTag(data);
             barrier.setown(container.queryJobChannel().createBarrier(barrierTag));
-            portbase = allocPort(NUMSLAVEPORTS);
+            portbase = queryJobChannel().allocPort(NUMSLAVEPORTS);
             server.setLocalHost(portbase);
             sorter.setown(CreateThorSorter(this, server,&container.queryJob().queryIDiskUsage(),&queryJobChannel().queryJobComm(),mpTagRPC));
             server.serialize(slaveData);
@@ -146,7 +146,7 @@ public:
         sorter.clear();
         if (portbase)
         {
-            freePort(portbase, NUMSLAVEPORTS);
+            queryJobChannel().freePort(portbase, NUMSLAVEPORTS);
             portbase = 0;
         }
         PARENT::kill();

+ 48 - 0
thorlcr/graph/thgraph.cpp

@@ -27,6 +27,7 @@
 #include "thmem.hpp"
 #include "rtlformat.hpp"
 #include "thorsoapcall.hpp"
+#include "thorport.hpp"
 
 
 PointerArray createFuncs;
@@ -2934,6 +2935,8 @@ CJobChannel::CJobChannel(CJobBase &_job, IMPServer *_mpServer, unsigned _channel
     jobComm.setown(mpServer->createCommunicator(&job.queryJobGroup()));
     myrank = job.queryJobGroup().rank(queryMyNode());
     graphExecutor.setown(new CGraphExecutor(*this));
+    myBasePort = mpServer->queryMyNode()->endpoint().port;
+    portMap.setown(createBitSet());
 }
 
 CJobChannel::~CJobChannel()
@@ -3053,6 +3056,51 @@ IThorResult *CJobChannel::getOwnedResult(graph_id gid, activity_id ownerId, unsi
     return result.getClear();
 }
 
+unsigned short CJobChannel::allocPort(unsigned num)
+{
+    CriticalBlock block(portAllocCrit);
+    if (num==0)
+        num = 1;
+    unsigned sp=0;
+    unsigned p;
+    for (;;)
+    {
+        p = portMap->scan(sp, false);
+        unsigned q;
+        for (q=p+1; q<p+num; q++)
+        {
+            if (portMap->test(q))
+                break;
+        }
+        if (q == p+num)
+        {
+            while (q != p)
+                portMap->set(--q);
+            break;
+        }
+        sp = p+1;
+    }
+
+    return (unsigned short)(p+queryMyBasePort());
+}
+
+void CJobChannel::freePort(unsigned short p, unsigned num)
+{
+    CriticalBlock block(portAllocCrit);
+    if (!p)
+        return;
+    if (num == 0)
+        num = 1;
+    while (num--) 
+        portMap->set(p-queryMyBasePort()+num, false);
+}
+
+void CJobChannel::reservePortKind(ThorPortKind kind)
+{
+    CriticalBlock block(portAllocCrit);
+    portMap->set(getPortOffset(kind), true);
+}
+
 void CJobChannel::abort(IException *e)
 {
     aborted = true;

+ 9 - 1
thorlcr/graph/thgraph.hpp

@@ -47,6 +47,7 @@
 #include "workunit.hpp"
 #include "thorcommon.hpp"
 #include "thmem.hpp"
+#include "thorport.hpp"
 
 #include "thor.hpp"
 #include "eclhelper.hpp"
@@ -860,7 +861,7 @@ public:
 
     inline bool queryUsePackedAllocators() const { return usePackedAllocator; }
     unsigned queryMaxLfnBlockTimeMins() const { return maxLfnBlockTimeMins; }
-    virtual void addChannel(IMPServer *mpServer) = 0;
+    virtual CJobChannel *addChannel(IMPServer *mpServer) = 0;
     CJobChannel &queryJobChannel(unsigned c) const;
     CActivityBase &queryChannelActivity(unsigned c, graph_id gid, activity_id id) const;
     unsigned queryChannelsPerSlave() const { return channelsPerSlave; }
@@ -970,6 +971,9 @@ protected:
     Owned<CThorCodeContextBase> sharedMemCodeCtx;
     unsigned channel;
     bool cleaned = false;
+    unsigned myBasePort = 0;
+    CriticalSection portAllocCrit;
+    Owned<IBitSet> portMap;
 
     void removeAssociates(CGraphBase &graph)
     {
@@ -1025,9 +1029,13 @@ public:
     ICommunicator &queryJobComm() const { return *jobComm; }
     IMPServer &queryMPServer() const { return *mpServer; }
     const rank_t &queryMyRank() const { return myrank; }
+    unsigned queryMyBasePort() const { return myBasePort; }
     mptag_t deserializeMPTag(MemoryBuffer &mb);
     IEngineRowAllocator *getRowAllocator(IOutputMetaData * meta, activity_id activityId, roxiemem::RoxieHeapFlags flags=roxiemem::RHFnone) const;
     roxiemem::IRowManager *queryRowManager() const;
+    unsigned short allocPort(unsigned num);
+    void freePort(unsigned short p, unsigned num);
+    void reservePortKind(ThorPortKind type);
 
     virtual void abort(IException *e);
     virtual IBarrier *createBarrier(mptag_t tag) { UNIMPLEMENTED; return NULL; }

+ 9 - 3
thorlcr/graph/thgraphmaster.cpp

@@ -1349,7 +1349,11 @@ CJobMaster::CJobMaster(IConstWorkUnit &_workunit, const char *graphName, ILoaded
     }
     sharedAllocator.setown(::createThorAllocator(globalMemoryMB, 0, 1, memorySpillAtPercentage, *logctx, crcChecking, usePackedAllocator));
     Owned<IMPServer> mpServer = getMPServer();
-    addChannel(mpServer);
+    CJobChannel *channel = addChannel(mpServer);
+    channel->reservePortKind(TPORT_mp); 
+    channel->reservePortKind(TPORT_watchdog);
+    channel->reservePortKind(TPORT_debug);
+
     slavemptag = allocateMPTag();
     slaveMsgHandler.setown(new CSlaveMessageHandler(*this, slavemptag));
     tmpHandler.setown(createTempHandler(true));
@@ -1367,9 +1371,11 @@ void CJobMaster::endJob()
     PARENT::endJob();
 }
 
-void CJobMaster::addChannel(IMPServer *mpServer)
+CJobChannel *CJobMaster::addChannel(IMPServer *mpServer)
 {
-    jobChannels.append(*new CJobMasterChannel(*this, mpServer, jobChannels.ordinality()));
+    CJobChannel *channel = new CJobMasterChannel(*this, mpServer, jobChannels.ordinality());
+    jobChannels.append(*channel);
+    return channel;
 }
 
 

+ 1 - 1
thorlcr/graph/thgraphmaster.ipp

@@ -215,7 +215,7 @@ public:
     CJobMaster(IConstWorkUnit &workunit, const char *_graphName, ILoadedDllEntry *querySo, bool _sendSo, const SocketEndpoint &_agentEp);
     virtual void endJob() override;
 
-    virtual void addChannel(IMPServer *mpServer);
+    virtual CJobChannel *addChannel(IMPServer *mpServer) override;
 
     void registerFile(const char *logicalName, StringArray &clusters, unsigned usageCount=0, WUFileKind fileKind=WUFileStandard, bool temp=false);
     void deregisterFile(const char *logicalName, bool kept=false);

+ 2 - 1
thorlcr/graph/thgraphslave.cpp

@@ -1694,7 +1694,7 @@ CJobSlave::CJobSlave(ISlaveWatchdog *_watchdog, IPropertyTree *_workUnitInfo, co
         actInitWaitTimeMins = queryMaxLfnBlockTimeMins()+1;
 }
 
-void CJobSlave::addChannel(IMPServer *mpServer)
+CJobChannel *CJobSlave::addChannel(IMPServer *mpServer)
 {
     unsigned nextChannelNum = jobChannels.ordinality();
     CJobSlaveChannel *channel = new CJobSlaveChannel(*this, mpServer, nextChannelNum);
@@ -1702,6 +1702,7 @@ void CJobSlave::addChannel(IMPServer *mpServer)
     unsigned slaveNum = channel->queryMyRank();
     jobChannelSlaveNumbers[nextChannelNum] = slaveNum;
     jobSlaveChannelNum[slaveNum-1] = nextChannelNum;
+    return channel;
 }
 
 void CJobSlave::startJob()

+ 1 - 1
thorlcr/graph/thgraphslave.hpp

@@ -483,7 +483,7 @@ public:
 
     CJobSlave(ISlaveWatchdog *_watchdog, IPropertyTree *workUnitInfo, const char *graphName, ILoadedDllEntry *querySo, mptag_t _slavemptag);
 
-    virtual void addChannel(IMPServer *mpServer);
+    virtual CJobChannel *addChannel(IMPServer *mpServer) override;
     virtual void startJob() override;
     virtual void endJob() override;
     const char *queryFindString() const { return key.get(); } // for string HT

+ 4 - 1
thorlcr/slave/slavmain.cpp

@@ -1843,7 +1843,10 @@ public:
                         Owned<CJobSlave> job = new CJobSlave(watchdog, workUnitInfo, graphName, querySo, slaveMsgTag);
                         job->setXGMML(deps);
                         for (unsigned sc=0; sc<channelsPerSlave; sc++)
-                            job->addChannel(&mpServers.item(sc));
+                        {
+                            CJobChannel *channel = job->addChannel(&mpServers.item(sc));
+                            channel->reservePortKind(TPORT_mp);
+                        }
                         jobs.replace(*job.getLink());
                         job->startJob();
 

+ 18 - 66
thorlcr/thorutil/thorport.cpp

@@ -35,27 +35,11 @@
 #include "portlist.h"
 #include "thorport.hpp"
 
+// NB: these are offsets from slave/channel start port
 #define MPPORT       0
 #define WATCHDOGPORT 1
 #define DEBUGPORT 2
 
-static CriticalSection *portallocsection;
-static IBitSet *portmap;
-MODULE_INIT(INIT_PRIORITY_STANDARD)
-{
-    portallocsection = new CriticalSection;
-    portmap = createThreadSafeBitSet();
-    portmap->set(MPPORT, true);
-    portmap->set(WATCHDOGPORT, true);
-    portmap->set(DEBUGPORT, true);
-    return true;
-}
-MODULE_EXIT()
-{
-    portmap->Release();
-    delete portallocsection;
-}
-
 static unsigned short masterportbase=0;
 static unsigned short machineportbase=0;
 
@@ -74,56 +58,9 @@ unsigned short getExternalFixedPort(unsigned short masterBase, unsigned short ma
 {
     if (!masterBase) masterBase = THOR_BASE_PORT;
     if (!machineBase) machineBase = THOR_BASESLAVE_PORT;
-    switch (category) {
-    case TPORT_watchdog:
-        return machineBase+WATCHDOGPORT;
-    case TPORT_mp:
-        return machineBase+MPPORT; 
-    case TPORT_debug:
-        return machineBase+DEBUGPORT;
-    }
-    LOG(MCerror,unknownJob,"getFixedPort: Unknown Port Kind!");
-    return 0;
-}
-
-unsigned short allocPort(unsigned num)
-{
-    CriticalBlock proc(*portallocsection);
-    if (num==0)
-        num = 1;
-    unsigned sp=0;
-    unsigned p;
-    for (;;) {
-        p = portmap->scan(sp,false);
-        unsigned q;
-        for (q=p+1;q<p+num;q++) {
-            if (portmap->test(q))
-                break;
-        }
-        if (q==p+num) {
-            while (q!=p)
-                portmap->set(--q);
-            break;
-        }
-        sp=p+1;
-    }
-
-    return (unsigned short)(p+machineportbase);
-}
-
-void freePort(unsigned short p,unsigned num)
-{
-    CriticalBlock proc(*portallocsection);
-    if (!p)
-        return;
-    if (!portmap) 
-        return;
-    if (num==0)
-        num = 1;
-    while (num--) 
-        portmap->set(p-machineportbase+num,false);
+    return machineBase + getPortOffset(category);
 }
-        
+  
 void setMachinePortBase(unsigned short base)
 {
     machineportbase = base?base:THOR_BASESLAVE_PORT;
@@ -143,3 +80,18 @@ unsigned short getMachinePortBase()
 {
     return machineportbase?machineportbase:THOR_BASESLAVE_PORT;
 }
+
+unsigned getPortOffset(ThorPortKind category)
+{
+    switch (category)
+    {
+        case TPORT_watchdog:
+            return WATCHDOGPORT;
+        case TPORT_mp:
+            return MPPORT; 
+        case TPORT_debug:
+            return DEBUGPORT;
+        default:
+            throwUnexpected();
+    }
+}

+ 2 - 32
thorlcr/thorutil/thorport.hpp

@@ -35,44 +35,14 @@ enum ThorPortKind
     TPORT_debug
 };
 
+// NB: these helpers are all based on the slave or master base port and do not relate to channels
 graph_decl unsigned short getFixedPort(ThorPortKind category);
 graph_decl unsigned short getFixedPort(unsigned short base, ThorPortKind category);
 graph_decl unsigned short getExternalFixedPort(unsigned short masterbase, unsigned short machinebase, ThorPortKind category);
-graph_decl unsigned short allocPort(unsigned num=1);
-graph_decl void           freePort(unsigned short,unsigned num=1);
 graph_decl void           setMachinePortBase(unsigned short base);
 graph_decl void           setMasterPortBase(unsigned short base);
 graph_decl unsigned short         getMasterPortBase();
 graph_decl unsigned short         getMachinePortBase();
-
-typedef UnsignedShortArray PortArray;
-
-class CPortGroup
-{
-public:
-    unsigned short allocPort(unsigned n=1)
-    {
-        unsigned short p=::allocPort(n);
-        while (n--)
-            portsinuse.append(p+n);
-        return p;
-    }
-    void freePort(unsigned short p,unsigned n=1)
-    {
-        unsigned i;
-        for (i=0;i<n;i++)
-            portsinuse.zap(p+i);
-        ::freePort(p,n);
-    }
-    virtual ~CPortGroup()
-    {
-        ForEachItemIn(i,portsinuse) {
-            freePort(portsinuse.item(i));
-        }
-    }
-protected:
-    PortArray portsinuse;
-};
-
+graph_decl unsigned getPortOffset(ThorPortKind category);
 
 #endif