瀏覽代碼

Merge branch 'candidate-7.12.x'

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 4 年之前
父節點
當前提交
fbac153160

+ 4 - 0
dali/base/dadfs.cpp

@@ -3421,6 +3421,10 @@ protected:
             minSkew = (unsigned)(10000.0 * ((avgPartSz-(double)minPartSz)/avgPartSz));
         }
 
+        // +1 because published part number references are 1 based.
+        maxSkewPart++;
+        minSkewPart++;
+
         return true;
     }
 

+ 3 - 3
esp/src/eclwatch/TargetClustersQueryWidget.js

@@ -86,7 +86,7 @@ define([
                 title: this.i18n.TargetClustersLegacy,
                 style: "border: 0; width: 100%; height: 100%"
             });
-            this.legacyTargetClustersIframeWidget.placeAt(this._tabContainer, "last");
+            this.legacyTargetClustersIframeWidget.placeAt(this._tabContainer, "first");
             this.machineFilter.disable();
         },
 
@@ -107,7 +107,7 @@ define([
                         renderHeaderCell: function (node) {
                             node.innerHTML = Utility.getImageHTML("configuration.png", context.i18n.Configuration);
                         },
-                        width: 8,
+                        width: 12,
                         sortable: false,
                         formatter: function (configuration) {
                             if (configuration === true) {
@@ -121,7 +121,7 @@ define([
                         renderHeaderCell: function (node) {
                             node.innerHTML = Utility.getImageHTML("server.png", context.i18n.Dali);
                         },
-                        width: 8,
+                        width: 12,
                         sortable: false,
                         formatter: function (dali) {
                             if (dali === true) {

+ 95 - 30
roxie/udplib/udpmsgpk.cpp

@@ -60,13 +60,21 @@ class PackageSequencer : public CInterface, implements IInterface
 {
     DataBuffer *firstPacket;
     DataBuffer *lastContiguousPacket;
+    DataBuffer *tail = nullptr;
     unsigned metaSize;
     unsigned headerSize;
     const void *header;
+#ifdef _DEBUG
+    unsigned numPackets = 0;
+    unsigned maxSeqSeen = 0;
+    unsigned scans = 0;
+    unsigned overscans = 0;
+#endif
 
     MemoryBuffer metadata;
     InterruptableSemaphore dataAvailable; // MORE - need to work out when to interrupt it!
 
+
 public:
     IMPLEMENT_IINTERFACE;
 
@@ -122,7 +130,11 @@ public:
         bool res = false;
         assert(dataBuff->msgNext == NULL);
         UdpPacketHeader *pktHdr = (UdpPacketHeader*) dataBuff->data;
-
+        unsigned pktseq = pktHdr->pktSeq;
+#ifdef _DEBUG
+        if ((pktseq & UDP_PACKET_SEQUENCE_MASK) > maxSeqSeen)
+            maxSeqSeen = pktseq & UDP_PACKET_SEQUENCE_MASK;
+#endif
         if (checkTraceLevel(TRACE_MSGPACK, 5))
         {
             StringBuffer s;
@@ -130,46 +142,74 @@ public:
                     pktHdr->ruid, pktHdr->msgId, pktHdr->msgSeq, pktHdr->pktSeq, pktHdr->node.getTraceText(s).str(), dataBuff, this);
         }
 
+        // Optimize the (very) common case where I need to add to the end
         DataBuffer *finger;
         DataBuffer *prev;
-        if (lastContiguousPacket)
+        if (tail && (pktseq > ((UdpPacketHeader*) tail->data)->pktSeq))
         {
-            UdpPacketHeader *oldHdr = (UdpPacketHeader*) lastContiguousPacket->data;
-            if (pktHdr->pktSeq <= oldHdr->pktSeq)
-            {
-                // discard duplicated incoming packet - should be uncommon unless we requested a resend for a packet already in flight
-                if (checkTraceLevel(TRACE_MSGPACK, 5))
-                    DBGLOG("UdpCollator: Discarding duplicate incoming packet");
-                dataBuff->Release();
-                return false;
-            }
-            finger = lastContiguousPacket->msgNext;
-            prev = lastContiguousPacket;
+            assert(tail->msgNext==nullptr);
+            finger = nullptr;
+            prev = tail;
+            tail = dataBuff;
         }
         else
         {
-            finger = firstPacket; 
-            prev = NULL;
-        }
-        while (finger)
-        {
-            UdpPacketHeader *oldHdr = (UdpPacketHeader*) finger->data;
-            if (pktHdr->pktSeq == oldHdr->pktSeq)
+            // This is an insertion sort - YUK!
+            if (lastContiguousPacket)
             {
-                // discard duplicated incoming packet - should be uncommon unless we requested a resend for a packet already in flight
-                if (checkTraceLevel(TRACE_MSGPACK, 5))
-                    DBGLOG("UdpCollator: Discarding duplicate incoming packet");
-                dataBuff->Release();
-                return false;
+                UdpPacketHeader *oldHdr = (UdpPacketHeader*) lastContiguousPacket->data;
+                if (pktHdr->pktSeq <= oldHdr->pktSeq)
+                {
+                    // discard duplicated incoming packet - should be uncommon unless we requested a resend for a packet already in flight
+                    if (checkTraceLevel(TRACE_MSGPACK, 5))
+                        DBGLOG("UdpCollator: Discarding duplicate incoming packet");
+                    dataBuff->Release();
+                    return false;
+                }
+                finger = lastContiguousPacket->msgNext;
+                prev = lastContiguousPacket;
             }
-            else if (pktHdr->pktSeq < oldHdr->pktSeq)
+            else
             {
-                break;
+                finger = firstPacket;
+                prev = NULL;
             }
-            else
+            while (finger)
             {
-                prev = finger;
-                finger = finger->msgNext;
+    #ifdef _DEBUG
+                scans++;
+                if (scans==1000000)
+                {
+                    overscans++;
+                    DBGLOG("%u million scans in UdpCollator insert(DataBuffer *dataBuff", overscans);
+                    if (lastContiguousPacket)
+                    {
+                        UdpPacketHeader *oldHdr = (UdpPacketHeader*) lastContiguousPacket->data;
+                        DBGLOG("lastContiguousPacket is at %u , last packet seen is %u", oldHdr->pktSeq & UDP_PACKET_SEQUENCE_MASK, pktHdr->pktSeq & UDP_PACKET_SEQUENCE_MASK);
+                    }
+                    else
+                        DBGLOG("lastContiguousPacket is NULL , last packet seen is %u", pktHdr->pktSeq & UDP_PACKET_SEQUENCE_MASK);
+                    scans = 0;
+                }
+    #endif
+                UdpPacketHeader *oldHdr = (UdpPacketHeader*) finger->data;
+                if (pktHdr->pktSeq == oldHdr->pktSeq)
+                {
+                    // discard duplicated incoming packet - should be uncommon unless we requested a resend for a packet already in flight
+                    if (checkTraceLevel(TRACE_MSGPACK, 5))
+                        DBGLOG("UdpCollator: Discarding duplicate incoming packet");
+                    dataBuff->Release();
+                    return false;
+                }
+                else if (pktHdr->pktSeq < oldHdr->pktSeq)
+                {
+                    break;
+                }
+                else
+                {
+                    prev = finger;
+                    finger = finger->msgNext;
+                }
             }
         }
         if (prev)
@@ -178,8 +218,15 @@ public:
             prev->msgNext = dataBuff;
         }
         else
+        {
             firstPacket = dataBuff;
+            if (!tail)
+                tail = dataBuff;
+        }
         dataBuff->msgNext = finger;
+#ifdef _DEBUG
+        numPackets++;
+#endif
         if (prev == lastContiguousPacket)
         {
             unsigned prevseq;
@@ -250,6 +297,12 @@ public:
         return headerSize;
     }
 
+#ifdef _DEBUG
+    void dump()
+    {
+        DBGLOG("Contains %u packets, lastSeq = %u", numPackets, maxSeqSeen);
+    }
+#endif
 };
 
 // MessageResult ====================================================================================
@@ -562,6 +615,18 @@ IMessageResult *CMessageCollator::getNextResult(unsigned time_out, bool &anyActi
     activity = false;
     if (!anyActivity && ruid>=RUID_FIRST && checkTraceLevel(TRACE_MSGPACK, 1)) // suppress the tracing for pings where we expect the timeout...
     {
+#ifdef _DEBUG
+        DBGLOG("GetNextResult timeout: mapping has %d partial results", mapping.ordinality());
+        HashIterator h(mapping);
+        ForEach(h)
+        {
+            auto *r = mapping.mapToValue(&h.query());
+            PUID puid = *(PUID *) h.query().getKey();
+            DBGLOG("puid=%" I64F "x:", puid);
+            PackageSequencer *pkSqncr = mapping.getValue(puid);
+            pkSqncr->dump();
+        }
+#endif
         DBGLOG("UdpCollator: CMessageCollator::GetNextResult timeout");
     }
     return 0;

+ 14 - 9
roxie/udplib/udptrr.cpp

@@ -177,7 +177,7 @@ class CReceiveManager : implements IReceiveManager, public CInterface
                 if (udpTraceLevel > 1)
                 {
                     StringBuffer ipStr;
-                    DBGLOG("UdpReceiver: sending ok_to_send %d msg to node=%s", maxTransfer, returnAddress.getIpText(ipStr).str());
+                    DBGLOG("UdpReceiver: sending ok_to_send %d msg to node=%s", maxTransfer, dest.getIpText(ipStr).str());
                 }
                 flowSocket->write(&msg, sizeof(UdpPermitToSendMsg));
             }
@@ -354,6 +354,7 @@ class CReceiveManager : implements IReceiveManager, public CInterface
             unsigned timeout = ((max_transfer * DATA_PAYLOAD) / 100) + 10; // in ms assuming mtu package size with 100x margin on 100 Mbit network // MORE - hideous!
             currentRequester = requester;
             requester->requestToSend(max_transfer, myNode.getIpAddress());
+            assert(timeout >= 10 && timeout <= 20000);
             return timeout;
         }
 
@@ -375,6 +376,8 @@ class CReceiveManager : implements IReceiveManager, public CInterface
         unsigned timedOut(UdpSenderEntry *requester)
         {
             // MORE - this will retry indefinitely if agent in question is dead
+            // As coded, this rescinds the permission to send that just timed out and tells the next person to have a go
+            // Thus leading to "Received completed message is not from current sender" messages the the send was in flight
             currentRequester = nullptr;
             if (requester->retryOnTimeout())
                 enqueueRequest(requester);
@@ -490,7 +493,7 @@ class CReceiveManager : implements IReceiveManager, public CInterface
                     else
                     {
                         flow_socket->readtms(&msg, l, l, res, timeoutExpires-now);
-                        now = msTick();
+                        unsigned newTimeout = 0;
                         assert(res==l);
                         if (udpTraceLevel > 5)
                         {
@@ -502,17 +505,17 @@ class CReceiveManager : implements IReceiveManager, public CInterface
                         {
                         case flowType::request_to_send:
                             if (pendingRequests || currentRequester)
-                                enqueueRequest(sender);   // timeoutExpires does not change - there's still an active request
+                                enqueueRequest(sender);   // timeoutExpires does not change - there's still an active request. We have not given a new permission
                             else
-                                timeoutExpires = now + okToSend(sender);
+                                newTimeout = okToSend(sender);
                             break;
 
                         case flowType::send_completed:
                             parent.inflight += msg.packets;
-                            if (noteDone(sender) && pendingRequests)
-                                timeoutExpires = now + sendNextOk();
+                            if (noteDone(sender) && pendingRequests)   // This && looks wrong - noteDone returning false should mean we haven't seen the completed we wanted - so current timeout still applies. Or the one below is wrong...
+                                newTimeout = sendNextOk();
                             else
-                                timeoutExpires = now + 5000;
+                                newTimeout = 5000;
                             break;
 
                         case flowType::request_to_send_more:
@@ -522,16 +525,18 @@ class CReceiveManager : implements IReceiveManager, public CInterface
                                 if (pendingRequests)
                                 {
                                     enqueueRequest(sender);
-                                    timeoutExpires = now + sendNextOk();
+                                    newTimeout = sendNextOk();
                                 }
                                 else
-                                    timeoutExpires = now + okToSend(sender);
+                                    newTimeout = okToSend(sender);
                             }
                             break;
 
                         default:
                             DBGLOG("UdpReceiver: received unrecognized flow control message cmd=%i", msg.cmd);
                         }
+                        if (newTimeout)
+                            timeoutExpires = msTick() + newTimeout;
                     }
                 }
                 catch (IException *e)

+ 34 - 13
thorlcr/activities/keyedjoin/thkeyedjoin.cpp

@@ -39,12 +39,15 @@ class CKeyedJoinMaster : public CMasterActivity
     bool local = false;
     bool remoteKeyedLookup = false;
     bool remoteKeyedFetch = false;
+    bool assumePrimary = false;
     unsigned totalIndexParts = 0;
 
     // CMap contains mappings and lists of parts for each slave
     class CMap
     {
+        CKeyedJoinMaster &activity;
     public:
+        CMap(CKeyedJoinMaster &_activity) : activity(_activity) {}
         std::vector<unsigned> allParts;
         std::vector<std::vector<unsigned>> slavePartMap; // vector of slave parts (IPartDescriptor's slavePartMap[<slave>] serialized to each slave)
         std::vector<unsigned> partToSlave; // vector mapping part index to slave (sent to all slaves)
@@ -146,21 +149,37 @@ class CKeyedJoinMaster : public CMasterActivity
                          * Add them to local parts list if found.
                          */
                         unsigned mappedPos = NotFound;
-                        for (unsigned c=0; c<part->numCopies(); c++)
+                        unsigned copies = part->numCopies();
+                        bool filePartExists = false;
+                        if (activity.assumePrimary)
+                        {
+                            /* If the index is big (e.g. large super-index), then it can be expensive
+                             * to walk over all part copies, checking their existence.
+                             * This option provides a workaround in those cases, to avoid that check,
+                             * by assuming the primary copy will exist and be used.
+                             */
+                            copies = 1;
+                            filePartExists = true;
+                        }
+                        for (unsigned c = 0; c < copies; c++)
                         {
                             INode *partNode = part->queryNode(c);
                             unsigned partCopy = p | (c << partBits);
                             unsigned start=nextGroupStartPos;
                             unsigned gn=start;
-                            do
+                            if (!activity.assumePrimary)
                             {
-                                INode &groupNode = dfsGroup.queryNode(gn);
-                                if ((partNode->equals(&groupNode)))
+                                RemoteFilename rfn;
+                                part->getFilename(c, rfn);
+                                Owned<IFile> file = createIFile(rfn);
+                                filePartExists = file->exists(); // skip if copy doesn't exist
+                            }
+                            if (filePartExists)
+                            {
+                                do
                                 {
-                                    RemoteFilename rfn;
-                                    part->getFilename(c, rfn);
-                                    Owned<IFile> file = createIFile(rfn);
-                                    if (file->exists()) // skip if copy doesn't exist
+                                    INode &groupNode = dfsGroup.queryNode(gn);
+                                    if (partNode->equals(&groupNode))
                                     {
                                         /* NB: If there's >1 slave per node (e.g. slavesPerNode>1) then there are multiple matching node's in the dfsGroup
                                         * Which means a copy of a part may already be assigned to a cluster slave map. This check avoid handling it again if it has.
@@ -195,12 +214,12 @@ class CKeyedJoinMaster : public CMasterActivity
                                                 slaveParts.push_back(partCopy);
                                         }
                                     }
+                                    gn++;
+                                    if (gn == groupSize)
+                                        gn = 0;
                                 }
-                                gn++;
-                                if (gn == groupSize)
-                                    gn = 0;
+                                while (gn != start);
                             }
-                            while (gn != start);
                         }
                         if (NotFound == mappedPos)
                         {
@@ -247,7 +266,7 @@ class CKeyedJoinMaster : public CMasterActivity
 
 
 public:
-    CKeyedJoinMaster(CMasterGraphElement *info) : CMasterActivity(info, keyedJoinActivityStatistics)
+    CKeyedJoinMaster(CMasterGraphElement *info) : CMasterActivity(info, keyedJoinActivityStatistics), indexMap(*this), dataMap(*this)
     {
         helper = (IHThorKeyedJoinArg *) queryHelper();
         reInit = 0 != (helper->getFetchFlags() & (FFvarfilename|FFdynamicfilename)) || (helper->getJoinFlags() & JFvarindexfilename);
@@ -259,6 +278,8 @@ public:
         if (getOptBool(THOROPT_FORCE_REMOTE_KEYED_FETCH))
             remoteKeyedFetch = true;
 
+        assumePrimary = getOptBool(THOROPT_KJ_ASSUME_PRIMARY);
+
         if (helper->diskAccessRequired())
             numTags += 2;
         for (unsigned t=0; t<numTags; t++)

+ 1 - 0
thorlcr/thorutil/thormisc.hpp

@@ -103,6 +103,7 @@
 #define THOROPT_MAXLFN_BLOCKTIME_MINS "maxLfnBlockTimeMins"     // max time permitted to be blocked on a DFS logical file operation.
 #define THOROPT_VALIDATE_FILE_TYPE    "validateFileType"        // validate file type compatibility, e.g. if on fire error if XML reading CSV    (default = true)
 #define THOROPT_MIN_REMOTE_CQ_INDEX_SIZE_MB "minRemoteCQIndexSizeMb" // minimum size of index file to enable server side handling                (default = 0, meaning use heuristic to determin)
+#define THOROPT_KJ_ASSUME_PRIMARY "keyedJoinAssumePrimary"      // assume primary part exists (don't check when mapping, which can be slow)
 
 #define INITIAL_SELFJOIN_MATCH_WARNING_LEVEL 20000  // max of row matches before selfjoin emits warning