Bläddra i källkod

Merge pull request #5188 from jakesmith/hpcc-10302

HPCC-10302 - Fix eclagent deadlock if fileservices calls in apply

Reviewed-By: Gavin Halliday <gavin.halliday@lexisnexis.com>
Reviewed-By: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 11 år sedan
förälder
incheckning
fa7f95bdd0

+ 6 - 2
common/thorhelper/thorcommon.hpp

@@ -292,9 +292,9 @@ public:
     {
         return ctx->getExpandLogicalName(logicalName);
     }
-    virtual void addWuException(const char * text, unsigned code, unsigned severity)
+    virtual void addWuException(const char * text, unsigned code, unsigned severity, const char *source)
     {
-        ctx->addWuException(text, code, severity);
+        ctx->addWuException(text, code, severity, source);
     }
     virtual void addWuAssertFailure(unsigned code, const char * text, const char * filename, unsigned lineno, unsigned column, bool isAbort)
     {
@@ -424,6 +424,10 @@ public:
     {
         return ctx->getDaliServers();
     }
+    virtual IWorkUnit *updateWorkUnit() const
+    {
+        return ctx->updateWorkUnit();
+    }
 protected:
     ICodeContext * ctx;
 };

+ 1 - 1
ecl/eclagent/agentctx.hpp

@@ -75,7 +75,7 @@ struct IAgentContext : extends IGlobalCodeContext
     virtual ICodeContext *queryCodeContext() = 0;
 
     virtual IConstWorkUnit *queryWorkUnit() = 0;
-    virtual IWorkUnit *updateWorkUnit() = 0;
+    virtual IWorkUnit *updateWorkUnit() const = 0;
     virtual void unlockWorkUnit() = 0;
     
     virtual ILocalOrDistributedFile *resolveLFN(const char *logicalName, const char *errorTxt=NULL, bool optional=false, bool noteRead=true, bool write=false, StringBuffer * expandedlfn=NULL) = 0;

+ 1 - 6
ecl/eclagent/eclagent.cpp

@@ -705,7 +705,7 @@ const char *EclAgent::loadResource(unsigned id)
     return reinterpret_cast<const char *>(dll->getResource(id));  // stays loaded as long as dll stays loaded
 }
 
-IWorkUnit *EclAgent::updateWorkUnit()
+IWorkUnit *EclAgent::updateWorkUnit() const
 {
     CriticalBlock block(wusect);
     if (!wuWrite)
@@ -1482,11 +1482,6 @@ char * EclAgent::getExpandLogicalName(const char * logicalName)
     return lfn.detach();
 }
 
