瀏覽代碼

Merge remote-tracking branch 'origin/candidate-3.10.x'

Conflicts:
	ecl/hqlcpp/hqlresource.cpp
	ecl/hqlcpp/hqlresource.ipp

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 12 年之前
父節點
當前提交
085347b65e

+ 1 - 2
ecl/ecl-package/ecl-package.cpp

@@ -250,8 +250,7 @@ public:
             for (unsigned i=0; i<num; i++)
             {
                 IConstPackageListMapData& req = pkgMapInfo.item(i);
-                const char *id = req.getId();
-                printf("\nPackage Name = %s\n", id);
+                printf("\nPackage Name = %s  active = %d\n", req.getId(), req.getActive());
                 IArrayOf<IConstPackageListData> &pkgInfo = req.getPkgListData();
 
                 unsigned int numPkgs = pkgInfo.ordinality();

+ 15 - 0
ecl/eclcmd/eclcmd_common.cpp

@@ -132,6 +132,21 @@ bool isValidMemoryValue(const char *value)
     return false;
 }
 
+bool isValidPriorityValue(const char *value)
+{
+    if (!value || !*value)
+        return false;
+    if (strieq("LOW", value))
+        return true;
+    if (strieq("HIGH", value))
+        return true;
+    if (strieq("SLA", value))
+        return true;
+    if (strieq("NONE", value))
+        return true;
+    return false;
+}
+
 //=========================================================================================
 
 #define PE_OFFSET_LOCATION_IN_DOS_SECTION 0x3C

+ 2 - 0
ecl/eclcmd/eclcmd_common.hpp

@@ -83,6 +83,7 @@ typedef IEclCommand *(*EclCommandFactory)(const char *cmdname);
 #define ECLOPT_TIME_LIMIT "--timeLimit"
 #define ECLOPT_MEMORY_LIMIT "--memoryLimit"
 #define ECLOPT_WARN_TIME_LIMIT "--warnTimeLimit"
+#define ECLOPT_PRIORITY "--priority"
 
 #define ECLOPT_RESULT_LIMIT "--limit"
 #define ECLOPT_RESULT_LIMIT_INI "resultLimit"
@@ -119,6 +120,7 @@ typedef IEclCommand *(*EclCommandFactory)(const char *cmdname);
 #define ECLOPT_VERBOSE_S "-v"
 
 bool isValidMemoryValue(const char *value);
+bool isValidPriorityValue(const char *value);
 
 bool extractEclCmdOption(StringBuffer & option, IProperties * globals, const char * envName, const char * propertyName, const char * defaultPrefix, const char * defaultSuffix);
 bool extractEclCmdOption(StringAttr & option, IProperties * globals, const char * envName, const char * propertyName, const char * defaultPrefix, const char * defaultSuffix);

+ 12 - 0
ecl/eclcmd/eclcmd_core.cpp

@@ -333,6 +333,8 @@ public:
                 continue;
             if (iter.matchOption(optMemoryLimit, ECLOPT_MEMORY_LIMIT))
                 continue;
+            if (iter.matchOption(optPriority, ECLOPT_PRIORITY))
+                continue;
             if (iter.matchFlag(optNoActivate, ECLOPT_NO_ACTIVATE))
             {
                 activateSet=true;
@@ -367,6 +369,11 @@ public:
             fprintf(stderr, "invalid --memoryLimit value of %s.\n\n", optMemoryLimit.get());
             return false;
         }
+        if (optPriority.length() && !isValidPriorityValue(optPriority))
+        {
+            fprintf(stderr, "invalid --priority value of %s.\n\n", optPriority.get());
+            return false;
+        }
         return true;
     }
     virtual int processCMD()
@@ -399,6 +406,8 @@ public:
             req->setWarnTimeLimit(optWarnTimeLimit);
         if (!optMemoryLimit.isEmpty())
             req->setMemoryLimit(optMemoryLimit);
+        if (!optPriority.isEmpty())
+            req->setPriority(optPriority);
 
         Owned<IClientWUPublishWorkunitResponse> resp = client->WUPublishWorkunit(req);
         const char *id = resp->getQueryId();
@@ -446,6 +455,8 @@ public:
             "   --warnTimeLimit=<ms>   Value to set for query warnTimeLimit configuration\n"
             "   --memoryLimit=<mem>    Value to set for query memoryLimit configuration\n"
             "                          format <mem> as 500000B, 550K, 100M, 10G, 1T etc.\n"
+            "   --priority=<val>       set the priority for this query. Value can be LOW,\n"
+            "                          HIGH, SLA, NONE. NONE will clear current setting.\n"
             "   --wait=<ms>            Max time to wait in milliseconds\n",
             stdout);
         EclCmdWithEclTarget::usage();
@@ -454,6 +465,7 @@ private:
     StringAttr optName;
     StringAttr optDaliIP;
     StringAttr optMemoryLimit;
+    StringAttr optPriority;
     unsigned optMsToWait;
     unsigned optTimeLimit;
     unsigned optWarnTimeLimit;

+ 34 - 4
ecl/eclcmd/queries/ecl-queries.cpp

