瀏覽代碼

Merge pull request #15837 from rpastrana/HPCC-26035-zap

HPCC-26035 Enable WU logs in container mode

Reviewed-By: Gavin Halliday <gavin.halliday@lexisnexis.com>
Reviewed-By: Jake Smith <jake.smith@lexisnexis.com>
Reviewed-By: Kevin Wang <kevin.wang@lexisnexis.com>
Reviewed-By: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 3 年之前
父節點
當前提交
d7e38bf66f

+ 1 - 1
esp/services/ws_logaccess/WsLogAccessService.cpp

@@ -28,7 +28,7 @@ void Cws_logaccessEx::init(const IPropertyTree *cfg, const char *process, const
 
     try
     {
-        m_remoteLogAccessor.set(queryRemoteLogAccessor());
+        m_remoteLogAccessor.set(&queryRemoteLogAccessor());
 
         if (m_remoteLogAccessor == nullptr)
             LOG(MCerror,"WsLogAccessService could not load remote log access plugin!");

+ 156 - 2
esp/services/ws_workunits/ws_workunitsHelpers.cpp

@@ -206,6 +206,110 @@ WsWUExceptions::WsWUExceptions(IConstWorkUnit& wu): numerr(0), numwrn(0), numinf
     }
 }
 
+void WsWuInfo::readWorkunitComponentLogs(const char* outFile, unsigned maxLogRecords,
+                                         LogAccessReturnColsMode retColsMode, LogAccessLogFormat logFormat, unsigned wuLogSearchTimeBuffSecs)
+{
+    if (!m_remoteLogAccessor)
+        throw makeStringException(ECLWATCH_LOGACCESS_UNAVAILABLE, "WsWuInfo: Remote Log Access plug-in not available!");
+
+    if (isEmptyString(outFile))
+        throw makeStringException(ECLWATCH_INVALID_FILE_NAME, "WsWuInfo: Target filename not provided!");
+
+    LogAccessConditions logFetchOptions;
+    logFetchOptions.setFilter(getJobIDLogAccessFilter(wuid.str()));
+
+    struct LogAccessTimeRange range;
+    stat_type timeStamp;
+    if (cw->getStatistic(timeStamp, "", StWhenCreated))
+    {
+        CDateTime startt;
+        startt.set(timeStamp / microSecsToSecDivisor);
+
+        startt.adjustTimeSecs(-wuLogSearchTimeBuffSecs);
+        range.setStart(startt);
+
+        StringBuffer newstart;
+        startt.getString(newstart);
+        DBGLOG("Searching for WUID '%s' log entries starting time: '%s'", wuid.str(), newstart.str());
+    }
+    else
+        throw makeStringExceptionV(ECLWATCH_WU_START_NOT_AVAILABLE, "WsWuInfo: Cound not determine WUID '%s' start time", wuid.str());
+
+    if (cw->getStatistic(timeStamp, "", StWhenFinished))
+    {
+        CDateTime endt;
+        endt.set(timeStamp / microSecsToSecDivisor);
+        endt.adjustTimeSecs(wuLogSearchTimeBuffSecs);
+
+        range.setEnd(endt);
+        StringBuffer newend;
+        endt.getString(newend);
+        DBGLOG("Searching for WUID '%s' log entries ending time: '%s'", wuid.str(), newend.str()); //end + time buffer
+    }
+    else
+        WARNLOG("WsWuInfo: Fetching log entries for '%s' without a 'finished'", wuid.str());
+
+    logFetchOptions.setReturnColsMode(retColsMode);
+    logFetchOptions.setTimeRange(range);
+    logFetchOptions.setLimit(maxLogRecords);
+
+    Owned<IFileIOStream> outIOS;
+    CWsWuFileHelper helper(nullptr);
+    outIOS.setown(helper.createIOStreamWithFileName(outFile, IFOcreate));
+
+    if(!outIOS)
+        throw makeStringException(ECLWATCH_CANNOT_OPEN_FILE, "WsWuInfo: Could not create target log file!");
+
+    Owned<IRemoteLogAccessStream> logreader = m_remoteLogAccessor->getLogReader(logFetchOptions, logFormat);
+
+    StringBuffer logcontent;
+    unsigned totalRecsRead = 0;
+    unsigned recsRead = 0;
+    
+    if (logFormat == LOGACCESS_LOGFORMAT_json)
+        writeStringToStream(*outIOS, "{\"lines\": [");
+    else if (logFormat == LOGACCESS_LOGFORMAT_xml)
+        writeStringToStream(*outIOS, "<lines>");
+
+    while (logreader->readLogEntries(logcontent.clear(), recsRead))
+    {
+        if (logFormat == LOGACCESS_LOGFORMAT_json && totalRecsRead > 0 && recsRead > 0)
+        {
+            writeCharToStream(*outIOS, ',');
+        }
+        totalRecsRead += recsRead;
+        writeStringToStream(*outIOS, logcontent.str());
+    }
+
+    if (totalRecsRead == maxLogRecords) //Warn of possible truncation
+    {
+        VStringBuffer truncationWarnmessage("Log query reached record limit (%u). Log report could be incomplete.", maxLogRecords);
+        LOG(MCuserInfo, "WsWuInfo: %s", truncationWarnmessage.str());
+
+        if (logFormat == LOGACCESS_LOGFORMAT_json)
+        {
+            VStringBuffer jsonMessage("], \"Message\": \"%s\"}", truncationWarnmessage.str()); //close lines array, append Message object
+            writeStringToStream(*outIOS, jsonMessage.str()); 
+        }
+        else if (logFormat == LOGACCESS_LOGFORMAT_xml)
+        {
+            VStringBuffer xmlMessage("</lines>\n<!-- %s -->", truncationWarnmessage.str()); //close lines element, append comment message
+            writeStringToStream(*outIOS, xmlMessage.str()); 
+        }
+        else if (logFormat == LOGACCESS_LOGFORMAT_csv)
+        {
+            writeStringToStream(*outIOS, truncationWarnmessage); 
+        }
+    }
+    else //close the lines container 
+    {
+        if (logFormat == LOGACCESS_LOGFORMAT_json)
+            writeStringToStream(*outIOS, "]}");
+        else if (logFormat == LOGACCESS_LOGFORMAT_xml)
+            writeStringToStream(*outIOS, "</lines>");
+    }
+}
+
 void WsWuInfo::getSourceFiles(IEspECLWorkunit &info, unsigned long flags)
 {
     if (!(flags & WUINFO_IncludeSourceFiles))
@@ -1856,6 +1960,20 @@ unsigned WsWuInfo::getResourceURLCount()
     return 0;
 }
 
