Browse Source

Merge pull request #7290 from wangkx/h10472

HPCC-10472 Add WsWorkunits methods to retrieve WU archive Info

Reviewed-By: Anthony Fishbeck <anthony.fishbeck@lexisnexis.com>
Reviewed-By: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 10 years ago
parent
commit
6db5985b1d

+ 50 - 0
esp/scm/ws_workunits.ecm

@@ -1609,6 +1609,54 @@ ESPresponse [exceptions_inline] WUGetStatsResponse
     ESParray<ESPstruct WUStatisticItem> Statistics;
 };
 
+ESPStruct [nil_remove] WUArchiveFile
+{
+    string Name;
+    string Key;
+    string SourcePath;
+    string Path;        //nested (parent) module names for this <Attribute>
+};
+
+ESPStruct [nil_remove] WUArchiveModule
+{
+    string Name;
+    string FullName;
+    unsigned Flags;
+    string Key;
+    string Plugin;
+    string SourcePath;
+    string Version;
+    string Path;        //nested (parent) module names for this <Module>
+    ESParray<ESPstruct WUArchiveModule, ArchiveModule> ArchiveModules;
+    ESParray<ESPstruct WUArchiveFile, File> Files;
+};
+
+ESPrequest [nil_remove] WUListArchiveFilesRequest
+{
+    string WUID;
+};
+
+ESPresponse [exceptions_inline, nil_remove] WUListArchiveFilesResponse
+{
+    ESParray<ESPstruct WUArchiveModule, ArchiveModule> ArchiveModules;
+    ESParray<ESPstruct WUArchiveFile, File> Files;
+    string Message;
+};
+
+ESPrequest [nil_remove] WUGetArchiveFileRequest
+{
+    string WUID;
+    string ModuleName; //<Module @name> or
+    string FileName;   //<Attribute @name>
+    string Path;       //(nested) parent module name(s) for the <Module @name> or <Attribute @name>
+};
+
+ESPresponse [exceptions_inline, nil_remove] WUGetArchiveFileResponse
+{
+    string File;    //ECL text
+    string Message;
+};
+
 ESPservice [
     version("1.56"), default_client_version("1.56"),
     noforms,exceptions_inline("./smc_xslt/exceptions.xslt"),use_method_name] WsWorkunits
@@ -1686,6 +1734,8 @@ ESPservice [
     ESPmethod [resp_xsl_default("/esp/xslt/WUZAPInfoForm.xslt")] WUGetZAPInfo(WUGetZAPInfoRequest, WUGetZAPInfoResponse);
     ESPmethod WUCheckFeatures(WUCheckFeaturesRequest, WUCheckFeaturesResponse);
     ESPmethod WUGetStats(WUGetStatsRequest, WUGetStatsResponse);
+    ESPmethod WUListArchiveFiles(WUListArchiveFilesRequest, WUListArchiveFilesResponse);
+    ESPmethod WUGetArchiveFile(WUGetArchiveFileRequest, WUGetArchiveFileResponse);
 };
 
 

+ 150 - 8
esp/services/ws_workunits/ws_workunitsHelpers.cpp