@@ -192,10 +192,16 @@ public:
                 line.appendN(41 - line.length(), ' ');
             line.append(' ').append(query.getWarnTimeLimit());
         }
-        if (query.getMemoryLimit())
+        if (query.getPriority())
         {
             if (line.length() < 48)
                 line.appendN(48 - line.length(), ' ');
+            line.append(' ').append(query.getPriority());
+        }
+        if (query.getMemoryLimit())
+        {
+            if (line.length() < 53)
+                line.appendN(53 - line.length(), ' ');
             line.append(' ').append(query.getMemoryLimit());
         }
         fputs(line.append('\n').str(), stdout);
@@ -207,9 +213,9 @@ public:
         if (qs.getQuerySetName())
             fprintf(stdout, "\nQuerySet: %s\n", qs.getQuerySetName());
         fputs("\n", stdout);
-        fputs("                                   Time   Warn   Memory\n", stdout);
-        fputs("Flags Query Id                     Limit  Limit  Limit\n", stdout);
-        fputs("----- ---------------------------- ------ ------ ----------\n", stdout);
+        fputs("                                   Time   Warn   Pri  Memory\n", stdout);
+        fputs("Flags Query Id                     Limit  Limit       Limit\n", stdout);
+        fputs("----- ---------------------------- ------ ------ ---- ----------\n", stdout);
 
         IArrayOf<IConstQuerySetQuery> &queries = qs.getQueries();
         ForEachItemIn(id, queries)
@@ -316,6 +322,8 @@ public:
                 continue;
             if (iter.matchOption(optMemoryLimit, ECLOPT_MEMORY_LIMIT))
                 continue;
+            if (iter.matchOption(optPriority, ECLOPT_PRIORITY))
+                continue;
             if (EclCmdCommon::matchCommandLineOption(iter, true)!=EclCmdOptionMatch)
                 return false;
         }
@@ -340,6 +348,11 @@ public:
             fprintf(stderr, "invalid --memoryLimit value of %s.\n\n", optMemoryLimit.get());
             return false;
         }
+        if (optPriority.length() && !isValidPriorityValue(optPriority))
+        {
+            fprintf(stderr, "invalid --priority value of %s.\n\n", optPriority.get());
+            return false;
+        }
 
         return true;
     }
@@ -364,6 +377,8 @@ public:
             req->setWarnTimeLimit(optWarnTimeLimit);
         if (!optMemoryLimit.isEmpty())
             req->setMemoryLimit(optMemoryLimit);
+        if (!optPriority.isEmpty())
+            req->setPriority(optPriority);
 
         Owned<IClientWUQuerySetCopyQueryResponse> resp = client->WUQuerysetCopyQuery(req);
         if (resp->getExceptions().ordinality())
@@ -403,6 +418,8 @@ public:
             "   --warnTimeLimit=<sec>  Value to set for query warnTimeLimit configuration\n"
             "   --memoryLimit=<mem>    Value to set for query memoryLimit configuration\n"
             "                          format <mem> as 500000B, 550K, 100M, 10G, 1T etc.\n"
+            "   --priority=<val>       set the priority for this query. Value can be LOW,\n"
+            "                          HIGH, SLA, NONE. NONE will clear current setting.\n"
             " Common Options:\n",
             stdout);
         EclCmdCommon::usage();
@@ -413,6 +430,7 @@ private:
     StringAttr optTargetCluster;
     StringAttr optDaliIP;
     StringAttr optMemoryLimit;
+    StringAttr optPriority;
     unsigned optMsToWait;
     unsigned optTimeLimit;
     unsigned optWarnTimeLimit;
@@ -464,6 +482,8 @@ public:
                 continue;
             if (iter.matchOption(optMemoryLimit, ECLOPT_MEMORY_LIMIT))
                 continue;
+            if (iter.matchOption(optPriority, ECLOPT_PRIORITY))
+                continue;
             if (EclCmdCommon::matchCommandLineOption(iter, true)!=EclCmdOptionMatch)
                 return false;
         }
@@ -483,6 +503,11 @@ public:
             fprintf(stderr, "invalid --memoryLimit value of %s.\n\n", optMemoryLimit.get());
             return false;
         }
+        if (optPriority.length() && !isValidPriorityValue(optPriority))
+        {
+            fprintf(stderr, "invalid --priority value of %s.\n\n", optPriority.get());
+            return false;
+        }
         return true;
     }
 
@@ -501,6 +526,8 @@ public:
             req->setWarnTimeLimit(optWarnTimeLimit);
         if (!optMemoryLimit.isEmpty())
             req->setMemoryLimit(optMemoryLimit);
+        if (!optPriority.isEmpty())
+            req->setPriority(optPriority);
 
         Owned<IClientWUQueryConfigResponse> resp = client->WUQueryConfig(req);
         if (resp->getExceptions().ordinality())
@@ -531,6 +558,8 @@ public:
             "   --warnTimeLimit=<sec>  Value to set for query warnTimeLimit configuration\n"
             "   --memoryLimit=<mem>    Value to set for query memoryLimit configuration\n"
             "                          format <mem> as 500000B, 550K, 100M, 10G, 1T etc.\n"