+void WsWuInfo::initWULogReader()
+{
+    LOG(MCdebugProgress,"WsWuInfo loading remote log access plug-in...");
+    try
+    {
+        m_remoteLogAccessor.set(&queryRemoteLogAccessor());
+    }
+    catch (IException * e)
+    {
+        OWARNLOG(e, "WsWuInfo could not load remote log access plug-in");
+        e->Release();
+    }
+}
+
 void WsWuInfo::copyContentFromRemoteFile(const char* sourceFileName, const char* sourceIPAddress,
     const char* sourceAlias, const char *outFileName)
 {
@@ -2542,7 +2660,7 @@ void WsWuInfo::getArchiveFile(IPropertyTree* archive, const char* moduleName, co
 
     file.set(archive->queryProp(xPath.str()));
 }
-#ifndef _CONTAINERIZED
+
 void WsWuInfo::outputALine(size32_t length, const char* content, MemoryBuffer& outputBuf, IFileIOStream* outIOS)
 {
     if (outIOS)
@@ -2550,7 +2668,6 @@ void WsWuInfo::outputALine(size32_t length, const char* content, MemoryBuffer& o
     else
         outputBuf.append(length, content);
 }
-#endif
 
 WsWuSearch::WsWuSearch(IEspContext& context,const char* owner,const char* state,const char* cluster,const char* startDate,const char* endDate,const char* jobname)
 {
@@ -3775,6 +3892,36 @@ void CWsWuFileHelper::createZAPECLQueryArchiveFiles(IConstWorkUnit* cwu, const c
     }
 }
 
+void CWsWuFileHelper::createWULogFile(IConstWorkUnit *cwu, WsWuInfo &winfo, const char *path, unsigned maxLogRecords, LogAccessReturnColsMode retColsMode, LogAccessLogFormat logFormat, unsigned wuLogSearchTimeBuffSecs)
+{
+    if (cwu->getWuidVersion() == 0)
+        return;
+
+    StringBuffer logfileextension;
+    if (logFormat == LOGACCESS_LOGFORMAT_csv)
+        logfileextension.set("csv");
+    else if (logFormat == LOGACCESS_LOGFORMAT_xml)
+        logfileextension.set("xml");
+    else if (logFormat == LOGACCESS_LOGFORMAT_json)
+        logfileextension.set("json");
+    else
+        logfileextension.set("log");
+
+    VStringBuffer fileName("%s%c%s-log.%s", path, PATHSEPCHAR, cwu->queryWuid(), logfileextension.str());
+    try
+    {
+        winfo.readWorkunitComponentLogs(fileName.str(), maxLogRecords, retColsMode, logFormat, wuLogSearchTimeBuffSecs);
+    }
+    catch(IException* e)
+    {
+        StringBuffer s;
+        e->errorMessage(s);
+        IERRLOG(e, "Error accessing WU logs");
+        writeToFile(fileName.str(), s.length(), s.str());
+        e->Release();
+    }
+}
+
 void CWsWuFileHelper::createZAPWUGraphProgressFile(const char* wuid, const char* pathNameStr)
 {
     Owned<IPropertyTree> graphProgress = getWUGraphProgress(wuid, true);
@@ -3833,6 +3980,13 @@ void CWsWuFileHelper::createWUZAPFile(IEspContext& context, IConstWorkUnit* cwu,
     createProcessLogfile(cwu, winfo, "Thor", folderToZIP.str());
     if (request.includeThorSlaveLog.isEmpty() || strieq(request.includeThorSlaveLog.str(), "on"))
         createThorSlaveLogfile(cwu, winfo, folderToZIP.str());
+#else
+    //These options should ultimately be drawn from req
+    unsigned maxLogRecords = defaultMaxLogRecords;
+    LogAccessReturnColsMode retColsMode = RETURNCOLS_MODE_default;
+    LogAccessLogFormat logFormat = LOGACCESS_LOGFORMAT_csv;
+    unsigned wuLogSearchTimeBuffSecs = defaultWULogSearchTimeBufferSecs;
+    createWULogFile(cwu, winfo, folderToZIP.str(), maxLogRecords, retColsMode, LOGACCESS_LOGFORMAT_csv, wuLogSearchTimeBuffSecs);
 #endif
 
     //Write out to ZIP file

+ 28 - 3
esp/services/ws_workunits/ws_workunitsHelpers.hpp

@@ -142,28 +142,48 @@ private:
 #define WUINFO_IncludeServiceNames      0x40000
 #define WUINFO_All                      0xFFFFFFFF
 
+static constexpr unsigned defaultMaxLogRecords = 10000;
+static constexpr unsigned defaultWULogSearchTimeBufferSecs = 600;
+static constexpr unsigned microSecsToSecDivisor = 1000000;
+
 class WsWuInfo
 {
     IEspWUArchiveFile* readArchiveFileAttr(IPropertyTree& fileTree, const char* path);
     IEspWUArchiveModule* readArchiveModuleAttr(IPropertyTree& moduleTree, const char* path);
     void readArchiveFiles(IPropertyTree* archiveTree, const char* path, IArrayOf<IEspWUArchiveFile>& files);
-#ifndef _CONTAINERIZED
+
     void outputALine(size32_t len, const char* content, MemoryBuffer& outputBuf, IFileIOStream* outIOS);
+#ifndef _CONTAINERIZED
     bool parseLogLine(const char* line, const char* endWUID, unsigned& processID, const unsigned columnNumPID);
     void readWorkunitThorLog(const char* processName, const char* logSpec, const char* slaveIPAddress, unsigned slaveNum, MemoryBuffer& buf, const char* outFile);
     void readWorkunitThorLogOneDay(IFile* ios, unsigned& processID, MemoryBuffer& buf, IFileIOStream* outIOS);
 #endif
-
     void readFileContent(const char* sourceFileName, const char* sourceIPAddress,
         const char* sourceAlias, MemoryBuffer &mb, bool forDownload);
     void copyContentFromRemoteFile(const char* sourceFileName, const char* sourceIPAddress,
         const char* sourceAlias, const char *outFileName);
+    void initWULogReader();
+
 public:
+    /*
+    * Fetches trace log records related to target Workunit
+    *
+    * unsigned maxLogRecords - Limits number of records fetched
+    * LogAccessReturnColsMode retColsMode - Defines the log record fields
+    * LogAccessLogFormat logFormat - Declares the log report format
+    * unsigned wuLogSearchTimeBuffSecs - Defines the query time-window before wu creation and after wu end
+    */
+    void readWorkunitComponentLogs(const char* outFile, unsigned maxLogRecords, LogAccessReturnColsMode retColsMode,
+                                   LogAccessLogFormat logFormat, unsigned wuLogSearchTimeBuffSecs);
+
     WsWuInfo(IEspContext &ctx, IConstWorkUnit *cw_) :
       context(ctx), cw(cw_)
     {
         version = context.getClientVersion();
         wuid.set(cw->queryWuid());
+#ifdef _CONTAINERIZED
+        initWULogReader();
+#endif
     }
 
     WsWuInfo(IEspContext &ctx, const char *wuid_) :
@@ -175,6 +195,10 @@ public:
         cw.setown(factory->openWorkUnit(wuid_));
         if(!cw)
             throw MakeStringException(ECLWATCH_CANNOT_OPEN_WORKUNIT,"Cannot open workunit %s.", wuid_);
+
+#ifdef _CONTAINERIZED
+        initWULogReader();
+#endif
     }
 
     bool getResourceInfo(StringArray &viewnames, StringArray &urls, unsigned long flags);
@@ -246,6 +270,7 @@ public:
     void addTimerToList(SCMStringBuffer& name, const char * scope, IConstWUStatistic & stat, IArrayOf<IEspECLTimer>& timers);
 protected:
     bool hasSubGraphTimings();
+    Owned<IRemoteLogAccess> m_remoteLogAccessor;
 
 public:
     IEspContext &context;
@@ -641,6 +666,7 @@ class CWsWuFileHelper
     void createProcessLogfile(IConstWorkUnit *cwu, WsWuInfo &winfo, const char *process, const char *path);
     void createThorSlaveLogfile(IConstWorkUnit *cwu, WsWuInfo &winfo, const char *path);
 #endif
+    void createWULogFile(IConstWorkUnit *cwu, WsWuInfo &winfo, const char *path, unsigned maxLogRecords, LogAccessReturnColsMode retColsMode, LogAccessLogFormat logFormat, unsigned wuLogSearchTimeBuffSecs);
     void writeZAPWUInfoToIOStream(IFileIOStream *outFile, const char *name, SCMStringBuffer &value);
     void writeZAPWUInfoToIOStream(IFileIOStream *outFile, const char *name, const char *value);
 public:
@@ -720,7 +746,6 @@ public:
     {
         return true;
     }
-   
 private:
     Owned<CGetThorSlaveLogToFileThreadParam> param;
 };

+ 2 - 0
esp/smc/SMCLib/eclwatch_errorlist.hpp

@@ -129,6 +129,8 @@
 #define ECLWATCH_MISSING_FILETYPE           ECLWATCH_ERROR_START+108
 
 #define ECLWATCH_INVALID_QUERY_KEY          ECLWATCH_ERROR_START+109
+#define ECLWATCH_LOGACCESS_UNAVAILABLE      ECLWATCH_ERROR_START+110
+#define ECLWATCH_WU_START_NOT_AVAILABLE     ECLWATCH_ERROR_START+111
 
 #endif //_ECLWATCH_ERRORLIST_HPP__
 

+ 59 - 58
system/jlib/jlog.cpp

@@ -2612,6 +2612,8 @@ public:
 };
 
 static CNullManager nullManager;
+static Singleton<IRemoteLogAccess> logAccessor;
+static CriticalSection logAccessCrit;
 
 MODULE_INIT(INIT_PRIORITY_JLOG)
 {
@@ -2645,6 +2647,7 @@ MODULE_EXIT()
     thePassNoneFilter = nullptr;
     thePassLocalFilter = nullptr;
     thePassAllFilter = nullptr;
+    delete logAccessor.queryExisting();
 }
 
 static constexpr const char * logFieldsAtt = "@fields";
@@ -3482,69 +3485,67 @@ bool fetchLogByClass(StringBuffer & returnbuf, IRemoteLogAccess & logAccess, Log
 //logAccessPluginConfig expected to contain connectivity and log mapping information
 typedef IRemoteLogAccess * (*newLogAccessPluginMethod_t_)(IPropertyTree & logAccessPluginConfig);
 
-IRemoteLogAccess * queryRemoteLogAccessor()
-{
-    Owned<IPropertyTree> logAccessPluginConfig = getGlobalConfigSP()->getPropTree("logAccess");
-#ifdef LOGACCESSDEBUG
-    if (!logAccessPluginConfig)
-    {
-        const char *simulatedGlobalYaml = R"!!(global:
-          logAccess:
-            name: "localES"
-            type: "elasticstack"
-            connection:
-              protocol: "http"
-              host: "localhost"
-              port: 9200
-            logMaps:
-            - type: "global"
-              storeName: "filebeat-7.9.3-*"
-              searchColumn: "message"
-              timeStampColumn: "created_ts"
-            - type: "workunits"
-              storeName: "filebeat-7.9.3-*"
-              searchColumn: "hpcc.log.jobid"
-            - type: "components"
-              searchColumn: "kubernetes.container.name"
-            - type: "audience"
-              searchColumn: "hpcc.log.audience"
-            - type: "class"
-              searchColumn: "hpcc.log.class"
-        )!!";
-        Owned<IPropertyTree> testTree = createPTreeFromYAMLString(simulatedGlobalYaml, ipt_none, ptr_ignoreWhiteSpace, nullptr);
-        logAccessPluginConfig.setown(testTree->getPropTree("global/logAccess"));
-    }
-#endif
-
-    if (!logAccessPluginConfig)
-        throw makeStringException(-1, "RemoteLogAccessLoader: logaccess configuration not available!");
+IRemoteLogAccess &queryRemoteLogAccessor()
+{
+    return *logAccessor.query([]
+        {
+            Owned<IPropertyTree> logAccessPluginConfig = getGlobalConfigSP()->getPropTree("logAccess");
+    #ifdef LOGACCESSDEBUG
+            if (!logAccessPluginConfig)
+            {
+                const char * simulatedGlobalYaml = R"!!(global:
+                logAccess:
+                    name: "localES"
+                    type: "elasticstack"
+                    connection:
+                    protocol: "http"
+                    host: "elasticsearch-master.default.svc.cluster.local"
+                    port: 9200
+                    logMaps:
+                    - type: "global"
+                    storeName: "filebeat-*"
+                    searchColumn: "message"
+                    timeStampColumn: "created_ts"
+                    - type: "workunits"
+                    storeName: "filebeat-*"
+                    searchColumn: "hpcc.log.jobid"
+                    - type: "components"
+                    searchColumn: "kubernetes.container.name"
+                    - type: "audience"
+                    searchColumn: "hpcc.log.audience"
+                    - type: "class"
+                    searchColumn: "hpcc.log.class"
+                )!!";
+                Owned<IPropertyTree> testTree = createPTreeFromYAMLString(simulatedGlobalYaml, ipt_none, ptr_ignoreWhiteSpace, nullptr);
+                logAccessPluginConfig.setown(testTree->getPropTree("global/logAccess"));
+            }
+    #endif
 
-    constexpr const char * methodName = "queryRemoteLogAccessor";
-    constexpr const char * instFactoryName = "createInstance";
+            if (!logAccessPluginConfig)
+                throw makeStringException(-1, "RemoteLogAccessLoader: logaccess configuration not available!");
 
-    StringBuffer libName; //lib<type>logaccess.so
-    StringBuffer type;
-    logAccessPluginConfig->getProp("@type", type);
-    if (type.isEmpty())
-        throw makeStringExceptionV(-1, "%s RemoteLogAccess plugin kind not specified.", methodName);
-    libName.append("lib").append(type.str()).append("logaccess");
+            constexpr const char * methodName = "queryRemoteLogAccessor";
+            constexpr const char * instFactoryName = "createInstance";
 
-    //Load the DLL/SO
-    HINSTANCE logAccessPluginLib = LoadSharedObject(libName.str(), true, false);
-    if(logAccessPluginLib == nullptr)
-        throw makeStringExceptionV(-1, "%s cannot load library '%s'", methodName, libName.str());
+            StringBuffer libName; //lib<type>logaccess.so
+            StringBuffer type;
+            logAccessPluginConfig->getProp("@type", type);
+            if (type.isEmpty())
+                throw makeStringExceptionV(-1, "%s RemoteLogAccess plugin kind not specified.", methodName);
+            libName.append("lib").append(type.str()).append("logaccess");
 
-    newLogAccessPluginMethod_t_ xproc = nullptr;
-    xproc = (newLogAccessPluginMethod_t_)GetSharedProcedure(logAccessPluginLib, instFactoryName);
-    if (xproc == nullptr)
-        throw makeStringExceptionV(-1, "%s cannot locate procedure %s in library '%s'", methodName, instFactoryName, libName.str());
+            //Load the DLL/SO
+            HINSTANCE logAccessPluginLib = LoadSharedObject(libName.str(), false, true);
 
-    //Call logaccessplugin instance factory and return the new instance
-    DBGLOG("Calling '%s' in log access plugin '%s'", instFactoryName, libName.str());
-    IRemoteLogAccess * pLogAccessPlugin = dynamic_cast<IRemoteLogAccess*>(xproc(*logAccessPluginConfig));
+            newLogAccessPluginMethod_t_ xproc = (newLogAccessPluginMethod_t_)GetSharedProcedure(logAccessPluginLib, instFactoryName);
+            if (xproc == nullptr)
+                throw makeStringExceptionV(-1, "%s cannot locate procedure %s in library '%s'", methodName, instFactoryName, libName.str());
 
-    if (pLogAccessPlugin == nullptr)
-        throw makeStringExceptionV(-1, "%s Log Access Plugin '%s' failed to instantiate in call to %s", methodName, libName.str(), instFactoryName);
+            //Call logaccessplugin instance factory and return the new instance
+            DBGLOG("Calling '%s' in log access plugin '%s'", instFactoryName, libName.str());
 
-    return pLogAccessPlugin;
+            return xproc(*logAccessPluginConfig);
+        }
+    );
 }
+

+ 29 - 8
system/jlib/jlog.hpp

@@ -1449,11 +1449,20 @@ interface jlib_decl ILogAccessFilter : public IInterface
     virtual LogAccessFilterType filterType() const = 0;
 };
 
+enum LogAccessReturnColsMode
+{
+    RETURNCOLS_MODE_min,
+    RETURNCOLS_MODE_default,
+    RETURNCOLS_MODE_custom,
+    RETURNCOLS_MODE_all
+};
+
 struct LogAccessConditions
 {
 private:
     Owned<ILogAccessFilter> filter;
     StringArray logFieldNames;
+    LogAccessReturnColsMode returnColsMode = RETURNCOLS_MODE_default;
     LogAccessTimeRange timeRange;
     unsigned limit = 100;
     offset_t startFrom = 0;
@@ -1466,10 +1475,20 @@ public:
         timeRange = l.timeRange;
         setFilter(LINK(l.filter));
         startFrom = l.startFrom;
-
+        returnColsMode = l.returnColsMode;
         return *this;
     }
 
+    LogAccessReturnColsMode getReturnColsMode() const
+    {
+        return returnColsMode;
+    }
+
+    void setReturnColsMode(LogAccessReturnColsMode retColsMode)
+    {
+        returnColsMode = retColsMode;
+    }
+
     ILogAccessFilter * queryFilter() const
     {
         return filter.get();
@@ -1481,6 +1500,7 @@ public:
 
     void appendLogFieldName(const char * fieldname)
     {
+        returnColsMode = RETURNCOLS_MODE_custom;
         if (!logFieldNames.contains(fieldname))
             logFieldNames.append(fieldname);
     }
@@ -1493,11 +1513,6 @@ public:
         }
     }
 
