Explorar el Código

Merge branch 'closedown-4.2.x' into candidate-4.2.8

Conflicts:
	version.cmake

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman hace 11 años
padre
commit
64c66e4596

+ 7 - 0
common/workunit/pkgimpl.hpp

@@ -106,6 +106,13 @@ protected:
         return (node) ? node->getPropBool("@compulsory", false) : false;
     }
 
+    virtual bool resolveLocally() const
+    {
+        if (isCompulsory())
+            return false;
+        return (node) ? node->getPropBool("@resolveLocally", false) : true;  // default is false for explicit package files, but true for the default empty package
+    }
+
     virtual bool getSysFieldTranslationEnabled() const {return false;}
     virtual bool getEnableFieldTranslation() const
     {

+ 10 - 1
dali/base/dacsds.ipp

@@ -87,7 +87,16 @@ public:
         if (connected)
         {
             bool deleteRoot = false;
-            manager.commit(*this, &deleteRoot);
+            try
+            {
+                manager.commit(*this, &deleteRoot);
+            }
+            catch (IException *e)
+            {
+                VStringBuffer errMsg("beforeDispose commit connectionid=%"I64F"x, xpath=%s", connectionId, xpath.get());
+                EXCLOG(e, errMsg.str());
+                e->Release();
+            }
         }
         root.clear();
     }

+ 11 - 6
dali/base/dadfs.cpp

@@ -575,7 +575,7 @@ class CClustersLockedSection
 {
     Owned<IRemoteConnection> conn;
 public:
-    CClustersLockedSection(CDfsLogicalFileName &dlfn)
+    CClustersLockedSection(CDfsLogicalFileName &dlfn, bool exclusive)
     {
         StringBuffer xpath;
         dlfn.makeFullnameQuery(xpath,DXB_File,true).append("/ClusterLock");
@@ -583,9 +583,14 @@ public:
         /* Avoid RTM_CREATE_QUERY connect() if possible by making 1st call without. This is to avoid write contention caused by RTM_CREATE*
          * NB: RTM_CREATE_QUERY should probably only gain exclusive access in Dali if node is missing.
          */
-        conn.setown(querySDS().connect(xpath.str(), myProcessSession(), RTM_LOCK_WRITE, SDS_CONNECT_TIMEOUT));
+        conn.setown(querySDS().connect(xpath.str(), myProcessSession(), exclusive ? RTM_LOCK_WRITE : RTM_LOCK_READ, SDS_CONNECT_TIMEOUT));
         if (!conn.get()) // NB: ClusterLock is now created at File create time, so this can only be true for pre-existing File's
+        {
             conn.setown(querySDS().connect(xpath.str(), myProcessSession(), RTM_CREATE_QUERY | RTM_LOCK_WRITE, SDS_CONNECT_TIMEOUT));
+            assertex(conn.get());
+            if (!exclusive)
+                conn->changeMode(RTM_LOCK_READ, SDS_CONNECT_TIMEOUT);
+        }
     }
 };
 
@@ -3170,7 +3175,7 @@ public:
         logicalName.set(lname);
         parent = _parent;
         conn.setown(_conn);
-        CClustersLockedSection sect(logicalName);
+        CClustersLockedSection sect(logicalName, false);
         root.setown(conn->getRoot());
         root->queryBranch(".");     // load branch
 #ifdef EXTRA_LOGGING