+            "   --priority=<val>       set the priority for this query. Value can be LOW,\n"
+            "                          HIGH, SLA, NONE. NONE will clear current setting.\n"
             " Common Options:\n",
             stdout);
         EclCmdCommon::usage();
@@ -539,6 +568,7 @@ private:
     StringAttr optTargetCluster;
     StringAttr optQueryId;
     StringAttr optMemoryLimit;
+    StringAttr optPriority;
     unsigned optMsToWait;
     unsigned optTimeLimit;
     unsigned optWarnTimeLimit;

+ 65 - 3
ecl/hqlcpp/hqlresource.cpp

@@ -1756,6 +1756,7 @@ EclResourcer::EclResourcer(IErrorReceiver * _errors, IConstWorkUnit * _wu, Clust
     targetClusterType = _targetClusterType; 
     clusterSize = _clusterSize ? _clusterSize : FIXED_CLUSTER_SIZE;
     insideNeverSplit = false;
+    insideSteppedNeverSplit = false;
     sequential = false;
     options.mangleSpillNameWithWuid = false;
     options.minimizeSpillSize = _translatorOptions.minimizeSpillSize;
@@ -2584,14 +2585,71 @@ protected:
 };
 
 
+static bool isPotentialCompoundSteppedIndexRead(IHqlExpression * expr)
+{
+    loop
+    {
+        switch (expr->getOperator())
+        {
+        case no_compound_diskread:
+        case no_compound_disknormalize:
+        case no_compound_diskaggregate:
+        case no_compound_diskcount:
+        case no_compound_diskgroupaggregate:
+        case no_compound_childread:
+        case no_compound_childnormalize:
+        case no_compound_childaggregate:
+        case no_compound_childcount:
+        case no_compound_childgroupaggregate:
+        case no_compound_selectnew:
+        case no_compound_inline:
+            return false;
+        case no_compound_indexread:
+        case no_newkeyindex:
+            return true;
+        case no_keyedlimit:
+        case no_preload:
+        case no_filter:
+        case no_hqlproject:
+        case no_newusertable:
+        case no_limit:
+        case no_sorted:
+        case no_preservemeta:
+        case no_distributed:
+        case no_grouped:
+        case no_stepped:
+        case no_section:
+        case no_sectioninput:
+        case no_dataset_alias:
+            break;
+        case no_choosen:
+            {
+                IHqlExpression * arg2 = expr->queryChild(2);
+                if (arg2 && !arg2->isPure())
+                    return false;
+                break;
+            }
+        default:
+            return false;
+        }
+        expr = expr->queryChild(0);
+    }
+}
 
 bool EclResourcer::findSplitPoints(IHqlExpression * expr)
 {
     ResourcerInfo * info = queryResourceInfo(expr);
     bool savedInsideNeverSplit = insideNeverSplit;
+    bool savedInsideSteppedNeverSplit = insideSteppedNeverSplit;
+    if (insideSteppedNeverSplit && info)
+    {
+        if (!isPotentialCompoundSteppedIndexRead(expr) && (expr->getOperator() != no_datasetlist))
+            insideSteppedNeverSplit = false;
+    }
+
     if (info && info->numUses)
     {
-        if (insideNeverSplit)
+        if (insideNeverSplit || insideSteppedNeverSplit)
             info->neverSplit = true;
         if (info->isAlreadyInScope && (info->numUses == 0) && expr->isDatarow())
         {
@@ -2615,7 +2673,7 @@ bool EclResourcer::findSplitPoints(IHqlExpression * expr)
     {
         info = queryCreateResourceInfo(expr);
         info->numUses++;
-        if (insideNeverSplit)
+        if (insideNeverSplit || insideSteppedNeverSplit)
             info->neverSplit = true;
 
         bool isActivity = true;
@@ -2687,7 +2745,7 @@ bool EclResourcer::findSplitPoints(IHqlExpression * expr)
         case no_mergejoin:
         case no_nwayjoin:
             if (options.preventSteppedSplit)
-                insideNeverSplit = true;
+                insideSteppedNeverSplit = true;
             break;
         case no_compound_diskread:
         case no_compound_disknormalize:
@@ -2714,6 +2772,7 @@ bool EclResourcer::findSplitPoints(IHqlExpression * expr)
         if (!type || type->isScalar())
         {
             insideNeverSplit = savedInsideNeverSplit;
+            insideSteppedNeverSplit = savedInsideSteppedNeverSplit;
             return false;
         }
 
@@ -2746,12 +2805,14 @@ bool EclResourcer::findSplitPoints(IHqlExpression * expr)
                     findSplitPoints(cur);
                 }
                 insideNeverSplit = savedInsideNeverSplit;
+                insideSteppedNeverSplit = savedInsideSteppedNeverSplit;
                 gatherChildSplitPoints(expr, alwaysHoistChild, info, first, last);
                 break;
             }
         }
 
         insideNeverSplit = false;
+        insideSteppedNeverSplit = false;
         ForEachItemIn(i2, info->childDependents)
         {
             IHqlExpression & cur = info->childDependents.item(i2);
@@ -2768,6 +2829,7 @@ bool EclResourcer::findSplitPoints(IHqlExpression * expr)
     }
 
     insideNeverSplit = savedInsideNeverSplit;
+    insideSteppedNeverSplit = savedInsideSteppedNeverSplit;
     return info->containsActivity;
 }
 

