Browse Source

HPCC-25453 Roxie does not handle oversize continuation data well

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 4 years ago
parent
commit
31e81ddc90

+ 2 - 0
roxie/ccd/ccd.hpp

@@ -406,6 +406,8 @@ extern unsigned defaultHeapFlags;
 extern bool defaultCheckingHeap;
 extern bool defaultDisableLocalOptimizations;
 
+extern unsigned continuationCompressThreshold;
+
 extern unsigned agentQueryReleaseDelaySeconds;
 extern unsigned coresPerQuery;
 

+ 111 - 34
roxie/ccd/ccdactivities.cpp

@@ -52,7 +52,8 @@ using roxiemem::OwnedConstRoxieRow;
 using roxiemem::OwnedRoxieString;
 using roxiemem::IRowManager;
 
-#define maxContinuationSize 48000 // note - must fit in the 2-byte length field... but also needs to be possible to send back from Roxie server->agent in one packet
+#define continuationErrorThreshold 48000
+#define continuationWarnThreshold 32768
 
 size32_t serializeRow(IOutputRowSerializer * serializer, IMessagePacker *output, const void *unserialized)
 {
@@ -1327,7 +1328,7 @@ public:
         if (!aborted)
         {
             MemoryBuffer si;
-            unsigned short siLen = 0;
+            unsigned siLen = 0;
             si.append(siLen);
             si.append(processed);
             si.append(reader->isKeyed());
@@ -1424,7 +1425,7 @@ public:
                     if (totalSizeSent > indexReadChunkSize)
                     {
                         MemoryBuffer si;
-                        unsigned short siLen = 0;
+                        unsigned siLen = 0;
                         si.append(siLen);
                         si.append(processed);
                         si.append(reader->isKeyed());
@@ -1664,7 +1665,7 @@ public:
             if (totalSizeSent > indexReadChunkSize)
             {
                 MemoryBuffer si;
-                unsigned short siLen = 0;
+                unsigned siLen = 0;
                 si.append(siLen);
                 si.append(processed);
                 si.append(reader->isKeyed());
@@ -2661,29 +2662,61 @@ public:
     bool sendContinuation(IMessagePacker * output)
     {
         MemoryBuffer si;
-        unsigned short siLen = 0;
+        unsigned siLen = 0;
         si.append(siLen);
-        si.append(lastRowCompleteMatch);
+        si.append(false);
+        si.append(lastRowCompleteMatch);  // Note - this flag is also read on server side in isCompleteMatchFlag so (a) be careful about moving it and (b) don't compress it
         si.append(inputsDone);
         si.append(processed);
         si.append(keyprocessed);
         si.append(lastPartNo.partNo);
         si.append(lastPartNo.fileNo);
         tlk->serializeCursorPos(si);
-        if (si.length() <= maxContinuationSize)
+        if (si.length() <= continuationCompressThreshold || (continuationCompressThreshold==0 && si.length() <= continuationErrorThreshold))
         {
             siLen = si.length() - sizeof(siLen);
             si.writeDirect(0, sizeof(siLen), &siLen);
             output->sendMetaInfo(si.toByteArray(), si.length());
             return true;
         }
+        else if (continuationCompressThreshold)
+        {
+            MemoryBuffer compressed;
+            compressed.append(siLen);  // Leaving space to patch when size known
+            compressed.append(true);
+            compressed.append(lastRowCompleteMatch);  // This field is not compressed - see above!
+            compressToBuffer(compressed, si.length() - compressed.length(), si.toByteArray() + compressed.length());
+            bool report = logctx.queryTraceLevel() && (logctx.queryTraceLevel() > 3 || si.length() >= continuationWarnThreshold);
+            if (report)
+                DBGLOG("ERROR: continuation data size %u for %d cursor positions is large - compressed to %u", si.length(), tlk->numActiveKeys(), compressed.length());
+            siLen = compressed.length() - sizeof(siLen);
+            compressed.writeDirect(0, sizeof(siLen), &siLen);
+            output->sendMetaInfo(compressed.toByteArray(), compressed.length());
+            return true;
+        }
         else
+        {
+            DBGLOG("ERROR: continuation data size %u for %d cursor positions is too large", si.length(), tlk->numActiveKeys());
+            DBGLOG("ERROR: set continuationCompressThreshold to compress it");
+            DBGLOG("ERROR: result will be sent as single message, and performance may be degraded");
             return false;
+        }
     }
 
     void readContinuationInfo()
     {
-        resentInfo.read(lastRowCompleteMatch);
+        bool isCompressed;
+        resentInfo.read(isCompressed);
+        resentInfo.read(lastRowCompleteMatch); // This field is not compressed - see above!
+        if (isCompressed)
+        {
+            MemoryBuffer decompressed;
+            decompressToBuffer(decompressed, resentInfo);
+            if (logctx.queryTraceLevel() > 3)
+                 DBGLOG("readContinuationInfo: decompressed from %u to %u", resentInfo.length(), decompressed.length());
+            resentInfo.swapWith(decompressed);
+            resentInfo.reset(0);
+        }
         resentInfo.read(inputsDone);
         resentInfo.read(processed);
         resentInfo.read(keyprocessed);
@@ -4043,6 +4076,74 @@ class CRoxieKeyedJoinIndexActivity : public CRoxieKeyedActivity
     unsigned keepCount;
     unsigned inputDone;
 
+    bool sendContinuation(IMessagePacker * output)
+    {
+        MemoryBuffer si;
+        unsigned siLen = 0;
+        si.append(siLen);
+        si.append(false);
+        si.append(inputDone);
+        si.append(processed);
+        si.append(candidateCount);
+        si.append(keepCount);
+        si.append(lastPartNo.partNo);
+        si.append(lastPartNo.fileNo);
+        tlk->serializeCursorPos(si);
+        if (si.length() <= continuationCompressThreshold || (continuationCompressThreshold==0 && si.length() <= continuationErrorThreshold))
+        {
+            siLen = si.length() - sizeof(siLen);
+            si.writeDirect(0, sizeof(siLen), &siLen);
+            output->sendMetaInfo(si.toByteArray(), si.length());
+            return true;
+        }
+        else if (continuationCompressThreshold)
+        {
+            MemoryBuffer compressed;
+            compressed.append(siLen);  // Leaving space to patch when size known
+            compressed.append(true);
+            compressToBuffer(compressed, si.length() - compressed.length(), si.toByteArray() + compressed.length());
+            bool report = logctx.queryTraceLevel() && (logctx.queryTraceLevel() > 3 || si.length() >= continuationWarnThreshold);
+            if (report)
+                DBGLOG("ERROR: continuation data size %u for %d cursor positions is large - compressed to %u", si.length(), tlk->numActiveKeys(), compressed.length());
+            siLen = compressed.length() - sizeof(siLen);
+            compressed.writeDirect(0, sizeof(siLen), &siLen);
+            output->sendMetaInfo(compressed.toByteArray(), compressed.length());
+            return true;
+        }
+        else
+        {
+            DBGLOG("ERROR: continuation data size %u for %d cursor positions is too large", si.length(), tlk->numActiveKeys());
+            DBGLOG("ERROR: set continuationCompressThreshold to compress it");
+            DBGLOG("ERROR: result will be sent as single message, and performance may be degraded");
+            return false;
+        }
+    }
+
+    void readContinuationInfo()
+    {
+        bool isCompressed;
+        resentInfo.read(isCompressed);
+        if (isCompressed)
+        {
+            MemoryBuffer decompressed;
+            decompressToBuffer(decompressed, resentInfo);
+            if (logctx.queryTraceLevel() > 3)
+                 DBGLOG("readContinuationInfo: decompressed from %u to %u", resentInfo.length(), decompressed.length());
+            resentInfo.swapWith(decompressed);
+            resentInfo.reset(0);
+        }
+        resentInfo.read(inputDone);
+        inputData += inputDone;
+        resentInfo.read(processed);
+        resentInfo.read(candidateCount);
+        resentInfo.read(keepCount);
+        resentInfo.read(lastPartNo.partNo);
+        resentInfo.read(lastPartNo.fileNo);
+        setPartNo(true);
+        tlk->deserializeCursorPos(resentInfo);
+        assertex(resentInfo.remaining() == 0);
+    }
+
 public:
     CRoxieKeyedJoinIndexActivity(AgentContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CRoxieKeyedJoinIndexActivityFactory *_aFactory)
         : CRoxieKeyedActivity(_logctx, _packet, _hFactory, _aFactory), factory(_aFactory)
@@ -4059,18 +4160,7 @@ public:
         inputData = (char *) serializedCreate.readDirect(0);
         inputLength = (serializedCreate.length() - serializedCreate.getPos());
         if (resent)
-        {
-            resentInfo.read(inputDone);
-            inputData += inputDone;
-            resentInfo.read(processed);
-            resentInfo.read(candidateCount);
-            resentInfo.read(keepCount);
-            resentInfo.read(lastPartNo.partNo);
-            resentInfo.read(lastPartNo.fileNo);
-            setPartNo(true);
-            tlk->deserializeCursorPos(resentInfo);
-            assertex(resentInfo.remaining() == 0);
-        }
+            readContinuationInfo();
     }
 
     virtual bool hasNewSegmentMonitors() { return helper->hasNewSegmentMonitors(); }
@@ -4265,21 +4355,8 @@ IMessagePacker *CRoxieKeyedJoinIndexActivity::process()
                         totalSizeSent += KEYEDJOIN_RECORD_SIZE(totalSize);
                         if (totalSizeSent > indexReadChunkSize && !continuationFailed)
                         {
-                            MemoryBuffer si;
-                            unsigned short siLen = 0;
-                            si.append(siLen);
-                            si.append(inputDone);
-                            si.append(processed);
-                            si.append(candidateCount);
-                            si.append(keepCount);
-                            si.append(lastPartNo.partNo);
-                            si.append(lastPartNo.fileNo);
-                            tlk->serializeCursorPos(si);
-                            if (si.length() <= maxContinuationSize)
+                            if (sendContinuation(output))
                             {
-                                siLen = si.length() - sizeof(siLen);
-                                si.writeDirect(0, sizeof(siLen), &siLen);
-                                output->sendMetaInfo(si.toByteArray(), si.length());
                                 noteStats(processed-processedBefore, rejected);
                                 return output.getClear();
                             }

+ 3 - 0
roxie/ccd/ccdmain.cpp

@@ -137,6 +137,8 @@ bool defaultNoSeekBuildIndex = false;
 unsigned parallelLoadQueries = 8;
 bool alwaysFailOnLeaks = false;
 
+unsigned continuationCompressThreshold = 1024;
+
 bool useOldTopology = false;
 
 int backgroundCopyClass = 0;
@@ -1002,6 +1004,7 @@ int CCD_API roxie_main(int argc, const char *argv[], const char * defaultYaml)
         defaultWarnTimeLimit[1] = (unsigned) topology->getPropInt64("@defaultHighPriorityTimeWarning", 0);
         defaultWarnTimeLimit[2] = (unsigned) topology->getPropInt64("@defaultSLAPriorityTimeWarning", 0);
         defaultThorConnectTimeout = (unsigned) topology->getPropInt64("@defaultThorConnectTimeout", 60);
+        continuationCompressThreshold = (unsigned) topology->getPropInt64("@continuationCompressThreshold", 1024);
 
         defaultXmlReadFlags = topology->getPropBool("@defaultStripLeadingWhitespace", true) ? ptr_ignoreWhiteSpace : ptr_none;
         defaultParallelJoinPreload = topology->getPropInt("@defaultParallelJoinPreload", 0);

+ 13 - 13
roxie/ccd/ccdqueue.cpp

@@ -527,19 +527,19 @@ public:
         {
             if (data->continueSequence & ~CONTINUE_SEQUENCE_SKIPTO)
             {
-                assertex(lengthRemaining >= (int) sizeof(unsigned short));
-                continuationLength = *(unsigned short *) finger;
-                continuationData = finger + sizeof(unsigned short);
+                assertex(lengthRemaining >= (int) sizeof(unsigned));
+                continuationLength = *(unsigned *) finger;
+                continuationData = finger + sizeof(unsigned);
                 finger = continuationData + continuationLength;
-                lengthRemaining -= continuationLength + sizeof(unsigned short);
+                lengthRemaining -= continuationLength + sizeof(unsigned);
             }
             if (data->continueSequence & CONTINUE_SEQUENCE_SKIPTO)
             {
-                assertex(lengthRemaining >= (int) sizeof(unsigned short));
-                smartStepInfoLength = *(unsigned short *) finger;
-                smartStepInfoData = finger + sizeof(unsigned short);
+                assertex(lengthRemaining >= (int) sizeof(unsigned));
+                smartStepInfoLength = *(unsigned *) finger;
+                smartStepInfoData = finger + sizeof(unsigned);
                 finger = smartStepInfoData + smartStepInfoLength;
-                lengthRemaining -= smartStepInfoLength + sizeof(unsigned short);
+                lengthRemaining -= smartStepInfoLength + sizeof(unsigned);
             }
         }
         assertex(lengthRemaining >= 0);
@@ -606,18 +606,18 @@ public:
     {
         assertex((data->continueSequence & CONTINUE_SEQUENCE_SKIPTO) == 0); // Should not already be any skipto info in the source packet
 
-        unsigned newDataSize = data->packetlength + sizeof(unsigned short) + skipDataLen;
+        unsigned newDataSize = data->packetlength + sizeof(unsigned) + skipDataLen;
         char *newdata = (char *) malloc(newDataSize);
         unsigned headSize = sizeof(RoxiePacketHeader);
         if (traceLength)
             headSize += traceLength;
         if (data->continueSequence & ~CONTINUE_SEQUENCE_SKIPTO)
-            headSize += sizeof(unsigned short) + continuationLength;
+            headSize += sizeof(unsigned) + continuationLength;
         memcpy(newdata, data, headSize); // copy in leading part of old data
         ((RoxiePacketHeader *) newdata)->continueSequence |= CONTINUE_SEQUENCE_SKIPTO; // set flag indicating new data is present
-        *(unsigned short *) (newdata + headSize) = skipDataLen; // add length field for new data
-        memcpy(newdata + headSize + sizeof(unsigned short), skipData, skipDataLen); // copy in new data
-        memcpy(newdata + headSize + sizeof(unsigned short) + skipDataLen, ((char *) data) + headSize, data->packetlength - headSize); // copy in remaining old data
+        *(unsigned *) (newdata + headSize) = skipDataLen; // add length field for new data
+        memcpy(newdata + headSize + sizeof(unsigned), skipData, skipDataLen); // copy in new data
+        memcpy(newdata + headSize + sizeof(unsigned) + skipDataLen, ((char *) data) + headSize, data->packetlength - headSize); // copy in remaining old data
         return createRoxiePacket(newdata, newDataSize);
     }
 

+ 5 - 5
roxie/ccd/ccdserver.cpp

@@ -3399,11 +3399,11 @@ class CRemoteResultAdaptor : implements IEngineRowStream, implements IFinalRoxie
             const byte *metaInfo = (const byte *) result->getMessageMetadata(metaLen);
             if (metaLen)
             {
-                unsigned short continuationLen = *(unsigned short *) metaInfo;
-                if (continuationLen >= sizeof(bool))
+                unsigned continuationLen = *(unsigned *) metaInfo;
+                if (continuationLen >= sizeof(bool)*2)
                 {
-                    metaInfo += sizeof(unsigned short);
-                    return *(bool *) metaInfo;
+                    metaInfo += sizeof(unsigned)+sizeof(bool);
+                    return *(bool *) metaInfo;                     // The field we want is the second bool field in the continuation data, and is always stored uncompressed
                 }
             }
             return true; // if no continuation info, last row was complete.
@@ -4822,7 +4822,7 @@ public:
                                 StringBuffer s;
                                 activity.queryLogCtx().CTXLOG("Additional data size %d on query %s mergeOrder %p", metaLen, header.toString(s).str(), mergeOrder);
                             }
-                            if (*((unsigned short *) metaData) + sizeof(unsigned short) != metaLen)
+                            if (*((unsigned *) metaData) + sizeof(unsigned) != metaLen)
                             {
                                 StringBuffer s;
                                 activity.queryLogCtx().CTXLOG("Additional data size %d on query %s mergeOrder %p", metaLen, header.toString(s).str(), mergeOrder);

+ 10 - 0
system/jhtree/jhtree.cpp

@@ -364,6 +364,11 @@ public:
         releaseBlobs();
     }
 
+    virtual unsigned numActiveKeys() const override
+    {
+        return keyCursor ? 1 : 0;
+    }
+
     virtual unsigned querySeeks() const
     {
         return stats.seeks;
@@ -2604,6 +2609,11 @@ public:
         sortFromSeg = 0;
     }
 
+    virtual unsigned numActiveKeys() const override
+    {
+        return activekeys;
+    }
+
     virtual unsigned getPartition() override
     {
         return 0;   // If all keys share partition info (is that required?) then we can do better

+ 1 - 0
system/jhtree/jhtree.hpp

@@ -282,6 +282,7 @@ interface IKeyManager : public IInterface, extends IIndexReadContext
     virtual bool lookupSkip(const void *seek, size32_t seekGEOffset, size32_t seeklen) = 0;
     virtual unsigned getPartition() = 0;  // Use PARTITION() to retrieve partno, if possible, or zero to mean read all
 
+    virtual unsigned numActiveKeys() const = 0;
 };
 
 inline offset_t extractFpos(IKeyManager * manager)