浏览代码

Merge branch 'closedown-4.2.x' into candidate-5.0.0

Conflicts:
	roxie/ccd/ccdcontext.cpp
	roxie/ccd/ccdlistener.cpp
	roxie/ccd/ccdstate.cpp
	version.cmake

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 11 年之前
父节点
当前提交
2fdddb50c8

+ 7 - 0
common/workunit/pkgimpl.hpp

@@ -106,6 +106,13 @@ protected:
         return (node) ? node->getPropBool("@compulsory", false) : false;
     }
 
+    virtual bool resolveLocally() const
+    {
+        if (isCompulsory())
+            return false;
+        return (node) ? node->getPropBool("@resolveLocally", false) : true;  // default is false for explicit package files, but true for the default empty package
+    }
+
     virtual bool getSysFieldTranslationEnabled() const {return false;}
     virtual bool getEnableFieldTranslation() const
     {

+ 10 - 1
dali/base/dacsds.ipp

@@ -41,7 +41,16 @@ public:
         if (connected)
         {
             bool deleteRoot = false;
-            manager.commit(*this, &deleteRoot);
+            try
+            {
+                manager.commit(*this, &deleteRoot);
+            }
+            catch (IException *e)
+            {
+                VStringBuffer errMsg("beforeDispose commit connectionid=%"I64F"x, xpath=%s", connectionId, xpath.get());
+                EXCLOG(e, errMsg.str());
+                e->Release();
+            }
         }
         root.clear();
     }

+ 11 - 6
dali/base/dadfs.cpp

@@ -670,7 +670,7 @@ class CClustersLockedSection
 {
     Owned<IRemoteConnection> conn;
 public:
-    CClustersLockedSection(CDfsLogicalFileName &dlfn)
+    CClustersLockedSection(CDfsLogicalFileName &dlfn, bool exclusive)
     {
         StringBuffer xpath;
         dlfn.makeFullnameQuery(xpath,DXB_File,true).append("/ClusterLock");
@@ -678,9 +678,14 @@ public:
         /* Avoid RTM_CREATE_QUERY connect() if possible by making 1st call without. This is to avoid write contention caused by RTM_CREATE*
          * NB: RTM_CREATE_QUERY should probably only gain exclusive access in Dali if node is missing.
          */
-        conn.setown(querySDS().connect(xpath.str(), myProcessSession(), RTM_LOCK_WRITE, SDS_CONNECT_TIMEOUT));
+        conn.setown(querySDS().connect(xpath.str(), myProcessSession(), exclusive ? RTM_LOCK_WRITE : RTM_LOCK_READ, SDS_CONNECT_TIMEOUT));
         if (!conn.get()) // NB: ClusterLock is now created at File create time, so this can only be true for pre-existing File's
+        {
             conn.setown(querySDS().connect(xpath.str(), myProcessSession(), RTM_CREATE_QUERY | RTM_LOCK_WRITE, SDS_CONNECT_TIMEOUT));
+            assertex(conn.get());
+            if (!exclusive)
+                conn->changeMode(RTM_LOCK_READ, SDS_CONNECT_TIMEOUT);
+        }
     }
 };
 
@@ -3284,7 +3289,7 @@ public:
         logicalName.set(lname);
         parent = _parent;
         conn.setown(_conn);
-        CClustersLockedSection sect(logicalName);
+        CClustersLockedSection sect(logicalName, false);
         root.setown(conn->getRoot());
         root->queryBranch(".");     // load branch
 #ifdef EXTRA_LOGGING