+ 1 - 0
ecl/hqlcpp/hqlresource.ipp

@@ -424,6 +424,7 @@ protected:
     bool spillMultiCondition;
     bool spotThroughAggregate;
     bool insideNeverSplit;
+    bool insideSteppedNeverSplit;
     bool sequential;
     CResourceOptions options;
     HqlExprArray rootConditions;

+ 41 - 0
ecl/regress/bug8413.ecl

@@ -0,0 +1,41 @@
+/*##############################################################################
+
+    HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems.
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+############################################################################## */
+
+d := dataset('in', { string15 name, unsigned value }, thor);
+
+i := index(d, { name, value } , {}, '\\home\\person.name_first.key');
+
+f := i(name != 'Gavin');
+
+r1 := RECORD
+string15 name;
+unsigned value;
+unsigned seq;
+  END;
+  
+p := PROJECT(f, TRANSFORM(r1, SELF.seq := RANDOM(), SELF := LEFT));
+s1 := SORT(p, value);
+
+output(s1(name != 'Jim'));
+
+p2 := DEDUP(s1, name);
+
+output(p2);
+
+f2 := JOIN([s1,p2,p2], LEFT.value = RIGHT.value, TRANSFORM(LEFT), SORTED(value));
+
+output(f2);

+ 1 - 0
esp/scm/ws_packageprocess.ecm

@@ -98,6 +98,7 @@ ESPstruct PackageListMapData
     string Id;
     string Target;
     ESParray<ESPstruct PackageListData> PkgListData;
+    boolean Active;
 };
 
 ESPresponse [exceptions_inline] ListPackageResponse

+ 4 - 0
esp/scm/ws_workunits.ecm

@@ -1070,6 +1070,7 @@ ESPrequest [nil_remove] WUPublishWorkunitRequest
     string memoryLimit;
     nonNegativeInteger TimeLimit(0);
     nonNegativeInteger WarnTimeLimit(0);
+    string Priority;
     string RemoteDali;
 };
 
@@ -1095,6 +1096,7 @@ ESPrequest [nil_remove] WUQueryConfigRequest
     string memoryLimit;
     nonNegativeInteger TimeLimit(0);
     nonNegativeInteger WarnTimeLimit(0);
+    string Priority;
 };
 
 ESPStruct WUQueryConfigResult
@@ -1141,6 +1143,7 @@ ESPStruct [nil_remove] QuerySetQuery
     string memoryLimit;
     nonNegativeInteger timeLimit;
     nonNegativeInteger warnTimeLimit;
+    string priority;
 };
 
 ESPStruct QuerySetAlias
@@ -1290,6 +1293,7 @@ ESPrequest [nil_remove] WUQuerySetCopyQueryRequest
     string memoryLimit;
     nonNegativeInteger TimeLimit(0);
     nonNegativeInteger WarnTimeLimit(0);
+    string priority;
 };
 
 ESPresponse [exceptions_inline] WUQuerySetCopyQueryResponse

+ 7 - 0
esp/services/ws_machine/ws_machineService.cpp

