Jelajahi Sumber

Merge pull request #11426 from jakesmith/hpcc-20100

HPCC-20100 Limit reply size by configurable option in request

Reviewed-By: Rodrigo Pastrana <rodrigo.pastrana@lexisnexis.com>
Reviewed-By: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 7 tahun lalu
induk
melakukan
f3ad686827
1 mengubah file dengan 41 tambahan dan 15 penghapusan
  1. 41 15
      common/remote/sockfile.cpp

+ 41 - 15
common/remote/sockfile.cpp

@@ -68,7 +68,7 @@
 
 static const unsigned __int64 defaultFileStreamChooseNLimit = I64C(0x7fffffffffffffff); // constant should be move to common place (see eclhelper.hpp)
 static const unsigned __int64 defaultFileStreamSkipN = 0;
-static const unsigned defaultDaFSNumRecs = 100;
+static const unsigned defaultDaFSReplyLimitKB = 1024; // 1MB
 enum OutputFormat:byte { outFmt_Binary, outFmt_Xml, outFmt_Json };
 
 
@@ -3919,19 +3919,33 @@ interface IRemoteActivity : extends IInterface
     virtual bool isGrouped() const = 0;
 };
 
+class CRemoteRequest : public CSimpleInterfaceOf<IInterface>
+{
+    OutputFormat format;
+    unsigned __int64 replyLimit;
+    Linked<IRemoteActivity> activity;
+public:
+    CRemoteRequest(OutputFormat _format, unsigned __int64 _replyLimit, IRemoteActivity *_activity)
+        : format(_format), replyLimit(_replyLimit), activity(_activity)
+    {
+    }
+    OutputFormat queryFormat() const { return format; }
+    unsigned __int64 queryReplyLimit() const { return replyLimit; }
+    IRemoteActivity *queryActivity() const { return activity; }
+};
+
 enum OpenFileFlag { of_null=0x0, of_key=0x01 };
 struct OpenFileInfo
 {
     OpenFileInfo() { }
     OpenFileInfo(int _handle, IFileIO *_fileIO, StringAttrItem *_filename) : handle(_handle), fileIO(_fileIO), filename(_filename) { }
-    OpenFileInfo(int _handle, IRemoteActivity *_activity, StringAttrItem *_filename, OutputFormat _format)
-        : handle(_handle), activity(_activity), filename(_filename), format(_format) { }
+    OpenFileInfo(int _handle, CRemoteRequest *_remoteRequest, StringAttrItem *_filename)
+        : handle(_handle), remoteRequest(_remoteRequest), filename(_filename) { }
     Linked<IFileIO> fileIO;
-    Linked<IRemoteActivity> activity;
+    Linked<CRemoteRequest> remoteRequest;
     Linked<StringAttrItem> filename; // for debug
     int handle = 0;
     unsigned flags = 0;
-    OutputFormat format = outFmt_Xml;
 };
 
 
@@ -3984,7 +3998,7 @@ protected:
 public:
     IMPLEMENT_IINTERFACE_USING(CSimpleInterfaceOf<IRemoteActivity>)
 
-    CRemoteDiskBaseActivity()
+    CRemoteDiskBaseActivity(IPropertyTree &config)
     {
     }
 // IRemoteActivity impl.
@@ -4110,7 +4124,7 @@ class CRemoteDiskReadActivity : public CRemoteDiskBaseActivity
             return true;
     }
 public:
-    CRemoteDiskReadActivity(IPropertyTree &config) : prefetchBuffer(nullptr)
+    CRemoteDiskReadActivity(IPropertyTree &config) : PARENT(config), prefetchBuffer(nullptr)
     {
         compressed = config.getPropBool("compressed");
         inputGrouped = config.getPropBool("inputGrouped", false);
@@ -4269,7 +4283,7 @@ protected:
         eofSeen = true;
     }
 public:
