Selaa lähdekoodia

HPCC-10696 Roxie should limit size of workunit outputs

Both a default limit (10Mb) and an overridable one via MAXSIZE attribute.

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 11 vuotta sitten
vanhempi
commit
ee0d3b9760

+ 2 - 0
common/thorhelper/thorcommon.hpp

@@ -26,6 +26,8 @@
 #include "thorxmlwrite.hpp"
 
 #define DALI_RESULT_OUTPUTMAX 2000 // MB
+#define DALI_RESULT_LIMIT_DEFAULT 10 // MB
+
 class THORHELPER_API CSizingSerializer : implements IRowSerializerTarget
 {
     size32_t totalsize;

+ 3 - 4
ecl/hthor/hthor.cpp

@@ -51,7 +51,6 @@
 static unsigned const hthorReadBufferSize = 0x10000;
 static offset_t const defaultHThorDiskWriteSizeLimit = I64C(10*1024*1024*1024); //10 GB, per Nigel
 static size32_t const spillStreamBufferSize = 0x10000;
-static int const defaultWorkUnitWriteLimit = 10; //10MB as thor
 static unsigned const hthorPipeWaitTimeout = 100; //100ms - fairly arbitrary choice
 
 using roxiemem::IRowManager;
@@ -5922,11 +5921,11 @@ void CHThorWorkUnitWriteActivity::execute()
 {
     unsigned flags = helper.getFlags();
     grouped = (POFgrouped & flags) != 0;
-    size32_t outputLimit = agent.queryWorkUnit()->getDebugValueInt("outputLimit", defaultWorkUnitWriteLimit);
+    size32_t outputLimit = agent.queryWorkUnit()->getDebugValueInt("outputLimit", DALI_RESULT_LIMIT_DEFAULT);
     if (flags & POFmaxsize)
         outputLimit = helper.getMaxSize();
     if (outputLimit>DALI_RESULT_OUTPUTMAX)
-        throw MakeStringException(0, "Dali result outputs are restricted to a maximum of %d MB, the current limit is %d MB. A huge dali result usually indicates the ECL needs altering.", DALI_RESULT_OUTPUTMAX, defaultWorkUnitWriteLimit);
+        throw MakeStringException(0, "Dali result outputs are restricted to a maximum of %d MB, the current limit is %d MB. A huge dali result usually indicates the ECL needs altering.", DALI_RESULT_OUTPUTMAX, DALI_RESULT_LIMIT_DEFAULT);
     assertex(outputLimit<=0x1000); // 32bit limit because MemoryBuffer/CMessageBuffers involved etc.
     outputLimit *= 0x100000;
     MemoryBuffer rowdata;
@@ -6067,7 +6066,7 @@ void CHThorDictionaryWorkUnitWriteActivity::execute()
     }
     size32_t usedCount = rtlDictionaryCount(builder.getcount(), builder.queryrows());
 
-    size32_t outputLimit = agent.queryWorkUnit()->getDebugValueInt("outputLimit", defaultWorkUnitWriteLimit) * 0x100000;
+    size32_t outputLimit = agent.queryWorkUnit()->getDebugValueInt("outputLimit", DALI_RESULT_LIMIT_DEFAULT) * 0x100000;
     MemoryBuffer rowdata;
     CThorDemoRowSerializer out(rowdata);
     Owned<IOutputRowSerializer> serializer = input->queryOutputMeta()->createDiskSerializer(agent.queryCodeContext(), activityId);

+ 1 - 1
roxie/ccd/ccdcontext.cpp

@@ -3686,7 +3686,7 @@ public:
     virtual IDistributedFileTransaction *querySuperFileTransaction() { UNIMPLEMENTED; }
     virtual void flush(unsigned seqNo) { throwUnexpected(); }
     virtual unsigned getPriority() const { return priority; }
-    virtual bool outputResultsToWorkUnit() const { return workUnit != NULL; }
+    virtual IConstWorkUnit *queryWorkUnit() const { return workUnit; }
     virtual bool outputResultsToSocket() const { return client != NULL; }
 
     virtual void selectCluster(const char * cluster) { throwUnexpected(); }

+ 1 - 1
roxie/ccd/ccdcontext.hpp

@@ -92,7 +92,7 @@ interface IRoxieServerContext : extends IInterface
     virtual unsigned getSlavesReplyLen() = 0;
 
     virtual unsigned getXmlFlags() const = 0;
-    virtual bool outputResultsToWorkUnit() const = 0;
+    virtual IConstWorkUnit *queryWorkUnit() const = 0;
     virtual bool outputResultsToSocket() const = 0;
 
     virtual IRoxieDaliHelper *checkDaliConnection() = 0;

+ 30 - 4
roxie/ccd/ccdserver.cpp

@@ -19512,7 +19512,21 @@ public:
             }
 
         }
