浏览代码

HPCC-12710 Support persists based on results in other workunits

Signed-off-by: Gavin Halliday <gavin.halliday@lexisnexis.com>
Gavin Halliday 10 年之前
父节点
当前提交
f33cb28318

+ 4 - 0
common/thorhelper/thorcommon.hpp

@@ -360,6 +360,10 @@ public:
     {
         return ctx->getResultHash(name, sequence);
     }
+    virtual unsigned getExternalResultHash(const char * wuid, const char * name, unsigned sequence)
+    {
+        return ctx->getExternalResultHash(wuid, name, sequence);
+    }
     virtual char *getWuid()
     {
         return ctx->getWuid();

+ 10 - 0
ecl/eclagent/eclagent.cpp

@@ -860,6 +860,16 @@ unsigned EclAgent::getResultHash(const char * name, unsigned sequence)
     return r->getResultHash();
 }
 
+unsigned EclAgent::getExternalResultHash(const char * wuid, const char * name, unsigned sequence)
+{
+    logGetResult("ExternalHash", name, sequence);
+    Owned<IConstWUResult> r = getExternalResult(wuid, name, sequence);
+    if (!r)
+        failv(0, "Failed to retrieve hash value %s from workunit %s", name, wuid);
+    return r->getResultHash();
+}
+
+
 
 //---------------------------------------------------------------------------
 

+ 1 - 0
ecl/eclagent/eclagent.ipp

@@ -470,6 +470,7 @@ public:
     virtual double getResultReal(const char * name, unsigned sequence);
     virtual unsigned getResultHash(const char * name, unsigned sequence);
     virtual void getExternalResultRaw(unsigned & tlen, void * & tgt, const char * wuid, const char * stepname, unsigned sequence, IXmlToRowTransformer * xmlTransformer, ICsvToRowTransformer * csvTransformer);
+    virtual unsigned getExternalResultHash(const char * wuid, const char * name, unsigned sequence);
     virtual void getResultRowset(size32_t & tcount, byte * * & tgt, const char * name, unsigned sequence, IEngineRowAllocator * _rowAllocator, bool isGrouped, IXmlToRowTransformer * xmlTransformer, ICsvToRowTransformer * csvTransformer);
     virtual void getResultDictionary(size32_t & tcount, byte * * & tgt, IEngineRowAllocator * _rowAllocator, const char * name, unsigned sequence, IXmlToRowTransformer * xmlTransformer, ICsvToRowTransformer * csvTransformer, IHThorHashLookupInfo * hasher);
     virtual char *getJobName();

+ 8 - 6
ecl/hql/hqlutil.cpp

@@ -2348,12 +2348,12 @@ void DependenciesUsed::addFilenameWrite(IHqlExpression * expr)
         allWritten = true;
 }
 
-void DependenciesUsed::addResultRead(IHqlExpression * seq, IHqlExpression * name, bool isGraphResult)
+void DependenciesUsed::addResultRead(IHqlExpression * wuid, IHqlExpression * seq, IHqlExpression * name, bool isGraphResult)
 {
     if (!isGraphResult)
         if (!seq || !seq->queryValue())
             return;         //Can be called in parser when no sequence has been allocated
-    OwnedHqlExpr result = createAttribute(resultAtom, LINK(seq), LINK(name));
+    OwnedHqlExpr result = createAttribute(resultAtom, LINK(seq), LINK(name), LINK(wuid));
     if (resultsWritten.find(*result) == NotFound)
         appendUniqueExpr(resultsRead, LINK(result));
 }