@@ -1174,6 +1174,7 @@ void Cws_machineEx::readStorageData(const char* response, CMachineInfoThreadPara
     if (!pStr)
         DBGLOG("Storage information not found on %s", pParam->m_machineData.getNetworkAddress());
 
+    bool isTitleLine = true;
     CIArrayOf<CStorageData>& storage = pParam->m_machineData.getStorage();
     while (pStr)
     {
@@ -1190,6 +1191,12 @@ void Cws_machineEx::readStorageData(const char* response, CMachineInfoThreadPara
             pStr = NULL;
         }
 
+        if (isTitleLine)
+        {
+            isTitleLine = false;
+            continue;
+        }
+
         if (buf.length() > 0)
         {
             StringBuffer diskSpaceTitle;

+ 13 - 33
esp/services/ws_packageprocess/ws_packageprocessService.cpp

@@ -310,42 +310,22 @@ void listPkgInfo(const char *target, const char *process, IArrayOf<IConstPackage
         throw MakeStringException(PKG_DALI_LOOKUP_ERROR, "Unable to retrieve package information from dali for process %s", (process && *process) ? process : "*");
 
     StringBuffer xpath("PackageMap");
-    if (!target || !*target)
+    if (target && *target)
+        xpath.appendf("[@querySet='%s']", target);
+    Owned<IPropertyTreeIterator> iter = pkgSetRegistry->getElements(xpath.str());
+    ForEach(*iter)
     {
-        Owned<IPropertyTreeIterator> iter = pkgSetRegistry->getElements("PackageMap");
-        ForEach(*iter)
+        IPropertyTree &item = iter->query();
+        const char *id = item.queryProp("@id");
+        if (id)
         {
-            Owned<IEspPackageListMapData> res = createPackageListMapData("", "");
-            IPropertyTree &item = iter->query();
             StringBuffer xpath;
-            const char *id = item.queryProp("@id");
-            if (id)
-            {
-                xpath.append("PackageMap[@id='").append(id).append("']");
-                IPropertyTree *mapTree = root->queryPropTree(xpath);
-                Owned<IEspPackageListMapData> res = createPackageListMapData("", "");
-                getPackageListInfo(mapTree, res);
-                results->append(*res.getClear());
-            }
-        }
-    }
-    else
-    {
-        xpath.appendf("[@querySet='%s']", target);
-        Owned<IPropertyTreeIterator> iter = pkgSetRegistry->getElements(xpath.str());
-        ForEach(*iter)
-        {
-            IPropertyTree &item = iter->query();
-            const char *id = item.queryProp("@id");
-            if (id)
-            {
-                StringBuffer xpath;
-                xpath.append("PackageMap[@id='").append(id).append("']");
-                IPropertyTree *mapTree = root->queryPropTree(xpath);
-                Owned<IEspPackageListMapData> res = createPackageListMapData("", "");
-                getPackageListInfo(mapTree, res);
-                results->append(*res.getClear());
-            }
+            xpath.append("PackageMap[@id='").append(id).append("']");
+            IPropertyTree *mapTree = root->queryPropTree(xpath);
+            Owned<IEspPackageListMapData> res = createPackageListMapData("", "");
+            res->setActive(item.getPropBool("@active"));
+            getPackageListInfo(mapTree, res);
+            results->append(*res.getClear());
         }
     }
 }

+ 57 - 2
esp/services/ws_workunits/ws_workunitsQuerySets.cpp

@@ -381,6 +381,56 @@ static inline void updateMemoryLimitSetting(IPropertyTree *queryTree, const char
         queryTree->setPropInt64("@memoryLimit", limit);
 }
 
+enum QueryPriority {
+    QueryPriorityNone = -1,
+    QueryPriorityLow = 0,
+    QueryPriorityHigh = 1,
+    QueryPrioritySLA = 2,
+    QueryPriorityInvalid = 3
+};
+
+static inline const char *getQueryPriorityName(int value)
+{
+    switch (value)
+    {
+    case QueryPriorityLow:
+        return "LOW";
+    case QueryPriorityHigh:
+        return "HIGH";
+    case QueryPrioritySLA:
+        return "SLA";
+    case QueryPriorityNone:
+        return "NONE";
+    }
+    return "INVALID";
+}
+static inline void updateQueryPriority(IPropertyTree *queryTree, const char *value)
+{
+    if (!value || !*value || !queryTree)
+        return;
+    int priority = QueryPriorityInvalid;
+    if (strieq("LOW", value))
+        priority=QueryPriorityLow;
+    else if (strieq("HIGH", value))
+        priority=QueryPriorityHigh;
+    else if (strieq("SLA", value))
+        priority=QueryPrioritySLA;
+    else if (strieq("NONE", value))
+        priority=QueryPriorityNone;
+
+    switch (priority)
+    {
+    case QueryPriorityInvalid:
+        break;
+    case QueryPriorityNone:
+        queryTree->removeProp("@priority");
+        break;
+    default:
+        queryTree->setPropInt("@priority", priority);
+        break;
+    }
+}
+
 void copyQueryFilesToCluster(IEspContext &context, IConstWorkUnit *cw, const char *remoteIP, const char *target, const char *queryid, bool overwrite)
 {
     if (!target || !*target)
@@ -481,12 +531,13 @@ bool CWsWorkunitsEx::onWUPublishWorkunit(IEspContext &context, IEspWUPublishWork
     StringBuffer queryId;
     WUQueryActivationOptions activate = (WUQueryActivationOptions)req.getActivate();
     addQueryToQuerySet(wu, target.str(), queryName.str(), NULL, activate, queryId);
-    if (req.getMemoryLimit() || !req.getTimeLimit_isNull() || ! req.getWarnTimeLimit_isNull())
+    if (req.getMemoryLimit() || !req.getTimeLimit_isNull() || !req.getWarnTimeLimit_isNull() || req.getPriority())
     {
         Owned<IPropertyTree> queryTree = getQueryById(target.str(), queryId, false);
         updateMemoryLimitSetting(queryTree, req.getMemoryLimit());
         updateQuerySetting(req.getTimeLimit_isNull(), queryTree, "@timeLimit", req.getTimeLimit());
         updateQuerySetting(req.getWarnTimeLimit_isNull(), queryTree, "@warnTimeLimit", req.getWarnTimeLimit());
+        updateQueryPriority(queryTree, req.getPriority());
     }
     wu->commit();
     wu.clear();
@@ -550,6 +601,8 @@ void gatherQuerySetQueryDetails(IPropertyTree *query, IEspQuerySetQuery *queryIn
         queryInfo->setTimeLimit(query->getPropInt("@timeLimit"));
     if (query->hasProp("@warnTimeLimit"))
         queryInfo->setWarnTimeLimit(query->getPropInt("@warnTimeLimit"));
+    if (query->hasProp("@priority"))
+        queryInfo->setPriority(getQueryPriorityName(query->getPropInt("@priority")));
     if (queriesOnCluster)
     {
         IArrayOf<IEspClusterQueryState> clusters;
@@ -878,6 +931,7 @@ bool CWsWorkunitsEx::onWUQueryConfig(IEspContext &context, IEspWUQueryConfigRequ
         if (queryTree)
         {
             updateMemoryLimitSetting(queryTree, req.getMemoryLimit());
+            updateQueryPriority(queryTree, req.getPriority());
             updateQuerySetting(req.getTimeLimit_isNull(), queryTree, "@timeLimit", req.getTimeLimit());
             updateQuerySetting(req.getWarnTimeLimit_isNull(), queryTree, "@warnTimeLimit", req.getWarnTimeLimit());
         }
@@ -1093,10 +1147,11 @@ bool CWsWorkunitsEx::onWUQuerysetCopyQuery(IEspContext &context, IEspWUQuerySetC
     StringBuffer targetQueryId;
     WUQueryActivationOptions activate = (WUQueryActivationOptions)req.getActivate();
     addQueryToQuerySet(wu, target, queryName.str(), NULL, activate, targetQueryId);
-    if (req.getMemoryLimit() || !req.getTimeLimit_isNull() || ! req.getWarnTimeLimit_isNull())
+    if (req.getMemoryLimit() || !req.getTimeLimit_isNull() || ! req.getWarnTimeLimit_isNull() || req.getPriority())
     {
         Owned<IPropertyTree> queryTree = getQueryById(target, targetQueryId, false);
         updateMemoryLimitSetting(queryTree, req.getMemoryLimit());
+        updateQueryPriority(queryTree, req.getPriority());
         updateQuerySetting(req.getTimeLimit_isNull(), queryTree, "@timeLimit", req.getTimeLimit());
         updateQuerySetting(req.getWarnTimeLimit_isNull(), queryTree, "@warnTimeLimit", req.getWarnTimeLimit());
     }

+ 2 - 9
initfiles/componentfiles/configxml/thor.xsd.in

@@ -349,14 +349,14 @@
       <xs:attribute name="masterMemorySize" type="xs:nonNegativeInteger" use="optional">
         <xs:annotation>
           <xs:appinfo>
-            <tooltip>Memory (in MB) to use for Link Counted Rows on thor master. It will default to globalMemorySize if unset</tooltip>
+            <tooltip>Memory (in MB) to use for rows on thor master. It will default to globalMemorySize if unset</tooltip>
           </xs:appinfo>
         </xs:annotation>
       </xs:attribute>
       <xs:attribute name="globalMemorySize" type="xs:nonNegativeInteger" use="optional">
         <xs:annotation>
           <xs:appinfo>
-            <tooltip>Memory (in MB) to use for Link Counted Rows. It will default to physical available memory if unset</tooltip>
+            <tooltip>Memory (in MB) to use for rows. If unset, default = [75% of physical memory] / slavesPerNode</tooltip>
           </xs:appinfo>
         </xs:annotation>
       </xs:attribute>
@@ -525,13 +525,6 @@
           </xs:appinfo>
         </xs:annotation>
       </xs:attribute>
-      <xs:attribute name="largeMemSize" type="xs:nonNegativeInteger" use="optional">
-        <xs:annotation>
-          <xs:appinfo>
-            <tooltip>Memory available to thor for heavyweith operations (MB)</tooltip>
-          </xs:appinfo>
-        </xs:annotation>
-      </xs:attribute>
       <xs:attribute name="smallSortThreshold" type="xs:nonNegativeInteger" use="optional" default="1024">
         <xs:annotation>
           <xs:appinfo>

+ 3 - 3
initfiles/etc/bash_completion/ecl

@@ -89,12 +89,12 @@ _ecl_opts_queries()
             echo "--help list copy config"
             ;;
         copy)
-            echo -n "--no-reload --wait= --timeLimit= --warnTimeLimit= --memoryLimit= --daliip= "
+            echo -n "--no-reload --wait= --timeLimit= --warnTimeLimit= --memoryLimit= --priority= --daliip= "
             _ecl_opts_common
             ;;
         config)
             echo -n "-t= --target= --no-files --daliip= -A --activate --no-reload "
-            echo -n "-O --overwrite --wait= --timeLimit= --warnTimeLimit= --memoryLimit= "
+            echo -n "-O --overwrite --wait= --timeLimit= --warnTimeLimit= --memoryLimit= --priority= "
             _ecl_opts_common
             ;;
         list)
@@ -166,7 +166,7 @@ _ecl_opts_core_file()
                 _ecl_opts_common
                 ;;
             publish)
-                echo -n "-A --activate --no-reload --timeLimit= --warnTimeLimit= --memoryLimit= --daliip= "
+                echo -n "-A --activate --no-reload --timeLimit= --warnTimeLimit= --memoryLimit= --priority= --daliip= "
                 _ecl_opts_deploy
                 _ecl_opts_common
                 ;;

+ 1 - 0
thorlcr/activities/indexread/thindexreadslave.cpp

@@ -691,6 +691,7 @@ public:
     }
     CATCH_NEXTROW()
     {
+        ActivityTimer t(totalCycles, timeActivities, NULL);
         if (eoi) 
             return NULL;
         if (RCMAX != keyedLimitCount)

+ 30 - 42
thorlcr/activities/nsplitter/thnsplitterslave.cpp

@@ -27,17 +27,20 @@ class NSplitterSlaveActivity;
 
 class CSplitterOutputBase : public CSimpleInterface, implements IRowStream
 {
+protected:
+    unsigned __int64 totalCycles;
 public:
     IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
+    CSplitterOutputBase() { totalCycles = 0; }
 
     virtual void start() = 0;
     virtual void getMetaInfo(ThorDataLinkMetaInfo &info) = 0;
+    virtual unsigned __int64 queryTotalCycles() const { return totalCycles; }
 };
 
 class CSplitterOutput : public CSplitterOutputBase
 {
     NSplitterSlaveActivity &activity;
-    unsigned __int64 totalCycles;
 
     unsigned output;
     rowcount_t rec, max;
@@ -49,33 +52,10 @@ public:
 
     virtual void getMetaInfo(ThorDataLinkMetaInfo &info);
 
-    void addCycles(unsigned __int64 elapsedCycles);
     virtual void start();
     virtual void stop();
     virtual const void *nextRow();
-    unsigned __int64 queryTotalCycles() const;
-};
-
-#ifdef TIME_ACTIVITIES
-struct Timer : public ActivityTimer
-{
-    CSplitterOutput &output;
-    unsigned __int64 elapsedCycles;
-    inline Timer(CSplitterOutput &_output, const bool &enabled) : ActivityTimer(elapsedCycles, enabled, NULL), output(_output) { }
-    inline ~Timer()
-    {
-        if (enabled)
-            output.addCycles(elapsedCycles);
-    }
 };
-#else
-//optimized away completely?
-struct Timer
-{
-    inline Timer(CSplitterOutput &_output, const bool &enabled) { }
-};
-#endif
-
 
 
 //
@@ -90,7 +70,6 @@ class NSplitterSlaveActivity : public CSlaveActivity
     CriticalSection startLock;
     unsigned nstopped;
     rowcount_t recsReady;
-    SpinLock timingLock;
     IThorDataLink *input;
     bool grouped;
     Owned<IException> startException, writeAheadException;
@@ -180,10 +159,15 @@ class NSplitterSlaveActivity : public CSlaveActivity
         IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
 
         CInputWrapper(NSplitterSlaveActivity &_activity, IThorDataLink *_input) : activity(_activity), input(_input) { }
-        virtual const void *nextRow() { return input->nextRow(); }
+        virtual const void *nextRow()
+        {
+            ActivityTimer t(totalCycles, activity.queryTimeActivities(), NULL);
+            return input->nextRow();
+        }
         virtual void stop() { input->stop(); }
         virtual void start()
         {
+            ActivityTimer s(totalCycles, activity.queryTimeActivities(), NULL);
             input->start();
         }
         virtual void getMetaInfo(ThorDataLinkMetaInfo &info)
@@ -236,6 +220,12 @@ class NSplitterSlaveActivity : public CSlaveActivity
                 activity->inputs.item(0)->getMetaInfo(info);
             info.canStall = !activity->spill;
         }
+        virtual unsigned __int64 queryTotalCycles() const
+        {
+            if (!input)
+                return 0;
+            return input->queryTotalCycles();
+        }
     };
 public:
     IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
@@ -327,6 +317,7 @@ public:
     }
     inline void more(rowcount_t &max)
     {
+        ActivityTimer t(totalCycles, queryTimeActivities(), NULL);
         writer.more(max);
     }
     void prepareInput(unsigned output)
@@ -427,6 +418,17 @@ public:
         if (smartBuf)
             smartBuf->cancel();
     }
+    unsigned __int64 queryTotalCycles() const
+    {
+        unsigned __int64 _totalCycles = totalCycles; // more() time
+        ForEachItemIn(o, outputs)
+        {
+            CDelayedInput *delayedInput = (CDelayedInput *)outputs.item(o);
+            _totalCycles += delayedInput->queryTotalCycles();
+        }
+        return _totalCycles;
+    }
+
 friend class CInputWrapper;
 friend class CSplitterOutput;
 friend class CWriter;
@@ -439,20 +441,12 @@ CSplitterOutput::CSplitterOutput(NSplitterSlaveActivity &_activity, unsigned _ou
    : activity(_activity), output(_output)
 {
     rec = max = 0;
-    totalCycles = 0;
-}
-
-void CSplitterOutput::addCycles(unsigned __int64 elapsedCycles)
-{
-    totalCycles += elapsedCycles; // per output
-    SpinBlock b(activity.timingLock);
-    activity.getTotalCyclesRef() += elapsedCycles; // Splitter act aggregate time.
 }
 
 // IThorDataLink
 void CSplitterOutput::start()
 {
-    Timer s(*this, activity.queryTimeActivities());
+    ActivityTimer s(totalCycles, activity.queryTimeActivities(), NULL);
     rec = max = 0;
     activity.prepareInput(output);
     if (activity.startException)
@@ -468,9 +462,9 @@ void CSplitterOutput::stop()
 
 const void *CSplitterOutput::nextRow()
 {
-    Timer t(*this, activity.queryTimeActivities());
     if (rec == max)
         activity.more(max);
+    ActivityTimer t(totalCycles, activity.queryTimeActivities(), NULL);
     const void *row = activity.nextRow(output); // pass ptr to max if need more
     ++rec;
     return row;
@@ -482,12 +476,6 @@ void CSplitterOutput::getMetaInfo(ThorDataLinkMetaInfo &info)
     CThorDataLink::calcMetaInfoSize(info, activity.inputs.item(0));
 }
 
-unsigned __int64 CSplitterOutput::queryTotalCycles() const
-{
-    return totalCycles;
-}
-
-
 CActivityBase *createNSplitterSlave(CGraphElementBase *container)
 {
     return new NSplitterSlaveActivity(container);

+ 2 - 0
thorlcr/graph/thgraphmaster.cpp

@@ -2355,6 +2355,8 @@ void CMasterGraph::executeSubGraph(size32_t parentExtractSz, const byte *parentE
             fatalHandler->clear();
     }
     fatalHandler.clear();
+    Owned<IWorkUnit> wu = &job.queryWorkUnit().lock();
+    queryJobManager().updateWorkUnitLog(*wu);
 }
 
 void CMasterGraph::sendGraph()

+ 1 - 0
thorlcr/graph/thgraphmaster.hpp

@@ -53,6 +53,7 @@ interface IJobManager : extends IInterface
     virtual IDeMonServer *queryDeMonServer() = 0;
     virtual void fatal(IException *e) = 0;
     virtual void addCachedSo(const char *name) = 0;
+    virtual void updateWorkUnitLog(IWorkUnit &workunit) = 0;
 };
 
 interface ILoadedDllEntry;

+ 3 - 2
thorlcr/graph/thgraphslave.cpp

@@ -265,9 +265,10 @@ unsigned __int64 CSlaveActivity::queryLocalCycles() const
                 break;
         }
     }
