Преглед изворни кода

HPCC-24786 Abort WUResult if client disconnected

In https://github.com/hpcc-systems/HPCC-Platform/pull/14226, the
checkEspConnection() was added to check if an esp connection for
a context is still valid. The checkEspConnection() is used in
this PR to abort the process of retrieving WUResult.

Signed-off-by: wangkx <kevin.wang@lexisnexis.com>
wangkx пре 4 година
родитељ
комит
ac7756b9a0

+ 6 - 3
common/fileview2/fileview.hpp

@@ -160,10 +160,13 @@ extern FILEVIEW_API IResultSetFactory * getSecResultSetFactory(ISecManager *secm
 extern FILEVIEW_API int findResultSetColumn(const INewResultSet * results, const char * columnName);
 
 extern FILEVIEW_API unsigned getResultCursorXml(IStringVal & ret, IResultSetCursor * cursor, const char * name, unsigned start=0, unsigned count=0, const char * schemaName=NULL, const IProperties *xmlns=NULL);
-extern FILEVIEW_API unsigned getResultXml(IStringVal & ret, INewResultSet * cursor,  const char* name, unsigned start=0, unsigned count=0, const char * schemaName=NULL, const IProperties *xmlns=NULL);
-extern FILEVIEW_API unsigned getResultJSON(IStringVal & ret, INewResultSet * cursor,  const char* name, unsigned start=0, unsigned count=0, const char * schemaName=NULL);
+extern FILEVIEW_API unsigned getResultXml(IStringVal & ret, INewResultSet * cursor,  const char * name,
+    unsigned start=0, unsigned count=0, const char * schemaName=nullptr, const IProperties * xmlns=nullptr, IAbortRequestCallback * abortCheck=nullptr);
+extern FILEVIEW_API unsigned getResultJSON(IStringVal & ret, INewResultSet * cursor,  const char* name,
+    unsigned start=0, unsigned count=0, const char * schemaName=nullptr, IAbortRequestCallback * abortCheck=nullptr);
 extern FILEVIEW_API unsigned writeResultCursorXml(IXmlWriter & writer, IResultSetCursor * cursor, const char * name,
-    unsigned start=0, unsigned count=0, const char * schemaName=NULL, const IProperties *xmlns = NULL, bool flushContent = false);
+    unsigned start=0, unsigned count=0, const char * schemaName=nullptr, const IProperties * xmlns=nullptr, bool flushContent=false,
+    IAbortRequestCallback * abortCheck=nullptr);
 extern FILEVIEW_API unsigned writeResultXml(IXmlWriter & writer, INewResultSet * cursor,  const char* name, unsigned start=0, unsigned count=0, const char * schemaName=NULL, const IProperties *xmlns = NULL);
 
 extern FILEVIEW_API unsigned getResultCursorBin(MemoryBuffer & ret, IResultSetCursor * cursor, unsigned start=0, unsigned count=0);

+ 12 - 7
common/fileview2/fvresultset.cpp

@@ -2292,34 +2292,37 @@ int findResultSetColumn(const INewResultSet * results, const char * columnName)
 }
 
 
-extern FILEVIEW_API unsigned getResultCursorXml(IStringVal & ret, IResultSetCursor * cursor, const char * name, unsigned start, unsigned count, const char * schemaName, const IProperties *xmlns)
+extern FILEVIEW_API unsigned getResultCursorXml(IStringVal & ret, IResultSetCursor * cursor, const char * name,
+    unsigned start, unsigned count, const char * schemaName, const IProperties * xmlns, IAbortRequestCallback * abortCheck)
 {
     Owned<CommonXmlWriter> writer = CreateCommonXmlWriter(XWFexpandempty);
-    unsigned rc = writeResultCursorXml(*writer, cursor, name, start, count, schemaName, xmlns);
+    unsigned rc = writeResultCursorXml(*writer, cursor, name, start, count, schemaName, xmlns, false, abortCheck);
     ret.set(writer->str());
     return rc;
 
 }
 
-extern FILEVIEW_API unsigned getResultXml(IStringVal & ret, INewResultSet * result, const char* name,unsigned start, unsigned count, const char * schemaName, const IProperties *xmlns)
+extern FILEVIEW_API unsigned getResultXml(IStringVal & ret, INewResultSet * result, const char * name,
+    unsigned start, unsigned count, const char * schemaName, const IProperties * xmlns, IAbortRequestCallback * abortCheck)
 {
     Owned<IResultSetCursor> cursor = result->createCursor();
-    return getResultCursorXml(ret, cursor, name, start, count, schemaName, xmlns);
+    return getResultCursorXml(ret, cursor, name, start, count, schemaName, xmlns, abortCheck);
 }
 