-void EclAgent::addWuException(const char * text, unsigned code, unsigned severity)
-{
-    addException((WUExceptionSeverity)severity, "user", code, text, NULL, 0, 0, false, false);
-}
-
 void EclAgent::addWuException(const char * text, unsigned code, unsigned severity, char const * source)
 {
     addException((WUExceptionSeverity)severity, source, code, text, NULL, 0, 0, false, false);

+ 4 - 5
ecl/eclagent/eclagent.ipp

@@ -178,7 +178,7 @@ public:
     {
         return ctx->queryWorkUnit();
     }
-    virtual IWorkUnit *updateWorkUnit()
+    virtual IWorkUnit *updateWorkUnit() const
     {
         return ctx->updateWorkUnit();
     }
@@ -351,7 +351,7 @@ private:
     friend class EclAgentWorkflowMachine;
 
     Owned<EclAgentWorkflowMachine> workflow;
-    Owned<IWorkUnit> wuWrite;
+    mutable Owned<IWorkUnit> wuWrite;
     Owned<IConstWorkUnit> wuRead;
     Owned<roxiemem::IRowManager> rowManager;
     StringAttr wuid;
@@ -370,7 +370,7 @@ private:
     Owned<IUserDescriptor> standAloneUDesc;
     outputFmts outputFmt;
     unsigned __int64 stopAfter;
-    CriticalSection wusect;
+    mutable CriticalSection wusect;
     StringArray tempFiles;
     CriticalSection tfsect;
     Array persistReadLocks;
@@ -501,7 +501,6 @@ public:
 
     virtual bool fileExists(const char * filename);
     virtual char * getExpandLogicalName(const char * logicalName);
-    virtual void addWuException(const char * text, unsigned code, unsigned severity);
     virtual void addWuException(const char * text, unsigned code, unsigned severity, char const * source);
     virtual void addWuAssertFailure(unsigned code, const char * text, const char * filename, unsigned lineno, unsigned column, bool isAbort);
     virtual IUserDescriptor *queryUserDescriptor();
@@ -576,7 +575,7 @@ public:
     virtual bool isResult(const char * name, unsigned sequence);
     virtual unsigned getWorkflowId();// { return workflow->queryCurrentWfid(); }
     virtual IConstWorkUnit *queryWorkUnit();  // no link
-    virtual IWorkUnit *updateWorkUnit();        // links
+    virtual IWorkUnit *updateWorkUnit() const; // links
     virtual void unlockWorkUnit();      
     virtual void reloadWorkUnit();
     void addTimings();

+ 0 - 2
ecl/hqlcpp/hqlcatom.cpp

@@ -106,7 +106,6 @@ IIdAtom * addAggregateRowId;
 IIdAtom * addAllId;
 IIdAtom * addRangeId;
 IIdAtom * addWorkunitAssertFailureId;
-IIdAtom * addWorkunitExceptionId;
 IIdAtom * an2bId;
 IIdAtom * an2fId;
 IIdAtom * an2l4Id;
@@ -741,7 +740,6 @@ MODULE_INIT(INIT_PRIORITY_HQLATOM-1)
     MAKEID(addAll);
     MAKEID(addRange);
     MAKEID(addWorkunitAssertFailure);
-    MAKEID(addWorkunitException);
     MAKEID(an2b);
     MAKEID(an2f);
     MAKEID(an2l4);

+ 0 - 1
ecl/hqlcpp/hqlcatom.hpp

@@ -106,7 +106,6 @@ extern IIdAtom * addAggregateRowId;
 extern IIdAtom * addAllId;
 extern IIdAtom * addRangeId;
 extern IIdAtom * addWorkunitAssertFailureId;
-extern IIdAtom * addWorkunitExceptionId;
 extern IIdAtom * an2bId;
 extern IIdAtom * an2fId;
 extern IIdAtom * an2l4Id;

+ 0 - 1
ecl/hqlcpp/hqlcppsys.ecl

@@ -684,7 +684,6 @@ const char * cppSystemText[]  = {
     "   doNotifyTarget(const varstring name, const varstring text, const varstring _target) : gctxmethod,entrypoint='doNotify';",
     "   setWorkflowCondition(boolean value) : gctxmethod,entrypoint='setWorkflowCondition';",
     "   returnPersistVersion(const varstring name, unsigned4 eclCRC, unsigned8 allCRC, boolean isFile) : gctxmethod,entrypoint='returnPersistVersion';",
-    "   addWorkunitException(const varstring txt, unsigned code, unsigned severity) : ctxmethod,entrypoint='addWuException'; ",
     "   addWorkunitAssertFailure(unsigned4 errNo, const varstring _msg, const varstring _filename, unsigned4 _lineno, unsigned4 _column, boolean _isAbort) : ctxmethod,entrypoint='addWuAssertFailure'; ",
 
     //

+ 4 - 41
plugins/fileservices/fileservices.cpp

@@ -168,28 +168,6 @@ static IConstWorkUnit * getWorkunit(ICodeContext * ctx)
     return factory->openWorkUnit(wuid, false);
 }
 
-static IWorkUnit * updateWorkunit(ICodeContext * ctx)
-{
-    // following bit of a kludge, as
-    // 1) eclagent keeps WU locked, and
-    // 2) rtti not available in generated .so's to convert to IAgentContext
-    IAgentContext * actx = dynamic_cast<IAgentContext *>(ctx);
-    if (actx == NULL) { // fall back to pure ICodeContext
-        // the following works for thor only
-        char * platform = ctx->getPlatform();
-        if (strcmp(platform,"thor")==0) {
-            CTXFREE(parentCtx, platform);
-            Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
-            StringAttr wuid;
-            wuid.setown(ctx->getWuid());
-            return factory->updateWorkUnit(wuid);
-        }
-        CTXFREE(parentCtx, platform);
-        return NULL;
-    }
-    return actx->updateWorkUnit();
-}
-
 static IPropertyTree *getEnvironment()
 {
     Owned<IPropertyTree> env;
@@ -285,26 +263,11 @@ StringBuffer & constructLogicalName(ICodeContext * ctx, const char * partialLogi
 
 static void WUmessage(ICodeContext *ctx, WUExceptionSeverity sev, const char *fn, const char *msg)
 {
-    StringBuffer s;
-    s.append("fileservices");
+    StringBuffer s("fileservices");
     if (fn)
         s.append(", ").append(fn);
-    IAgentContext * actx = dynamic_cast<IAgentContext *>(ctx); // doesn't work if called from helper .so (no rtti)
-    if (actx)
-        actx->addWuException(msg,0,sev,s.str());
-    else {
-        Owned<IWorkUnit> wu = updateWorkunit(ctx);
-        if (wu.get()) {
-            Owned<IWUException> we = wu->createException();
-            we->setSeverity(sev);
-            we->setExceptionMessage(msg);
-            we->setExceptionSource(s.str());
-        }
-        else {
-            s.append(" : ").append(msg);
-            ctx->addWuException(s.str(),0,sev); // use plain code context
-        }
-    }
+    ctx->addWuException(msg, 0, sev, s.str()); // use plain code context
+    return;
 }
 
 static void AuditMessage(ICodeContext *ctx,
@@ -549,7 +512,7 @@ static void blockUntilComplete(const char * label, IClientFileSpray &server, ICo
 
     while(true)
     {
-        Owned<IWorkUnit> wu = updateWorkunit(ctx); // may return NULL
+        Owned<IWorkUnit> wu = ctx->updateWorkUnit(); // may return NULL
 
         Owned<IClientGetDFUWorkunit> req = server.createGetDFUWorkunitRequest();
         req->setWuid(wuid);

+ 3 - 3
plugins/logging/logging.cpp

@@ -31,9 +31,9 @@ static const char * compatibleVersions[] = {
 static const char * EclDefinition =
 "export Logging := SERVICE\n"
 "  dbglog(const string src) : c,action,entrypoint='logDbgLog'; \n"
-"  addWorkunitInformation(const varstring txt, unsigned code=0, unsigned severity=0) : ctxmethod,action,entrypoint='addWuException'; \n"
-"  addWorkunitWarning(const varstring txt, unsigned code=0, unsigned severity=1) : ctxmethod,action,entrypoint='addWuException'; \n"
-"  addWorkunitError(const varstring txt, unsigned code=0, unsigned severity=2) : ctxmethod,action,entrypoint='addWuException'; \n"
+"  addWorkunitInformation(const varstring txt, unsigned code=0, unsigned severity=0, const varstring source='user') : ctxmethod,action,entrypoint='addWuException'; \n"
+"  addWorkunitWarning(const varstring txt, unsigned code=0, unsigned severity=1, const varstring source='user') : ctxmethod,action,entrypoint='addWuException'; \n"
+"  addWorkunitError(const varstring txt, unsigned code=0, unsigned severity=2, const varstring source='user') : ctxmethod,action,entrypoint='addWuException'; \n"
 "END;";
 
 LOGGING_API bool getECLPluginDefinition(ECLPluginDefinitionBlock *pb) 

+ 2 - 1
roxie/ccd/ccdactivities.cpp

@@ -551,7 +551,7 @@ public:
     virtual unsigned __int64 getDatasetHash(const char * name, unsigned __int64 hash)   { throwUnexpected(); return 0; }
 
     virtual char * getExpandLogicalName(const char * logicalName) { throwUnexpected(); }
-    virtual void addWuException(const char * text, unsigned code, unsigned severity) { throwUnexpected(); }
+    virtual void addWuException(const char * text, unsigned code, unsigned severity, const char * source) { throwUnexpected(); }
     virtual void addWuAssertFailure(unsigned code, const char * text, const char * filename, unsigned lineno, unsigned column, bool isAbort) { throwUnexpected(); }
     virtual IUserDescriptor *queryUserDescriptor() { throwUnexpected(); }
     virtual unsigned getNodes() { throwUnexpected(); }
@@ -608,6 +608,7 @@ public:
         return createRowFromXml(rowAllocator, len, utf8, xmlTransformer, stripWhitespace);
     }
     virtual IEngineContext *queryEngineContext() { return NULL; }
+    virtual IWorkUnit *updateWorkUnit() const { throwUnexpected(); }
 };
 
 //================================================================================================

+ 4 - 4
roxie/ccd/ccdcontext.cpp

@@ -1194,7 +1194,7 @@ public:
     virtual void getExternalResultRaw(unsigned & tlen, void * & tgt, const char * wuid, const char * stepname, unsigned sequence, IXmlToRowTransformer * xmlTransformer, ICsvToRowTransformer * csvTransformer) { throwUnexpected(); }
 
     virtual char * getExpandLogicalName(const char * logicalName) { throwUnexpected(); }
-    virtual void addWuException(const char * text, unsigned code, unsigned severity) { throwUnexpected(); }
+    virtual void addWuException(const char * text, unsigned code, unsigned severity, const char * source) { throwUnexpected(); }
     virtual void addWuAssertFailure(unsigned code, const char * text, const char * filename, unsigned lineno, unsigned column, bool isAbort) { throwUnexpected(); }
     virtual IUserDescriptor *queryUserDescriptor() { throwUnexpected(); }
 
@@ -2804,7 +2804,7 @@ public:
         UNIMPLEMENTED;
     }
 
-    virtual void addWuException(const char * text, unsigned code, unsigned _severity)
+    virtual void addWuException(const char * text, unsigned code, unsigned _severity, const char * source)
     {
         WUExceptionSeverity severity = (WUExceptionSeverity) _severity;
         CTXLOG("%s", text);
@@ -2813,7 +2813,7 @@ public:
         if (workUnit)
         {
             WorkunitUpdate wu(&workUnit->lock());
-            addExceptionToWorkunit(wu, severity, "user", code, text, NULL, 0 ,0);
+            addExceptionToWorkunit(wu, severity, source, code, text, NULL, 0 ,0);
         }
     }
     virtual void addWuAssertFailure(unsigned code, const char * text, const char * filename, unsigned lineno, unsigned column, bool isAbort)
@@ -2880,7 +2880,7 @@ public:
     virtual void returnPersistVersion(const char * logicalName, unsigned eclCRC, unsigned __int64 allCRC, bool isFile) { throwUnexpected(); }
     virtual void fail(int code, const char *text)
     {
-        addWuException(text, code, 2);
+        addWuException(text, code, 2, "user");
     }
 
     virtual unsigned getWorkflowId() { return workflow->queryCurrentWfid(); }

+ 3 - 1
rtl/include/eclhelper.hpp

@@ -499,6 +499,7 @@ interface IUserDescriptor;
 interface IHThorArg;
 interface IHThorHashLookupInfo;
 interface IEngineContext;
+interface IWorkUnit;
 
 interface ICodeContext : public IResourceContext
 {
@@ -555,7 +556,7 @@ interface ICodeContext : public IResourceContext
 
     // Exception handling
 
-    virtual void addWuException(const char * text, unsigned code, unsigned severity) = 0; //n.b. this might be better named: it should only be used for adding user-generated exceptions (via the logging plug-in) --- there's a call in IAgentContext which takes a source argument too
+    virtual void addWuException(const char * text, unsigned code, unsigned severity, const char * source) = 0;
     virtual void addWuAssertFailure(unsigned code, const char * text, const char * filename, unsigned lineno, unsigned column, bool isAbort) = 0;
 
     // File resolution etc
@@ -596,6 +597,7 @@ interface ICodeContext : public IResourceContext
     virtual char * queryIndexMetaData(char const * lfn, char const * xpath) = 0;
     virtual IEngineContext *queryEngineContext() = 0;
     virtual char *getDaliServers() = 0;
+    virtual IWorkUnit *updateWorkUnit() const = 0;
 };
 
 

+ 2 - 1
thorlcr/graph/thgraph.hpp

@@ -480,7 +480,7 @@ class graph_decl CGraphBase : public CInterface, implements IEclGraphResults, im
         virtual void getExternalResultRaw(unsigned & tlen, void * & tgt, const char * wuid, const char * stepname, unsigned sequence, IXmlToRowTransformer * xmlTransformer, ICsvToRowTransformer * csvTransformer) { ctx->getExternalResultRaw(tlen, tgt, wuid, stepname, sequence, xmlTransformer, csvTransformer); }
         virtual void executeGraph(const char * graphName, bool realThor, size32_t parentExtractSize, const void * parentExtract) { ctx->executeGraph(graphName, realThor, parentExtractSize, parentExtract); }
         virtual char * getExpandLogicalName(const char * logicalName) { return ctx->getExpandLogicalName(logicalName); }
-        virtual void addWuException(const char * text, unsigned code, unsigned severity) { ctx->addWuException(text, code, severity); }
+        virtual void addWuException(const char * text, unsigned code, unsigned severity, const char * source) { ctx->addWuException(text, code, severity, source); }
         virtual void addWuAssertFailure(unsigned code, const char * text, const char * filename, unsigned lineno, unsigned column, bool isAbort) { ctx->addWuAssertFailure(code, text, filename, lineno, column, isAbort); }
         virtual IUserDescriptor *queryUserDescriptor() { return ctx->queryUserDescriptor(); }
         virtual IThorChildGraph * resolveChildQuery(__int64 activityId, IHThorArg * colocal) { return ctx->resolveChildQuery(activityId, colocal); }
@@ -526,6 +526,7 @@ class graph_decl CGraphBase : public CInterface, implements IEclGraphResults, im
         {
             return ctx->getDaliServers();
         }
+        virtual IWorkUnit *updateWorkUnit() const { return ctx->updateWorkUnit(); }
    } graphCodeContext;
 
 protected:

+ 3 - 3
thorlcr/graph/thgraphmaster.cpp

@@ -750,7 +750,7 @@ class CThorCodeContextMaster : public CThorCodeContextBase
     Linked<IConstWorkUnit> workunit;
     Owned<IDistributedFileTransaction> superfiletransaction;
 
-    IWorkUnit *updateWorkUnit() 
+    virtual IWorkUnit *updateWorkUnit() const
     {
         StringAttr wuid;
         workunit->getWuid(StringAttrAdaptor(wuid));
@@ -1061,7 +1061,7 @@ public:
             throw MakeStringException(TE_FailedToRetrieveWorkunitValue, "Failed to retrieve external data value %s from workunit %s", stepname, wuid);
         }
     }
-    virtual void addWuException(const char * text, unsigned code, unsigned severity)
+    virtual void addWuException(const char * text, unsigned code, unsigned severity, const char * source)
     {
         DBGLOG("%s", text);
         try
@@ -1070,7 +1070,7 @@ public:
             Owned<IWUException> we = w->createException();
             we->setSeverity((WUExceptionSeverity)severity);
             we->setExceptionMessage(text);
-            we->setExceptionSource("user");
+            we->setExceptionSource(source);
             if (code)
                 we->setExceptionCode(code);
         }

+ 2 - 3
thorlcr/graph/thgraphslave.cpp

@@ -953,12 +953,11 @@ public:
 
     virtual void getExternalResultRaw(unsigned & tlen, void * & tgt, const char * wuid, const char * stepname, unsigned sequence, IXmlToRowTransformer * xmlTransformer, ICsvToRowTransformer * csvTransformer) { throwUnexpected(); }
 
-    virtual void addWuException(const char * text, unsigned code, unsigned severity)
+    virtual void addWuException(const char * text, unsigned code, unsigned severity, const char * source)
     {
         DBGLOG("%s", text);
         Owned<IThorException> e = MakeThorException(code, "%s", text);
-        e->setAction(tea_warning);
-        e->setOrigin("user");
+        e->setOrigin(source);
         e->setAction(tea_warning);
         e->setSeverity((WUExceptionSeverity)severity);
         job.fireException(e);

+ 1 - 0
thorlcr/thorcodectx/thcodectx.hpp

@@ -118,6 +118,7 @@ public:
     virtual const void * fromXml(IEngineRowAllocator * rowAllocator, size32_t len, const char * utf8, IXmlToRowTransformer * xmlTransformer, bool stripWhitespace);
     virtual IEngineContext *queryEngineContext() { return NULL; }
     virtual char *getDaliServers();
+    virtual IWorkUnit *updateWorkUnit() const { throwUnexpected(); }
 };
 
 #endif