@@ -2477,12 +2477,13 @@ void DependenciesUsed::extractDependencies(IHqlExpression * expr, unsigned flags
         {
             IHqlExpression * sequence = queryAttributeChild(expr, sequenceAtom, 0);
             IHqlExpression * name = queryAttributeChild(expr, nameAtom, 0);
-            addResultRead(sequence, name, false);
+            IHqlExpression * wuid = expr->queryAttribute(wuidAtom);
+            addResultRead(wuid, sequence, name, false);
         }
         break;
     case no_getgraphresult:
         if (flags & GatherGraphResultRead)
-            addResultRead(expr->queryChild(1), expr->queryChild(2), true);
+            addResultRead(NULL, expr->queryChild(1), expr->queryChild(2), true);
         break;
     case no_setgraphresult:
         if (flags & GatherGraphResultWrite)
@@ -2493,7 +2494,8 @@ void DependenciesUsed::extractDependencies(IHqlExpression * expr, unsigned flags
         {
             IHqlExpression * sequence = queryAttributeChild(expr, sequenceAtom, 0);
             IHqlExpression * name = queryAttributeChild(expr, namedAtom, 0);
-            addResultRead(sequence, name, false);
+            IHqlExpression * wuid = expr->queryAttribute(wuidAtom);
+            addResultRead(wuid, sequence, name, false);
         }
         break;
     case no_ensureresult:
@@ -2515,7 +2517,7 @@ void DependenciesUsed::extractDependencies(IHqlExpression * expr, unsigned flags
     case no_callsideeffect:
         if (flags & GatherResultRead)
         {
-            addResultRead(expr->queryAttribute(_uid_Atom), NULL, false);
+            addResultRead(NULL, expr->queryAttribute(_uid_Atom), NULL, false);
         }
         break;
     }

+ 1 - 1
ecl/hql/hqlutil.hpp

@@ -299,7 +299,7 @@ protected:
     void addFilenameRead(IHqlExpression * expr);
     void addFilenameWrite(IHqlExpression * expr);
     void addRefDependency(IHqlExpression * expr);
-    void addResultRead(IHqlExpression * seq, IHqlExpression * name, bool isGraphResult);
+    void addResultRead(IHqlExpression * wuid, IHqlExpression * seq, IHqlExpression * name, bool isGraphResult);
     void addResultWrite(IHqlExpression * seq, IHqlExpression * name, bool isGraphResult);
     IHqlExpression * getNormalizedFilename(IHqlExpression * filename);
 

+ 2 - 0
ecl/hqlcpp/hqlcatom.cpp

@@ -340,6 +340,7 @@ IIdAtom * getEnvId;
 IIdAtom * getEventExtraId;
 IIdAtom * getEventNameId;
 IIdAtom * getExpandLogicalNameId;
+IIdAtom * getExternalResultHashId;
 IIdAtom * getFailMessageId;
 IIdAtom * getFilePositionId;
 IIdAtom * getGraphLoopCounterId;
@@ -989,6 +990,7 @@ MODULE_INIT(INIT_PRIORITY_HQLATOM-1)
     MAKEID(getEventExtra);
     MAKEID(getEventName);
     MAKEID(getExpandLogicalName);
+    MAKEID(getExternalResultHash);
     MAKEID(getFailMessage);
     MAKEID(getFilePosition);
     MAKEID(getGraphLoopCounter);

+ 1 - 0
ecl/hqlcpp/hqlcatom.hpp

@@ -340,6 +340,7 @@ extern IIdAtom * getEnvId;
 extern IIdAtom * getEventExtraId;
 extern IIdAtom * getEventNameId;
 extern IIdAtom * getExpandLogicalNameId;
+extern IIdAtom * getExternalResultHashId;
 extern IIdAtom * getFailMessageId;
 extern IIdAtom * getFilePositionId;
 extern IIdAtom * getGraphLoopCounterId;

+ 1 - 0
ecl/hqlcpp/hqlcppsys.ecl

@@ -659,6 +659,7 @@ const char * cppSystemText[]  = {
     "   varstring   getResultVarString(const varstring stepname, unsigned4 sequence) : ctxmethod,pure,entrypoint='getResultVarString';",
     "   varunicode  getResultVarUnicode(const varstring stepname, unsigned4 sequence) : ctxmethod,pure,entrypoint='getResultVarUnicode';",
     "   set of any getResultSet(const varstring stepname, unsigned4 sequence, boolean xmltransformer, boolean csvtransformer) : ctxmethod,pure,entrypoint='getResultSet',newset;",
+    "   unsigned4 getExternalResultHash(const varstring wuid, const varstring stepname, unsigned4 sequence) : ctxmethod,pure;",
 
     "   _linkcounted_ dataset getResultRowset(const varstring stepname, unsigned4 sequence, boolean _allocator, boolean isGrouped, boolean xmltransformer, boolean csvtransformer) : ctxmethod,allocator(false),pure,entrypoint='getResultRowset';",
     "   linkcounted dictionary getResultDictionary(const varstring stepname, unsigned4 sequence, boolean xmltransformer, boolean csvtransformer, boolean hasher) : ctxmethod,pure,entrypoint='getResultDictionary';",

+ 10 - 1
ecl/hqlcpp/hqlhtcpp.cpp

@@ -7726,14 +7726,23 @@ IHqlExpression * HqlCppTranslator::calculatePersistInputCrc(BuildCtx & ctx, Depe
         IHqlExpression & cur = dependencies.resultsRead.item(idx2);
         IHqlExpression * seq = cur.queryChild(0);
         IHqlExpression * name = cur.queryChild(1);
+        IHqlExpression * wuid = cur.queryChild(2);
+        if (name->isAttribute())
+        {
+            assertex(name->queryName() == wuidAtom);
+            wuid = name;
+            name = NULL;
+        }
 
         //Not sure if we need to do this if the result is internal.  Leave on for the moment.
         //if (seq->queryValue()->getIntValue() != ResultSequenceInternal)
         bool expandLogical = matchesConstantValue(seq, ResultSequencePersist) && !cur.hasAttribute(_internal_Atom);
         HqlExprArray args;
+        if (wuid)
+            args.append(*LINK(wuid->queryChild(0)));
         args.append(*createResultName(name, expandLogical));
         args.append(*LINK(seq));
-        OwnedHqlExpr call = bindFunctionCall(getResultHashId, args);
+        OwnedHqlExpr call = bindFunctionCall(wuid ? getExternalResultHashId : getResultHashId, args);
         OwnedHqlExpr value = createValue(no_bxor, crcExpr->getType(), LINK(crcExpr), ensureExprType(call, crcExpr->queryType()));
         buildAssignToTemp(ctx, crcExpr, value);
     }

+ 1 - 0
roxie/ccd/ccdactivities.cpp

@@ -538,6 +538,7 @@ public:
     }
 
     virtual unsigned getResultHash(const char * name, unsigned sequence) { throwUnexpected(); }
+    virtual unsigned getExternalResultHash(const char * wuid, const char * name, unsigned sequence) { throwUnexpected(); }
 
     // Not yet thought about these....
 

+ 7 - 0
roxie/ccd/ccdcontext.cpp

@@ -324,6 +324,12 @@ private:
         return r->getResultHash();
     }
 
+    unsigned getExternalResultHash(const char * wuid, const char * name, unsigned sequence)
+    {
+        //GH->RKC Roxie should implement reading external results when connected to dali
+        UNIMPLEMENTED;
+    }
+
     unsigned __int64 getResultInt(const char * name, unsigned sequence)
     {
         Owned<IConstWUResult> r = getWorkUnitResult(workunit, name, sequence);
@@ -1516,6 +1522,7 @@ public:
     virtual void setResultVarUnicode(const char * name, unsigned sequence, UChar const * value) { throwUnexpected(); }
 
     virtual unsigned getResultHash(const char * name, unsigned sequence) { throwUnexpected(); }
+    virtual unsigned getExternalResultHash(const char * wuid, const char * name, unsigned sequence) { throwUnexpected(); }
     virtual void printResults(IXmlWriter *output, const char *name, unsigned sequence) { throwUnexpected(); }
 
     virtual char *getWuid() { throwUnexpected(); }

+ 1 - 0
rtl/include/eclhelper.hpp

@@ -645,6 +645,7 @@ interface ICodeContext : public IResourceContext
 
     virtual const void * fromJson(IEngineRowAllocator * _rowAllocator, size32_t len, const char * utf8, IXmlToRowTransformer * xmlTransformer, bool stripWhitespace) = 0;
     virtual void getRowJSON(size32_t & lenResult, char * & result, IOutputMetaData & info, const void * row, unsigned flags) = 0;
+    virtual unsigned getExternalResultHash(const char * wuid, const char * name, unsigned sequence) = 0;
 };
 
 

+ 28 - 0
testing/regress/ecl/aaawriteresult.ecl

@@ -0,0 +1,28 @@
+/*##############################################################################
+
+    HPCC SYSTEMS software Copyright (C) 2015 HPCC Systems.
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+############################################################################## */
+
+#workunit ('name','ExportResult');
+
+namesRecord := 
+            RECORD
+string10        forename;
+string20        surname;
+            END;
+
+ds := dataset([{'Jo','Smith'},{'Jim','Smithe'},{'Joe','Schmitt'}], namesRecord);
+    
+output(ds,NAMED('ExportedNames'));

+ 5 - 0
testing/regress/ecl/key/aaawriteresult.xml

@@ -0,0 +1,5 @@
+<Dataset name='ExportedNames'>
+ <Row><forename>Jo        </forename><surname>Smith               </surname></Row>
+ <Row><forename>Jim       </forename><surname>Smithe              </surname></Row>
+ <Row><forename>Joe       </forename><surname>Schmitt             </surname></Row>
+</Dataset>

+ 8 - 0
testing/regress/ecl/key/readresult.xml

@@ -0,0 +1,8 @@
+<Dataset name='Result 1'>
+ <Row><Result_1>W</Result_1></Row>
+</Dataset>
+<Dataset name='Result 2'>
+ <Row><forename>Jo        </forename><surname>Smith               </surname></Row>
+ <Row><forename>Jim       </forename><surname>Smithe              </surname></Row>
+ <Row><forename>Joe       </forename><surname>Schmitt             </surname></Row>
+</Dataset>

+ 48 - 0
testing/regress/ecl/readresult.ecl

@@ -0,0 +1,48 @@
+/*##############################################################################
+
+    HPCC SYSTEMS software Copyright (C) 2015 HPCC Systems.
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+############################################################################## */
+
+//noroxie       - roxie doesn't currently support reading from external workunits
+
+namesRecord := 
+            RECORD
+string10        forename;
+string20        surname;
+            END;
+
+
+//Horrible code - get a list of workunits that match the name of the job that creates the result
+//which needs to be inside a nothor.
+
+import Std.System.Workunit as Wu;
+myWuid := workunit;
+startOfDay := myWuid[1..9] + '-000000';
+writers := Wu.WorkunitList(lowWuid := startOfDay,jobname := 'aaawriteresult*');
+
+//Now sort and extract the most recent wuid that matches the condition
+lastWriter := sort(nothor(writers), -wuid);
+wuid := TRIM(lastWriter[1].wuid) : independent;   // trim should not be needed
+
+ds := dataset(workunit(WUID,'ExportedNames'), namesRecord);
+
+p := ds : persist('readExported', single);
+
+import Std.File;
+sequential(
+    output(NOFOLD(wuid)[1..1]); /// yuk - output the wuid so it gets evaluated before the check in the persist call
+    File.DeleteLogicalFile('~readExported'),
+    output(p);
+);

+ 1 - 0
thorlcr/graph/thgraph.hpp

@@ -471,6 +471,7 @@ class graph_decl CGraphBase : public CInterface, implements IEclGraphResults, im
         virtual char *getResultVarString(const char * name, unsigned sequence) { return ctx->getResultVarString(name, sequence); }
         virtual UChar *getResultVarUnicode(const char * name, unsigned sequence) { return ctx->getResultVarUnicode(name, sequence); }
         virtual unsigned getResultHash(const char * name, unsigned sequence) { return ctx->getResultHash(name, sequence); }
+        virtual unsigned getExternalResultHash(const char * wuid, const char * name, unsigned sequence) { return ctx->getExternalResultHash(wuid, name, sequence); }
         virtual const char *cloneVString(const char *str) const { return ctx->cloneVString(str); }
         virtual const char *cloneVString(size32_t len, const char *str) const { return ctx->cloneVString(len, str); }
         virtual char *getWuid() { return ctx->getWuid(); }

+ 14 - 0
thorlcr/graph/thgraphmaster.cpp

@@ -1064,6 +1064,20 @@ public:
             return r->getResultHash();
         );
     }