-extern FILEVIEW_API unsigned getResultJSON(IStringVal & ret, INewResultSet * result, const char* name,unsigned start, unsigned count, const char * schemaName)
+extern FILEVIEW_API unsigned getResultJSON(IStringVal & ret, INewResultSet * result, const char * name,
+    unsigned start, unsigned count, const char * schemaName, IAbortRequestCallback * abortCheck)
 {
     Owned<IResultSetCursor> cursor = result->createCursor();
     Owned<CommonJsonWriter> writer = new CommonJsonWriter(0);
     writer->outputBeginRoot();
-    unsigned rc = writeResultCursorXml(*writer, cursor, name, start, count, schemaName);
+    unsigned rc = writeResultCursorXml(*writer, cursor, name, start, count, schemaName, nullptr, false, abortCheck);
     writer->outputEndRoot();
     ret.set(writer->str());
     return rc;
 }
 
 extern FILEVIEW_API unsigned writeResultCursorXml(IXmlWriter & writer, IResultSetCursor * cursor, const char * name,
-    unsigned start, unsigned count, const char * schemaName, const IProperties *xmlns, bool flushContent)
+    unsigned start, unsigned count, const char * schemaName, const IProperties * xmlns, bool flushContent, IAbortRequestCallback * abortCheck)
 {
     if (schemaName)
     {
@@ -2350,6 +2353,8 @@ extern FILEVIEW_API unsigned writeResultCursorXml(IXmlWriter & writer, IResultSe
     unsigned c=0;
     for(bool ok=cursor->absolute(start);ok;ok=cursor->next())
     {
+        if (abortCheck && abortCheck->abortRequested())
+            break;
         cursor->writeXmlRow(writer);
         if (flushContent)
             writer.flushContent(false);

+ 1 - 0
common/workunit/workunit.hpp

@@ -35,6 +35,7 @@
 #include "jstats.h"
 #include "jutil.hpp"
 #include "jprop.hpp"
+#include "jmisc.hpp"
 #include "wuattr.hpp"
 #include <vector>
 #include <list>

+ 9 - 0
esp/platform/espprotocol.hpp

@@ -217,4 +217,13 @@ public:
 
 esp_http_decl bool checkEspConnection(IEspContext& ctx);
 
+class CESPAbortRequestCallback : implements IAbortRequestCallback
+{
+    IEspContext* context = nullptr;
+public:
+    CESPAbortRequestCallback(IEspContext* _context) : context(_context){ };
+
+    virtual bool abortRequested() override { return !checkEspConnection(*context); }
+};
+
 #endif

+ 11 - 10
esp/services/ws_workunits/ws_workunitsService.cpp

@@ -2715,7 +2715,7 @@ void getCSVHeaders(const IResultSetMetaData& metaIn, CommonCSVWriter* writer, un
     }
 }
 
-unsigned getResultCSV(IStringVal& ret, INewResultSet* result, const char* name, __int64 start, unsigned& count)
+unsigned getResultCSV(IStringVal& ret, INewResultSet* result, const char* name, __int64 start, unsigned& count, IAbortRequestCallback* abortCheck)
 {
     unsigned headerLayer = 0;
     CSVOptions csvOptions;
@@ -2728,12 +2728,12 @@ unsigned getResultCSV(IStringVal& ret, INewResultSet* result, const char* name,
     writer->finishCSVHeaders();
 
     Owned<IResultSetCursor> cursor = result->createCursor();
-    count = writeResultCursorXml(*writer, cursor, name, start, count, NULL);
+    count = writeResultCursorXml(*writer, cursor, name, start, count, nullptr, nullptr, false, abortCheck);
     ret.set(writer->str());
     return count;
 }
 
-void appendResultSet(MemoryBuffer& mb, INewResultSet* result, const char *name, __int64 start, unsigned& count, __int64& total, bool bin, bool xsd, ESPSerializationFormat fmt, const IProperties *xmlns)
+void appendResultSet(IEspContext &context, MemoryBuffer& mb, INewResultSet* result, const char *name, __int64 start, unsigned& count, __int64& total, bool bin, bool xsd, ESPSerializationFormat fmt, const IProperties *xmlns)
 {
     if (!result)
         return;
@@ -2757,12 +2757,13 @@ void appendResultSet(MemoryBuffer& mb, INewResultSet* result, const char *name,
             MemoryBuffer & buffer;
         } adaptor(mb);
 
+        CESPAbortRequestCallback abortCallback(&context);
         if (fmt==ESPSerializationCSV)
-            count = getResultCSV(adaptor, result, name, (unsigned) start, count);
+            count = getResultCSV(adaptor, result, name, (unsigned) start, count, &abortCallback);
         else if (fmt==ESPSerializationJSON)
-            count = getResultJSON(adaptor, result, name, (unsigned) start, count, (xsd) ? "myschema" : NULL);
+            count = getResultJSON(adaptor, result, name, (unsigned) start, count, (xsd) ? "myschema" : NULL, &abortCallback);
         else
-            count = getResultXml(adaptor, result, name, (unsigned) start, count, (xsd) ? "myschema" : NULL, xmlns);
+            count = getResultXml(adaptor, result, name, (unsigned) start, count, (xsd) ? "myschema" : NULL, xmlns, &abortCallback);
     }
 }
 
@@ -2878,12 +2879,12 @@ void CWsWorkunitsEx::getWsWuResult(IEspContext &context, const char *wuid, const
                                        wuid, wuResultMaxSize/0x100000);
         }
 
-        appendResultSet(mb, rs, name, start, count, total, bin, xsd, context.getResponseFormat(), result->queryResultXmlns());
+        appendResultSet(context, mb, rs, name, start, count, total, bin, xsd, context.getResponseFormat(), result->queryResultXmlns());
     }
     else
     {
         Owned<INewResultSet> filteredResult = createFilteredResultSet(rs, filterBy);
-        appendResultSet(mb, filteredResult, name, start, count, total, bin, xsd, context.getResponseFormat(), result->queryResultXmlns());
+        appendResultSet(context, mb, filteredResult, name, start, count, total, bin, xsd, context.getResponseFormat(), result->queryResultXmlns());
     }
 
     wuState = cw->getState();
@@ -3222,13 +3223,13 @@ void CWsWorkunitsEx::getFileResults(IEspContext &context, const char *logicalNam
                                        logicalName, wuResultMaxSize/0x100000);
         }
     