-        if (serverContext->outputResultsToWorkUnit()||(response && response->isRaw))
+        size32_t outputLimitBytes = 0;
+        IConstWorkUnit *workunit = serverContext->queryWorkUnit();
+        if (workunit)
+        {
+            size32_t outputLimit;
+            if (helper.getFlags() & POFmaxsize)
+                outputLimit = helper.getMaxSize();
+            else
+                outputLimit = workunit->getDebugValueInt("outputLimit", DALI_RESULT_LIMIT_DEFAULT);
+            if (outputLimit>DALI_RESULT_OUTPUTMAX)
+                throw MakeStringException(0, "Dali result outputs are restricted to a maximum of %d MB, the current limit is %d MB. A huge dali result usually indicates the ECL needs altering.", DALI_RESULT_OUTPUTMAX, DALI_RESULT_LIMIT_DEFAULT);
+            assertex(outputLimit<=0x1000); // 32bit limit because MemoryBuffer/CMessageBuffers involved etc.
+            outputLimitBytes = outputLimit * 0x100000;
+        }
+        if (workunit != NULL || (response && response->isRaw))
         {
             createRowAllocator();
             rowSerializer.setown(rowAllocator->createDiskSerializer(ctx->queryCodeContext()));
@@ -19529,7 +19543,7 @@ public:
             }
             if (grouped && (processed != initialProcessed))
             {
-                if (serverContext->outputResultsToWorkUnit())
+                if (workunit)
                     result.append(row == NULL);
                 if (response)
                 {
@@ -19550,7 +19564,7 @@ public:
                     builder.append(row);
             }
             processed++;
-            if (serverContext->outputResultsToWorkUnit())
+            if (workunit)
             {
                 CThorDemoRowSerializer serializerTarget(result);
                 rowSerializer->serialize(serializerTarget, (const byte *) row);
@@ -19582,12 +19596,24 @@ public:
                 response->flush(false);
             }
             ReleaseRoxieRow(row);
+            if (outputLimitBytes && result.length() > outputLimitBytes)
+            {
+                StringBuffer errMsg("Dataset too large to output to workunit (limit ");
+                errMsg.append(outputLimitBytes/0x100000).append(") megabytes, in result (");
+                const char *name = helper.queryName();
+                if (name)
+                    errMsg.append("name=").append(name);
+                else
+                    errMsg.append("sequence=").append(helper.getSequence());
+                errMsg.append(")");
+                throw MakeStringExceptionDirect(0, errMsg.str());
+            }
         }
         if (writer)
             writer->outputEndArray("Row");
         if (saveInContext)
             serverContext->appendResultDeserialized(storedName, sequence, builder.getcount(), builder.linkrows(), (helper.getFlags() & POFextend) != 0, LINK(meta.queryOriginal()));
-        if (serverContext->outputResultsToWorkUnit())
+        if (workunit)
             serverContext->appendResultRawContext(storedName, sequence, result.length(), result.toByteArray(), processed, (helper.getFlags() & POFextend) != 0, false); // MORE - shame to do extra copy...
     }
 };