Преглед изворни кода

Merge pull request #7604 from richardkchapman/cassandra-result2

HPCC-13977 Fix issues running thor/hthor regression suites using C* workunits

Reviewed-By: Jake Smith <jake.smith@lexisnexis.com>
Reviewed-By: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman пре 10 година
родитељ
комит
5514b01feb

+ 20 - 16
ecl/eclagent/eclagent.cpp

@@ -698,12 +698,6 @@ void EclAgent::abort()
         activeGraph->abort();
 }
 
-IWUResult *EclAgent::updateResult(const char *name, unsigned sequence)
-{
-    WorkunitUpdate w = updateWorkUnit();
-    return updateWorkUnitResult(w, name, sequence);
-}
-
 IConstWUResult *EclAgent::getResult(const char *name, unsigned sequence)
 {
     IConstWorkUnit *w = queryWorkUnit();
@@ -773,7 +767,8 @@ void EclAgent::outputFormattedResult(const char * name, unsigned sequence, bool
 void EclAgent::setResultInt(const char * name, unsigned sequence, __int64 val, unsigned size)
 {
     LOG(MCsetresult, unknownJob, "setResultInt(%s,%d,%" I64F "d)", nullText(name), sequence, val);
-    Owned<IWUResult> r = updateResult(name, sequence);
+    WorkunitUpdate w = updateWorkUnit();
+    Owned<IWUResult> r = updateWorkUnitResult(w, name, sequence);
     if (r)
     {
         r->setResultInt(val);
@@ -796,7 +791,8 @@ void EclAgent::setResultInt(const char * name, unsigned sequence, __int64 val, u
 void EclAgent::setResultUInt(const char * name, unsigned sequence, unsigned __int64 val, unsigned size)
 {
     LOG(MCsetresult, unknownJob, "setResultUInt(%s,%d,%" I64F "u)", nullText(name), sequence, val);
-    Owned<IWUResult> r = updateResult(name, sequence);
+    WorkunitUpdate w = updateWorkUnit();
+    Owned<IWUResult> r = updateWorkUnitResult(w, name, sequence);
     if (r)
     {
         r->setResultUInt(val);
@@ -820,7 +816,8 @@ void EclAgent::setResultReal(const char *name, unsigned sequence, double val)
 {
     // Still a bit of a mess - variables vs results
     LOG(MCsetresult, unknownJob, "setResultReal(%s,%d,%6f)", nullText(name), sequence, val);
-    Owned<IWUResult> r = updateResult(name, sequence);
+    WorkunitUpdate w = updateWorkUnit();
+    Owned<IWUResult> r = updateWorkUnitResult(w, name, sequence);
     if (r)
     {
         r->setResultReal(val);  
@@ -1120,7 +1117,8 @@ void EclAgent::setResultData(const char * stepname, unsigned sequence, int len,
 void EclAgent::doSetResultString(type_t type, const char *name, unsigned sequence, int len, const char *val)
 {
     LOG(MCsetresult, unknownJob, "setResultString(%s,%d,'%.*s')", nullText(name), sequence, len, val);
-    Owned<IWUResult> r = updateResult(name, sequence);
+    WorkunitUpdate w = updateWorkUnit();
+    Owned<IWUResult> r = updateWorkUnitResult(w, name, sequence);
     if (r)
     {
         r->setResultString(val, len);   
@@ -1144,7 +1142,8 @@ void EclAgent::doSetResultString(type_t type, const char *name, unsigned sequenc
 void EclAgent::setResultRaw(const char * name, unsigned sequence, int len, const void *val)
 {
     LOG(MCsetresult, unknownJob, "setResultRaw(%s,%d,(%d bytes))", nullText(name), sequence, len);
-    Owned<IWUResult> r = updateResult(name, sequence);
+    WorkunitUpdate w = updateWorkUnit();
+    Owned<IWUResult> r = updateWorkUnitResult(w, name, sequence);
     if (r)
     {
         r->setResultRow(len, val);
@@ -1167,7 +1166,8 @@ void EclAgent::setResultRaw(const char * name, unsigned sequence, int len, const
 void EclAgent::setResultSet(const char * name, unsigned sequence, bool isAll, size32_t len, const void *val, ISetToXmlTransformer *xform)
 {
     LOG(MCsetresult, unknownJob, "setResultSet(%s,%d)", nullText(name), sequence);
-    Owned<IWUResult> r = updateResult(name, sequence);
+    WorkunitUpdate w = updateWorkUnit();
+    Owned<IWUResult> r = updateWorkUnitResult(w, name, sequence);
     if (r)
     {
         r->setResultIsAll(isAll);
@@ -1227,7 +1227,8 @@ void EclAgent::setResultUnicode(const char * name, unsigned sequence, int len, U
 {
     LOG(MCsetresult, unknownJob, "setResultUnicode(%s,%d)", nullText(name), sequence);
 
-    Owned<IWUResult> r = updateResult(name, sequence);
+    WorkunitUpdate w = updateWorkUnit();
+    Owned<IWUResult> r = updateWorkUnitResult(w, name, sequence);
     if (r)
     {
         r->setResultUnicode((char const *)val, len);
@@ -1254,7 +1255,8 @@ void EclAgent::setResultBool(const char *name, unsigned sequence, bool val)
 {
     LOG(MCsetresult, unknownJob, "setResultBool(%s,%d,%s)", nullText(name), sequence, val ? "true" : "false");
 
-    Owned<IWUResult> r = updateResult(name, sequence);
+    WorkunitUpdate w = updateWorkUnit();
+    Owned<IWUResult> r = updateWorkUnitResult(w, name, sequence);
     if (r)
     {
         r->setResultBool(val);
@@ -1277,7 +1279,8 @@ void EclAgent::setResultBool(const char *name, unsigned sequence, bool val)
 void EclAgent::setResultDecimal(const char *name, unsigned sequence, int len, int precision, bool isSigned, const void *val)
 {
     LOG(MCsetresult, unknownJob, "setResultDecimal(%s,%d)", nullText(name), sequence);
-    Owned<IWUResult> r = updateResult(name, sequence);
+    WorkunitUpdate w = updateWorkUnit();
+    Owned<IWUResult> r = updateWorkUnitResult(w, name, sequence);
     if (r)
     {
         r->setResultDecimal(val, len);
@@ -1305,7 +1308,8 @@ void EclAgent::setResultDecimal(const char *name, unsigned sequence, int len, in
 void EclAgent::setResultDataset(const char * name, unsigned sequence, size32_t len, const void *val, unsigned numRows, bool extend)
 {
     LOG(MCsetresult, unknownJob, "setResultDataset(%s,%d)", nullText(name), sequence);
-    Owned<IWUResult> result = updateResult(name, sequence);
+    WorkunitUpdate w = updateWorkUnit();
+    Owned<IWUResult> result = updateWorkUnitResult(w, name, sequence);
     if (!result)
         fail(0, "Unexpected parameters to setResultDataset");
 

+ 0 - 1
ecl/eclagent/eclagent.ipp

@@ -390,7 +390,6 @@ private:
     bool expandLogicalName(StringBuffer & fullname, const char * logicalName);
     IRemoteConnection *getPersistReadLock(const char * logicalName);
     void doSimpleResult(type_t type, int size, char * buffer, int sequence);
-    IWUResult *updateResult(const char *name, unsigned sequence);
     IConstWUResult *getResult(const char *name, unsigned sequence);
     IConstWUResult *getResultForGet(const char *name, unsigned sequence);
     IConstWUResult *getExternalResult(const char * wuid, const char *name, unsigned sequence);

+ 9 - 8
plugins/cassandra/cassandrawu.cpp

@@ -1137,7 +1137,7 @@ static const CassandraXmlMapping wuGraphProgressMappings [] =
     {"partition", "int", NULL, hashRootNameColumnMapper},
     {"wuid", "text", NULL, rootNameColumnMapper},
     {"graphID", "text", NULL, stringColumnMapper},
-    {"subgraphID", "text", NULL, stringColumnMapper},
+    {"subgraphID", "bigint", NULL, bigintColumnMapper},
     {"creator", "text", NULL, stringColumnMapper},
     {"progress", "blob", NULL, blobColumnMapper},
     { NULL, "wuGraphProgress", "((partition, wuid), graphID, subgraphID, creator)", stringColumnMapper}
@@ -1148,7 +1148,7 @@ static const CassandraXmlMapping wuGraphStateMappings [] =
     {"partition", "int", NULL, hashRootNameColumnMapper},
     {"wuid", "text", NULL, rootNameColumnMapper},
     {"graphID", "text", NULL, stringColumnMapper},
-    {"subgraphID", "text", NULL, stringColumnMapper},
+    {"subgraphID", "bigint", NULL, bigintColumnMapper},
     {"state", "int", NULL, intColumnMapper},
     { NULL, "wuGraphState", "((partition, wuid), graphID, subgraphID)", stringColumnMapper}
 };
@@ -1158,7 +1158,7 @@ static const CassandraXmlMapping wuGraphRunningMappings [] =
     {"partition", "int", NULL, hashRootNameColumnMapper},
     {"wuid", "text", NULL, rootNameColumnMapper},
     {"graphID", "text", NULL, stringColumnMapper},
-    {"subgraphID", "int", NULL, intColumnMapper},
+    {"subgraphID", "bigint", NULL, bigintColumnMapper},
     { NULL, "wuGraphRunning", "((partition, wuid))", stringColumnMapper}
 };
 
@@ -2181,6 +2181,7 @@ public:
             prev.clear();
             allDirty = false;
             dirtyPaths.kill();
+            dirtyResults.kill();
         }
         else
             DBGLOG("No batch present??");
@@ -2376,7 +2377,7 @@ public:
         while (cass_iterator_next(rows))
         {
             const CassRow *row = cass_iterator_get_row(rows);
-            unsigned subId = subId = getUnsignedResult(NULL, cass_row_get_column(row, 0));
+            WUGraphIDType subId = subId = getUnsignedResult(NULL, cass_row_get_column(row, 0));
             StringBuffer creator, xml;
             getCassString(creator, cass_row_get_column(row, 1));
             getCassString(xml, cass_row_get_column(row, 2));
@@ -2397,7 +2398,7 @@ public:
         statement.bindInt32(0, rtlHash32VStr(wuid, 0) % NUM_PARTITIONS);
         statement.bindString(1, wuid);
         statement.bindString(2, graphName);
-        statement.bindInt32(3, nodeId);
+        statement.bindInt64(3, nodeId);
         CassandraFuture future(cass_session_execute(sessionCache->querySession(), statement));
         future.wait("queryNodeState");
         CassandraResult result(cass_future_get_result(future));
@@ -2417,7 +2418,7 @@ public:
         statement.bindInt32(0, rtlHash32VStr(wuid, 0) % NUM_PARTITIONS);
         statement.bindString(1, wuid);
         statement.bindString(2, graphName);
-        statement.bindInt32(3, nodeId);
+        statement.bindInt64(3, nodeId);
         statement.bindInt32(4, (int) state);
         CassandraFuture future(cass_session_execute(sessionCache->querySession(), statement));
         future.wait("setNodeState update state");
@@ -2431,7 +2432,7 @@ public:
                     statement2.bindInt32(0, rtlHash32VStr(wuid, 0) % NUM_PARTITIONS);
                     statement2.bindString(1, wuid);
                     statement2.bindString(2, graphName);
-                    statement2.bindInt32(3, nodeId);
+                    statement2.bindInt64(3, nodeId);
                     CassandraFuture future(cass_session_execute(sessionCache->querySession(), statement2));
                     future.wait("setNodeState update running");
                     break;
@@ -2463,7 +2464,7 @@ public:
             statement.bindInt32(0, rtlHash32VStr(wuid, 0) % NUM_PARTITIONS);
             statement.bindString(1, wuid);
             statement.bindString(2, progress->queryName());
-            statement.bindInt32(3, id);
+            statement.bindInt64(3, id);
             statement.bindString(4, creator);
             StringBuffer tag;
             tag.append("sg").append(id);

+ 22 - 21
thorlcr/graph/thgraphmaster.cpp

@@ -800,16 +800,6 @@ class CThorCodeContextMaster : public CThorCodeContextBase
     Linked<IConstWorkUnit> workunit;
     Owned<IDistributedFileTransaction> superfiletransaction;
 
-    virtual IWorkUnit *updateWorkUnit() const
-    {
-        Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
-        return factory->updateWorkUnit(workunit->queryWuid());
-    }
-    IWUResult *updateResult(const char *name, unsigned sequence)
-    {
-        Owned<IWorkUnit> w = updateWorkUnit();
-        return updateWorkUnitResult(w, name, sequence);
-    }
     IConstWUResult * getResult(const char * name, unsigned sequence)
     {
         return getWorkUnitResult(workunit, name, sequence);
@@ -835,7 +825,8 @@ public:
 // ICodeContext
     virtual void setResultBool(const char *name, unsigned sequence, bool result)
     {
-        Owned<IWUResult> r = updateResult(name, sequence);
+        WorkunitUpdate w(&workunit->lock());
+        Owned<IWUResult> r = updateWorkUnitResult(w, name, sequence);
         if (r)
         {
             r->setResultBool(result);   
@@ -846,7 +837,8 @@ public:
     }
     virtual void setResultData(const char *name, unsigned sequence, int len, const void *result)
     {
-        Owned<IWUResult> r = updateResult(name, sequence);
+        WorkunitUpdate w(&workunit->lock());
+        Owned<IWUResult> r = updateWorkUnitResult(w, name, sequence);
         if (r)
         {
             r->setResultData(result, len);  
@@ -857,7 +849,8 @@ public:
     }
     virtual void setResultDecimal(const char * name, unsigned sequence, int len, int precision, bool isSigned, const void *val)
     {
-        Owned<IWUResult> r = updateResult(name, sequence);
+        WorkunitUpdate w(&workunit->lock());
+        Owned<IWUResult> r = updateWorkUnitResult(w, name, sequence);
         if (r)
         {
             r->setResultDecimal(val, len);
@@ -868,7 +861,8 @@ public:
     }
     virtual void setResultInt(const char *name, unsigned sequence, __int64 result, unsigned size)
     {
-        Owned<IWUResult> r = updateResult(name, sequence);
+        WorkunitUpdate w(&workunit->lock());
+        Owned<IWUResult> r = updateWorkUnitResult(w, name, sequence);
         if (r)
         {
             r->setResultInt(result);
@@ -879,7 +873,8 @@ public:
     }
     virtual void setResultRaw(const char *name, unsigned sequence, int len, const void *result)
     {
-        Owned<IWUResult> r = updateResult(name, sequence);
+        WorkunitUpdate w(&workunit->lock());
+        Owned<IWUResult> r = updateWorkUnitResult(w, name, sequence);
         if (r)
         {
             r->setResultRaw(len, result, ResultFormatRaw);  
@@ -890,7 +885,8 @@ public:
     }
     virtual void setResultReal(const char *name, unsigned sequence, double result)
     {
-        Owned<IWUResult> r = updateResult(name, sequence);
+        WorkunitUpdate w(&workunit->lock());
+        Owned<IWUResult> r = updateWorkUnitResult(w, name, sequence);
         if (r)
         {
             r->setResultReal(result);   
@@ -901,7 +897,8 @@ public:
     }
     virtual void setResultSet(const char *name, unsigned sequence, bool isAll, size32_t len, const void *result, ISetToXmlTransformer *)
     {
-        Owned<IWUResult> r = updateResult(name, sequence);
+        WorkunitUpdate w(&workunit->lock());
+        Owned<IWUResult> r = updateWorkUnitResult(w, name, sequence);
         if (r)
         {
             r->setResultIsAll(isAll);
@@ -913,7 +910,8 @@ public:
     }
     virtual void setResultString(const char *name, unsigned sequence, int len, const char *result)
     {
-        Owned<IWUResult> r = updateResult(name, sequence);
+        WorkunitUpdate w(&workunit->lock());
+        Owned<IWUResult> r = updateWorkUnitResult(w, name, sequence);
         if (r)
         {
             r->setResultString(result, len);    
@@ -924,7 +922,8 @@ public:
     }
     virtual void setResultUnicode(const char * name, unsigned sequence, int len, UChar const * result)
     {
-        Owned<IWUResult> r = updateResult(name, sequence);
+        WorkunitUpdate w(&workunit->lock());
+        Owned<IWUResult> r = updateWorkUnitResult(w, name, sequence);
         if (r)
         {
             r->setResultUnicode(result, len);
@@ -935,7 +934,8 @@ public:
     }
     virtual void setResultVarString(const char * name, unsigned sequence, const char *result)
     {
-        Owned<IWUResult> r = updateResult(name, sequence);
+        WorkunitUpdate w(&workunit->lock());
+        Owned<IWUResult> r = updateWorkUnitResult(w, name, sequence);
         if (r)
         {
             r->setResultString(result, strlen(result)); 
@@ -946,7 +946,8 @@ public:
     }
     virtual void setResultUInt(const char *name, unsigned sequence, unsigned __int64 result, unsigned size)
     {
-        Owned<IWUResult> r = updateResult(name, sequence);
+        WorkunitUpdate w(&workunit->lock());
+        Owned<IWUResult> r = updateWorkUnitResult(w, name, sequence);
         if (r)
         {
             r->setResultUInt(result);

+ 1 - 1
thorlcr/master/thactivitymaster.cpp

@@ -405,7 +405,7 @@ void updateActivityResult(IConstWorkUnit &workunit, unsigned helperFlags, unsign
 {
     Owned<IWorkUnit> wu = &workunit.lock();
     Owned<IWUResult> r;
-    r.setown(wu->updateResultBySequence(sequence));
+    r.setown(updateWorkUnitResult(wu, logicalFilename, sequence));
     r->setResultTotalRowCount(recordCount); 
     r->setResultStatus(ResultStatusCalculated);
     if (TDWresult & helperFlags)