-    if (totalCycles < inputCycles) // not sure how/if possible, but guard against
+    unsigned __int64 _totalCycles = queryTotalCycles();
+    if (_totalCycles < inputCycles) // not sure how/if possible, but guard against
         return 0;
-    return totalCycles-inputCycles;
+    return _totalCycles-inputCycles;
 }
 
 unsigned __int64 CSlaveActivity::queryTotalCycles() const

+ 1 - 1
thorlcr/graph/thgraphslave.hpp

@@ -70,7 +70,7 @@ public:
 
     unsigned __int64 &getTotalCyclesRef() { return totalCycles; }
     unsigned __int64 queryLocalCycles() const;
-    unsigned __int64 queryTotalCycles() const;
+    virtual unsigned __int64 queryTotalCycles() const; // some acts. may calculate accumulated total from inputs (e.g. splitter)
     virtual void serializeStats(MemoryBuffer &mb);
 };
 

+ 12 - 4
thorlcr/master/thgraphmanager.cpp

@@ -85,6 +85,7 @@ public:
     virtual IDeMonServer *queryDeMonServer() { return demonServer; }
     virtual void fatal(IException *e);
     virtual void addCachedSo(const char *name);
+    virtual void updateWorkUnitLog(IWorkUnit &workunit);
 };
 
 // CJobManager impl.