@@ -2161,7 +2161,7 @@ void WsWuInfo::getWorkunitCpp(const char *cppname, const char* description, cons
 }
 
 void WsWuInfo::getWorkunitAssociatedXml(const char* name, const char* ipAddress, const char* plainText,
-                                        const char* description, bool forDownload, MemoryBuffer& buf)
+    const char* description, bool forDownload, bool addXMLDeclaration, MemoryBuffer& buf)
 {
     if (isEmpty(description)) //'File Name' as shown in WU Details page
         throw MakeStringException(ECLWATCH_INVALID_INPUT, "File not specified.");
@@ -2185,18 +2185,161 @@ void WsWuInfo::getWorkunitAssociatedXml(const char* name, const char* ipAddress,
     if (!ios)
         throw MakeStringException(ECLWATCH_CANNOT_READ_FILE,"Cannot read %s.", description);
 
-    const char* header;
-    if (plainText && (!stricmp(plainText, "yes")))
-        header = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>";
-    else
-        header = "<?xml version=\"1.0\" encoding=\"UTF-8\"?><?xml-stylesheet href=\"../esp/xslt/xmlformatter.xsl\" type=\"text/xsl\"?>";
+    if (addXMLDeclaration)
+    {
+        const char* header;
+        if (plainText && (!stricmp(plainText, "yes")))
+            header = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>";
+        else
+            header = "<?xml version=\"1.0\" encoding=\"UTF-8\"?><?xml-stylesheet href=\"../esp/xslt/xmlformatter.xsl\" type=\"text/xsl\"?>";
+        buf.append(strlen(header), header);
+    }
 
-    buf.append(strlen(header), header);
     appendIOStreamContent(buf, ios.get(), forDownload);
 }
 
+IPropertyTree* WsWuInfo::getWorkunitArchive()
+{
+    Owned <IConstWUQuery> query = cw->getQuery();
+    if(!query)
+        return NULL;
+
+    SCMStringBuffer name, ip;
+    Owned<IConstWUAssociatedFileIterator> iter = &query->getAssociatedFiles();
+    ForEach(*iter)
+    {
+        IConstWUAssociatedFile& cur = iter->query();
+        if (cur.getType() != FileTypeXml)
+            continue;
+
+        cur.getName(name);
+        if (name.length() < 15)
+            continue;
+        const char* pStr = name.str() + name.length() - 15;
+        if (strieq(pStr, ".archive.eclxml"))
+        {
+            cur.getIp(ip);
+            break;
+        }
+    }
+    if (!ip.length())
+        return NULL;
+
+    MemoryBuffer content;
+    getWorkunitAssociatedXml(name.str(), ip.str(), "", "WU archive eclxml", true, false, content);
+    if (!content.length())
+        return NULL;
+    return createPTreeFromXMLString(content.length(), content.toByteArray());
+}
+
 
+IEspWUArchiveFile* WsWuInfo::readArchiveFileAttr(IPropertyTree& fileTree, const char* path)
+{
+    const char* fileName = fileTree.queryProp("@name");
+    if (isEmpty(fileName))
+        return NULL;
 
+    Owned<IEspWUArchiveFile> file= createWUArchiveFile();
+    file->setName(fileName);
+    if (!isEmpty(path))
+        file->setPath(path);
+    if (fileTree.hasProp("@key"))
+        file->setKey(fileTree.queryProp("@key"));
+    if (fileTree.hasProp("@sourcePath"))
+        file->setSourcePath(fileTree.queryProp("@sourcePath"));
+    return file.getClear();
+}
+
+IEspWUArchiveModule* WsWuInfo::readArchiveModuleAttr(IPropertyTree& moduleTree, const char* path)
+{
+    const char* moduleName = moduleTree.queryProp("@name");
+    if (isEmpty(moduleName))
+        return NULL;
+
+    Owned<IEspWUArchiveModule> module= createWUArchiveModule();
+    module->setName(moduleName);
+    if (!isEmpty(path))
+        module->setPath(path);
+    if (moduleTree.hasProp("@fullName"))
+        module->setFullName(moduleTree.queryProp("@fullName"));
+    if (moduleTree.hasProp("@key"))
+        module->setKey(moduleTree.queryProp("@key"));
+    if (moduleTree.hasProp("@plugin"))
+        module->setPlugin(moduleTree.queryProp("@plugin"));
+    if (moduleTree.hasProp("@version"))
+        module->setVersion(moduleTree.queryProp("@version"));
+    if (moduleTree.hasProp("@sourcePath"))
+        module->setSourcePath(moduleTree.queryProp("@sourcePath"));
+    if (moduleTree.hasProp("@flags"))
+        module->setFlags(moduleTree.getPropInt("@flags", 0));
+    return module.getClear();
+}
+
+void WsWuInfo::readArchiveFiles(IPropertyTree* archiveTree, const char* path, IArrayOf<IEspWUArchiveFile>& files)
+{
+    Owned<IPropertyTreeIterator> iter = archiveTree->getElements("Attribute");
+    ForEach(*iter)
+    {
+        IPropertyTree& item = iter->query();
+        Owned<IEspWUArchiveFile> file = readArchiveFileAttr(item, path);
+        if (file)
+            files.append(*file.getClear());
+    }
+}
+
+void WsWuInfo::listArchiveFiles(IPropertyTree* archiveTree, const char* path, IArrayOf<IEspWUArchiveModule>& modules, IArrayOf<IEspWUArchiveFile>& files)
+{
+    if (!archiveTree)
+        return;
+
+    Owned<IPropertyTreeIterator> iter = archiveTree->getElements("Module");
+    ForEach(*iter)
+    {
+        IPropertyTree& item = iter->query();
+        Owned<IEspWUArchiveModule> module = readArchiveModuleAttr(item, path);
+        if (!module)
+            continue;
+
+        StringBuffer newPath;
+        if (isEmpty(path))
+            newPath.set(module->getName());
+        else
+            newPath.setf("%s/%s", path, module->getName());
+        IArrayOf<IEspWUArchiveModule> modulesInModule;
+        IArrayOf<IEspWUArchiveFile> filesInModule;
+        listArchiveFiles(&item, newPath.str(), modulesInModule, filesInModule);
+        if (modulesInModule.length())
+            module->setArchiveModules(modulesInModule);
+        if (filesInModule.length())
+            module->setFiles(filesInModule);
+
+        modules.append(*module.getClear());
+    }
+
+    readArchiveFiles(archiveTree, path, files);
+}
+
+void WsWuInfo::getArchiveFile(IPropertyTree* archive, const char* moduleName, const char* attrName, const char* path, StringBuffer& file)
+{
+    StringBuffer xPath;
+    if (!isEmpty(path))
+    {
+        StringArray list;
+        list.appendListUniq(path, "/");
+        ForEachItemIn(m, list)
+        {
+            const char* module = list.item(m);
+            if (!isEmpty(module))
+                xPath.appendf("Module[@name=\"%s\"]/", module);
+        }
+    }
+    if (isEmpty(moduleName))
+        xPath.appendf("Attribute[@name=\"%s\"]", attrName);
+    else
+        xPath.appendf("Module[@name=\"%s\"]/Text", moduleName);
+
+    file.set(archive->queryProp(xPath.str()));
+}
 
 WsWuSearch::WsWuSearch(IEspContext& context,const char* owner,const char* state,const char* cluster,const char* startDate,const char* endDate,const char* jobname)
 {
@@ -3056,5 +3199,4 @@ void WsWuHelpers::checkAndTrimWorkunit(const char* methodName, StringBuffer& inp
 
     return;
 }
-
 }

+ 102 - 1
esp/services/ws_workunits/ws_workunitsHelpers.hpp

@@ -52,6 +52,8 @@ namespace ws_workunits {
 
 static const long MAXXLSTRANSFER = 5000000;
 const unsigned DATA_SIZE = 16;
+const unsigned WUARCHIVE_CACHE_SIZE = 8;
+const unsigned WUARCHIVE_CACHE_MINITES = 5;
 const unsigned AWUS_CACHE_SIZE = 16;
 const unsigned AWUS_CACHE_MIN_DEFAULT = 15;
 
@@ -127,6 +129,9 @@ private:
 
 class WsWuInfo
 {
+    IEspWUArchiveFile* readArchiveFileAttr(IPropertyTree& fileTree, const char* path);
+    IEspWUArchiveModule* readArchiveModuleAttr(IPropertyTree& moduleTree, const char* path);
+    void readArchiveFiles(IPropertyTree* archiveTree, const char* path, IArrayOf<IEspWUArchiveFile>& files);
 public:
     WsWuInfo(IEspContext &ctx, IConstWorkUnit *cw_) :
       context(ctx), cw(cw_)
@@ -184,12 +189,17 @@ public:
     void getWorkunitDll(StringBuffer &name, MemoryBuffer& buf);
     void getWorkunitXml(const char* plainText, MemoryBuffer& buf);
     void getWorkunitQueryShortText(MemoryBuffer& buf);
-    void getWorkunitAssociatedXml(const char* name, const char* IPAddress, const char* plainText, const char* description, bool forDownload, MemoryBuffer& buf);
+    void getWorkunitAssociatedXml(const char* name, const char* IPAddress, const char* plainText, const char* description,
+        bool forDownload, bool addXMLDeclaration, MemoryBuffer& buf);
     void getWorkunitCpp(const char* cppname, const char* description, const char* ipAddress, MemoryBuffer& buf, bool forDownload);
     void getEventScheduleFlag(IEspECLWorkunit &info);
     unsigned getWorkunitThorLogInfo(IArrayOf<IEspECLHelpFile>& helpers, IEspECLWorkunit &info);
     IDistributedFile* getLogicalFileData(IEspContext& context, const char* logicalName, bool& showFileContent);
 
+    IPropertyTree* getWorkunitArchive();
+    void listArchiveFiles(IPropertyTree* archive, const char* path, IArrayOf<IEspWUArchiveModule>& modules, IArrayOf<IEspWUArchiveFile>& files);
+    void getArchiveFile(IPropertyTree* archive, const char* moduleName, const char* attrName, const char* path, StringBuffer& file);
+
 protected:
     void addTimerToList(SCMStringBuffer& name, const char * scope, IConstWUStatistic & stat, IArrayOf<IEspECLTimer>& timers);
     unsigned getTotalThorTime();
@@ -323,6 +333,97 @@ struct ArchivedWuCache: public CInterface, implements IInterface
     size32_t cacheSize;
 };
 
+struct WUArchiveCacheElement: public CInterface, implements IInterface
+{
+    IMPLEMENT_IINTERFACE;
+    WUArchiveCacheElement(const char* _wuid, IPropertyTree* _archive) : wuid(_wuid)
+    {
+        archive.setown(_archive);
+        timeCached.setNow();
+    }
+
+    CDateTime timeCached;
+    std::string wuid;
+    Owned<IPropertyTree> archive;
+};
+
+struct CompareWUArchive
+{
+    CompareWUArchive(const char* _wuid): wuid(_wuid) {}
+    bool operator()(const Linked<WUArchiveCacheElement>& e) const
+    {
+        return streq(e->wuid.c_str(), wuid.c_str());
+    }
+    std::string wuid;
+};
+
+struct WUArchiveCache: public CInterface, implements IInterface
+{
+    IMPLEMENT_IINTERFACE;
+
+    WUArchiveCache(size32_t _cacheSize=0): cacheSize(_cacheSize){}
+
+    WUArchiveCacheElement* lookup(IEspContext &context, const char* wuid, unsigned timeOutMin)
+    {
+        CriticalBlock block(crit);
+
+        if (cache.size() < 1)
+            return NULL;
+
+        //erase data if it should be
+        CDateTime timeNow;
+        int timeout = timeOutMin;
+        timeNow.setNow();
+        timeNow.adjustTime(-timeout);
+        while (true)
+        {
+            std::list<Linked<WUArchiveCacheElement> >::iterator list_iter = cache.begin();
+            if (list_iter == cache.end())
+                break;
+
+            WUArchiveCacheElement* wuArchive = list_iter->get();
+            if (!wuArchive || (wuArchive->timeCached > timeNow))
+                break;
+
+            cache.pop_front();
+        }
+
+        if (cache.size() < 1)
+            return NULL;
+
+        //Check whether we have the WUArchive cache for this WU.
+        std::list<Linked<WUArchiveCacheElement> >::iterator it = std::find_if(cache.begin(), cache.end(), CompareWUArchive(wuid));
+        if(it!=cache.end())
+        {
+            return it->getLink();
+        }
+
+        return NULL;
+    }
+
+    void add(const char* _wuid, IPropertyTree* _archive)
+    {
+        Owned<IPropertyTree> archive = _archive;
+        CriticalBlock block(crit);
+
+        //Save new data
+        Owned<WUArchiveCacheElement> e = new WUArchiveCacheElement(_wuid, archive.getClear());
+        if (cacheSize > 0)
+        {
+            if (cache.size() >= cacheSize)
+                cache.pop_front();
+
+            cache.push_back(e.get());
+        }
+
+        return;
+    }
+
+    std::list<Linked<WUArchiveCacheElement> > cache;
+    CriticalSection crit;
+    size32_t cacheSize;
+};
+
 class WsWuJobQueueAuditInfo
 {
 public:

+ 77 - 1
esp/services/ws_workunits/ws_workunitsService.cpp

@@ -466,6 +466,7 @@ void CWsWorkunitsEx::init(IPropertyTree *cfg, const char *process, const char *s
 
     dataCache.setown(new DataCache(DATA_SIZE));
     archivedWuCache.setown(new ArchivedWuCache(AWUS_CACHE_SIZE));
+    wuArchiveCache.setown(new WUArchiveCache(WUARCHIVE_CACHE_SIZE));
 
     //Create a folder for temporarily holding gzip files by WUResultBin()
     Owned<IFile> tmpdir = createIFile(TEMPZIPDIR);
@@ -2536,7 +2537,7 @@ bool CWsWorkunitsEx::onWUFile(IEspContext &context,IEspWULogFileRequest &req, IE
                 else
                     ptr = name;
 
-                winfo.getWorkunitAssociatedXml(name, req.getIPAddress(), req.getPlainText(), req.getDescription(), opt > 0, mb);
+                winfo.getWorkunitAssociatedXml(name, req.getIPAddress(), req.getPlainText(), req.getDescription(), opt > 0, true, mb);
                 openSaveFile(context, opt, req.getSizeLimit(), ptr, HTTP_TYPE_APPLICATION_XML, mb, resp);
             }
             else if (strieq(File_XML,req.getType()) || strieq(File_WUECL,req.getType()))
@@ -4278,3 +4279,78 @@ bool CWsWorkunitsEx::onWUGetStats(IEspContext &context, IEspWUGetStatsRequest &r
     }
     return true;
 }
+
+IPropertyTree* CWsWorkunitsEx::getWorkunitArchive(IEspContext &context, WsWuInfo& winfo, const char* wuid, unsigned cacheMinutes)
+{
+    Owned<WUArchiveCacheElement> wuArchive = wuArchiveCache->lookup(context, wuid, cacheMinutes);
+    if (wuArchive)
+        return wuArchive->archive.getLink();
+
+    Owned<IPropertyTree> archive = winfo.getWorkunitArchive();
+    if (!archive)
+        return NULL;
+
+    wuArchiveCache->add(wuid, archive.getLink());
+    return archive.getClear();
+}
+
+bool CWsWorkunitsEx::onWUListArchiveFiles(IEspContext &context, IEspWUListArchiveFilesRequest &req, IEspWUListArchiveFilesResponse &resp)
+{
+    try
+    {
+        const char* wuid = req.getWUID();
+        if (isEmpty(wuid))
+            throw MakeStringException(ECLWATCH_NO_WUID_SPECIFIED, "No workunit defined.");
+
+        WsWuInfo winfo(context, wuid);
+        Owned<IPropertyTree> archive = getWorkunitArchive(context, winfo, wuid, WUARCHIVE_CACHE_MINITES);
+        if (!archive)
+            throw MakeStringException(ECLWATCH_INVALID_INPUT,"No workunit archive found for %s.", wuid);
+
+        IArrayOf<IEspWUArchiveModule> modules;
+        IArrayOf<IEspWUArchiveFile> files;
+        winfo.listArchiveFiles(archive, "", modules, files);
+        if (modules.length())
+            resp.setArchiveModules(modules);
+        if (files.length())
+            resp.setFiles(files);
+        if (!modules.length() && !files.length())
+            resp.setMessage("Files not found");
+    }
+    catch(IException* e)
+    {
+        FORWARDEXCEPTION(context, e,  ECLWATCH_INTERNAL_ERROR);
+    }
+    return true;
+}
+
+bool CWsWorkunitsEx::onWUGetArchiveFile(IEspContext &context, IEspWUGetArchiveFileRequest &req, IEspWUGetArchiveFileResponse &resp)
+{
+    try
+    {
+        const char* wuid = req.getWUID();
+        const char* moduleName = req.getModuleName();
+        const char* attrName = req.getFileName();
+        if (isEmpty(wuid))
+            throw MakeStringException(ECLWATCH_NO_WUID_SPECIFIED, "No workunit defined.");
+        if (isEmpty(moduleName) && isEmpty(attrName))
+            throw MakeStringException(ECLWATCH_INVALID_INPUT, "No file name defined.");
+
+        WsWuInfo winfo(context, wuid);
+        Owned<IPropertyTree> archive = getWorkunitArchive(context, winfo, wuid, WUARCHIVE_CACHE_MINITES);
+        if (!archive)
+            throw MakeStringException(ECLWATCH_INVALID_INPUT,"No workunit archive found for %s.", wuid);
+
+        StringBuffer file;
+        winfo.getArchiveFile(archive, moduleName, attrName, req.getPath(), file);
+        if (file.length())
+            resp.setFile(file.str());
+        else
+            resp.setMessage("File not found");
+    }
+    catch(IException* e)
+    {
+        FORWARDEXCEPTION(context, e,  ECLWATCH_INTERNAL_ERROR);
+    }
+    return true;
+}

+ 4 - 0
esp/services/ws_workunits/ws_workunitsService.hpp

@@ -256,6 +256,8 @@ public:
     bool onWUCheckFeatures(IEspContext &context, IEspWUCheckFeaturesRequest &req, IEspWUCheckFeaturesResponse &resp);
     bool onWUGetStats(IEspContext &context, IEspWUGetStatsRequest &req, IEspWUGetStatsResponse &resp);
 
+    bool onWUListArchiveFiles(IEspContext &context, IEspWUListArchiveFilesRequest &req, IEspWUListArchiveFilesResponse &resp);
+    bool onWUGetArchiveFile(IEspContext &context, IEspWUGetArchiveFileRequest &req, IEspWUGetArchiveFileResponse &resp);
 private:
     void addProcessLogfile(Owned<IConstWorkUnit> &cwu, WsWuInfo &winfo, const char * process, const char* path);
     void createZAPWUInfoFile(IEspWUCreateZAPInfoRequest &req, Owned<IConstWorkUnit>& cwu, const char* pathNameStr);
@@ -267,6 +269,7 @@ private:
     bool resetQueryStats(IEspContext &context, const char* target, IProperties* queryIds, IEspWUQuerySetQueryActionResponse& resp);
     void readGraph(IEspContext& context, const char* subGraphId, WUGraphIDType& id, bool running,
         IConstWUGraph* graph, IArrayOf<IEspECLGraphEx>& graphs);
+    IPropertyTree* getWorkunitArchive(IEspContext &context, WsWuInfo& winfo, const char* wuid, unsigned cacheMinutes);
 
     unsigned awusCacheMinutes;
     StringBuffer queryDirectory;
@@ -274,6 +277,7 @@ private:
     StringAttr daliServers;
     Owned<DataCache> dataCache;
     Owned<ArchivedWuCache> archivedWuCache;
+    Owned<WUArchiveCache> wuArchiveCache;
     StringAttr sashaServerIp;
     unsigned short sashaServerPort;
     BoolHash validClusters;