+    virtual unsigned getExternalResultHash(const char * wuid, const char * stepname, unsigned sequence)
+    {
+        try
+        {
+            LOG(MCdebugProgress, unknownJob, "getExternalResultRaw %s", stepname);
+
+            Owned<IConstWUResult> r = getExternalResult(wuid, stepname, sequence);
+            return r->getResultHash();
+        }
+        catch (CATCHALL)
+        {
+            throw MakeStringException(TE_FailedToRetrieveWorkunitValue, "Failed to retrieve external data hash %s from workunit %s", stepname, wuid);
+        }
+    }
     virtual void getResultRowset(size32_t & tcount, byte * * & tgt, const char * stepname, unsigned sequence, IEngineRowAllocator * _rowAllocator, bool isGrouped, IXmlToRowTransformer * xmlTransformer, ICsvToRowTransformer * csvTransformer)
     {
         tgt = NULL;

+ 1 - 0
thorlcr/graph/thgraphslave.cpp

@@ -951,6 +951,7 @@ public:
     virtual unsigned getResultHash(const char * name, unsigned sequence) { throwUnexpected(); }
 
     virtual void getExternalResultRaw(unsigned & tlen, void * & tgt, const char * wuid, const char * stepname, unsigned sequence, IXmlToRowTransformer * xmlTransformer, ICsvToRowTransformer * csvTransformer) { throwUnexpected(); }
+    virtual unsigned getExternalResultHash(const char * wuid, const char * name, unsigned sequence) { throwUnexpected(); }
 
     virtual void addWuException(const char * text, unsigned code, unsigned severity, const char * source)
     {