@@ -3421,7 +3426,7 @@ public:
     {
         if (!clustername&&!*clustername)
             return;
-        CClustersLockedSection cls(CDistributedFileBase<IDistributedFile>::logicalName);
+        CClustersLockedSection cls(CDistributedFileBase<IDistributedFile>::logicalName, true);
         reloadClusters();
         if (findCluster(clustername)!=NotFound) {
             IDFS_Exception *e = new CDFS_Exception(DFSERR_ClusterAlreadyExists,clustername);
@@ -3440,7 +3445,7 @@ public:
 
     bool removeCluster(const char *clustername)
     {
-        CClustersLockedSection cls(CDistributedFileBase<IDistributedFile>::logicalName);
+        CClustersLockedSection cls(CDistributedFileBase<IDistributedFile>::logicalName, true);
         reloadClusters();
         unsigned i = findCluster(clustername);
         if (i!=NotFound) {
@@ -3526,7 +3531,7 @@ public:
 
     void updatePartDiskMapping(const char *clustername,const ClusterPartDiskMapSpec &spec)
     {
-        CClustersLockedSection cls(CDistributedFileBase<IDistributedFile>::logicalName);
+        CClustersLockedSection cls(CDistributedFileBase<IDistributedFile>::logicalName, true);
         reloadClusters();
         unsigned i = findCluster(clustername);
         if (i!=NotFound) {

+ 4 - 4
dali/base/dasds.cpp

@@ -7377,7 +7377,7 @@ void CCovenSDSManager::createConnection(SessionId sessionId, unsigned mode, unsi
                 if (!queryConnection(connectionId)) // aborted
                 {
                     connectionId = 0;
-                    return;
+                    throw MakeSDSException(SDSExcpt_AbortDuringConnection, " during connect");
                 }
             }
             freeExistingLocks.setConnectionId(connectionId);
@@ -7407,7 +7407,7 @@ void CCovenSDSManager::createConnection(SessionId sessionId, unsigned mode, unsi
                                 if (!queryConnection(connectionId)) // aborted
                                 {
                                     connectionId = 0;
-                                    return;
+                                    throw MakeSDSException(SDSExcpt_AbortDuringConnection, " during connect");
                                 }
                                 iter.setown(root->getElements(xpath+1));
                                 iter->first();
@@ -7521,7 +7521,7 @@ void CCovenSDSManager::createConnection(SessionId sessionId, unsigned mode, unsi
             if (!queryConnection(connectionId))
             {
                 connectionId = 0;
-                return;
+                throw MakeSDSException(SDSExcpt_AbortDuringConnection, " during connect");
             }
         }
     }
@@ -7538,7 +7538,7 @@ void CCovenSDSManager::createConnection(SessionId sessionId, unsigned mode, unsi
     {
         unlock(_tree->queryServerId(), connectionId);
         connectionId = 0;
-        return;
+        throw MakeSDSException(SDSExcpt_AbortDuringConnection, " during connect");
     }
     connection->setEstablished();
 

+ 21 - 11
dali/base/dasess.cpp

@@ -1320,12 +1320,13 @@ public:
         {
             /* There's existing ip:port match (client) in process table..
              * Old may be in process of closing or about to, but new has beaten the onClose() to it..
-             * Track old sessions in new CProcessSessionState instance, so that upcoming onClose() can find them
+             * Track old sessions in new CProcessSessionState instance, so that in-process or upcoming onClose()/stopSession() can find them
              */
             CProcessSessionState *previousState = processlookup.query(client);
             dbgassertex(previousState); // Must be there, it's reason add() failed
+            SessionId oldSessionId = previousState->getId();
             s->addSessionIds(*previousState, false); // merges sessions from previous process state into new one that replaces it
-            WARNLOG("Dali session manager: registerClient process session already registered, old replaced");
+            WARNLOG("Dali session manager: registerClient process session already registered, old (%"I64F"x) replaced", oldSessionId);
             processlookup.remove(previousState, this);
         }
     }
@@ -1697,28 +1698,37 @@ protected:
                     SessionId prevId = cState->dequeuePreviousSessionId();
                     if (prevId)
                     {
+                        PROGLOG("Session (%"I64F"x) in stopSession, detected %d pending previous states, reinstating session (%"I64F"x) as current", id, cState->previousSessionIdCount(), prevId);
                         CSessionState *prevSessionState = sessionstates.query(prevId);
                         dbgassertex(prevSessionState); // must be there
                         CProcessSessionState *prevProcessState = QUERYINTERFACE(prevSessionState, CProcessSessionState);
-                        dbgassertex(prevSessionState);
+                        dbgassertex(prevProcessState);
                         /* NB: prevProcessState's have 0 entries in their previousSessionIds, since they were merged at replacement time
                          * in addProcessSession()
                          */
-                        prevProcessState->addSessionIds(*cState, true); // add in any remaining
+
+                        /* add in any remaining to-be-stopped process sessions from current that's stopping into this previous state
+                         * that's being reinstated, so will be picked up on next onClose()/stopSession()
+                         */
+                        prevProcessState->addSessionIds(*cState, true);
                         processlookup.replace(prevProcessState);
                     }
                     else
                         processlookup.remove(pState, this);
                 }
-                else
+                else // Here because in stopSession for an previous process state, that has been replaced (in addProcessSession)
                 {
-                    if (processlookup.remove(pState, this)) // old may have been removed when replaced
+                    if (processlookup.remove(pState, this))
                     {
-                        if (cState)
-                        {
-                            PROGLOG("Session (%"I64F"x) was replaced, ensuring removed from new process state", id);
-                            cState->removeOldSessionId(id); // If already replaced, then must ensure no longer tracked by new
-                        }
+                        // Don't think possible to be here, if not current must have replaced afaics
+                        PROGLOG("Session (%"I64F"x) in stopSession, old process session removed", id);
+                    }
+                    else
+                        PROGLOG("Session (%"I64F"x) in stopSession, old process session was already removed", id); // because replaced
+                    if (cState)
+                    {
+                        PROGLOG("Session (%"I64F"x) was replaced, ensuring removed from new process state", id);
+                        cState->removeOldSessionId(id); // If already replaced, then must ensure no longer tracked by new
                     }
                 }
             }

+ 7 - 6
dali/dfu/dfuutil.cpp

@@ -765,6 +765,7 @@ public:
             else
                 superfile->addSubFile(subfiles[i],false,NULL,false,transaction);
         }
+        superfile.clear();
         transaction->commit();
     }
 
@@ -812,15 +813,15 @@ public:
         // Do we have something to delete?
         if (toremove.ordinality()) {
             transaction->start();
+            if (removesuperfile && toremove.ordinality()!=superfile->numSubFiles())
+                removesuperfile = false;
             ForEachItemIn(i2,toremove)
                 superfile->removeSubFile(toremove.item(i2).text.get(),delsub,false,transaction);
-            transaction->commit();
-        }
-        // Delete superfile if empty
-        if (removesuperfile && (superfile->numSubFiles() == 0)) {
+            // Delete superfile if empty
+            if (removesuperfile)
+                queryDistributedFileDirectory().removeEntry(superfname, user, transaction);
             superfile.clear();
-            // MORE - add file deletion to transaction
-            queryDistributedFileDirectory().removeEntry(superfname,user);
+            transaction->commit();
         }
     }
 

+ 18 - 8
esp/services/ws_dfu/ws_dfuXRefService.cpp

@@ -535,16 +535,26 @@ void addUsedFilesFromActivePackageMaps(MapStringTo<bool> &usedFileMap, const cha
     Owned<IPropertyTree> packageSet = resolvePackageSetRegistry(process, true);
     if (!packageSet)
         throw MakeStringException(ECLWATCH_PACKAGEMAP_NOTRESOLVED, "Unable to retrieve package information from dali /PackageMaps");
-    Owned<IPropertyTreeIterator> activeMaps = packageSet->getElements("PackageMap[@active='1']");
-    //Add files referenced in all active maps, for all targets configured for this process cluster
-    ForEach(*activeMaps)
+    StringArray pmids;
+    Owned<IStringIterator> targets = getTargetClusters("RoxieCluster", process);
+    ForEach(*targets)
     {
-        Owned<IPropertyTree> packageMap = getPackageMapById(activeMaps->query().queryProp("@id"), true);
-        if (packageMap)
+        SCMStringBuffer target;
+        VStringBuffer xpath("PackageMap[@querySet='%s'][@active='1']", targets->str(target).str());
+        Owned<IPropertyTreeIterator> activeMaps = packageSet->getElements(xpath);
+        //Add files referenced in all active maps, for all targets configured for this process cluster
+        ForEach(*activeMaps)
         {
-            Owned<IPropertyTreeIterator> subFiles = packageMap->getElements("//SubFile");
-            ForEach(*subFiles)
-                addLfnToUsedFileMap(usedFileMap, subFiles->query().queryProp("@value"));
+            const char *pmid = activeMaps->query().queryProp("@id");
+            if (!pmids.appendUniq(pmid))
+                continue;
+            Owned<IPropertyTree> packageMap = getPackageMapById(pmid, true);
+            if (packageMap)
+            {
+                Owned<IPropertyTreeIterator> subFiles = packageMap->getElements("//SubFile");
+                ForEach(*subFiles)
+                    addLfnToUsedFileMap(usedFileMap, subFiles->query().queryProp("@value"));
+            }
         }
     }
 }

+ 16 - 4
esp/services/ws_ecl/ws_ecl_service.cpp

@@ -286,9 +286,21 @@ bool CWsEclService::init(const char * name, const char * type, IPropertyTree * c
     Owned<IConstEnvironment> environment = factory->openEnvironmentByFile();
     Owned<IPropertyTree> pRoot = &environment->getPTree();
 
-    xpath.clear().appendf("EspService[@name='%s']/VIPS", name);
-    IPropertyTree *vips = prc->queryPropTree(xpath.str());
+    xpath.clear().appendf("EspService[@name='%s']", name);
+    IPropertyTree *serviceTree = prc->queryPropTree(xpath);
+    if (!serviceTree)
+        throw MakeStringException(-1, "ESP Service %s not configured", name);
+
+    roxieTimeout = serviceTree->getPropInt("RoxieTimeout", 10 * 60);
+    if (!roxieTimeout)
+        roxieTimeout = WAIT_FOREVER;
+    workunitTimeout = serviceTree->getPropInt("WorkunitTimeout", 10 * 60);
+    if (workunitTimeout)
+        workunitTimeout *= 1000;
+    else
+        workunitTimeout = WAIT_FOREVER;
 
+    IPropertyTree *vips = serviceTree->queryPropTree(xpath.str());
     Owned<IStringIterator> roxieTargets = getTargetClusters("RoxieCluster", NULL);
     ForEach(*roxieTargets)
     {
@@ -2180,8 +2192,7 @@ int CWsEclBinding::submitWsEclWorkunit(IEspContext & context, WsEclWuInfo &wsinf
     bool async = context.queryRequestParameters()->hasProp("_async");
 
     //don't wait indefinately, in case submitted to an inactive queue wait max + 5 mins
-    int wutimeout = 300000;
-    if (!async && waitForWorkUnitToComplete(wuid.str(), wutimeout))
+    if (!async && waitForWorkUnitToComplete(wuid.str(), wsecl->workunitTimeout))
     {
         Owned<IWuWebView> web = createWuWebView(wuid.str(), wsinfo.queryname.get(), getCFD(), true);
         if (!web)
@@ -2309,6 +2320,7 @@ void CWsEclBinding::sendRoxieRequest(const char *target, StringBuffer &req, Stri
         ep.getIpText(url).append(':').append(ep.port);
 
         Owned<IHttpClient> httpclient = httpctx->createHttpClient(NULL, url);
+        httpclient->setTimeOut(wsecl->roxieTimeout);
         if (0 > httpclient->sendRequest("POST", contentType, req, resp, status))
             throw MakeStringException(-1, "Roxie cluster communication error: %s", target);
     }

+ 2 - 0
esp/services/ws_ecl/ws_ecl_service.hpp

@@ -94,6 +94,8 @@ public:
     MapStringToMyClass<ISmartSocketFactory> connMap;
     StringAttr auth_method;
     StringAttr portal_URL;
+    unsigned roxieTimeout;
+    unsigned workunitTimeout;
 
 public:
     IMPLEMENT_IINTERFACE;

+ 40 - 25
esp/services/ws_workunits/ws_workunitsQuerySets.cpp

@@ -812,42 +812,31 @@ bool CWsWorkunitsEx::onWUQuerysets(IEspContext &context, IEspWUQuerysetsRequest
 
 void addClusterQueryStates(IPropertyTree* queriesOnCluster, const char *target, const char *id, IArrayOf<IEspClusterQueryState>& clusterStates, double version)
 {
+    queriesOnCluster = queriesOnCluster->queryPropTree("Endpoint[1]/Queries[1]");
+    if (!queriesOnCluster)
+        return;
+
+    int reporting = queriesOnCluster->getPropInt("@reporting");
+
     Owned<IEspClusterQueryState> clusterState = createClusterQueryState();
     clusterState->setCluster(target);
 
-    VStringBuffer xpath("Endpoint/Queries/Query[@id='%s']", id);
-    Owned<IPropertyTreeIterator> iter = queriesOnCluster->getElements(xpath.str());
-    bool found = false;
-    bool suspended = false;
-    bool available = false;
-    StringBuffer errors;
-    ForEach (*iter)
-    {
-        found = true;
-        if (iter->query().getPropBool("@suspended", false))
-            suspended = true;
-        else
-            available = true;
-        const char* error = iter->query().queryProp("@error");
-        if (error && *error && (version >=1.46))
-        {
-            if (errors.length())
-                errors.append(";");
-            errors.append(error);
-        }
-    }
-    if (!found)
+    VStringBuffer xpath("Query[@id='%s']", id);
+    IPropertyTree *query = queriesOnCluster->getPropTree(xpath.str());
+    int suspended = query->getPropInt("@suspended");
+    const char* error = query->queryProp("@error");
+    if (!query)
         clusterState->setState("Not Found");
     else if (suspended)
     {
         clusterState->setState("Suspended");
-        if (available)
+        if (suspended<reporting)
             clusterState->setMixedNodeStates(true);
     }
     else
         clusterState->setState("Available");
-    if ((version >=1.46) && errors.length())
-        clusterState->setErrors(errors.str());
+    if ((version >=1.46) && error && *error)
+        clusterState->setErrors(error);
 
     clusterStates.append(*clusterState.getClear());
 }
@@ -1465,6 +1454,28 @@ bool CWsWorkunitsEx::onWUQueryDetails(IEspContext &context, IEspWUQueryDetailsRe
     return true;
 }
 
+int StringArrayCompareFunc(const char **s1, const char **s2)
+{
+    if (!s1 || !*s1 || !s2 || !*s2)
+        return 0;
+    return strcmp(*s1, *s2);
+}
+
+int EspQuerySuperFileCompareFunc(IInterface **i1, IInterface **i2)
+{
+    if (!i1 || !*i1 || !i2 || !*i2)
+        return 0;
+    IEspQuerySuperFile *sf1 = QUERYINTERFACE(*i1, IEspQuerySuperFile);
+    IEspQuerySuperFile *sf2 = QUERYINTERFACE(*i2, IEspQuerySuperFile);
+    if (!sf1 || !sf2)
+        return 0;
+    const char *name1 = sf1->getName();
+    const char *name2 = sf2->getName();
+    if (!name1 || !name2)
+        return 0;
+    return strcmp(name1, name2);
+}
+
 bool CWsWorkunitsEx::getQueryFiles(const char* query, const char* target, StringArray& logicalFiles, IArrayOf<IEspQuerySuperFile> *respSuperFiles)
 {
     try
@@ -1496,6 +1507,7 @@ bool CWsWorkunitsEx::getQueryFiles(const char* query, const char* target, String
             if (fileName && *fileName)
                 logicalFiles.append(fileName);
         }
+        logicalFiles.sort(StringArrayCompareFunc);
 
         if (respSuperFiles)
         {
@@ -1514,9 +1526,12 @@ bool CWsWorkunitsEx::getQueryFiles(const char* query, const char* target, String
                     if (fileName && *fileName)
                         respSubFiles.append(fileName);
                 }
+                respSubFiles.sort(StringArrayCompareFunc);
+
                 respSuperFile->setSubFiles(respSubFiles);
                 respSuperFiles->append(*respSuperFile.getClear());
             }
+            respSuperFiles->sort(EspQuerySuperFileCompareFunc);
         }
         return true;
     }

+ 7 - 0
initfiles/componentfiles/configxml/@temp/esp_service.xsl

@@ -974,6 +974,13 @@ xmlns:seisint="http://seisint.com"  xmlns:set="http://exslt.org/sets" exclude-re
                 <LanguageDirectory><xsl:value-of select="@LanguageDirectory"/></LanguageDirectory>
             </xsl:when>
             <xsl:when test="$serviceType='ws_ecl'">
+                <xsl:if test="string(@roxieTimeout)!=''">
+                    <RoxieTimeout><xsl:value-of select="@roxieTimeout"/></RoxieTimeout>
+                </xsl:if>
+                <xsl:if test="string(@workunitTimout)!=''">
+                    <WorkunitTimeout><xsl:value-of select="@workunitTimeout"/></WorkunitTimeout>
+                </xsl:if>
+                <WorkunitTimeout><xsl:value-of select="@workunitTimeout"/></WorkunitTimeout>
                 <VIPS>
                     <xsl:for-each select="ProcessCluster">
                         <xsl:if test="string(@roxie) != '' and string(@vip) != ''">

+ 14 - 0
initfiles/componentfiles/configxml/esp_service_wsecl2.xsd

@@ -85,6 +85,20 @@
                     </xs:appinfo>
                 </xs:annotation>
             </xs:attribute>
+            <xs:attribute name="roxieTimeout" type="xs:unsignedInt" use="optional" default="300">
+                <xs:annotation>
+                    <xs:appinfo>
+                        <tooltip>Timeout (in seconds) for WsEcl connections to roxie (0 == wait forever)</tooltip>
+                    </xs:appinfo>
+                </xs:annotation>
+            </xs:attribute>
+            <xs:attribute name="workunitTimeout" type="xs:unsignedInt" use="optional" default="600">
+                <xs:annotation>
+                    <xs:appinfo>
+                        <tooltip>Timeout (in seconds), for WsEcl to wait for workunit to complete (0 == wait forever)</tooltip>
+                    </xs:appinfo>
+                </xs:annotation>
+            </xs:attribute>
         </xs:complexType>
     </xs:element>
 </xs:schema>

+ 43 - 4
plugins/fileservices/fileservices.cpp

@@ -1156,6 +1156,27 @@ FILESERVICES_API void FILESERVICES_CALL fsAddSuperFile(IGlobalCodeContext *gctx,
 }
 
 
+class CImplicitSuperTransaction
+{
+    IDistributedFileTransaction *transaction;
+public:
+    CImplicitSuperTransaction(IDistributedFileTransaction *_transaction)
+    {
+        if (!_transaction->active()) // then created implicitly
+        {
+            transaction = _transaction;
+            transaction->start();
+        }
+        else
+            transaction = NULL;
+    }
+    ~CImplicitSuperTransaction()
+    {
+        if (transaction)
+            transaction->commit();
+    }
+};
+
 FILESERVICES_API void FILESERVICES_CALL fslAddSuperFile(ICodeContext *ctx, const char *lsuperfn,const char *_lfn,unsigned atpos,bool addcontents, bool strict)
 {
     Owned<IDistributedSuperFile> file;
@@ -1185,7 +1206,11 @@ FILESERVICES_API void FILESERVICES_CALL fslAddSuperFile(ICodeContext *ctx, const
     StringBuffer other;
     if (atpos>1)
         other.append("#").append(atpos);
-    file->addSubFile(lfn.str(),atpos>0,(atpos>1)?other.str():NULL,addcontents,transaction);
+    {
+        CImplicitSuperTransaction implicitTransaction(transaction);
+        file->addSubFile(lfn.str(),atpos>0,(atpos>1)?other.str():NULL,addcontents,transaction);
+        file.clear(); // Must clear file before implicit transaction executed in destructor
+    }
     StringBuffer s("AddSuperFile ('");
     s.append(lsfn).append("', '");
     s.append(lfn).append('\'');
@@ -1217,7 +1242,11 @@ FILESERVICES_API void FILESERVICES_CALL fslRemoveSuperFile(ICodeContext *ctx, co
     lookupSuperFile(ctx, lsuperfn, file, true, lsfn, false, true);
     IDistributedFileTransaction *transaction = ctx->querySuperFileTransaction();
     assertex(transaction);
-    file->removeSubFile(_lfn?lfn.str():NULL,del,remcontents,transaction);
+    {
+        CImplicitSuperTransaction implicitTransaction(transaction);
+        file->removeSubFile(_lfn?lfn.str():NULL,del,remcontents,transaction);
+        file.clear(); // Must clear file before implicit transaction executed in destructor
+    }
     StringBuffer s;
     if (_lfn)
         s.append("RemoveSuperFile ('");
@@ -1261,7 +1290,11 @@ FILESERVICES_API void FILESERVICES_CALL fslRemoveOwnedSubFiles(ICodeContext *ctx
     lookupSuperFile(ctx, lsuperfn, file, true, lsfn, false, true);
     IDistributedFileTransaction *transaction = ctx->querySuperFileTransaction();
     assertex(transaction);
-    file->removeOwnedSubFiles(del,transaction);
+    {
+        CImplicitSuperTransaction implicitTransaction(transaction);
+        file->removeOwnedSubFiles(del,transaction);
+        file.clear(); // Must clear file before implicit transaction executed in destructor
+    }
     VStringBuffer s("RemoveOwnedSubFiles ('%s'", lsfn.str());
     if (del)
         s.append(", del");
@@ -1295,7 +1328,13 @@ FILESERVICES_API void FILESERVICES_CALL fslSwapSuperFile(ICodeContext *ctx, cons
 
     IDistributedFileTransaction *transaction = ctx->querySuperFileTransaction();
     assertex(transaction);
-    file1->swapSuperFile(file2,transaction);
+    {
+        CImplicitSuperTransaction implicitTransaction(transaction);
+        file1->swapSuperFile(file2,transaction);
+        // Must clear files before implicit transaction executed in destructor
+        file1.clear();
+        file2.clear();
+    }
     StringBuffer s("SwapSuperFile ('");
     s.append(lsfn1).append("', '");
     s.append(lsfn2).append("') '");

+ 1 - 0
roxie/ccd/ccd.hpp

@@ -873,6 +873,7 @@ public:
     virtual void CTXLOGl(LogItem *logItem) const
     {
         // NOTE - we don't actually print anything to logfile here - was already printed on slave
+        CriticalBlock b(crit);
         log.append(*logItem);
         flush(false, false);
     }

+ 2 - 2
roxie/ccd/ccddali.cpp

@@ -721,15 +721,15 @@ public:
             CriticalBlock b(daliConnectionCrit);
             if (isConnected)
             {
+                isConnected = false;
                 delete serverStatus;
                 serverStatus = NULL;
                 closeDllServer();
                 closeEnvironment();
                 clientShutdownWorkUnit();
+                disconnectRoxieQueues();
                 ::closedownClientProcess(); // dali client closedown
-                isConnected = false;
                 disconnectSem.signal();
-                disconnectRoxieQueues();
             }
         }
     }

+ 27 - 12
roxie/ccd/ccdlistener.cpp

@@ -520,21 +520,29 @@ public:
             return locksGot > getNumNodes()/2;
     }
 
+    enum CascadeMergeType { CascadeMergeNone, CascadeMergeStats, CascadeMergeQueries };
+
     void doControlQuery(SocketEndpoint &ep, const char *queryText, StringBuffer &reply)
     {
         if (logctx.queryTraceLevel() > 5)
             logctx.CTXLOG("doControlQuery (%d): %.80s", isMaster, queryText);
         // By this point we should have cascade-connected thanks to a prior <control:lock>
         // So do the query ourselves and in all child threads;
-        Owned<IPropertyTree> mergedStats;
+        CascadeMergeType mergeType=CascadeMergeNone;
         if (strstr(queryText, "querystats"))
-            mergedStats.setown(createPTree("Endpoint"));
+            mergeType=CascadeMergeStats;
+        else if (strstr(queryText, ":queries"))
+            mergeType=CascadeMergeQueries;
+        Owned<IPropertyTree> mergedReply;
+        if (mergeType!=CascadeMergeNone)
+            mergedReply.setown(createPTree("Endpoint"));
 
         class casyncfor: public CAsyncFor
         {
             const char *queryText;
             CascadeManager *parent;
-            IPropertyTree *mergedStats;
+            IPropertyTree *mergedReply;
+            CascadeMergeType mergeType;
             StringBuffer &reply;
             CriticalSection crit;
             SocketEndpoint &ep;
@@ -542,9 +550,9 @@ public:
             const IRoxieContextLogger &logctx;
 
         public:
-            casyncfor(const char *_queryText, CascadeManager *_parent, IPropertyTree *_mergedStats,
+            casyncfor(const char *_queryText, CascadeManager *_parent, IPropertyTree *_mergedReply, CascadeMergeType _mergeType,
                       StringBuffer &_reply, SocketEndpoint &_ep, unsigned _numChildren, const IRoxieContextLogger &_logctx)
-                : queryText(_queryText), parent(_parent), mergedStats(_mergedStats), reply(_reply), ep(_ep), numChildren(_numChildren), logctx(_logctx)
+                : queryText(_queryText), parent(_parent), mergedReply(_mergedReply), mergeType(_mergeType), reply(_reply), ep(_ep), numChildren(_numChildren), logctx(_logctx)
             {
             }
             void Do(unsigned i)
@@ -569,9 +577,12 @@ public:
                     ForEach(*meat)
                     {
                         CriticalBlock cb(crit);
-                        if (mergedStats)
+                        if (mergedReply)
                         {
-                            mergeStats(mergedStats, &meat->query());
+                            if (mergeType == CascadeMergeStats)
+                                mergeStats(mergedReply, &meat->query());
+                            else if (mergeType == CascadeMergeQueries)
+                                mergeQueries(mergedReply, &meat->query());
                         }
                         else
                             toXML(&meat->query(), reply);
@@ -599,19 +610,22 @@ public:
                 }
                 myReply.append("</Endpoint>\n");
                 CriticalBlock cb(crit);
-                if (mergedStats)
+                if (mergedReply)
                 {
                     Owned<IPropertyTree> xml = createPTreeFromXMLString(myReply);
-                    mergeStats(mergedStats, xml);
+                    if (mergeType == CascadeMergeStats)
+                        mergeStats(mergedReply, xml);
+                    else if (mergeType == CascadeMergeQueries)
+                        mergeQueries(mergedReply, xml);
                 }
                 else
                     reply.append(myReply);
             }
-        } afor(queryText, this, mergedStats, reply, ep, activeChildren.ordinality(), logctx);
+        } afor(queryText, this, mergedReply, mergeType, reply, ep, activeChildren.ordinality(), logctx);
         afor.For(activeChildren.ordinality()+(isMaster ? 0 : 1), 10);
         activeChildren.kill();
-        if (mergedStats)
-            toXML(mergedStats, reply);
+        if (mergedReply)
+            toXML(mergedReply, reply);
         if (logctx.queryTraceLevel() > 5)
             logctx.CTXLOG("doControlQuery (%d) finished: %.80s", isMaster, queryText);
     }
@@ -863,6 +877,7 @@ public:
         {
             DBGLOG("RoxieWorkUnitListener::disconnectQueue");
             queue->cancelAcceptConversation();
+            queue.clear();
         }
     }
 

+ 38 - 5
roxie/ccd/ccdstate.cpp

@@ -409,7 +409,7 @@ protected:
     }
 
     // Use dali to resolve subfile into physical file info
-    static IResolvedFile *resolveLFNusingDaliOrLocal(const char *fileName, bool useCache, bool cacheResult, bool writeAccess, bool alwaysCreate)
+    static IResolvedFile *resolveLFNusingDaliOrLocal(const char *fileName, bool useCache, bool cacheResult, bool writeAccess, bool alwaysCreate, bool resolveLocal)
     {
         // MORE - look at alwaysCreate... This may be useful to implement earlier locking semantics.
         if (traceLevel > 9)
@@ -449,7 +449,7 @@ protected:
                     }
                 }
             }
-            if (!result)
+            if (!result && resolveLocal)
             {
                 StringBuffer useName;
                 if (strstr(fileName,"::"))
@@ -486,7 +486,7 @@ protected:
     {
         IResolvedFile *result = lookupFile(fileName, useCache, cacheResult, writeAccess, alwaysCreate);
         if (!result && (!checkCompulsory || !isCompulsory()))
-            result = resolveLFNusingDaliOrLocal(fileName, useCache, cacheResult, writeAccess, alwaysCreate);
+            result = resolveLFNusingDaliOrLocal(fileName, useCache, cacheResult, writeAccess, alwaysCreate, resolveLocally());
         return result;
     }
 
@@ -611,7 +611,7 @@ public:
         expandLogicalFilename(fileName, _fileName, wu, false);
         Owned<IResolvedFile> resolved = lookupFile(fileName, false, false, true, true);
         if (!resolved)
-            resolved.setown(resolveLFNusingDaliOrLocal(fileName, false, false, true, true));
+            resolved.setown(resolveLFNusingDaliOrLocal(fileName, false, false, true, true, resolveLocally()));
         if (resolved)
         {
             if (resolved->exists())
@@ -666,6 +666,10 @@ public:
     {
         return CPackageNode::isCompulsory();
     }
+    virtual bool resolveLocally() const
+    {
+        return CPackageNode::resolveLocally();
+    }
 };
 
 CResolvedFileCache CRoxiePackageNode::daliFiles;
@@ -1867,7 +1871,7 @@ private:
     void getQueryInfo(IPropertyTree *control, StringBuffer &reply, bool full, const IRoxieContextLogger &logctx) const
     {
         Owned<IPropertyTreeIterator> ids = control->getElements("Query");
-        reply.append("<Queries>\n");
+        reply.append("<Queries reporting='1'>\n");
         if (ids->first())
         {
             ForEach(*ids)
@@ -2912,6 +2916,35 @@ void mergeStats(IPropertyTree *s1, IPropertyTree *s2)
     mergeStats(s1, s2, 0);
 }
 
+void mergeQueries(IPropertyTree *dest, IPropertyTree *src)
+{
+    IPropertyTree *destQueries = ensurePTree(dest, "Queries");
+    IPropertyTree *srcQueries = src->queryPropTree("Queries");
+    if (!srcQueries)
+        return;
+    destQueries->setPropInt("@reporting", destQueries->getPropInt("@reporting") + srcQueries->getPropInt("@reporting"));
+
+    Owned<IPropertyTreeIterator> it = srcQueries->getElements("Query");
+    ForEach(*it)
+    {
+        IPropertyTree *srcQuery = &it->query();
+        const char *id = srcQuery->queryProp("@id");
+        if (!id || !*id)
+            continue;
+        VStringBuffer xpath("Query[@id='%s']", id);
+        IPropertyTree *destQuery = destQueries->queryPropTree(xpath);
+        if (!destQuery)
+        {
+            destQueries->addPropTree("Query", LINK(srcQuery));
+            continue;
+        }
+        int suspended = destQuery->getPropInt("@suspended") + srcQuery->getPropInt("@suspended"); //keep count to recognize "partially suspended" queries
+        mergePTree(destQuery, srcQuery);
+        if (suspended)
+            destQuery->setPropInt("@suspended", suspended);
+    }
+}
+
 #ifdef _USE_CPPUNIT
 #include <cppunit/extensions/HelperMacros.h>
 

+ 1 - 0
roxie/ccd/ccdstate.hpp

@@ -140,6 +140,7 @@ extern void cleanupPlugins();
 
 extern void mergeStats(IPropertyTree *s1, IPropertyTree *s2, unsigned level);
 extern void mergeStats(IPropertyTree *s1, IPropertyTree *s2);
+extern void mergeQueries(IPropertyTree *s1, IPropertyTree *s2);
 
 extern const char *queryNodeFileName(const IPropertyTree &graphNode);
 extern const char *queryNodeIndexName(const IPropertyTree &graphNode);