Browse Source

HPCC-12012 Cache PackageMaps/PackageSets info in WsPackageProcess

The PackageMaps/PackageSets info is cached into WsPackageProcess
in order to avoid unecessary calls to dali. Subscribe/notify is
used for dali to notify the ESP service about possible changes
of the PackageMaps/PackageSets info.

Signed-off-by: wangkx <kevin.wang@lexisnexis.com>
wangkx 11 năm trước cách đây
mục cha
commit
7d00f7ae74

+ 65 - 47
esp/services/ws_packageprocess/ws_packageprocessService.cpp

@@ -32,6 +32,7 @@
 
 void CWsPackageProcessEx::init(IPropertyTree *cfg, const char *process, const char *service)
 {
+    packageMapAndSet.subscribe();
 }
 
 bool CWsPackageProcessEx::onEcho(IEspContext &context, IEspEchoRequest &req, IEspEchoResponse &resp)
@@ -297,12 +298,10 @@ void getAllPackageListInfo(IPropertyTree *mapTree, StringBuffer &info)
     info.append("</PackageMap>");
 }
 
-void listPkgInfo(double version, const char *target, const char *process, IPropertyTree* pkgSetRegistry, IArrayOf<IConstPackageListMapData>* results)
+void listPkgInfo(double version, const char *target, const char *process, const IPropertyTree* packageMaps, IPropertyTree* pkgSetRegistry, IArrayOf<IConstPackageListMapData>* results)
 {
-    Owned<IRemoteConnection> globalLock = querySDS().connect("/PackageMaps/", myProcessSession(), RTM_LOCK_WRITE|RTM_CREATE_QUERY, SDS_LOCK_TIMEOUT);
-    if (!globalLock)
-        throw MakeStringException(PKG_DALI_LOOKUP_ERROR, "Unable to retrieve package information from dali /PackageMaps");
-    IPropertyTree *root = globalLock->queryRoot();
+    if (!packageMaps)
+        throw MakeStringException(PKG_DALI_LOOKUP_ERROR, "Unable to retrieve package information from dali for process");
     StringBuffer xpath("PackageMap");
     if (target && *target)
         xpath.appendf("[@querySet='%s']", target);
@@ -311,42 +310,41 @@ void listPkgInfo(double version, const char *target, const char *process, IPrope
     {
         IPropertyTree &item = iter->query();
         const char *id = item.queryProp("@id");
-        if (id)
-        {
-            StringBuffer xpath;
-            xpath.append("PackageMap[@id='").append(id).append("']");
-            IPropertyTree *mapTree = root->queryPropTree(xpath);
-            if (!mapTree)
-                continue;
-            Owned<IEspPackageListMapData> res = createPackageListMapData("", "");
-            res->setActive(item.getPropBool("@active"));
-            if (process && *process && (version >= 1.01))
-                res->setProcess(process);
-            getPackageListInfo(mapTree, res);
-            if (target && *target)
-                res->setTarget(target);
-            else
-                res->setTarget(item.queryProp("@querySet"));
-            results->append(*res.getClear());
-        }
+        if (!id || !*id)
+            continue;
+
+        StringBuffer xpath;
+        xpath.append("PackageMap[@id='").append(id).append("']");
+        IPropertyTree *mapTree = packageMaps->queryPropTree(xpath);
+        if (!mapTree)
+            continue;
+
+        Owned<IEspPackageListMapData> res = createPackageListMapData("", "");
+        res->setActive(item.getPropBool("@active"));
+        if (process && *process && (version >= 1.01))
+            res->setProcess(process);
+        getPackageListInfo(mapTree, res);
+        if (target && *target)
+            res->setTarget(target);
+        else
+            res->setTarget(item.queryProp("@querySet"));
+        results->append(*res.getClear());
     }
 }
 
-void listPkgInfo(double version, const char *target, const char *process, IArrayOf<IConstPackageListMapData>* results)
+void listPkgInfo(double version, const char *target, const char *process, const IPropertyTree* packageMaps, IArrayOf<IConstPackageListMapData>* results)
 {
     Owned<IPropertyTree> pkgSetRegistry = getPkgSetRegistry((process && *process) ? process : "*", true);
     if (pkgSetRegistry) //will be NULL if no package map
     {
-        listPkgInfo(version, target, process, pkgSetRegistry, results);
+        listPkgInfo(version, target, process, packageMaps, pkgSetRegistry, results);
     }
 }
 
-void getPkgInfo(const char *target, const char *process, StringBuffer &info)
+void getPkgInfo(const IPropertyTree *packageMaps, const char *target, const char *process, StringBuffer &info)
 {
-    Owned<IRemoteConnection> globalLock = querySDS().connect("/PackageMaps/", myProcessSession(), RTM_LOCK_WRITE|RTM_CREATE_QUERY, SDS_LOCK_TIMEOUT);
-    if (!globalLock)
-        throw MakeStringException(PKG_DALI_LOOKUP_ERROR, "Unable to retrieve package information from dali /PackageMaps");
-    IPropertyTree *root = globalLock->queryRoot();
+    if (!packageMaps)
+        throw MakeStringException(PKG_DALI_LOOKUP_ERROR, "Unable to retrieve package information from dali for process");
     Owned<IPropertyTree> tree = createPTree("PackageMaps");
     Owned<IPropertyTree> pkgSetRegistry = getPkgSetRegistry(process, true);
     if (!pkgSetRegistry)
@@ -366,7 +364,7 @@ void getPkgInfo(const char *target, const char *process, StringBuffer &info)
         {
             StringBuffer xpath;
             xpath.append("PackageMap[@id='").append(id).append("']");
-            IPropertyTree *mapTree = root->queryPropTree(xpath);
+            IPropertyTree *mapTree = packageMaps->queryPropTree(xpath);
             if (mapTree)
                 mergePTree(tree, mapTree);
         }
@@ -474,6 +472,29 @@ void activatePackageMapInfo(const char *target, const char *name, const char *pr
     }
 }
 
+void PackageMapAndSet::load(const char* path, IPropertyTree* t)
+{
+    Owned<IRemoteConnection> globalLock = querySDS().connect(path, myProcessSession(), RTM_LOCK_READ, SDS_LOCK_TIMEOUT);
+    if (!globalLock)
+        throw MakeStringException(PKG_DALI_LOOKUP_ERROR, "Unable to retrieve %s information from dali /%s", path, path);
+
+    t->removeProp(path);
+
+    IPropertyTree *root = globalLock->queryRoot();
+    if (root)
+        t->addPropTree(path, LINK(root));
+}
+
+void PackageMapAndSet::load(unsigned flags)
+{
+    Owned<IPropertyTree> t = createPTreeFromIPT(tree);
+    if (flags & PMAS_RELOAD_PACKAGE_SET)
+        load("PackageSets", t);
+    if (flags & PMAS_RELOAD_PACKAGE_MAP)
+        load("PackageMaps", t);
+    tree.setown(t.getClear());
+}
+
 bool CWsPackageProcessEx::readPackageMapString(const char *packageMapString, StringBuffer &target, StringBuffer &process, StringBuffer &packageMap)
 {
     if (!packageMapString || !*packageMapString)
@@ -497,14 +518,13 @@ void CWsPackageProcessEx::getPkgInfoById(const char *packageMapId, IPropertyTree
     if (!packageMapId || !*packageMapId)
         return;
 
-    Owned<IRemoteConnection> globalLock = querySDS().connect("/PackageMaps/", myProcessSession(), RTM_LOCK_WRITE|RTM_CREATE_QUERY, SDS_LOCK_TIMEOUT);
-    if (!globalLock)
-        throw MakeStringException(PKG_DALI_LOOKUP_ERROR, "Unable to retrieve package information from dali /PackageMaps");
+    IPropertyTree *packageMaps = packageMapAndSet.getPackageMaps();
+    if (!packageMaps)
+        throw MakeStringException(PKG_DALI_LOOKUP_ERROR, "Unable to retrieve package information from dali for process");
 
     StringBuffer xpath;
     xpath.append("PackageMap[@id='").append(packageMapId).append("']");
-    IPropertyTree *root = globalLock->queryRoot();
-    IPropertyTree *mapTree = root->queryPropTree(xpath);
+    IPropertyTree *mapTree = packageMaps->queryPropTree(xpath);
     if (mapTree)
         mergePTree(tree, mapTree);
 }
@@ -632,7 +652,7 @@ bool CWsPackageProcessEx::onListPackage(IEspContext &context, IEspListPackageReq
     resp.updateStatus().setCode(0);
     IArrayOf<IConstPackageListMapData> results;
     StringAttr process(req.getProcess());
-    listPkgInfo(context.getClientVersion(), req.getTarget(), process.length() ? process.get() : "*", &results);
+    listPkgInfo(context.getClientVersion(), req.getTarget(), process.length() ? process.get() : "*", packageMapAndSet.getPackageMaps(), &results);
     resp.setPkgListMapData(results);
     return true;
 }
@@ -643,15 +663,13 @@ bool CWsPackageProcessEx::onListPackages(IEspContext &context, IEspListPackagesR
     const char* targetReq = req.getTarget();
     const char* processReq = req.getProcess();
     const char* processFilterReq = req.getProcessFilter();
+    IPropertyTree* packageMaps = packageMapAndSet.getPackageMaps();
     IArrayOf<IConstPackageListMapData> results;
     if ((!processReq || !*processReq) && (processFilterReq && *processFilterReq))
-        listPkgInfo(version, targetReq, processFilterReq, &results);
+        listPkgInfo(version, targetReq, processFilterReq, packageMaps, &results);
     else
     {
-        Owned<IRemoteConnection> conn = querySDS().connect("/PackageSets", myProcessSession(), RTM_LOCK_READ, SDS_LOCK_TIMEOUT);
-        if (!conn)
-            throw MakeStringException(PKG_DALI_LOOKUP_ERROR, "Unable to retrieve package information from dali for process");
-        Owned<IPropertyTree> pkgSetRegistryRoot = conn->getRoot();
+        IPropertyTree* pkgSetRegistryRoot = packageMapAndSet.getPackageSets();
         if (!pkgSetRegistryRoot)
             throw MakeStringException(PKG_DALI_LOOKUP_ERROR, "Unable to retrieve package information from dali for process");
         Owned<IPropertyTreeIterator> iter = pkgSetRegistryRoot->getElements("PackageSet");
@@ -663,7 +681,7 @@ bool CWsPackageProcessEx::onListPackages(IEspContext &context, IEspListPackagesR
                 StringBuffer process;
                 pkgSetRegistry->getProp("@process", process);
                 if (process.length() && (streq(process.str(), "*") || WildMatch(process.str(), processReq, true)))
-                    listPkgInfo(version, targetReq, process.str(), pkgSetRegistry, &results);
+                    listPkgInfo(version, targetReq, process.str(), packageMaps, pkgSetRegistry, &results);
             }
             catch(IException* e)
             {
@@ -707,7 +725,7 @@ bool CWsPackageProcessEx::onGetPackage(IEspContext &context, IEspGetPackageReque
     resp.updateStatus().setCode(0);
     StringAttr process(req.getProcess());
     StringBuffer info;
-    getPkgInfo(req.getTarget(), process.length() ? process.get() : "*", info);
+    getPkgInfo(packageMapAndSet.getPackageMaps(), req.getTarget(), process.length() ? process.get() : "*", info);
     resp.setInfo(info);
     return true;
 }
@@ -898,10 +916,10 @@ bool CWsPackageProcessEx::onGetPackageMapSelectOptions(IEspContext &context, IEs
         {
             StringArray processFilters;
             processFilters.append("*");
-            Owned<IRemoteConnection> pkgSet = querySDS().connect("/PackageSets/", myProcessSession(), RTM_LOCK_WRITE, SDS_LOCK_TIMEOUT);
-            if (pkgSet)
+            IPropertyTree* pkgSets = packageMapAndSet.getPackageSets();
+            if (pkgSets)
             {
-                Owned<IPropertyTreeIterator> iter = pkgSet->queryRoot()->getElements("PackageSet");
+                Owned<IPropertyTreeIterator> iter = pkgSets->getElements("PackageSet");
                 ForEach(*iter)
                 {
                     StringBuffer process;

+ 95 - 1
esp/services/ws_packageprocess/ws_packageprocessService.hpp

@@ -19,11 +19,101 @@
 #define _ESPWIZ_ws_packageprocess_HPP__
 
 #include "ws_packageprocess_esp.ipp"
+#include "dasds.hpp"
 
 #define THORCLUSTER "thor"
 #define HTHORCLUSTER "hthor"
 #define ROXIECLUSTER "roxie"
 
+#define PMAS_RELOAD_PACKAGE_SET 0x01
+#define PMAS_RELOAD_PACKAGE_MAP 0x02
+
+class PackageMapAndSet : public CInterface, implements ISDSSubscription
+{
+    Owned<IPropertyTree> tree;
+    SubscriptionId pmChange;
+    SubscriptionId psChange;
+    mutable CriticalSection crit;
+    mutable CriticalSection dirtyCrit; //if there were an atomic_or I would just use atomic
+    unsigned dirty;
+
+    void load(unsigned flags);
+    void load(const char* path, IPropertyTree* t);
+
+public:
+    IMPLEMENT_IINTERFACE;
+    PackageMapAndSet() : pmChange(0), psChange(0), dirty(PMAS_RELOAD_PACKAGE_SET | PMAS_RELOAD_PACKAGE_MAP)
+    {
+        tree.setown(createPTree("PackageMapAndSet"));
+    }
+
+    virtual void notify(SubscriptionId subid, const char *xpath, SDSNotifyFlags flags, unsigned valueLen, const void *valueData)
+    {
+        Linked<PackageMapAndSet> me = this;  // Ensure that I am not released by the notify call (which would then access freed memory to release the critsec)
+        CriticalBlock b(dirtyCrit);
+        if (subid == pmChange)
+            dirty |= PMAS_RELOAD_PACKAGE_MAP;
+        else if (subid == psChange)
+            dirty |= PMAS_RELOAD_PACKAGE_SET;
+    }
+
+    virtual void subscribe()
+    {
+        CriticalBlock b(crit);
+        pmChange = querySDS().subscribe("PackageMaps", *this, true);
+        psChange = querySDS().subscribe("PackageSets", *this, true);
+    }
+
+    virtual void unsubscribe()
+    {
+        CriticalBlock b(crit);
+        try
+        {
+            if (pmChange)
+                querySDS().unsubscribe(pmChange);
+            if (psChange)
+                querySDS().unsubscribe(psChange);
+        }
+        catch (IException *E)
+        {
+            E->Release();
+        }
+        pmChange = 0;
+        psChange = 0;
+    }
+
+    IPropertyTree *getTree()
+    {
+        CriticalBlock b(crit);
+        unsigned flags;
+        {
+            CriticalBlock b(dirtyCrit);
+            flags = dirty;
+            dirty = 0;
+        }
+        if (flags)
+            load(flags);
+        return LINK(tree);
+    }
+
+    IPropertyTree *getPackageMaps()
+    {
+        Owned<IPropertyTree> root = getTree();
+        return root->queryPropTree("PackageMaps");
+    }
+
+    IPropertyTree *getPackageSets()
+    {
+        Owned<IPropertyTree> root = getTree();
+        return root->queryPropTree("PackageSets");
+    }
+
+    StringBuffer &toStr(StringBuffer &s)
+    {
+        Owned<IPropertyTree> t = getTree();
+        return toXML(t, s);
+    }
+};
 
 class CWsPackageProcessSoapBindingEx : public CWsPackageProcessSoapBinding
 {
@@ -50,7 +140,10 @@ class CWsPackageProcessEx : public CWsPackageProcess
     void deletePackage(const char *packageMap, const char *target, const char *process, bool globalScope, StringBuffer &returnMsg, int &returnCode);
 public:
     IMPLEMENT_IINTERFACE;
-    virtual ~CWsPackageProcessEx(){};
+    virtual ~CWsPackageProcessEx()
+    {
+        packageMapAndSet.unsubscribe();
+    };
 
     virtual void init(IPropertyTree *cfg, const char *process, const char *service);
 
@@ -66,6 +159,7 @@ public:
     virtual bool onListPackages(IEspContext &context, IEspListPackagesRequest &req, IEspListPackagesResponse &resp);
     virtual bool onGetPackageMapSelectOptions(IEspContext &context, IEspGetPackageMapSelectOptionsRequest &req, IEspGetPackageMapSelectOptionsResponse &resp);
     virtual bool onGetPackageMapById(IEspContext &context, IEspGetPackageMapByIdRequest &req, IEspGetPackageMapByIdResponse &resp);
+    PackageMapAndSet packageMapAndSet;
 };
 
 #endif //_ESPWIZ_ws_packageprocess_HPP__