浏览代码

Merge pull request #9360 from richardkchapman/issue16516

HPCC-16516 Associate results with the activity that creates them

Reviewed-by: Gavin Halliday <ghalliday@hpccsystems.com>
Gavin Halliday 8 年之前
父节点
当前提交
81a8c86c88
共有 4 个文件被更改,包括 76 次插入31 次删除
  1. 14 0
      common/workunit/workunit.cpp
  2. 2 0
      common/workunit/workunit.hpp
  3. 18 8
      ecl/hqlcpp/hqlhtcpp.cpp
  4. 42 23
      plugins/cassandra/cassandrawu.cpp

+ 14 - 0
common/workunit/workunit.cpp

@@ -1750,6 +1750,7 @@ public:
     virtual IProperties *queryResultXmlns();
     virtual IStringVal& getResultFieldOpt(const char *name, IStringVal &str) const;
     virtual void getSchema(IArrayOf<ITypeInfo> &types, StringAttrArray &names, IStringVal * ecl=NULL) const;
+    virtual void        getResultWriteLocation(IStringVal & _graph, unsigned & _activityId) const;
 
     // interface IWUResult
     virtual void        setResultStatus(WUResultStatus status);
@@ -1784,6 +1785,7 @@ public:
     virtual void        setResultRow(unsigned len, const void * data);
     virtual void        setResultXmlns(const char *prefix, const char *uri);
     virtual void        setResultFieldOpt(const char *name, const char *value);
+    virtual void        setResultWriteLocation(const char * _graph, unsigned _activityId);
 
     virtual IPropertyTree *queryPTree() { return p; }
 };
@@ -7948,6 +7950,12 @@ IStringVal& CLocalWUResult::getResultFieldOpt(const char *name, IStringVal &str)
     return str;
 }
 
+void CLocalWUResult::getResultWriteLocation(IStringVal & _graph, unsigned & _activityId) const
+{
+    _graph.set(p->queryProp("@graph"));
+    _activityId = p->getPropInt("@activity", 0);
+}
+
 void CLocalWUResult::setResultStatus(WUResultStatus status)
 {
     setEnum(p, "@status", status, resultStatuses);
@@ -7991,6 +7999,12 @@ void CLocalWUResult::setResultFieldOpt(const char *name, const char *value)
     format->setProp(xpath, value);
 }
 
+void CLocalWUResult::setResultWriteLocation(const char * _graph, unsigned _activityId)
+{
+    p->setProp("@graph", _graph);
+    p->setPropInt("@activity", _activityId);
+}
+
 void CLocalWUResult::setResultScalar(bool isScalar)
 {
     p->setPropInt("@isScalar", (int) isScalar);

+ 2 - 0
common/workunit/workunit.hpp

@@ -295,6 +295,7 @@ interface IConstWUResult : extends IInterface
     virtual const IProperties *queryResultXmlns() = 0;
     virtual IStringVal &getResultFieldOpt(const char *name, IStringVal &str) const = 0;
     virtual void getSchema(IArrayOf<ITypeInfo> &types, StringAttrArray &names, IStringVal * eclText) const = 0;
+    virtual void getResultWriteLocation(IStringVal & _graph, unsigned & _activityId) const = 0;
 };
 
 
@@ -332,6 +333,7 @@ interface IWUResult : extends IConstWUResult
     virtual void setResultRow(unsigned len, const void * data) = 0;
     virtual void setResultXmlns(const char *prefix, const char *uri) = 0;
     virtual void setResultFieldOpt(const char *name, const char *value)=0;
+    virtual void setResultWriteLocation(const char * _graph, unsigned _activityId) = 0;
 
     virtual IPropertyTree *queryPTree() = 0;
 };

+ 18 - 8
ecl/hqlcpp/hqlhtcpp.cpp

@@ -5369,16 +5369,26 @@ void HqlCppTranslator::buildSetResultInfo(BuildCtx & ctx, IHqlExpression * origi
             }
         }
 
-        IHqlExpression * format =  originalExpr->queryAttribute(storedFieldFormatAtom);
-        if (format)
+        if (result)
         {
-            ForEachChild(i, format)
+            ActivityInstance * activity = queryCurrentActivity(ctx);
+            if (activity)
             {
-                StringBuffer name;
-                StringBuffer value;
-                OwnedHqlExpr folded = foldHqlExpression(format->queryChild(i));
-                getHintNameValue(folded, name, value);
-                result->setResultFieldOpt(name, value);
+                const char * graphName = activeGraph->name;
+                result->setResultWriteLocation(graphName, activity->activityId);
+            }
+
+            IHqlExpression * format = originalExpr->queryAttribute(storedFieldFormatAtom);
+            if (format)
+            {
+                ForEachChild(i, format)
+                {
+                    StringBuffer name;
+                    StringBuffer value;
+                    OwnedHqlExpr folded = foldHqlExpression(format->queryChild(i));
+                    getHintNameValue(folded, name, value);
+                    result->setResultFieldOpt(name, value);
+                }
             }
         }
     }

+ 42 - 23
plugins/cassandra/cassandrawu.cpp

@@ -834,7 +834,7 @@ struct CassandraTableInfo
 };
 
 static const int majorVersion = 1;  // If this does not match the value in the repository, you cannot proceed - a conversion tool is needed
-static const int minorVersion = 1;  // If this is less that the value in the repository, we should be fine (but there may be columns we don't know about and thus don't read - and will write as NULL in new rows)
+static const int minorVersion = 2;  // If this is less that the value in the repository, we should be fine (but there may be columns we don't know about and thus don't read - and will write as NULL in new rows)
                                     // If this is greater than the value in the repository, we need to update the repository (using add column) and its version before proceeding
                                     // Make sure to increment this if any column is ever added below
 