-        appendResultSet(buf, result, resname.str(), start, count, total, bin, xsd, context.getResponseFormat(), NULL);
+        appendResultSet(context, buf, result, resname.str(), start, count, total, bin, xsd, context.getResponseFormat(), nullptr);
     }
     else
     {
         // NB: this could be still be very big, appendResultSet should be changed to ensure filtered result doesn't grow bigger than wuResultMaxSize
         Owned<INewResultSet> filteredResult = createFilteredResultSet(result, filterBy);
-        appendResultSet(buf, filteredResult, resname.str(), start, count, total, bin, xsd, context.getResponseFormat(), NULL);
+        appendResultSet(context, buf, filteredResult, resname.str(), start, count, total, bin, xsd, context.getResponseFormat(), nullptr);
     }
 }
 

+ 6 - 3
esp/services/ws_workunits/ws_wuresult.cpp

@@ -416,8 +416,9 @@ unsigned CWsWuResultOutHelper::getResultCSVStreaming(INewResultSet* result, cons
     writer->finishCSVHeaders();
     writer->flushContent(false);
 
+    CESPAbortRequestCallback abortCallback(context);
     Owned<IResultSetCursor> cursor = result->createCursor();
-    return writeResultCursorXml(*writer, cursor, resultName, start, count, nullptr, nullptr, true);
+    return writeResultCursorXml(*writer, cursor, resultName, start, count, nullptr, nullptr, true, &abortCallback);
 }
 
 //Similar to the getResultJSON in fileview2
@@ -427,17 +428,19 @@ unsigned CWsWuResultOutHelper::getResultJSONStreaming(INewResultSet* result, con
     writer->outputBeginRoot();
     writer->flushContent(false);
 
+    CESPAbortRequestCallback abortCallback(context);
     Owned<IResultSetCursor> cursor = result->createCursor();
-    unsigned rc = writeResultCursorXml(*writer, cursor, resultName, start, count, schemaName, nullptr, true);
+    unsigned rc = writeResultCursorXml(*writer, cursor, resultName, start, count, schemaName, nullptr, true, &abortCallback);
     writer->outputEndRoot();
     return rc;
 }
 
 unsigned CWsWuResultOutHelper::getResultXmlStreaming(INewResultSet* result, const char* resultName, const char* schemaName, const IProperties* xmlns, IXmlStreamFlusher* flusher)
 {
+    CESPAbortRequestCallback abortCallback(context);
     Owned<IResultSetCursor> cursor = result->createCursor();
     Owned<CommonXmlWriter> writer = CreateCommonXmlWriter(XWFexpandempty, 0, flusher);
-    return writeResultCursorXml(*writer, cursor, resultName, start, count, schemaName, xmlns, true);
+    return writeResultCursorXml(*writer, cursor, resultName, start, count, schemaName, xmlns, true, &abortCallback);
 }