@@ -165,6 +166,16 @@ void CJobManager::fatal(IException *e)
 #endif
 }
 
+void CJobManager::updateWorkUnitLog(IWorkUnit &workunit)
+{
+    StringBuffer log, logUrl;
+    logHandler->getLogName(log);
+    createUNCFilename(log, logUrl, false);
+    workunit.addProcess("Thor", globals->queryProp("@name"), logUrl.str());
+}
+
+
+
 
 #define IDLE_RESTART_PERIOD (8*60) // 8 hours
 class CIdleShutdown : public CSimpleInterface, implements IThreaded
@@ -642,12 +653,9 @@ bool CJobManager::executeGraph(IConstWorkUnit &workunit, const char *graphName,
     {
         Owned<IWorkUnit> wu = &workunit.lock();
         wu->setTracingValue("ThorBuild", BUILD_TAG);
-        StringBuffer log, logUrl;
-        logHandler->getLogName(log);
-        createUNCFilename(log, logUrl, false);
-        wu->addProcess("Thor", globals->queryProp("@name"), logUrl.str());
         StringBuffer tsStr("Thor - ");
         wu->setTimeStamp(tsStr.append(graphName).str(), GetCachedHostName(), "Started");
+        updateWorkUnitLog(*wu);
     }
     Owned<IException> exception;
     SCMStringBuffer wuid;