-    inline const StringArray & queryLogFieldNames() const
-    {
-        return logFieldNames;
-    }
-
     unsigned getLimit() const
     {
         return limit;
@@ -1562,6 +1577,11 @@ inline LogAccessLogFormat logAccessFormatFromName(const char * name)
         throw makeStringExceptionV(-1, "Encountered unknown Log Access Format name: '%s'", name);
 }
 
+interface IRemoteLogAccessStream : extends IInterface
+{
+    virtual bool readLogEntries(StringBuffer & record, unsigned & recsRead) = 0;
+};
+
 // Log Access Interface - Provides filtered access to persistent logging - independent of the log storage mechanism
 //                      -- Declares method to retrieve log entries based on options set
 //                      -- Declares method to retrieve remote log access type (eg elasticstack, etc)
@@ -1570,7 +1590,8 @@ inline LogAccessLogFormat logAccessFormatFromName(const char * name)
 interface IRemoteLogAccess : extends IInterface
 {
     virtual bool fetchLog(const LogAccessConditions & options, StringBuffer & returnbuf, LogAccessLogFormat format) = 0;
-
+    virtual IRemoteLogAccessStream * getLogReader(const LogAccessConditions & options, LogAccessLogFormat format) = 0;
+    virtual IRemoteLogAccessStream * getLogReader(const LogAccessConditions & options, LogAccessLogFormat format, unsigned int pageSize) = 0;
     virtual const char * getRemoteLogAccessType() const = 0;
     virtual IPropertyTree * queryLogMap() const = 0;
     virtual const char * fetchConnectionStr() const = 0;
@@ -1592,6 +1613,6 @@ extern jlib_decl bool fetchJobIDLog(StringBuffer & returnbuf, IRemoteLogAccess &
 extern jlib_decl bool fetchComponentLog(StringBuffer & returnbuf, IRemoteLogAccess & logAccess, const char * component, LogAccessTimeRange timeRange, StringArray & cols, LogAccessLogFormat format);
 extern jlib_decl bool fetchLogByAudience(StringBuffer & returnbuf, IRemoteLogAccess & logAccess, MessageAudience audience, LogAccessTimeRange timeRange, StringArray & cols, LogAccessLogFormat format);
 extern jlib_decl bool fetchLogByClass(StringBuffer & returnbuf, IRemoteLogAccess & logAccess, LogMsgClass logclass, LogAccessTimeRange timeRange, StringArray & cols, LogAccessLogFormat format);
-extern jlib_decl IRemoteLogAccess * queryRemoteLogAccessor();
+extern jlib_decl IRemoteLogAccess & queryRemoteLogAccessor();
 
 #endif

+ 1 - 0
system/logaccess/ElasticStack/CMakeLists.txt

@@ -55,6 +55,7 @@ set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fpermissive")
 set(srcs
   ${CMAKE_CURRENT_SOURCE_DIR}/ElasticStackLogAccess.cpp
   ${CMAKE_CURRENT_SOURCE_DIR}/elasticlient/include/elasticlient/client.h
+  ${CMAKE_CURRENT_SOURCE_DIR}/elasticlient/include/elasticlient/scroll.h
 )
 
 include_directories(

+ 198 - 53
system/logaccess/ElasticStack/ElasticStackLogAccess.cpp

@@ -19,6 +19,7 @@
 #include <vector>
 #include <iostream>
 #include <json/json.h>
+#include <json/writer.h>
 
 
 #ifdef _CONTAINERIZED
@@ -53,6 +54,26 @@ static constexpr const char * LOGMAP_INDEXPATTERN_ATT = "@storename";
 static constexpr const char * LOGMAP_SEARCHCOL_ATT = "@searchcolumn";
 static constexpr const char * LOGMAP_TIMESTAMPCOL_ATT = "@timestampcolumn";
 
+static constexpr const char * DEFAULT_SCROLL_TIMEOUT = "1m"; //Elastic Time Units (i.e. 1m = 1 minute).
+static constexpr std::size_t  DEFAULT_MAX_RECORDS_PER_FETCH = 100;
+
+void ElasticStackLogAccess::getMinReturnColumns(std::string & columns)
+{
+    //timestamp, source component, message
+    columns.append(" \"").append(DEFAULT_HPCC_LOG_TIMESTAMP_COL).append("\", \"").append(m_componentsSearchColName.str()).append("\", \"").append(m_globalSearchColName).append("\" ");
+}
+
+void ElasticStackLogAccess::getDefaultReturnColumns(std::string & columns)
+{
+    //timestamp, source component, all hpcc.log fields
+    columns.append(" \"").append(DEFAULT_HPCC_LOG_TIMESTAMP_COL).append("\", \"").append(m_componentsSearchColName.str()).append("\", \"hpcc.log.*\" ");
+}
+
+void ElasticStackLogAccess::getAllColumns(std::string & columns)
+{
+    columns.append( " \"*\" ");
+}
+
 ElasticStackLogAccess::ElasticStackLogAccess(const std::vector<std::string> &hostUrlList, IPropertyTree & logAccessPluginConfig) : m_esClient(hostUrlList)
 {
     if (!hostUrlList.at(0).empty())
@@ -181,33 +202,20 @@ const IPropertyTree * ElasticStackLogAccess::getESStatus()
 }
 
 /*
- * Transform ES query response to back-end agnostic response
+ * Transform iterator of hits/fields to back-end agnostic response
  *
  */
-void processESJsonResp(const cpr::Response & retrievedDocument, StringBuffer & returnbuf, LogAccessLogFormat format)
+void processHitsJsonResp(IPropertyTreeIterator * iter, StringBuffer & returnbuf, LogAccessLogFormat format, bool wrapped)
 {
-    if (retrievedDocument.status_code != 200)
-        throw makeStringExceptionV(-1, "ElasticSearch request failed: %s", retrievedDocument.status_line.c_str());
-
-    DBGLOG("Retrieved ES JSON DOC: %s", retrievedDocument.text.c_str());
+    if (!iter)
+        throw makeStringExceptionV(-1, "%s: Detected null 'hits' ElasticSearch response", COMPONENT_NAME);
 
-    Owned<IPropertyTree> tree = createPTreeFromJSONString(retrievedDocument.text.c_str());
-    if (!tree)
-        throw makeStringExceptionV(-1, "%s: Could not parse ElasticSearch query response", COMPONENT_NAME);
-
-    if (tree->getPropBool("timed_out", 0))
-        LOG(MCuserProgress,"ES Log Access: timeout reported");
-    if (tree->getPropInt("_shards/failed",0) > 0)
-        LOG(MCuserProgress,"ES Log Access: failed _shards reported");
-
-    DBGLOG("ES Log Access: hit count: '%d'", tree->getPropInt("hits/total/value"));
-
-    Owned<IPropertyTreeIterator> iter = tree->getElements("hits/hits/fields");
     switch (format)
     {
         case LOGACCESS_LOGFORMAT_xml:
         {
-            returnbuf.append("<lines>");
+            if (wrapped)
+                returnbuf.append("<lines>");
 
             ForEach(*iter)
             {
@@ -216,12 +224,15 @@ void processESJsonResp(const cpr::Response & retrievedDocument, StringBuffer & r
                 toXML(&cur,returnbuf);
                 returnbuf.append("</line>");
             }
-            returnbuf.append("</lines>");
+            if (wrapped)
+                returnbuf.append("</lines>");
             break;
         }
         case LOGACCESS_LOGFORMAT_json:
         {
-            returnbuf.append("{\"lines\": [");
+            if (wrapped)
+                returnbuf.append("{\"lines\": [");
+
             StringBuffer hitchildjson;
             bool first = true;
             ForEach(*iter)
@@ -234,7 +245,8 @@ void processESJsonResp(const cpr::Response & retrievedDocument, StringBuffer & r
                 first = false;
                 returnbuf.appendf("{\"fields\": [ %s ]}", hitchildjson.str());
             }
-            returnbuf.append("]}");
+            if (wrapped)
+                returnbuf.append("]}");
             break;
         }
         case LOGACCESS_LOGFORMAT_csv:
@@ -262,6 +274,48 @@ void processESJsonResp(const cpr::Response & retrievedDocument, StringBuffer & r
     }
 }
 
+/*
+ * Transform ES query response to back-end agnostic response
+ *
+ */
+void processESSearchJsonResp(const cpr::Response & retrievedDocument, StringBuffer & returnbuf, LogAccessLogFormat format)
+{
+    if (retrievedDocument.status_code != 200)
+        throw makeStringExceptionV(-1, "ElasticSearch request failed: %s", retrievedDocument.text.c_str());
+
+#ifdef _DEBUG
+    DBGLOG("Retrieved ES JSON DOC: %s", retrievedDocument.text.c_str());
+#endif
+
+    Owned<IPropertyTree> tree = createPTreeFromJSONString(retrievedDocument.text.c_str());
+    if (!tree)
+        throw makeStringExceptionV(-1, "%s: Could not parse ElasticSearch query response", COMPONENT_NAME);
+
+    if (tree->getPropBool("timed_out", false))
+        LOG(MCuserProgress,"ES Log Access: timeout reported");
+    if (tree->getPropInt("_shards/failed",0) > 0)
+        LOG(MCuserProgress,"ES Log Access: failed _shards reported");
+
+    DBGLOG("ES Log Access: hit count: '%d'", tree->getPropInt("hits/total/value"));
+
+    Owned<IPropertyTreeIterator> hitsFieldsElements = tree->getElements("hits/hits/fields");
+    processHitsJsonResp(hitsFieldsElements, returnbuf, format, true);
+}
+
+/*
+ * Transform ES scroll query response to back-end agnostic response
+ *
+ */
+void processESScrollJsonResp(const char * retValue, StringBuffer & returnbuf, LogAccessLogFormat format, bool wrapped)
+{
+    Owned<IPropertyTree> tree = createPTreeFromJSONString(retValue);
+    if (!tree)
+        throw makeStringExceptionV(-1, "%s: Could not parse ElasticSearch query response", COMPONENT_NAME);
+
+    Owned<IPropertyTreeIterator> hitsFieldsElements = tree->getElements("hits/fields");
+    processHitsJsonResp(hitsFieldsElements, returnbuf, format, wrapped);
+}
+
 void esTimestampQueryRangeString(std::string & range, const char * timestampfield, std::time_t from, std::time_t to)
 {
     if (isEmptyString(timestampfield))
@@ -331,7 +385,7 @@ void esMatchQueryString(std::string & search, const char *searchval, const char
 /*
  * Construct Elasticsearch query directives string
  */
-void esSearchMetaData(std::string & search, const  StringArray & selectcols, unsigned size = DEFAULT_ES_DOC_LIMIT, offset_t from = DEFAULT_ES_DOC_START)
+void ElasticStackLogAccess::esSearchMetaData(std::string & search, const LogAccessReturnColsMode retcolmode, const  StringArray & selectcols, unsigned size = DEFAULT_ES_DOC_LIMIT, offset_t from = DEFAULT_ES_DOC_START)
 {
     //Query parameters:
     //https://www.elastic.co/guide/en/elasticsearch/reference/6.8/search-request-body.html
@@ -339,25 +393,40 @@ void esSearchMetaData(std::string & search, const  StringArray & selectcols, uns
     //_source: https://www.elastic.co/guide/en/elasticsearch/reference/6.8/search-request-source-filtering.html
     search += "\"_source\": false, \"fields\": [" ;
 
-    if (selectcols.length() > 0)
+    switch (retcolmode)
     {
-        StringBuffer sourcecols;
-        ForEachItemIn(idx, selectcols)
+    case RETURNCOLS_MODE_all:
+        getAllColumns(search);
+        break;
+    case RETURNCOLS_MODE_min:
+        getMinReturnColumns(search);
+        break;
+    case RETURNCOLS_MODE_default:
+        getDefaultReturnColumns(search);
+        break;
+    case RETURNCOLS_MODE_custom:
+    {
+        if (selectcols.length() > 0)
         {
-            sourcecols.appendf("\"%s\"", selectcols.item(idx));
-            if (idx < selectcols.length() -1)
-                sourcecols.append(",");
-        }
+            StringBuffer sourcecols;
+            ForEachItemIn(idx, selectcols)
+            {
+                sourcecols.appendf("\"%s\"", selectcols.item(idx));
+                if (idx < selectcols.length() -1)
+                    sourcecols.append(",");
+            }
 
-        if (!sourcecols.isEmpty())
-            search += sourcecols.str() ;
+            search += sourcecols.str();
+        }
         else
-            search += "\"*\"";
-            //search += "!*.keyword"; //all fields ignoring the .keyword postfixed fields
+        {
+            throw makeStringExceptionV(-1, "%s: Custom return columns specified, but no columns provided", COMPONENT_NAME);
+        }
+        break;
+    }
+    default:
+        throw makeStringExceptionV(-1, "%s: Could not determine return colums mode", COMPONENT_NAME);
     }
-    else
-        //search += "!*.keyword"; //all fields ignoring the .keyword postfixed fields
-        search += "\"*\"";
 
     search += "],";
     search += "\"from\": ";
@@ -367,16 +436,13 @@ void esSearchMetaData(std::string & search, const  StringArray & selectcols, uns
     search += ", ";
 }
 
-/*
- * Construct ES query string, execute query
- */
-cpr::Response ElasticStackLogAccess::performESQuery(const LogAccessConditions & options)
+void ElasticStackLogAccess::populateQueryStringAndQueryIndex(std::string & queryString, std::string & queryIndex, const LogAccessConditions & options)
 {
     try
     {
         StringBuffer queryValue;
         std::string queryField = m_globalSearchColName.str();
-        std::string queryIndex = m_globalIndexSearchPattern.str();
+        queryIndex = m_globalIndexSearchPattern.str();
 
         bool fullTextSearch = true;
         bool wildCardSearch = false;
@@ -464,14 +530,14 @@ cpr::Response ElasticStackLogAccess::performESQuery(const LogAccessConditions &
             throw makeStringExceptionV(-1, "%s: Unknown query criteria type encountered: '%s'", COMPONENT_NAME, queryValue.str());
         }
 
-        std::string fullRequest = "{";
-        esSearchMetaData(fullRequest, options.getLogFieldNames(), options.getLimit(), options.getStartFrom());
+        queryString = "{";
+        esSearchMetaData(queryString, options.getReturnColsMode(), options.getLogFieldNames(), options.getLimit(), options.getStartFrom());
 
-        fullRequest += "\"query\": { \"bool\": {";
+        queryString += "\"query\": { \"bool\": {";
 
         if(!wildCardSearch)
         {
-            fullRequest += " \"must\": { ";
+            queryString += " \"must\": { ";
 
             std::string criteria;
             if (fullTextSearch) //are we performing a query on a blob, or exact term match?
@@ -479,8 +545,8 @@ cpr::Response ElasticStackLogAccess::performESQuery(const LogAccessConditions &
             else
                 esTermQueryString(criteria, queryValue.str(), queryField.c_str());
 
-            fullRequest += criteria;
-            fullRequest += "}, "; //end must, expect filter to follow
+            queryString += criteria;
+            queryString += "}, "; //end must, expect filter to follow
         }
 
         std::string filter = "\"filter\": {";
@@ -496,12 +562,37 @@ cpr::Response ElasticStackLogAccess::performESQuery(const LogAccessConditions &
         filter += range;
         filter += "}"; //end filter
 
-        fullRequest += filter;
-        fullRequest += "}}}"; //end bool and query
+        queryString += filter;
+        queryString += "}}}"; //end bool and query
+
+        DBGLOG("%s: Search string '%s'", COMPONENT_NAME, queryString.c_str());
+    }
+    catch (std::runtime_error &e)
+    {
+        const char * wha = e.what();
+        throw makeStringExceptionV(-1, "%s: fetchLog: Error searching doc: %s", COMPONENT_NAME, wha);
+    }
+    catch (IException * e)
+    {
+        StringBuffer mess;
+        e->errorMessage(mess);
+        e->Release();
+        throw makeStringExceptionV(-1, "%s: fetchLog: Error searching doc: %s", COMPONENT_NAME, mess.str());
+    }
+}
 
-        DBGLOG("%s: Search string '%s'", COMPONENT_NAME, fullRequest.c_str());
+/*
+ * Construct ES query string, execute query
+ */
+cpr::Response ElasticStackLogAccess::performESQuery(const LogAccessConditions & options)
+{
+    try
+    {
+        std::string queryString;
+        std::string queryIndex;
+        populateQueryStringAndQueryIndex(queryString, queryIndex, options);
 
-        return m_esClient.search(queryIndex.c_str(), DEFAULT_ES_DOC_TYPE, fullRequest);
+        return m_esClient.search(queryIndex.c_str(), DEFAULT_ES_DOC_TYPE, queryString);
     }
     catch (std::runtime_error &e)
     {
@@ -520,11 +611,65 @@ cpr::Response ElasticStackLogAccess::performESQuery(const LogAccessConditions &
 bool ElasticStackLogAccess::fetchLog(const LogAccessConditions & options, StringBuffer & returnbuf, LogAccessLogFormat format)
 {
     cpr::Response esresp = performESQuery(options);
-    processESJsonResp(esresp, returnbuf, format);
+    processESSearchJsonResp(esresp, returnbuf, format);
 
     return true;
 }
 
+class ELASTICSTACKLOGACCESS_API ElasticStackLogStream : public CInterfaceOf<IRemoteLogAccessStream>
+{
+public:
+    virtual bool readLogEntries(StringBuffer & record, unsigned & recsRead) override
+    {
+        Json::Value res;
+        recsRead = 0;
+
+        if (m_esSroller.next(res))
+        {
+            if (!res["hits"].empty())
+            {
+                recsRead = res["hits"].size();
+                std::ostringstream sout;
+                m_jsonWriter->write(res, &sout); // serialize Json object to string for processing
+                processESScrollJsonResp(sout.str().c_str(), record, m_outputFormat, false); // convert Json string to target format
+                return true;
+            }
+        }
+
+        return false;
+    }
+
+    ElasticStackLogStream(std::string & queryString, const char * connstr, const char * indexsearchpattern, LogAccessLogFormat format,  std::size_t pageSize, std::string scrollTo)
+     : m_esSroller(std::make_shared<elasticlient::Client>(std::vector<std::string>({connstr})), pageSize, scrollTo)
+    {
+        m_outputFormat = format;
+        m_esSroller.init(indexsearchpattern, DEFAULT_ES_DOC_TYPE, queryString);
+        m_jsonWriter.reset(m_jsonStreamBuilder.newStreamWriter());
+    }
+
+    virtual ~ElasticStackLogStream() override = default;
+
+private:
+    elasticlient::Scroll m_esSroller;
+
+    LogAccessLogFormat m_outputFormat;
+    Json::StreamWriterBuilder m_jsonStreamBuilder;
+    std::unique_ptr<Json::StreamWriter> m_jsonWriter;
+};
+
+IRemoteLogAccessStream * ElasticStackLogAccess::getLogReader(const LogAccessConditions & options, LogAccessLogFormat format)
+{
+    return getLogReader(options, format, DEFAULT_MAX_RECORDS_PER_FETCH);
+}
+
+IRemoteLogAccessStream * ElasticStackLogAccess::getLogReader(const LogAccessConditions & options, LogAccessLogFormat format, unsigned int pageSize)
+{
+    std::string queryString;
+    std::string queryIndex;
+    populateQueryStringAndQueryIndex(queryString, queryIndex, options);
+    return new ElasticStackLogStream(queryString, m_esConnectionStr.str(), queryIndex.c_str(), format, pageSize, DEFAULT_SCROLL_TIMEOUT);
+}
+
 extern "C" IRemoteLogAccess * createInstance(IPropertyTree & logAccessPluginConfig)
 {
     //constructing ES Connection string(s) here b/c ES Client explicit ctr requires conn string array

+ 9 - 0
system/logaccess/ElasticStack/ElasticStackLogAccess.hpp

@@ -35,6 +35,7 @@
 
 #include <cpr/response.h>
 #include <elasticlient/client.h>
+#include <elasticlient/scroll.h>
 
 using namespace elasticlient;
 
@@ -73,13 +74,21 @@ private:
     const IPropertyTree * getTimestampTypeFormat(const char * indexpattern, const char * fieldname);
     const IPropertyTree * performAndLogESRequest(Client::HTTPMethod httpmethod, const char * url, const char * reqbody, const char * logmessageprefix, LogMsgCategory reqloglevel, LogMsgCategory resploglevel);
 
+    void esSearchMetaData(std::string & search, const LogAccessReturnColsMode retcolmode, const  StringArray & selectcols, unsigned size, offset_t from);
+    void getMinReturnColumns(std::string & columns);
+    void getDefaultReturnColumns(std::string & columns);
+    void getAllColumns(std::string & columns);
 public:
     ElasticStackLogAccess(const std::vector<std::string> &hostUrlList, IPropertyTree & logAccessPluginConfig);
     virtual ~ElasticStackLogAccess() override = default;
 
+    void populateQueryStringAndQueryIndex(std::string & queryString, std::string & queryIndex, const LogAccessConditions & options);
+
     // IRemoteLogAccess methods
     virtual bool fetchLog(const LogAccessConditions & options, StringBuffer & returnbuf, LogAccessLogFormat format) override;
     virtual const char * getRemoteLogAccessType() const override { return type; }
     virtual IPropertyTree * queryLogMap() const override { return m_pluginCfg->queryPropTree("logmap");}
     virtual const char * fetchConnectionStr() const override { return m_esConnectionStr.str();}
+    virtual IRemoteLogAccessStream * getLogReader(const LogAccessConditions & options, LogAccessLogFormat format) override;
+    virtual IRemoteLogAccessStream * getLogReader(const LogAccessConditions & options, LogAccessLogFormat format, unsigned int pageSize) override;
 };