Преглед изворни кода

Merge branch 'closedown-4.2.x' into candidate-5.0.0

Conflicts:
	esp/services/ws_workunits/ws_workunitsQuerySets.cpp
	roxie/ccd/ccddali.cpp
	roxie/ccd/ccddali.hpp
	roxie/ccd/ccdstate.cpp

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman пре 11 година
родитељ
комит
4f32f2f150

+ 3 - 1
common/workunit/pkgimpl.hpp

@@ -291,11 +291,12 @@ public:
     StringAttr packageId;
     StringAttr querySet;
     bool active;
+    bool compulsory;
     StringArray wildMatches, wildIds;
 public:
     IMPLEMENT_IINTERFACE;
     CPackageMapOf(const char *_packageId, const char *_querySet, bool _active)
-        : packageId(_packageId), querySet(_querySet), active(_active), packages(true)
+        : packageId(_packageId), querySet(_querySet), active(_active), packages(true), compulsory(false)
     {
     }
 
@@ -355,6 +356,7 @@ public:
     {
         if (!xml)
             return;
+        compulsory = xml->getPropBool("@compulsory");
         Owned<IPropertyTreeIterator> allpackages = xml->getElements("Package");
         ForEach(*allpackages)
         {

+ 17 - 10
common/workunit/referencedfilelist.cpp

@@ -123,7 +123,7 @@ class ReferencedFile : public CInterface, implements IReferencedFile
 {
 public:
     IMPLEMENT_IINTERFACE;
-    ReferencedFile(const char *lfn, const char *sourceIP, const char *srcCluster, const char *prefix, bool isSubFile, unsigned _flags, const char *_pkgid) : flags(_flags), pkgid(_pkgid)
+    ReferencedFile(const char *lfn, const char *sourceIP, const char *srcCluster, const char *prefix, bool isSubFile, unsigned _flags, const char *_pkgid, bool noDfs) : flags(_flags), pkgid(_pkgid), noDfsResolution(noDfs)
     {
         logicalName.set(skipForeign(lfn, &daliip)).toLowerCase();
         if (daliip.length())
@@ -172,6 +172,7 @@ public:
     StringBuffer filePrefix;
     StringAttr fileSrcCluster;
     unsigned flags;
+    bool noDfsResolution;
 };
 
 class ReferencedFileList : public CInterface, implements IReferencedFileList
@@ -193,7 +194,7 @@ public:
             user.set(userDesc);
     }
 
-    void ensureFile(const char *ln, unsigned flags, const char *pkgid, const char *daliip=NULL, const char *srcCluster=NULL, const char *remotePrefix=NULL);
+    void ensureFile(const char *ln, unsigned flags, const char *pkgid, bool noDfsResolution, const char *daliip=NULL, const char *srcCluster=NULL, const char *remotePrefix=NULL);
 
     virtual void addFile(const char *ln, const char *daliip=NULL, const char *srcCluster=NULL, const char *remotePrefix=NULL);
     virtual void addFiles(StringArray &files);
@@ -287,8 +288,11 @@ void ReferencedFile::processRemoteFileTree(IPropertyTree *tree, const char *srcC
 
 void ReferencedFile::resolveLocal(const char *dstCluster, const char *srcCluster, IUserDescriptor *user, StringArray *subfiles)
 {
-    if (flags & RefFileInPackage)
+    if (noDfsResolution || (flags & RefFileInPackage))
+    {
+        flags |= RefFileNotFound;
         return;
+    }
     reset();
     Owned<IDistributedFile> df = queryDistributedFileDirectory().lookup(logicalName.str(), user);
     if(df)
@@ -330,8 +334,11 @@ void ReferencedFile::resolveRemote(IUserDescriptor *user, INode *remote, const c
 {
     if ((flags & RefFileForeign) && !resolveForeign)
         return;
-    if (flags & RefFileInPackage)
+    if (noDfsResolution || (flags & RefFileInPackage))
+    {
+        flags |= RefFileNotFound;
         return;
+    }
     reset();
     if (checkLocalFirst) //usually means we don't want to overwrite existing file info
     {
@@ -489,12 +496,12 @@ public:
     Owned<HashIterator> iter;
 };
 
-void ReferencedFileList::ensureFile(const char *ln, unsigned flags, const char *pkgid, const char *daliip, const char *srcCluster, const char *prefix)
+void ReferencedFileList::ensureFile(const char *ln, unsigned flags, const char *pkgid, bool noDfsResolution, const char *daliip, const char *srcCluster, const char *prefix)
 {
     if (!allowForeign && checkForeign(ln))
         throw MakeStringException(-1, "Foreign file not allowed%s: %s", (flags & RefFileInPackage) ? " (declared in package)" : "", ln);
 
-    Owned<ReferencedFile> file = new ReferencedFile(ln, daliip, srcCluster, prefix, false, flags, pkgid);
+    Owned<ReferencedFile> file = new ReferencedFile(ln, daliip, srcCluster, prefix, false, flags, pkgid, noDfsResolution);
     if (!file->logicalName.length())
         return;
     ReferencedFile *existing = map.getValue(file->getLogicalName());
@@ -600,14 +607,14 @@ void ReferencedFileList::addFilesFromQuery(IConstWorkUnit *cw, const IHpccPackag
                         {
                             StringBuffer subfile;
                             ssfe->getSubFileName(count, subfile);
-                            ensureFile(subfile, RefSubFile | RefFileInPackage, pkgid);
+                            ensureFile(subfile, RefSubFile | RefFileInPackage, pkgid, pkg->isCompulsory());
                         }
                     }
                 }
-                ensureFile(logicalName, flags, pkgid);
+                ensureFile(logicalName, flags, pkgid, pkg->isCompulsory());
             }
             else
-                ensureFile(logicalName, flags, NULL);
+                ensureFile(logicalName, flags, NULL, false);
         }
     }
 }
@@ -634,7 +641,7 @@ void ReferencedFileList::resolveSubFiles(StringArray &subfiles, bool checkLocalF
         if (!allowForeign && checkForeign(lfn))
             throw MakeStringException(-1, "Foreign sub file not allowed: %s", lfn);
 
-        Owned<ReferencedFile> file = new ReferencedFile(lfn, NULL, NULL, NULL, true, 0, NULL);
+        Owned<ReferencedFile> file = new ReferencedFile(lfn, NULL, NULL, NULL, true, 0, NULL, false);
         if (file->logicalName.length() && !map.getValue(file->getLogicalName()))
         {
             file->resolve(process.get(), srcCluster, user, remote, remotePrefix, checkLocalFirst, &childSubFiles, resolveForeign);

+ 3 - 3
common/workunit/workunit.cpp

@@ -5381,7 +5381,7 @@ IConstWUQuery* CLocalWorkUnit::getQuery() const
     {
         IPropertyTree *s = p->getPropTree("Query");
         if (s)
-            query.setown(new CLocalWUQuery(s)); 
+            query.setown(new CLocalWUQuery(s)); // NB takes ownership of 's'
     }
     return query.getLink();
 }
@@ -5981,7 +5981,7 @@ IConstWUWebServicesInfo* CLocalWorkUnit::getWebServicesInfo() const
         assertex(!webServicesInfo);
         IPropertyTree *s = p->getPropTree("WebServicesInfo");
         if (s)
-            webServicesInfo.setown(new CLocalWUWebServicesInfo(s)); 
+            webServicesInfo.setown(new CLocalWUWebServicesInfo(s)); // NB takes ownership of 's'
         webServicesInfoCached = true;
     }
     return webServicesInfo.getLink();
@@ -6017,7 +6017,7 @@ IConstWURoxieQueryInfo* CLocalWorkUnit::getRoxieQueryInfo() const
         assertex(!roxieQueryInfo);
         IPropertyTree *s = p->getPropTree("RoxieQueryInfo");
         if (s)
-            roxieQueryInfo.setown(new CLocalWURoxieQueryInfo(s)); 
+            roxieQueryInfo.setown(new CLocalWURoxieQueryInfo(s)); // NB takes ownership of 's'
         roxieQueryInfoCached = true;
     }
     return roxieQueryInfo.getLink();

+ 1 - 1
esp/services/WsDeploy/WsDeployService.cpp

@@ -3952,7 +3952,7 @@ bool CWsDeployFileInfo::handleAttributeAdd(IEspContext &context, IEspHandleAttri
   if (attribName.length() == 0)
     throw MakeStringException(-1,"Attribute name can't be empty!");
 
-  IPropertyTree* pComp =  pEnvRoot->getPropTree(xpath.str());
+  IPropertyTree* pComp =  pEnvRoot->queryPropTree(xpath.str());
 
   if (pComp != NULL)
     pComp->addProp(attribName.str(), "");

+ 2 - 2
esp/services/ws_dfu/ws_dfuService.cpp

@@ -5106,14 +5106,14 @@ void CWsDfuEx::mergeDataRow(StringBuffer& newRow, StringBuffer dataRow1, StringB
     newRow.append("<Row>");
 
     StringArray columnLabels;
-    IPropertyTreeIterator* it = data1->getElements("*");
+    Owned<IPropertyTreeIterator> it = data1->getElements("*");
     if (it)
     {
         StringArray columnLabels0;
         mergeDataRow(newRow, 0, it, columnLabels0, columnLabels);   
     }
 
-    IPropertyTreeIterator* it2 = data2->getElements("*");
+    Owned<IPropertyTreeIterator> it2 = data2->getElements("*");
     if (it2)
     {   
         mergeDataRow(newRow, 1, it2, columnsHide, columnLabels);

+ 1 - 1
esp/services/ws_ecl/ws_ecl_service.cpp

@@ -1393,7 +1393,7 @@ void appendEclInputXsds(StringBuffer &content, IPropertyTree *xsd, BoolHash &add
                 {
 #if 0
                     content.appendf("<xsd:complexType name=\"%s\"><xsd:sequence>", schema_name);
-                    IPropertyTreeIterator *children = item.getElements("xs:complexType/xs:sequence/*");
+                    Owned<IPropertyTreeIterator> children = item.getElements("xs:complexType/xs:sequence/*");
                     ForEach(*children)
                     {
                         IPropertyTree &child = children->query();

+ 1 - 1
esp/services/ws_packageprocess/ws_packageprocessService.cpp

@@ -422,7 +422,7 @@ bool deletePkgInfo(const char *name, const char *target, const char *process, bo
         IPropertyTree *pkgMaps = pkgMapsConn->queryRoot();
         if (!pkgMaps)
             throw MakeStringException(PKG_DALI_LOOKUP_ERROR, "Unable to retrieve PackageMaps information from dali [/PackageMaps]");
-        IPropertyTree *mapTree = pkgMaps->getPropTree(xpath.clear().appendf("PackageMap[@id='%s']", pmid.get()).str());
+        IPropertyTree *mapTree = pkgMaps->queryPropTree(xpath.clear().appendf("PackageMap[@id='%s']", pmid.get()).str());
         if (mapTree)
             pkgMaps->removeTree(mapTree);
     }

+ 3 - 1
esp/services/ws_workunits/ws_workunitsQuerySets.cpp

@@ -365,6 +365,8 @@ void QueryFilesInUse::loadTarget(IPropertyTree *t, const char *target, unsigned
                 fileTree->setProp("@lfn", lfn);
                 if (rf.getFlags() & RefFileSuper)
                     fileTree->setPropBool("@super", true);
+                if (rf.getFlags() & RefFileNotFound)
+                    fileTree->setPropBool("@notFound", true);
                 const char *fpkgid = rf.queryPackageId();
                 if (fpkgid && *fpkgid)
                     fileTree->setProp("@pkgid", fpkgid);
@@ -404,7 +406,7 @@ IPropertyTreeIterator *QueryFilesInUse::findQueriesUsingFile(const char *target,
         return NULL;
     if (!target || !*target)
         return findAllQueriesUsingFile(lfn);
-    IPropertyTree *targetTree = tree->getPropTree(target);
+    IPropertyTree *targetTree = tree->queryPropTree(target);
     if (!targetTree)
         return NULL;
 

+ 6 - 0
initfiles/componentfiles/configxml/RoxieTopology.xsl

@@ -146,6 +146,12 @@
                     <xsl:copy-of select="@regex"/>
                 </xsl:copy>
             </xsl:for-each>
+            <xsl:for-each select="PreferredCluster">
+                <xsl:copy>
+                    <xsl:copy-of select="@name"/>
+                    <xsl:copy-of select="@priority"/>
+                </xsl:copy>
+            </xsl:for-each>
             <xsl:for-each select="RoxieFarmProcess">
                 <xsl:element name="RoxieFarmProcess">
                     <xsl:copy-of select="@*[name()!='name' and name()!='level']"/>

+ 32 - 0
initfiles/componentfiles/configxml/roxie.xsd.in

@@ -212,6 +212,31 @@
             </xs:attribute>
           </xs:complexType>
         </xs:element>
+        <xs:element name="PreferredCluster" maxOccurs="unbounded">
+          <xs:annotation>
+            <xs:appinfo>
+              <title>Preferred Clusters</title>
+            </xs:appinfo>
+          </xs:annotation>
+          <xs:complexType>
+            <xs:attribute name="name" type="xs:string" use="required" >
+              <xs:annotation>
+                <xs:appinfo>
+                  <tooltip>Name of the cluster</tooltip>
+                                    <colIndex>1</colIndex>
+                </xs:appinfo>
+              </xs:annotation>
+            </xs:attribute>
+            <xs:attribute name="priority" type="xs:integer" use="required">
+              <xs:annotation>
+                <xs:appinfo>
+                  <tooltip>Priority (negative to disable)</tooltip>
+                                    <colIndex>2</colIndex>
+                </xs:appinfo>
+              </xs:annotation>
+            </xs:attribute>
+          </xs:complexType>
+        </xs:element>
         <xs:element name="UserMetric" maxOccurs="unbounded">
           <xs:annotation>
             <xs:appinfo>
@@ -690,6 +715,13 @@
         </xs:appinfo>
       </xs:annotation>
     </xs:attribute>
+    <xs:attribute name="slaveQueryReleaseDelaySeconds" type="xs:nonNegativeInteger" use="optional" default="60">
+      <xs:annotation>
+        <xs:appinfo>
+          <tooltip>Delay before unregistering slave queries to allow in-flight to complete</tooltip>
+        </xs:appinfo>
+      </xs:annotation>
+    </xs:attribute>
 
    <xs:attribute name="slaveThreads" type="xs:nonNegativeInteger" use="optional" default="30">
       <xs:annotation>

+ 1 - 0
initfiles/etc/DIR_NAME/environment.xml.in

@@ -885,6 +885,7 @@
                 siteCertificate=""
                 slaTimeout="2000"
                 slaveConfig="simple"
+                slaveQueryReleaseDelaySeconds="60"
                 slaveThreads="30"
                 soapTraceLevel="1"
                 socketCheckInterval="5000"

+ 3 - 2
roxie/ccd/ccd.hpp

@@ -324,8 +324,9 @@ extern unsigned slaTimeout;
 extern unsigned headRegionSize;
 extern unsigned ccdMulticastPort;
 extern CriticalSection ccdChannelsCrit;
-extern IPropertyTree* ccdChannels;
-extern IPropertyTree* topology;
+extern IPropertyTree *ccdChannels;
+extern IPropertyTree *topology;
+extern MapStringTo<int> *preferredClusters;
 extern StringArray allQuerySetNames;
 
 extern bool allFilesDynamic;

+ 6 - 9
roxie/ccd/ccddali.cpp

@@ -173,6 +173,7 @@ private:
                     try
                     {
                         owner->disconnectSem.wait();
+                        Sleep(5000);   // Don't retry immediately, give Dali a chance to recover.
                     }
                     catch (IException *E)
                     {
@@ -434,12 +435,6 @@ public:
         return ret.getClear();
     }
 
-    static const char *getPackageMapPath(StringBuffer &buf, const char *id)
-    {
-        buf.appendf("PackageMaps/PackageMap[@id='%s']", id);
-        return buf.str();
-    }
-
     static const char *getSuperFilePath(StringBuffer &buf, const char *lfn)
     {
         CDfsLogicalFileName lfnParser;
@@ -676,10 +671,10 @@ public:
         return getSubscription("PackageSets", "PackageSets", notifier);
     }
 
-    virtual IDaliPackageWatcher *getPackageMapSubscription(const char *id, ISDSSubscription *notifier)
+    virtual IDaliPackageWatcher *getPackageMapsSubscription(ISDSSubscription *notifier)
     {
         StringBuffer xpath;
-        return getSubscription(id, getPackageMapPath(xpath, id), notifier);
+        return getSubscription("PackageMaps", "PackageMaps", notifier);
     }
 
     virtual IDaliPackageWatcher *getSuperFileSubscription(const char *lfn, ISDSSubscription *notifier)
@@ -721,14 +716,16 @@ public:
                     serverStatus->queryProperties()->setProp("@cluster", roxieName.str());
                     serverStatus->commitProperties();
                     initCache();
-                    isConnected = true;
                     ForEachItemIn(idx, watchers)
                     {
                         watchers.item(idx).onReconnect();
                     }
+                    isConnected = true;
                 }
                 catch(IException *e)
                 {
+                    delete serverStatus;
+                    serverStatus = NULL;
                     ::closedownClientProcess(); // undo any partial initialization
                     StringBuffer text;
                     e->errorMessage(text);

+ 1 - 1
roxie/ccd/ccddali.hpp

@@ -47,8 +47,8 @@ interface IRoxieDaliHelper : extends IInterface
     virtual IDaliPackageWatcher *getQuerySetSubscription(const char *id, ISDSSubscription *notifier) = 0;
     virtual IPropertyTree *getPackageSets() = 0;
     virtual IDaliPackageWatcher *getPackageSetsSubscription(ISDSSubscription *notifier) = 0;
+    virtual IDaliPackageWatcher *getPackageMapsSubscription(ISDSSubscription *notifier) = 0;
     virtual IPropertyTree *getPackageMap(const char *id) = 0;
-    virtual IDaliPackageWatcher *getPackageMapSubscription(const char *id, ISDSSubscription *notifier) = 0;
     virtual IDaliPackageWatcher *getSuperFileSubscription(const char *lfn, ISDSSubscription *notifier) = 0;
     virtual void releaseSubscription(IDaliPackageWatcher *subscription) = 0;
     virtual bool connect(unsigned timeout) = 0;

+ 44 - 34
roxie/ccd/ccdfile.cpp

@@ -485,32 +485,32 @@ static IPartDescriptor *queryMatchingRemotePart(IPartDescriptor *pdesc, IFileDes
     return NULL;
 }
 
-static bool checkClusterCount(UnsignedArray &counts, unsigned clusterNo, unsigned max)
+static int getClusterPriority(const char *clusterName)
 {
-    while (!counts.isItem(clusterNo))
-        counts.append(0);
-    unsigned count = counts.item(clusterNo);
-    if (count>=max)
-        return false;
-    counts.replace(++count, clusterNo);
-    return true;
-}
-
-static bool isCopyFromCluster(IPartDescriptor *pdesc, unsigned clusterNo, const char *name)
-{
-    StringBuffer s;
-    return strieq(name, pdesc->queryOwner().getClusterGroupName(clusterNo, s));
+    assertex(preferredClusters);
+    int *priority = preferredClusters->getValue(clusterName);
+    return priority ? *priority : 100;
 }
 
 static void appendRemoteLocations(IPartDescriptor *pdesc, StringArray &locations, const char *localFileName, const char *fromCluster, bool includeFromCluster)
 {
-    UnsignedArray clusterCounts;
+    IFileDescriptor &fdesc = pdesc->queryOwner();
     unsigned numCopies = pdesc->numCopies();
+    unsigned lastClusterNo = (unsigned) -1;
+    unsigned numThisCluster = 0;
+    int priority = 0;
+    IntArray priorities;
     for (unsigned copy = 0; copy < numCopies; copy++)
     {
         unsigned clusterNo = pdesc->copyClusterNum(copy);
-        if (fromCluster && *fromCluster && isCopyFromCluster(pdesc, clusterNo, fromCluster)!=includeFromCluster)
-            continue;
+        StringBuffer clusterName;
+        fdesc.getClusterGroupName(clusterNo, clusterName);
+        if (fromCluster && *fromCluster)
+        {
+            bool matches = strieq(clusterName.str(), fromCluster);
+            if (matches!=includeFromCluster)
+                continue;
+        }
         RemoteFilename r;
         pdesc->getFilename(copy,r);
         StringBuffer path;
@@ -522,26 +522,36 @@ static void appendRemoteLocations(IPartDescriptor *pdesc, StringArray &locations
             if (streq(l, localFileName))
                 continue; // don't add ourself
         }
-        if (!checkClusterCount(clusterCounts, clusterNo, 2))  // Don't add more than 2 from one cluster
-            continue;
-        locations.append(path.str());
-    }
-}
-
-static void appendPeerLocations(IPartDescriptor *pdesc, StringArray &locations, const char *localFileName)
-{
-    const char *peerCluster = pdesc->queryOwner().queryProperties().queryProp("@cloneFromPeerCluster");
-    if (peerCluster)
-    {
-        if (*peerCluster=='-') //a remote cluster was specified explicitly
-            return;
-        if (streq(peerCluster, roxieName))
-            peerCluster=NULL;
+        if (clusterNo == lastClusterNo)
+        {
+            numThisCluster++;
+            if (numThisCluster > 2)  // Don't add more than 2 from one cluster
+                continue;
+        }
+        else
+        {
+            numThisCluster = 1;
+            lastClusterNo = clusterNo;
+            if (preferredClusters)
+            {
+                priority = getClusterPriority(clusterName);
+            }
+            else
+                priority = copy;
+        }
+        if (priority >= 0)
+        {
+            ForEachItemIn(idx, priorities)
+            {
+                if (priorities.item(idx) < priority)
+                    break;
+            }
+            priorities.add(priority, idx);
+            locations.add(path.str(), idx);
+        }
     }
-    appendRemoteLocations(pdesc, locations, localFileName, peerCluster, true);
 }
 
-
 //----------------------------------------------------------------------------------------------
 
 typedef StringArray *StringArrayPtr;

+ 1 - 1
roxie/ccd/ccdlistener.cpp

@@ -627,7 +627,7 @@ public:
         afor.For(activeChildren.ordinality()+(isMaster ? 0 : 1), 10);
         activeChildren.kill();
         if (mergedReply)
-            toXML(mergedReply, reply, 0, (mergeType == CascadeMergeQueries) ? XML_Format | XML_NewlinesOnly : XML_Format);
+            toXML(mergedReply, reply, 0, (mergeType == CascadeMergeQueries) ? XML_Embed|XML_LineBreak|XML_SortTags : XML_Format);
         if (logctx.queryTraceLevel() > 5)
             logctx.CTXLOG("doControlQuery (%d) finished: %.80s", isMaster, queryText);
     }

+ 15 - 2
roxie/ccd/ccdmain.cpp

@@ -86,7 +86,8 @@ bool runOnce = false;
 unsigned udpMulticastBufferSize = 262142;
 bool roxieMulticastEnabled = true;
 
-IPropertyTree* topology;
+IPropertyTree *topology;
+MapStringTo<int> *preferredClusters;
 StringBuffer topologyFile;
 CriticalSection ccdChannelsCrit;
 IPropertyTree* ccdChannels;
@@ -512,7 +513,19 @@ int STARTQUERY_API start_query(int argc, const char *argv[])
             topology->setProp("@traceLevel", globals->queryProp("--traceLevel"));
             topology->setProp("@memTraceLevel", globals->queryProp("--memTraceLevel"));
         }
-
+        if (topology->hasProp("PreferredCluster"))
+        {
+            preferredClusters = new MapStringTo<int>(true);
+            Owned<IPropertyTreeIterator> clusters = topology->getElements("PreferredCluster");
+            ForEach(*clusters)
+            {
+                IPropertyTree &child = clusters->query();
+                const char *name = child.queryProp("@name");
+                int priority = child.getPropInt("@priority", 100);
+                if (name && *name)
+                    preferredClusters->setValue(name, priority);
+            }
+        }
         topology->getProp("@name", roxieName);
         Owned<const IQueryDll> standAloneDll;
         if (globals->hasProp("--loadWorkunit"))

+ 1 - 1
roxie/ccd/ccdquery.cpp

@@ -1186,7 +1186,7 @@ public:
                 }
             }
         }
-        toXML(xref, reply, 1, XML_NewlinesOnly|XML_Format|XML_SortTags);
+        toXML(xref, reply, 1, XML_Embed|XML_LineBreak|XML_SortTags);
     }
     virtual void resetQueryTimings()
     {

+ 1 - 1
roxie/ccd/ccdqueue.cpp

@@ -443,7 +443,7 @@ extern IRoxieQueryPacket *createRoxiePacket(void *_data, unsigned _len)
 extern IRoxieQueryPacket *createRoxiePacket(MemoryBuffer &m)
 {
     unsigned length = m.length(); // don't make assumptions about evaluation order of parameters...
-    return createRoxiePacket(m.detach(), length);
+    return createRoxiePacket(m.detachOwn(), length);
 }
 
 //=================================================================================

+ 75 - 48
roxie/ccd/ccdstate.cpp

@@ -1201,13 +1201,20 @@ public:
 * look up queries that are received - this corresponds to the currently active package.
 *-----------------------------------------------------------------------------------------------*/
 
+static hash64_t hashXML(const IPropertyTree *tree)
+{
+    StringBuffer xml;
+    toXML(tree, xml, 0, XML_SortTags);
+    return rtlHash64Data(xml.length(), xml.str(), 877029);
+}
+
 class CRoxieQueryPackageManager : public CInterface
 {
 public:
     IMPLEMENT_IINTERFACE;
 
-    CRoxieQueryPackageManager(unsigned _numChannels, const char *_querySet, const IRoxiePackageMap *_packages)
-        : numChannels(_numChannels), packages(_packages), querySet(_querySet)
+    CRoxieQueryPackageManager(unsigned _numChannels, const char *_querySet, const IRoxiePackageMap *_packages, hash64_t _xmlHash)
+        : numChannels(_numChannels), packages(_packages), querySet(_querySet), xmlHash(_xmlHash)
     {
         queryHash = 0;
     }
@@ -1228,6 +1235,11 @@ public:
 
     virtual void load(bool forceReload) = 0;
 
+    bool matches(hash64_t _xmlHash, bool _active) const
+    {
+        return _xmlHash == xmlHash && _active==packages->isActive();
+    }
+
     virtual hash64_t getHash()
     {
         CriticalBlock b2(updateCrit);
@@ -1315,6 +1327,7 @@ public:
         serverManager->getAllQueryInfo(reply, full, slaveManagers, logctx);
     }
 protected:
+
     void reloadQueryManagers(CRoxieSlaveQuerySetManagerSet *newSlaveManagers, IRoxieQuerySetManager *newServerManager, hash64_t newHash)
     {
         Owned<CRoxieSlaveQuerySetManagerSet> oldSlaveManagers;
@@ -1339,6 +1352,7 @@ protected:
     Owned<const IRoxiePackageMap> packages;
     unsigned numChannels;
     hash64_t queryHash;
+    hash64_t xmlHash;
     StringAttr querySet;
 };
 
@@ -1369,8 +1383,8 @@ class CRoxieDaliQueryPackageManager : public CRoxieQueryPackageManager, implemen
 
 public:
     IMPLEMENT_IINTERFACE;
-    CRoxieDaliQueryPackageManager(unsigned _numChannels, const IRoxiePackageMap *_packages, const char *_querySet)
-        : CRoxieQueryPackageManager(_numChannels, _querySet, _packages)
+    CRoxieDaliQueryPackageManager(unsigned _numChannels, const IRoxiePackageMap *_packages, const char *_querySet, hash64_t _xmlHash)
+        : CRoxieQueryPackageManager(_numChannels, _querySet, _packages, _xmlHash)
     {
         daliHelper.setown(connectToDali());
     }
@@ -1415,7 +1429,7 @@ public:
     IMPLEMENT_IINTERFACE;
 
     CStandaloneQueryPackageManager(unsigned _numChannels, const char *_querySet, const IRoxiePackageMap *_packages, IPropertyTree *_standaloneDll)
-        : CRoxieQueryPackageManager(_numChannels, _querySet, _packages), standaloneDll(_standaloneDll)
+        : CRoxieQueryPackageManager(_numChannels, _querySet, _packages, 0), standaloneDll(_standaloneDll)
     {
         assertex(standaloneDll);
     }
@@ -1447,21 +1461,21 @@ extern IRoxieDebugSessionManager &queryRoxieDebugSessionManager()
     return *debugSessionManager;
 }
 
-class CRoxiePackageSetWatcher : public CInterface, implements ISDSSubscription
+class CRoxiePackageSetWatcher : public CInterface
 {
 public:
     IMPLEMENT_IINTERFACE;
-    CRoxiePackageSetWatcher(IRoxieDaliHelper *_daliHelper, ISDSSubscription *_owner, unsigned numChannels, bool forceReload)
-    : stateHash(0), daliHelper(_daliHelper), owner(_owner)
+    CRoxiePackageSetWatcher(IRoxieDaliHelper *_daliHelper, unsigned numChannels, CRoxiePackageSetWatcher *oldPackages, bool forceReload)
+    : stateHash(0), daliHelper(_daliHelper)
     {
         ForEachItemIn(idx, allQuerySetNames)
         {
-            createQueryPackageManagers(numChannels, allQuerySetNames.item(idx), forceReload);
+            createQueryPackageManagers(numChannels, allQuerySetNames.item(idx), oldPackages, forceReload);
         }
     }
 
-    CRoxiePackageSetWatcher(IRoxieDaliHelper *_daliHelper, ISDSSubscription *_owner, const IQueryDll *standAloneDll, unsigned numChannels, const char *querySet, bool forceReload)
-    : stateHash(0), daliHelper(_daliHelper), owner(_owner)
+    CRoxiePackageSetWatcher(IRoxieDaliHelper *_daliHelper, const IQueryDll *standAloneDll, unsigned numChannels, const char *querySet, bool forceReload)
+    : stateHash(0), daliHelper(_daliHelper)
     {
         Owned<IPropertyTree> standAloneDllTree;
         standAloneDllTree.setown(createPTree("Query"));
@@ -1473,16 +1487,6 @@ public:
         allQueryPackages.append(*qpm.getClear());
     }
 
-    ~CRoxiePackageSetWatcher()
-    {
-        unsubscribe();
-    }
-
-    virtual void notify(SubscriptionId id, const char *xpath, SDSNotifyFlags flags, unsigned valueLen, const void *valueData)
-    {
-        owner->notify(id, xpath, flags, valueLen, valueData);
-    }
-
     IQueryFactory *lookupLibrary(const char *libraryName, unsigned expectedInterfaceHash, const IRoxieContextLogger &logctx) const
     {
         ForEachItemIn(idx, allQueryPackages)
@@ -1554,8 +1558,11 @@ public:
         ForEachItemIn(idx, allQueryPackages)
         {
             Owned<IRoxieQuerySetManager> serverManager = allQueryPackages.item(idx).getRoxieServerManager();
-            Owned<IRoxieQuerySetManagerSet> slaveManagers = allQueryPackages.item(idx).getRoxieSlaveManagers();
-            serverManager->getAllQueryInfo(reply, full, slaveManagers, logctx);
+            if (serverManager->isActive())
+            {
+                Owned<IRoxieQuerySetManagerSet> slaveManagers = allQueryPackages.item(idx).getRoxieSlaveManagers();
+                serverManager->getAllQueryInfo(reply, full, slaveManagers, logctx);
+            }
         }
     }
 
@@ -1595,21 +1602,30 @@ public:
     }
 
 private:
-    ISDSSubscription *owner;
     CIArrayOf<CRoxieQueryPackageManager> allQueryPackages;
-    IArrayOf<IDaliPackageWatcher> notifiers;
     Linked<IRoxieDaliHelper> daliHelper;
     hash64_t stateHash;
 
-    void createQueryPackageManager(unsigned numChannels, const IRoxiePackageMap *packageMap, const char *querySet, bool forceReload)
+    CRoxieQueryPackageManager *getPackageManager(const char *id)
+    {
+        ForEachItemIn(idx, allQueryPackages)
+        {
+            CRoxieQueryPackageManager &pm = allQueryPackages.item(idx);
+            if (strcmp(pm.queryPackageId(), id)==0)
+                return LINK(&pm);
+        }
+        return NULL;
+    }
+
+    void createQueryPackageManager(unsigned numChannels, const IRoxiePackageMap *packageMap, const char *querySet, hash64_t xmlHash, bool forceReload)
     {
-        Owned<CRoxieQueryPackageManager> qpm = new CRoxieDaliQueryPackageManager(numChannels, packageMap, querySet);
+        Owned<CRoxieQueryPackageManager> qpm = new CRoxieDaliQueryPackageManager(numChannels, packageMap, querySet, xmlHash);
         qpm->load(forceReload);
         stateHash = rtlHash64Data(sizeof(stateHash), &stateHash, qpm->getHash());
         allQueryPackages.append(*qpm.getClear());
     }
 
-    void createQueryPackageManagers(unsigned numChannels, const char *querySet, bool forceReload)
+    void createQueryPackageManagers(unsigned numChannels, const char *querySet, CRoxiePackageSetWatcher *oldPackages, bool forceReload)
     {
         int loadedPackages = 0;
         int activePackages = 0;
@@ -1634,19 +1650,33 @@ private:
                     const char *packageMapFilter = pm.queryProp("@querySet");
                     if (packageMapId && *packageMapId && (!packageMapFilter || WildMatch(querySet, packageMapFilter, false)))
                     {
-                        bool isActive = pm.getPropBool("@active", true);
-                        if (traceLevel)
-                            DBGLOG("Loading package map %s, active %s", packageMapId, isActive ? "true" : "false");
                         try
                         {
-                            Owned<CRoxiePackageMap> packageMap = new CRoxiePackageMap(packageMapId, packageMapFilter, isActive);
+                            bool isActive = pm.getPropBool("@active", true);
                             Owned<IPropertyTree> xml = daliHelper->getPackageMap(packageMapId);
-                            packageMap->load(xml);
-                            createQueryPackageManager(numChannels, packageMap.getLink(), querySet, forceReload);
+                            hash64_t xmlHash = hashXML(xml);
+                            Owned<CRoxieQueryPackageManager> oldPackageManager;
+                            if (oldPackages)
+                            {
+                                oldPackageManager.setown(oldPackages->getPackageManager(packageMapId));
+                            }
+                            if (oldPackageManager && oldPackageManager->matches(xmlHash, isActive))
+                            {
+                                if (traceLevel)
+                                    DBGLOG("Package map %s, active %s already loaded", packageMapId, isActive ? "true" : "false");
+                                allQueryPackages.append(*oldPackageManager.getClear());
+                            }
+                            else
+                            {
+                                if (traceLevel)
+                                    DBGLOG("Loading package map %s, active %s", packageMapId, isActive ? "true" : "false");
+                                Owned<CRoxiePackageMap> packageMap = new CRoxiePackageMap(packageMapId, packageMapFilter, isActive);
+                                packageMap->load(xml);
+                                createQueryPackageManager(numChannels, packageMap.getLink(), querySet, xmlHash, forceReload);
+                            }
                             loadedPackages++;
                             if (isActive)
                                 activePackages++;
-                            notifiers.append(*daliHelper->getPackageMapSubscription(packageMapId, this));
                         }
                         catch (IException *E)
                         {
@@ -1663,25 +1693,18 @@ private:
         {
             if (traceLevel)
                 DBGLOG("Loading empty package for QuerySet %s", querySet);
-            createQueryPackageManager(numChannels, LINK(&queryEmptyRoxiePackageMap()), querySet, forceReload);
+            createQueryPackageManager(numChannels, LINK(&queryEmptyRoxiePackageMap()), querySet, 0, forceReload);
         }
         else if (traceLevel)
             DBGLOG("Loaded %d packages (%d active)", loadedPackages, activePackages);
     }
 
-    void unsubscribe()
-    {
-        ForEachItemIn(idx, notifiers)
-        {
-            daliHelper->releaseSubscription(&notifiers.item(idx));
-        }
-        notifiers.kill();
-    }
 };
 
 class CRoxiePackageSetManager : public CInterface, implements IRoxieQueryPackageManagerSet, implements ISDSSubscription
 {
-    Owned<IDaliPackageWatcher> notifier;
+    Owned<IDaliPackageWatcher> pSetsNotifier;
+    Owned<IDaliPackageWatcher> pMapsNotifier;
 public:
     IMPLEMENT_IINTERFACE;
     CRoxiePackageSetManager(const IQueryDll *_standAloneDll) :
@@ -1690,7 +1713,8 @@ public:
         daliHelper.setown(connectToDali(ROXIE_DALI_CONNECT_TIMEOUT));
         atomic_set(&autoPending, 0);
         autoReloadThread.start();
-        notifier.setown(daliHelper->getPackageSetsSubscription(this));
+        pSetsNotifier.setown(daliHelper->getPackageSetsSubscription(this));
+        pMapsNotifier.setown(daliHelper->getPackageMapsSubscription(this));
     }
 
     ~CRoxiePackageSetManager()
@@ -1796,6 +1820,9 @@ private:
             while (!closing)
             {
                 owner.autoReloadTrigger.wait();
+                if (closing)
+                    break;
+                Sleep(500); // Typically notifications come in clumps - this avoids reloading too often
                 if (atomic_read(&owner.autoPending))
                 {
                     atomic_set(&owner.autoPending, 0);
@@ -1834,9 +1861,9 @@ private:
         // So that the query/dll caching will work for anything that is not affected by the changes
         Owned<CRoxiePackageSetWatcher> newPackages;
         if (standAloneDll)
-            newPackages.setown(new CRoxiePackageSetWatcher(daliHelper, this, standAloneDll, numChannels, "roxie", forceRetry));
+            newPackages.setown(new CRoxiePackageSetWatcher(daliHelper, standAloneDll, numChannels, "roxie", forceRetry));
         else
-            newPackages.setown(new CRoxiePackageSetWatcher(daliHelper, this, numChannels, forceRetry));
+            newPackages.setown(new CRoxiePackageSetWatcher(daliHelper, numChannels, allQueryPackages, forceRetry));
         // Hold the lock for as little time as we can
         // Note that we must NOT hold the lock during the delete of the old object - or we deadlock.
         // Hence the slightly convoluted code below

+ 3 - 0
roxie/ccd/ccdstate.hpp

@@ -149,4 +149,7 @@ extern const char *queryNodeIndexName(const IPropertyTree &graphNode, ThorActivi
 extern void createDelayedReleaser();
 extern void stopDelayedReleaser();
 
+extern void createDelayedReleaser();
+extern void stopDelayedReleaser();
+
 #endif

+ 8 - 0
system/jlib/jbuff.cpp

@@ -376,6 +376,14 @@ void *MemoryBuffer::detach()
     return ret;
 }
 
+void *MemoryBuffer::detachOwn()
+{
+    assertex(ownBuffer);
+    void *ret = buffer;
+    init();
+    return ret;
+}
+
 void MemoryBuffer::setLength(unsigned len)
 {
     if (len > curLen)

+ 1 - 0
system/jlib/jbuff.hpp

@@ -173,6 +173,7 @@ public:
     void            setLength(unsigned len);
     void            setWritePos(unsigned len);      // only use for back patching data
     void *          detach();
+    void *          detachOwn();  // Never reallocates
     //Non-standard functions:
     void *          reserve(unsigned size);
     void            truncate();                     // truncates (i.e. minimizes allocation) to current size

+ 14 - 14
system/jlib/jptree.cpp

@@ -5220,13 +5220,13 @@ IPropertyTree *createPTreeFromXMLString(unsigned len, const char *xml, byte flag
 //////////////////////////
 /////////////////////////
 
-static void _toXML(const IPropertyTree *tree, IIOStream &out, unsigned indent, byte flags)
+static void _toXML(const IPropertyTree *tree, IIOStream &out, unsigned indent, unsigned flags)
 {
     const char *name = tree->queryName();
     if (!name) name = "__unnamed__";
     bool isBinary = tree->isBinary(NULL);
     bool inlinebody = true;
-    if (flags & XML_Format) writeCharsNToStream(out, ' ', indent);
+    if (flags & XML_Embed) writeCharsNToStream(out, ' ', indent);
     writeCharToStream(out, '<');
     writeStringToStream(out, name);
     Owned<IAttributeIterator> it = tree->getAttributes(true);
@@ -5243,11 +5243,11 @@ static void _toXML(const IPropertyTree *tree, IIOStream &out, unsigned indent, b
             {
                 if (first)
                 {
-                    if (flags & (XML_Format|XML_NewlinesOnly)) inlinebody = false;
+                    if (flags & XML_LineBreak) inlinebody = false;
                     first = false;
                     writeCharToStream(out, ' ');
                 }
-                else if ((flags & XML_Format) && it->count() > 3)
+                else if ((flags & XML_LineBreakAttributes) && it->count() > 3)
                 {
                     writeStringToStream(out, "\n");
                     writeCharsNToStream(out, ' ', attributeindent);
@@ -5291,7 +5291,7 @@ static void _toXML(const IPropertyTree *tree, IIOStream &out, unsigned indent, b
     bool empty;
     if (isBinary)
     {
-        if (flags & (XML_Format|XML_NewlinesOnly)) inlinebody = false;
+        if (flags & XML_LineBreak) inlinebody = false;
         writeStringToStream(out, " xsi:type=\"SOAP-ENC:base64\"");
         empty = (!tree->getPropBin(NULL, thislevelbin))||(thislevelbin.length()==0);
     }
@@ -5308,11 +5308,11 @@ static void _toXML(const IPropertyTree *tree, IIOStream &out, unsigned indent, b
     }
     if (sub->first())
     {
-        if (flags & (XML_Format|XML_NewlinesOnly)) inlinebody = false;
+        if (flags & XML_LineBreak) inlinebody = false;
     }
     else if (empty && !(flags & XML_Sanitize))
     {
-        if (flags & (XML_Format|XML_NewlinesOnly))
+        if (flags & XML_LineBreak)
             writeStringToStream(out, "/>\n");
         else
             writeStringToStream(out, "/>");
@@ -5399,13 +5399,13 @@ static void _toXML(const IPropertyTree *tree, IIOStream &out, unsigned indent, b
 
     writeStringToStream(out, "</");
     writeStringToStream(out, name);
-    if (flags & (XML_Format|XML_NewlinesOnly))
+    if (flags & XML_LineBreak)
         writeStringToStream(out, ">\n");
     else
         writeCharToStream(out, '>');
 }
 
-jlib_decl StringBuffer &toXML(const IPropertyTree *tree, StringBuffer &ret, unsigned indent, byte flags)
+jlib_decl StringBuffer &toXML(const IPropertyTree *tree, StringBuffer &ret, unsigned indent, unsigned flags)
 {
     class CAdapter : public CInterface, implements IIOStream
     {
@@ -5421,18 +5421,18 @@ jlib_decl StringBuffer &toXML(const IPropertyTree *tree, StringBuffer &ret, unsi
     return ret;
 }
 
-void toXML(const IPropertyTree *tree, IIOStream &out, unsigned indent, byte flags)
+void toXML(const IPropertyTree *tree, IIOStream &out, unsigned indent, unsigned flags)
 {
     _toXML(tree, out, indent, flags);
 }
 
-void saveXML(const char *filename, const IPropertyTree *tree, unsigned indent, byte flags)
+void saveXML(const char *filename, const IPropertyTree *tree, unsigned indent, unsigned flags)
 {
     OwnedIFile ifile = createIFile(filename);
     saveXML(*ifile, tree, indent, flags);
 }
 
-void saveXML(IFile &ifile, const IPropertyTree *tree, unsigned indent, byte flags)
+void saveXML(IFile &ifile, const IPropertyTree *tree, unsigned indent, unsigned flags)
 {
     OwnedIFileIO ifileio = ifile.open(IFOcreate);
     if (!ifileio)
@@ -5440,14 +5440,14 @@ void saveXML(IFile &ifile, const IPropertyTree *tree, unsigned indent, byte flag
     saveXML(*ifileio, tree, indent, flags);
 }
 
-void saveXML(IFileIO &ifileio, const IPropertyTree *tree, unsigned indent, byte flags)
+void saveXML(IFileIO &ifileio, const IPropertyTree *tree, unsigned indent, unsigned flags)
 {
     Owned<IIOStream> stream = createIOStream(&ifileio);
     stream.setown(createBufferedIOStream(stream));
     saveXML(*stream, tree, indent, flags);
 }
 
-void saveXML(IIOStream &stream, const IPropertyTree *tree, unsigned indent, byte flags)
+void saveXML(IIOStream &stream, const IPropertyTree *tree, unsigned indent, unsigned flags)
 {
     toXML(tree, stream, indent, flags);
 }

+ 11 - 9
system/jlib/jptree.hpp

@@ -207,20 +207,22 @@ jlib_decl IPropertyTree *createPTreeFromJSONString(const char *json, byte flags=
 jlib_decl IPropertyTree *createPTreeFromJSONString(unsigned len, const char *json, byte flags=ipt_none, PTreeReaderOptions readFlags=ptr_ignoreWhiteSpace, IPTreeMaker *iMaker=NULL);
 
 #define XML_SortTags 0x01
-#define XML_Format   0x02
+#define XML_Embed    0x02
 #define XML_NoEncode 0x04
 #define XML_Sanitize 0x08
 #define XML_SanitizeAttributeValues 0x10
 #define XML_SingleQuoteAttributeValues 0x20
 #define XML_NoBinaryEncode64 0x40
-#define XML_NewlinesOnly 0x80
-
-jlib_decl StringBuffer &toXML(const IPropertyTree *tree, StringBuffer &ret, unsigned indent = 0, byte flags=XML_Format);
-jlib_decl void toXML(const IPropertyTree *tree, IIOStream &out, unsigned indent = 0, byte flags=XML_Format);
-jlib_decl void saveXML(const char *filename, const IPropertyTree *tree, unsigned indent = 0, byte flags=XML_Format);
-jlib_decl void saveXML(IFile &ifile, const IPropertyTree *tree, unsigned indent = 0, byte flags=XML_Format);
-jlib_decl void saveXML(IFileIO &ifileio, const IPropertyTree *tree, unsigned indent = 0, byte flags=XML_Format);
-jlib_decl void saveXML(IIOStream &stream, const IPropertyTree *tree, unsigned indent = 0, byte flags=XML_Format);
+#define XML_LineBreak 0x100
+#define XML_LineBreakAttributes 0x80
+#define XML_Format (XML_Embed|XML_LineBreakAttributes|XML_LineBreak)
+
+jlib_decl StringBuffer &toXML(const IPropertyTree *tree, StringBuffer &ret, unsigned indent = 0, unsigned flags=XML_Format);
+jlib_decl void toXML(const IPropertyTree *tree, IIOStream &out, unsigned indent = 0, unsigned flags=XML_Format);
+jlib_decl void saveXML(const char *filename, const IPropertyTree *tree, unsigned indent = 0, unsigned flags=XML_Format);
+jlib_decl void saveXML(IFile &ifile, const IPropertyTree *tree, unsigned indent = 0, unsigned=XML_Format);
+jlib_decl void saveXML(IFileIO &ifileio, const IPropertyTree *tree, unsigned indent = 0, unsigned flags=XML_Format);
+jlib_decl void saveXML(IIOStream &stream, const IPropertyTree *tree, unsigned indent = 0, unsigned flags=XML_Format);
 
 #define JSON_SortTags 0x01
 #define JSON_Format   0x02