@@ -3536,7 +3541,7 @@ public:
     {
         if (!clustername&&!*clustername)
             return;
-        CClustersLockedSection cls(CDistributedFileBase<IDistributedFile>::logicalName);
+        CClustersLockedSection cls(CDistributedFileBase<IDistributedFile>::logicalName, true);
         reloadClusters();
         if (findCluster(clustername)!=NotFound) {
             IDFS_Exception *e = new CDFS_Exception(DFSERR_ClusterAlreadyExists,clustername);
@@ -3555,7 +3560,7 @@ public:
 
     bool removeCluster(const char *clustername)
     {
-        CClustersLockedSection cls(CDistributedFileBase<IDistributedFile>::logicalName);
+        CClustersLockedSection cls(CDistributedFileBase<IDistributedFile>::logicalName, true);
         reloadClusters();
         unsigned i = findCluster(clustername);
         if (i!=NotFound) {
@@ -3641,7 +3646,7 @@ public:
 
     void updatePartDiskMapping(const char *clustername,const ClusterPartDiskMapSpec &spec)
     {
-        CClustersLockedSection cls(CDistributedFileBase<IDistributedFile>::logicalName);
+        CClustersLockedSection cls(CDistributedFileBase<IDistributedFile>::logicalName, true);
         reloadClusters();
         unsigned i = findCluster(clustername);
         if (i!=NotFound) {

+ 4 - 4
dali/base/dasds.cpp

@@ -7702,7 +7702,7 @@ void CCovenSDSManager::createConnection(SessionId sessionId, unsigned mode, unsi
                 if (!queryConnection(connectionId)) // aborted
                 {
                     connectionId = 0;
-                    return;
+                    throw MakeSDSException(SDSExcpt_AbortDuringConnection, " during connect");
                 }
             }
             freeExistingLocks.setConnectionId(connectionId);
@@ -7732,7 +7732,7 @@ void CCovenSDSManager::createConnection(SessionId sessionId, unsigned mode, unsi
                                 if (!queryConnection(connectionId)) // aborted
                                 {
                                     connectionId = 0;
-                                    return;
+                                    throw MakeSDSException(SDSExcpt_AbortDuringConnection, " during connect");
                                 }
                                 iter.setown(root->getElements(xpath+1));
                                 iter->first();
@@ -7846,7 +7846,7 @@ void CCovenSDSManager::createConnection(SessionId sessionId, unsigned mode, unsi
             if (!queryConnection(connectionId))
             {
                 connectionId = 0;
-                return;
+                throw MakeSDSException(SDSExcpt_AbortDuringConnection, " during connect");
             }
         }
     }
@@ -7863,7 +7863,7 @@ void CCovenSDSManager::createConnection(SessionId sessionId, unsigned mode, unsi
     {
         unlock(_tree->queryServerId(), connectionId);
         connectionId = 0;
-        return;
+        throw MakeSDSException(SDSExcpt_AbortDuringConnection, " during connect");
     }
     connection->setEstablished();
 

+ 21 - 11
dali/base/dasess.cpp

@@ -1320,12 +1320,13 @@ public:
         {
             /* There's existing ip:port match (client) in process table..
              * Old may be in process of closing or about to, but new has beaten the onClose() to it..
-             * Track old sessions in new CProcessSessionState instance, so that upcoming onClose() can find them
+             * Track old sessions in new CProcessSessionState instance, so that in-process or upcoming onClose()/stopSession() can find them
              */
             CProcessSessionState *previousState = processlookup.query(client);
             dbgassertex(previousState); // Must be there, it's reason add() failed
+            SessionId oldSessionId = previousState->getId();
             s->addSessionIds(*previousState, false); // merges sessions from previous process state into new one that replaces it
-            WARNLOG("Dali session manager: registerClient process session already registered, old replaced");
+            WARNLOG("Dali session manager: registerClient process session already registered, old (%"I64F"x) replaced", oldSessionId);
             processlookup.remove(previousState, this);
         }
     }
@@ -1697,28 +1698,37 @@ protected:
                     SessionId prevId = cState->dequeuePreviousSessionId();
                     if (prevId)
                     {
+                        PROGLOG("Session (%"I64F"x) in stopSession, detected %d pending previous states, reinstating session (%"I64F"x) as current", id, cState->previousSessionIdCount(), prevId);
                         CSessionState *prevSessionState = sessionstates.query(prevId);
                         dbgassertex(prevSessionState); // must be there
                         CProcessSessionState *prevProcessState = QUERYINTERFACE(prevSessionState, CProcessSessionState);
-                        dbgassertex(prevSessionState);
+                        dbgassertex(prevProcessState);
                         /* NB: prevProcessState's have 0 entries in their previousSessionIds, since they were merged at replacement time
                          * in addProcessSession()
                          */
-                        prevProcessState->addSessionIds(*cState, true); // add in any remaining
+
+                        /* add in any remaining to-be-stopped process sessions from current that's stopping into this previous state
+                         * that's being reinstated, so will be picked up on next onClose()/stopSession()
+                         */
+                        prevProcessState->addSessionIds(*cState, true);
                         processlookup.replace(prevProcessState);
                     }
                     else
                         processlookup.remove(pState, this);
                 }
-                else
+                else // Here because in stopSession for an previous process state, that has been replaced (in addProcessSession)
                 {
-                    if (processlookup.remove(pState, this)) // old may have been removed when replaced
+                    if (processlookup.remove(pState, this))
                     {
-                        if (cState)
-                        {
-                            PROGLOG("Session (%"I64F"x) was replaced, ensuring removed from new process state", id);
-                            cState->removeOldSessionId(id); // If already replaced, then must ensure no longer tracked by new
-                        }
+                        // Don't think possible to be here, if not current must have replaced afaics
+                        PROGLOG("Session (%"I64F"x) in stopSession, old process session removed", id);
+                    }
+                    else
+                        PROGLOG("Session (%"I64F"x) in stopSession, old process session was already removed", id); // because replaced
+                    if (cState)
+                    {
+                        PROGLOG("Session (%"I64F"x) was replaced, ensuring removed from new process state", id);
+                        cState->removeOldSessionId(id); // If already replaced, then must ensure no longer tracked by new
                     }
                 }
             }

+ 18 - 8
esp/services/ws_dfu/ws_dfuXRefService.cpp

@@ -535,16 +535,26 @@ void addUsedFilesFromActivePackageMaps(MapStringTo<bool> &usedFileMap, const cha
     Owned<IPropertyTree> packageSet = resolvePackageSetRegistry(process, true);
     if (!packageSet)
         throw MakeStringException(ECLWATCH_PACKAGEMAP_NOTRESOLVED, "Unable to retrieve package information from dali /PackageMaps");
-    Owned<IPropertyTreeIterator> activeMaps = packageSet->getElements("PackageMap[@active='1']");
-    //Add files referenced in all active maps, for all targets configured for this process cluster
-    ForEach(*activeMaps)
+    StringArray pmids;
+    Owned<IStringIterator> targets = getTargetClusters("RoxieCluster", process);
+    ForEach(*targets)
     {
-        Owned<IPropertyTree> packageMap = getPackageMapById(activeMaps->query().queryProp("@id"), true);
-        if (packageMap)
+        SCMStringBuffer target;
+        VStringBuffer xpath("PackageMap[@querySet='%s'][@active='1']", targets->str(target).str());
+        Owned<IPropertyTreeIterator> activeMaps = packageSet->getElements(xpath);
+        //Add files referenced in all active maps, for all targets configured for this process cluster
+        ForEach(*activeMaps)
         {
-            Owned<IPropertyTreeIterator> subFiles = packageMap->getElements("//SubFile");
-            ForEach(*subFiles)
-                addLfnToUsedFileMap(usedFileMap, subFiles->query().queryProp("@value"));
+            const char *pmid = activeMaps->query().queryProp("@id");
+            if (!pmids.appendUniq(pmid))
+                continue;
+            Owned<IPropertyTree> packageMap = getPackageMapById(pmid, true);
+            if (packageMap)
+            {
+                Owned<IPropertyTreeIterator> subFiles = packageMap->getElements("//SubFile");
+                ForEach(*subFiles)
+                    addLfnToUsedFileMap(usedFileMap, subFiles->query().queryProp("@value"));
+            }
         }
     }
 }

+ 16 - 4
esp/services/ws_ecl/ws_ecl_service.cpp

@@ -305,9 +305,21 @@ bool CWsEclService::init(const char * name, const char * type, IPropertyTree * c
     Owned<IConstEnvironment> environment = factory->openEnvironmentByFile();
     Owned<IPropertyTree> pRoot = &environment->getPTree();
 
-    xpath.clear().appendf("EspService[@name='%s']/VIPS", name);
-    IPropertyTree *vips = prc->queryPropTree(xpath.str());
+    xpath.clear().appendf("EspService[@name='%s']", name);
+    IPropertyTree *serviceTree = prc->queryPropTree(xpath);
+    if (!serviceTree)
+        throw MakeStringException(-1, "ESP Service %s not configured", name);
+
+    roxieTimeout = serviceTree->getPropInt("RoxieTimeout", 10 * 60);
+    if (!roxieTimeout)
+        roxieTimeout = WAIT_FOREVER;
+    workunitTimeout = serviceTree->getPropInt("WorkunitTimeout", 10 * 60);
+    if (workunitTimeout)
+        workunitTimeout *= 1000;
+    else
+        workunitTimeout = WAIT_FOREVER;
 
+    IPropertyTree *vips = serviceTree->queryPropTree(xpath.str());
     Owned<IStringIterator> roxieTargets = getTargetClusters("RoxieCluster", NULL);
     ForEach(*roxieTargets)
     {
@@ -2120,8 +2132,7 @@ int CWsEclBinding::submitWsEclWorkunit(IEspContext & context, WsEclWuInfo &wsinf
     bool async = context.queryRequestParameters()->hasProp("_async");
 
     //don't wait indefinately, in case submitted to an inactive queue wait max + 5 mins
-    int wutimeout = 300000;
-    if (!async && waitForWorkUnitToComplete(wuid.str(), wutimeout))
+    if (!async && waitForWorkUnitToComplete(wuid.str(), wsecl->workunitTimeout))
     {
         Owned<IWuWebView> web = createWuWebView(wuid.str(), wsinfo.queryname.get(), getCFD(), true);
         if (!web)
@@ -2172,6 +2183,7 @@ void CWsEclBinding::sendRoxieRequest(const char *target, StringBuffer &req, Stri
         ep.getIpText(url).append(':').append(ep.port);
 
         Owned<IHttpClient> httpclient = httpctx->createHttpClient(NULL, url);
+        httpclient->setTimeOut(wsecl->roxieTimeout);
         if (0 > httpclient->sendRequest("POST", contentType, req, resp, status))
             throw MakeStringException(-1, "Roxie cluster communication error: %s", target);
     }

+ 2 - 0
esp/services/ws_ecl/ws_ecl_service.hpp

@@ -94,6 +94,8 @@ public:
     MapStringToMyClass<ISmartSocketFactory> connMap;
     StringAttr auth_method;
     StringAttr portal_URL;
+    unsigned roxieTimeout;
+    unsigned workunitTimeout;
 
 public:
     IMPLEMENT_IINTERFACE;

+ 26 - 0
esp/services/ws_workunits/ws_workunitsQuerySets.cpp

@@ -1544,6 +1544,28 @@ bool CWsWorkunitsEx::onWUQueryDetails(IEspContext &context, IEspWUQueryDetailsRe
     return true;
 }
 
+int StringArrayCompareFunc(const char **s1, const char **s2)
+{
+    if (!s1 || !*s1 || !s2 || !*s2)
+        return 0;
+    return strcmp(*s1, *s2);
+}
+
+int EspQuerySuperFileCompareFunc(IInterface **i1, IInterface **i2)
+{
+    if (!i1 || !*i1 || !i2 || !*i2)
+        return 0;
+    IEspQuerySuperFile *sf1 = QUERYINTERFACE(*i1, IEspQuerySuperFile);
+    IEspQuerySuperFile *sf2 = QUERYINTERFACE(*i2, IEspQuerySuperFile);
+    if (!sf1 || !sf2)
+        return 0;
+    const char *name1 = sf1->getName();
+    const char *name2 = sf2->getName();
+    if (!name1 || !name2)
+        return 0;
+    return strcmp(name1, name2);
+}
+
 bool CWsWorkunitsEx::getQueryFiles(const char* query, const char* target, StringArray& logicalFiles, IArrayOf<IEspQuerySuperFile> *respSuperFiles)
 {
     try
@@ -1575,6 +1597,7 @@ bool CWsWorkunitsEx::getQueryFiles(const char* query, const char* target, String
             if (fileName && *fileName)
                 logicalFiles.append(fileName);
         }
+        logicalFiles.sort(StringArrayCompareFunc);
 
         if (respSuperFiles)
         {
@@ -1593,9 +1616,12 @@ bool CWsWorkunitsEx::getQueryFiles(const char* query, const char* target, String
                     if (fileName && *fileName)
                         respSubFiles.append(fileName);
                 }
+                respSubFiles.sort(StringArrayCompareFunc);
+
                 respSuperFile->setSubFiles(respSubFiles);
                 respSuperFiles->append(*respSuperFile.getClear());
             }
+            respSuperFiles->sort(EspQuerySuperFileCompareFunc);
         }
         return true;
     }

+ 7 - 0
initfiles/componentfiles/configxml/@temp/esp_service.xsl

@@ -926,6 +926,13 @@ xmlns:seisint="http://seisint.com"  xmlns:set="http://exslt.org/sets" exclude-re
                 <LanguageDirectory><xsl:value-of select="@LanguageDirectory"/></LanguageDirectory>
             </xsl:when>
             <xsl:when test="$serviceType='ws_ecl'">
+                <xsl:if test="string(@roxieTimeout)!=''">
+                    <RoxieTimeout><xsl:value-of select="@roxieTimeout"/></RoxieTimeout>
+                </xsl:if>
+                <xsl:if test="string(@workunitTimout)!=''">
+                    <WorkunitTimeout><xsl:value-of select="@workunitTimeout"/></WorkunitTimeout>
+                </xsl:if>
+                <WorkunitTimeout><xsl:value-of select="@workunitTimeout"/></WorkunitTimeout>
                 <VIPS>
                     <xsl:for-each select="ProcessCluster">
                         <xsl:if test="string(@roxie) != '' and string(@vip) != ''">

+ 14 - 0
initfiles/componentfiles/configxml/esp_service_wsecl2.xsd

@@ -85,6 +85,20 @@
                     </xs:appinfo>
                 </xs:annotation>
             </xs:attribute>
+            <xs:attribute name="roxieTimeout" type="xs:unsignedInt" use="optional" default="300">
+                <xs:annotation>
+                    <xs:appinfo>
+                        <tooltip>Timeout (in seconds) for WsEcl connections to roxie (0 == wait forever)</tooltip>
+                    </xs:appinfo>
+                </xs:annotation>
+            </xs:attribute>
+            <xs:attribute name="workunitTimeout" type="xs:unsignedInt" use="optional" default="600">
+                <xs:annotation>
+                    <xs:appinfo>
+                        <tooltip>Timeout (in seconds), for WsEcl to wait for workunit to complete (0 == wait forever)</tooltip>
+                    </xs:appinfo>
+                </xs:annotation>
+            </xs:attribute>
         </xs:complexType>
     </xs:element>
 </xs:schema>

+ 1 - 0
roxie/ccd/ccd.hpp

@@ -898,6 +898,7 @@ public:
     virtual void CTXLOGl(LogItem *logItem) const
     {
         // NOTE - we don't actually print anything to logfile here - was already printed on slave
+        CriticalBlock b(crit);
         log.append(*logItem);
         flush(false, false);
     }

+ 23 - 3
roxie/ccd/ccdcontext.cpp

@@ -1431,7 +1431,7 @@ public:
     }
 
     Owned<IWUGraphProgress> graphProgress; // could make local to endGraph and pass to reset - might be cleaner
-    void endGraph(cycle_t startCycles, bool aborting)
+    virtual void endGraph(cycle_t startCycles, bool aborting)
     {
         if (graph)
         {
@@ -2555,6 +2555,7 @@ class CRoxieServerContext : public CSlaveContext, implements IRoxieServerContext
 
 protected:
     Owned<CRoxieWorkflowMachine> workflow;
+    mutable MapStringToMyClass<IResolvedFile> fileCache;
     SafeSocket *client;
     bool isBlocked;
     bool isHttp;
@@ -3533,9 +3534,22 @@ public:
         }
     }
 
-    virtual const IResolvedFile *resolveLFN(const char *filename, bool isOpt)
+    virtual const IResolvedFile *resolveLFN(const char *fileName, bool isOpt)
     {
-        return factory->queryPackage().lookupFileName(filename, isOpt, false, true, workUnit);
+        CriticalBlock b(contextCrit);
+        StringBuffer expandedName;
+        expandLogicalFilename(expandedName, fileName, workUnit, false);
+        Linked<const IResolvedFile> ret = fileCache.getValue(expandedName);
+        if (!ret)
+        {
+            ret.setown(factory->queryPackage().lookupFileName(fileName, isOpt, false, false, workUnit));
+            if (ret)
+            {
+                IResolvedFile *add = const_cast<IResolvedFile *>(ret.get());
+                fileCache.setValue(expandedName, add);
+            }
+        }
+        return ret.getClear();
     }
 
     virtual IRoxieWriteHandler *createLFN(const char *filename, bool overwrite, bool extend, const StringArray &clusters)
@@ -3543,6 +3557,12 @@ public:
         return factory->queryPackage().createFileName(filename, overwrite, extend, clusters, workUnit);
     }
 
+    virtual void endGraph(cycle_t startCycles, bool aborting)
+    {
+        fileCache.kill();
+        CSlaveContext::endGraph(startCycles, aborting);
+    }
+
     virtual void onFileCallback(const RoxiePacketHeader &header, const char *lfn, bool isOpt, bool isLocal)
     {
         Owned<const IResolvedFile> dFile = resolveLFN(lfn, isOpt);

+ 2 - 2
roxie/ccd/ccddali.cpp

@@ -763,15 +763,15 @@ public:
             CriticalBlock b(daliConnectionCrit);
             if (isConnected)
             {
+                isConnected = false;
                 delete serverStatus;
                 serverStatus = NULL;
                 closeDllServer();
                 closeEnvironment();
                 clientShutdownWorkUnit();
+                disconnectRoxieQueues();
                 ::closedownClientProcess(); // dali client closedown
-                isConnected = false;
                 disconnectSem.signal();
-                disconnectRoxieQueues();
             }
         }
     }

+ 1 - 0
roxie/ccd/ccdlistener.cpp

@@ -879,6 +879,7 @@ public:
         {
             DBGLOG("RoxieWorkUnitListener::disconnectQueue");
             queue->cancelAcceptConversation();
+            queue.clear();
         }
     }
 

+ 8 - 4
roxie/ccd/ccdstate.cpp

@@ -446,7 +446,7 @@ protected:
     }
 
     // Use dali to resolve subfile into physical file info
-    static IResolvedFile *resolveLFNusingDaliOrLocal(const char *fileName, bool useCache, bool cacheResult, bool writeAccess, bool alwaysCreate)
+    static IResolvedFile *resolveLFNusingDaliOrLocal(const char *fileName, bool useCache, bool cacheResult, bool writeAccess, bool alwaysCreate, bool resolveLocal)
     {
         // MORE - look at alwaysCreate... This may be useful to implement earlier locking semantics.
         if (traceLevel > 9)
@@ -486,7 +486,7 @@ protected:
                     }
                 }
             }