@@ -1081,7 +1081,9 @@ static const ChildTableInfo wuGraphMetasTable =
         {"totalrowcount", "bigint", "totalRowCount", bigintColumnMapper},  /* This is the number of rows in value */ \
         {"schemaRaw", "blob", "SchemaRaw", blobColumnMapper},              \
         {"logicalName", "text", "logicalName", stringColumnMapper},        /* either this or value will be present once result status is "calculated" */ \
-        {"value", "blob", "Value", blobColumnMapper}
+        {"value", "blob", "Value", blobColumnMapper}, \
+        {"graph", "text", "@graph", stringColumnMapper}, \
+        {"activity", "int", "@activity", intColumnMapper}
 
 static const CassandraXmlMapping wuResultsMappings [] =
 {
@@ -3056,36 +3058,53 @@ public:
         cluster.setOptions(options);
         if (!cluster.queryKeySpace())
             cluster.setKeySpace("hpcc");
-        cluster.connect();
-        Owned<IPTree> versionInfo = getVersionInfo();
-        if (versionInfo)
+        try
         {
-            int major = versionInfo->getPropInt("@major", 0);
-            int minor = versionInfo->getPropInt("@minor", 0);
-            if (major && minor)
+            cluster.connect();
+            Owned<IPTree> versionInfo = getVersionInfo();
+            if (versionInfo)
             {
-                // Note that if there is no version info at all, we have to assume that the repository is not yet created. We don't fail, otherwise no-one can call createRepository the first time...
-                if (major != majorVersion)
-                    throw makeStringExceptionV(WUERR_WorkunitVersionMismatch, "Incompatible workunit repository version (wanted %d.%d, found %d.%d)", majorVersion, minorVersion, major, minor);
-                if (minor != minorVersion)
+                int major = versionInfo->getPropInt("@major", 0);
+                int minor = versionInfo->getPropInt("@minor", 0);
+                if (major && minor)
                 {
-                    if (minor < minorVersion)
+                    // Note that if there is no version info at all, we have to assume that the repository is not yet created. We don't fail, otherwise no-one can call createRepository the first time...
+                    if (major != majorVersion)
+                        throw makeStringExceptionV(WUERR_WorkunitVersionMismatch, "Incompatible workunit repository version (wanted %d.%d, found %d.%d)", majorVersion, minorVersion, major, minor);
+                    if (minor != minorVersion)
                     {
-                        DBGLOG("WARNING: repository version %d.%d is older than current version %d.%d - adding required columns", major, minor, majorVersion, minorVersion);
-                        switch (minor)
+                        if (minor < minorVersion)
                         {
-                        // Add code here to create any columns that we need to to get from version "minor" to expected layout
+                            DBGLOG("WARNING: repository version %d.%d is older than current version %d.%d - adding required columns", major, minor, majorVersion, minorVersion);
+                            switch (minor)
+                            {
+                            case 1:
+                                executeSimpleCommand(querySession(), "ALTER TABLE wuresults ADD graph text;");
+                                executeSimpleCommand(querySession(), "ALTER TABLE wuresults ADD activity int;");
+                                executeSimpleCommand(querySession(), "ALTER TABLE wuvariables ADD graph text;");
+                                executeSimpleCommand(querySession(), "ALTER TABLE wuvariables ADD activity int;");
+                                executeSimpleCommand(querySession(), "ALTER TABLE wutemporaries ADD graph text;");
+                                executeSimpleCommand(querySession(), "ALTER TABLE wutemporaries ADD activity int;");
+                                break;
+                            }
+                            createVersionTable(true);
                         }
+                        else
+                            DBGLOG("WARNING: repository version %d.%d is newer than current version %d.%d - some columns will not be updated", major, minor, majorVersion, minorVersion);
                     }
-                    else
-                        DBGLOG("WARNING: repository version %d.%d is newer than current version %d.%d - some columns will not be updated", major, minor, majorVersion, minorVersion);
                 }
             }
+            else
+            {
+                DBGLOG("WARNING: repository version could not be retrieved (repository not yet created?)");
+                cluster.disconnect();
+            }
         }
-        else
+        catch (IException *E)
         {
+            EXCLOG(E);
+            E->Release();
             DBGLOG("WARNING: repository version could not be retrieved (repository not yet created?)");
-            cluster.disconnect();
         }
         cacheRetirer.start();
         LINK(_dll);  // Yes, this leaks. Not really sure how to avoid that.
@@ -3711,7 +3730,7 @@ public:
         executeSimpleCommand(s, create);
         s.set(NULL);
         cluster.connect();
-        createVersionTable();
+        createVersionTable(false);
         ensureTable(querySession(), workunitsMappings);
         ensureTable(querySession(), searchMappings);
         ensureTable(querySession(), uniqueSearchMappings);
@@ -3736,12 +3755,12 @@ public:
         return cluster.prepareStatement(query, traceLevel>=2);
     }
 private:
-    void createVersionTable()
+    void createVersionTable(bool force)
     {
         StringBuffer schema;
         executeSimpleCommand(querySession(), describeTable(versionMappings, schema));
         Owned<IPTree> oldVersion = getVersionInfo();
-        if (!oldVersion)
+        if (force || !oldVersion)
         {
             VStringBuffer versionInfo("<Version major='%d' minor='%d'/>", majorVersion, minorVersion);
             CassandraBatch versionBatch(cass_batch_new(CASS_BATCH_TYPE_LOGGED));