-    CRemoteIndexBaseActivity(IPropertyTree &config)
+    CRemoteIndexBaseActivity(IPropertyTree &config) : PARENT(config)
     {
         isTlk = config.getPropBool("isTlk");
         allowPreload = config.getPropBool("allowPreload");
@@ -6173,6 +6187,7 @@ public:
          * {
          *  "format" : "xml",
          *  "handle" : "1234",
+         *  "replyLimit" : "64",
          *  "node" : {
          *   "kind" : "diskread",
          *   "fileName": "examplefilename",
@@ -6227,6 +6242,7 @@ public:
 
         int cursorHandle = requestTree->getPropInt("handle");
         OutputFormat outputFormat = outFmt_Xml;
+        unsigned __int64 replyLimit = 0;
 
         Owned<IRemoteActivity> outputActivity;
         OpenFileInfo fileInfo;
@@ -6243,9 +6259,14 @@ public:
                 outputFormat = outFmt_Binary;
             else
                 throw MakeStringException(0, "Unrecognised output format: %s", outputFmtStr);
+
+            replyLimit = requestTree->getPropInt64("replyLimit", defaultDaFSReplyLimitKB) * 1024;
+
             // In future this may be passed the request and build a chain of activities and return sink.
             outputActivity.setown(createOutputActivity(*requestTree));
 
+            Owned<CRemoteRequest> remoteRequest = new CRemoteRequest(outputFormat, replyLimit, outputActivity);
+
             StringBuffer requestStr("jsonrequest:");
             outputActivity->getInfoStr(requestStr);
             Owned<StringAttrItem> name = new StringAttrItem(requestStr);
@@ -6253,14 +6274,15 @@ public:
             CriticalBlock block(sect);
             cursorHandle = getNextHandle();
             client.previdx = client.openFiles.ordinality();
-            client.openFiles.append(OpenFileInfo(cursorHandle, outputActivity, name, outputFormat));
+            client.openFiles.append(OpenFileInfo(cursorHandle, remoteRequest, name));
         }
         else if (!lookupFileIOHandle(cursorHandle, fileInfo))
             cursorHandle = 0; // challenge response ..
         else // known handle, continuation
         {
-            outputActivity.set(fileInfo.activity);
-            outputFormat = fileInfo.format;
+            outputActivity.set(fileInfo.remoteRequest->queryActivity());
+            outputFormat = fileInfo.remoteRequest->queryFormat();
+            replyLimit = fileInfo.remoteRequest->queryReplyLimit();
         }
 
         if (outputActivity && requestTree->hasProp("cursorBin")) // use handle if one provided
@@ -6296,7 +6318,7 @@ public:
                 if (grouped)
                 {
                     bool pastFirstRow = outputActivity->queryProcessed()>0;
-                    for (unsigned i=0; i<defaultDaFSNumRecs; i++)
+                    do
                     {
                         size32_t eogPos = 0;
                         if (pastFirstRow)
@@ -6330,10 +6352,11 @@ public:
                         }
                         pastFirstRow = true;
                     }
+                    while (reply.length() < replyLimit);
                 }
                 else
                 {
-                    for (unsigned i=0; i<defaultDaFSNumRecs; i++)
+                    do
                     {
                         size32_t rowSz;
                         const void *row = outputActivity->nextRow(outBuilder, rowSz);
@@ -6343,6 +6366,7 @@ public:
                             break;
                         }
                     }
+                    while (reply.length() < replyLimit);
                 }
                 dataLenMarker.write();
                 DelayedSizeMarker cursorLenMarker(reply); // cursor length
@@ -6355,7 +6379,7 @@ public:
                 if (grouped)
                 {
                     bool pastFirstRow = outputActivity->queryProcessed()>0;
-                    for (unsigned __int64 i=0; i<defaultDaFSNumRecs; i++)
+                    do
                     {
                         size32_t rowSz;
                         const void *row = outputActivity->nextRow(outBuilder, rowSz);
@@ -6384,10 +6408,11 @@ public:
                         resultBuffer.clear();
                         pastFirstRow = true;
                     }
+                    while (responseWriter->length() < replyLimit);
                 }
                 else
                 {
-                    for (unsigned __int64 i=0; i<defaultDaFSNumRecs; i++)
+                    do
                     {
                         size32_t rowSz;
                         const void *row = outputActivity->nextRow(outBuilder, rowSz);
@@ -6401,6 +6426,7 @@ public:
                         responseWriter->outputEndNested("Row");
                         resultBuffer.clear();
                     }
+                    while (responseWriter->length() < replyLimit);
                 }
                 if (!eoi)
                 {