-            if (!result)
+            if (!result && resolveLocal)
             {
                 StringBuffer useName;
                 bool wasDFS = false;
@@ -523,7 +523,7 @@ protected:
     {
         IResolvedFile *result = lookupFile(fileName, useCache, cacheResult, writeAccess, alwaysCreate);
         if (!result && (!checkCompulsory || !isCompulsory()))
-            result = resolveLFNusingDaliOrLocal(fileName, useCache, cacheResult, writeAccess, alwaysCreate);
+            result = resolveLFNusingDaliOrLocal(fileName, useCache, cacheResult, writeAccess, alwaysCreate, resolveLocally());
         return result;
     }
 
@@ -648,7 +648,7 @@ public:
         expandLogicalFilename(fileName, _fileName, wu, false);
         Owned<IResolvedFile> resolved = lookupFile(fileName, false, false, true, true);
         if (!resolved)
-            resolved.setown(resolveLFNusingDaliOrLocal(fileName, false, false, true, true));
+            resolved.setown(resolveLFNusingDaliOrLocal(fileName, false, false, true, true, resolveLocally()));
         if (resolved)
         {
             if (resolved->exists())
@@ -703,6 +703,10 @@ public:
     {
         return CPackageNode::queryId();
     }
+    virtual bool resolveLocally() const
+    {
+        return CPackageNode::resolveLocally();
+    }
 };
 
 CResolvedFileCache CRoxiePackageNode::daliFiles;