Browse Source

Merge branch 'candidate-4.2.8' into closedown-4.2.x

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 11 years ago
parent
commit
c55c7019c7

+ 3 - 1
ecl/hql/hqlexpr.cpp

@@ -3463,7 +3463,9 @@ void CHqlExpression::updateFlagsAfterOperands()
     case no_attr_link:
     case no_attr_expr:
         if (queryName() == onFailAtom)
-            infoFlags &= ~HEFonFailDependent;
+        {
+            infoFlags &= ~(HEFonFailDependent|HEFcontainsSkip); // ONFAIL(SKIP) - skip shouldn't extend any further
+        }
         infoFlags &= ~(HEFthrowscalar|HEFthrowds|HEFoldthrows);
         break;
     case no_clustersize:

+ 23 - 11
ecl/hthor/hthorkey.cpp

@@ -225,7 +225,8 @@ const void *CHThorNullAggregateActivity::nextInGroup()
 class CHThorNullCountActivity : public CHThorNullActivity
 {
 public:
-    CHThorNullCountActivity(IAgentContext &agent, unsigned _activityId, unsigned _subgraphId, IHThorArg & _arg, ThorActivityKind _kind) : CHThorNullActivity(agent, _activityId, _subgraphId, _arg, _kind) {}
+    CHThorNullCountActivity(IAgentContext &agent, unsigned _activityId, unsigned _subgraphId, IHThorArg & _arg, ThorActivityKind _kind)
+        : CHThorNullActivity(agent, _activityId, _subgraphId, _arg, _kind), finished(false) {}
 
     //interface IHThorInput
     virtual void ready();
@@ -804,11 +805,9 @@ CHThorIndexReadActivity::CHThorIndexReadActivity(IAgentContext &_agent, unsigned
 {
     steppedExtra = static_cast<IHThorSteppedSourceExtra *>(helper.selectInterface(TAIsteppedsourceextra_1));
     needTransform = helper.needTransform();
-    keyedLimit = helper.getKeyedLimit();
-    rowLimit = helper.getRowLimit();
-    if (helper.getFlags() & TIRlimitskips)
-        rowLimit = (unsigned __int64) -1;
-    stopAfter = helper.getChooseNLimit();
+    keyedLimit = (unsigned __int64)-1;
+    rowLimit = (unsigned __int64)-1;
+    stopAfter = (unsigned __int64)-1;
     keyedLimitReached = false;
     keyedLimitSkips = ((helper.getFlags() & TIRkeyedlimitskips) != 0);
     keyedLimitCreates = ((helper.getFlags() & TIRkeyedlimitcreates) != 0);
@@ -844,6 +843,11 @@ void CHThorIndexReadActivity::ready()
 {
     keyedLimitReached = false;
     keyedLimitRowCreated = false;
+    keyedLimit = helper.getKeyedLimit();
+    rowLimit = helper.getRowLimit();
+    if (helper.getFlags() & TIRlimitskips)
+        rowLimit = (unsigned __int64) -1;
+    stopAfter = helper.getChooseNLimit();
     keyedProcessed = 0;
     if(!gotLayoutTrans)
     {
@@ -1117,13 +1121,12 @@ protected:
 
 CHThorIndexNormalizeActivity::CHThorIndexNormalizeActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexNormalizeArg &_arg, ThorActivityKind _kind, IDistributedFile * _df) : CHThorIndexReadActivityBase(_agent, _activityId, _subgraphId, _arg, _kind, _df), helper(_arg), outBuilder(NULL)
 {
-    keyedLimit = helper.getKeyedLimit();
+    keyedLimit = (unsigned __int64)-1;
     skipLimitReached = false;
     keyedProcessed = 0;
-    rowLimit = helper.getRowLimit();
-    if (helper.getFlags() & TIRlimitskips)
-        rowLimit = (unsigned __int64) -1;
-    stopAfter = helper.getChooseNLimit();
+    rowLimit = (unsigned __int64)-1;
+    stopAfter = (unsigned __int64)-1;
+    expanding = false;
 }
 
 CHThorIndexNormalizeActivity::~CHThorIndexNormalizeActivity()
@@ -1132,8 +1135,13 @@ CHThorIndexNormalizeActivity::~CHThorIndexNormalizeActivity()
 
 void CHThorIndexNormalizeActivity::ready()
 {
+    keyedLimit = helper.getKeyedLimit();
     skipLimitReached = false;
     keyedProcessed = 0;
+    rowLimit = helper.getRowLimit();
+    if (helper.getFlags() & TIRlimitskips)
+        rowLimit = (unsigned __int64) -1;
+    stopAfter = helper.getChooseNLimit();
     expanding = false;
     CHThorIndexReadActivityBase::ready();
     outBuilder.setAllocator(rowAllocator);
@@ -1406,6 +1414,8 @@ protected:
 CHThorIndexCountActivity::CHThorIndexCountActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexCountArg &_arg, ThorActivityKind _kind, IDistributedFile * _df) 
     : CHThorIndexReadActivityBase(_agent, _activityId, _subgraphId, _arg, _kind, _df), helper(_arg)
 {
+    choosenLimit = (unsigned __int64)-1;
+    finished = false;
 }
 
 void CHThorIndexCountActivity::ready()
@@ -1514,6 +1524,8 @@ protected:
 
 CHThorIndexGroupAggregateActivity::CHThorIndexGroupAggregateActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexGroupAggregateArg &_arg, ThorActivityKind _kind, IDistributedFile * _df) : CHThorIndexReadActivityBase(_agent, _activityId, _subgraphId, _arg, _kind, _df), helper(_arg), aggregated(_arg, _arg)
 {
+    eof = false;
+    gathered = false;
 }
 
 void CHThorIndexGroupAggregateActivity::ready()

+ 106 - 105
esp/services/ws_smc/ws_smcService.cpp

@@ -76,26 +76,6 @@ void AccessFailure(IEspContext& context, char const * msg,...)
     AUDIT(AUDIT_TYPE_ACCESS_FAILURE,buf.str());
 }
 
-struct QueueWrapper
-{
-    QueueWrapper(const char* targetName, const char* queueExt)
-    {
-        StringBuffer name;
-        name.append(targetName).append('.').append(queueExt);
-        queue.setown(createJobQueue(name.str()));
-    }
-
-    QueueWrapper(const char* queueName)
-    {
-        queue.setown(createJobQueue(queueName));
-    }
-
-    operator IJobQueue*() { return queue.get(); }
-    IJobQueue* operator->() { return queue.get(); }
-
-    Owned<IJobQueue> queue;
-};
-
 struct QueueLock
 {
     QueueLock(IJobQueue* q): queue(q) { queue->lock(); }
@@ -660,12 +640,15 @@ ActivityInfo* CWsSMCEx::getActivityInfo(IEspContext &context, IEspActivityReques
         return activityInfoCache.getLink();
 
     DBGLOG("CWsSMCEx::getActivityInfo - rebuild cached information");
+    {
+        EspTimeSection timer("createActivityInfo");
+        activityInfoCache.setown(createActivityInfo(context));
+    }
 
-    activityInfoCache.setown(createActivityInfo(context, req));
     return activityInfoCache.getLink();
 }
 
-ActivityInfo* CWsSMCEx::createActivityInfo(IEspContext &context, IEspActivityRequest &req)
+ActivityInfo* CWsSMCEx::createActivityInfo(IEspContext &context)
 {
     Owned<IEnvironmentFactory> factory = getEnvironmentFactory();
     Owned<IConstEnvironment> env = factory->openEnvironment();
@@ -1266,17 +1249,18 @@ bool CWsSMCEx::onMoveJobDown(IEspContext &context, IEspSMCJobRequest &req, IEspS
     {
         checkAccess(context,THORQUEUE_FEATURE,SecAccess_Full);
 
-        Owned<IJobQueue> queue = createJobQueue(req.getQueueName());
-        QueueLock lock(queue);
-        unsigned index=queue->findRank(req.getWuid());
-        if(index<queue->ordinality())
         {
-            IJobQueueItem * item0 = queue->getItem(index);
-            IJobQueueItem * item = queue->getItem(index+1);
-            if(item && item0 && (item0->getPriority() == item->getPriority()))
-                queue->moveAfter(req.getWuid(),item->queryWUID());
+            Owned<IJobQueue> queue = createJobQueue(req.getQueueName());
+            QueueLock lock(queue);
+            unsigned index=queue->findRank(req.getWuid());
+            if(index<queue->ordinality())
+            {
+                Owned<IJobQueueItem> item0 = queue->getItem(index);
+                Owned<IJobQueueItem> item = queue->getItem(index+1);
+                if(item && item0 && (item0->getPriority() == item->getPriority()))
+                    queue->moveAfter(req.getWuid(),item->queryWUID());
+            }
         }
-
         AccessSuccess(context, "Changed job priority %s",req.getWuid());
         clearActivityInfoCache();
         resp.setRedirectUrl("/WsSMC/");
@@ -1294,17 +1278,18 @@ bool CWsSMCEx::onMoveJobUp(IEspContext &context, IEspSMCJobRequest &req, IEspSMC
     {
         checkAccess(context,THORQUEUE_FEATURE,SecAccess_Full);
 
-        Owned<IJobQueue> queue = createJobQueue(req.getQueueName());
-        QueueLock lock(queue);
-        unsigned index=queue->findRank(req.getWuid());
-        if(index>0 && index<queue->ordinality())
         {
-            IJobQueueItem * item0 = queue->getItem(index);
-            IJobQueueItem * item = queue->getItem(index-1);
-            if(item && item0 && (item0->getPriority() == item->getPriority()))
-                queue->moveBefore(req.getWuid(),item->queryWUID());
+            Owned<IJobQueue> queue = createJobQueue(req.getQueueName());
+            QueueLock lock(queue);
+            unsigned index=queue->findRank(req.getWuid());
+            if(index>0 && index<queue->ordinality())
+            {
+                Owned<IJobQueueItem> item0 = queue->getItem(index);
+                Owned<IJobQueueItem> item = queue->getItem(index-1);
+                if(item && item0 && (item0->getPriority() == item->getPriority()))
+                    queue->moveBefore(req.getWuid(),item->queryWUID());
+            }
         }
-
         AccessSuccess(context, "Changed job priority %s",req.getWuid());
         clearActivityInfoCache();
         resp.setRedirectUrl("/WsSMC/");
@@ -1322,33 +1307,35 @@ bool CWsSMCEx::onMoveJobBack(IEspContext &context, IEspSMCJobRequest &req, IEspS
     {
         checkAccess(context,THORQUEUE_FEATURE,SecAccess_Full);
 
-        Owned<IJobQueue> queue = createJobQueue(req.getQueueName());
-        QueueLock lock(queue);
-        
-        unsigned index=queue->findRank(req.getWuid());
-        if(index<queue->ordinality())
         {
-            int priority0 = queue->getItem(index)->getPriority();
-            unsigned biggestIndoxInSamePriority = index;
-            unsigned nextIndex = biggestIndoxInSamePriority + 1;
-            while (nextIndex<queue->ordinality())
+            Owned<IJobQueue> queue = createJobQueue(req.getQueueName());
+            QueueLock lock(queue);
+
+            unsigned index=queue->findRank(req.getWuid());
+            if(index<queue->ordinality())
             {
-                IJobQueueItem * item = queue->getItem(nextIndex);
-                if (priority0 != item->getPriority())
+                Owned<IJobQueueItem> item = queue->getItem(index);
+                int priority0 = item->getPriority();
+                unsigned biggestIndoxInSamePriority = index;
+                unsigned nextIndex = biggestIndoxInSamePriority + 1;
+                while (nextIndex<queue->ordinality())
                 {
-                    break;
+                    item.setown(queue->getItem(nextIndex));
+                    if (priority0 != item->getPriority())
+                    {
+                        break;
+                    }
+                    biggestIndoxInSamePriority = nextIndex;
+                    nextIndex++;
                 }
-                biggestIndoxInSamePriority = nextIndex;
-                nextIndex++;
-            }
 
-            if (biggestIndoxInSamePriority != index)
-            {
-                IJobQueueItem * item = queue->getItem(biggestIndoxInSamePriority);
-                queue->moveAfter(req.getWuid(),item->queryWUID());
+                if (biggestIndoxInSamePriority != index)
+                {
+                    item.setown(queue->getItem(biggestIndoxInSamePriority));
+                    queue->moveAfter(req.getWuid(), item->queryWUID());
+                }
             }
         }
-
         AccessSuccess(context, "Changed job priority %s",req.getWuid());
         clearActivityInfoCache();
         resp.setRedirectUrl("/WsSMC/");
@@ -1366,30 +1353,33 @@ bool CWsSMCEx::onMoveJobFront(IEspContext &context, IEspSMCJobRequest &req, IEsp
     {
         checkAccess(context,THORQUEUE_FEATURE,SecAccess_Full);
 
-        Owned<IJobQueue> queue = createJobQueue(req.getQueueName());
-        QueueLock lock(queue);
-        
-        unsigned index=queue->findRank(req.getWuid());
-        if(index>0 && index<queue->ordinality())
         {
-            int priority0 = queue->getItem(index)->getPriority();
-            unsigned smallestIndoxInSamePriority = index;
-            int nextIndex = smallestIndoxInSamePriority - 1;
-            while (nextIndex >= 0)
+            Owned<IJobQueue> queue=createJobQueue(req.getQueueName());
+            QueueLock lock(queue);
+
+            unsigned index=queue->findRank(req.getWuid());
+            if (index>0 && index<queue->ordinality())
             {
-                IJobQueueItem * item = queue->getItem(nextIndex);
-                if (priority0 != item->getPriority())
+                Owned<IJobQueueItem> item = queue->getItem(index);
+                int priority0 = item->getPriority();
+                unsigned smallestIndoxInSamePriority = index;
+                int nextIndex = smallestIndoxInSamePriority - 1;
+                while (nextIndex >= 0)
                 {
-                    break;
+                    item.setown(queue->getItem(nextIndex));
+                    if (priority0 != item->getPriority())
+                    {
+                        break;
+                    }
+                    smallestIndoxInSamePriority = nextIndex;
+                    nextIndex--;
                 }
-                smallestIndoxInSamePriority = nextIndex;
-                nextIndex--;
-            }
 
-            if (smallestIndoxInSamePriority != index)
-            {
-                IJobQueueItem * item = queue->getItem(smallestIndoxInSamePriority);
-                queue->moveBefore(req.getWuid(),item->queryWUID());
+                if (smallestIndoxInSamePriority != index)
+                {
+                    item.setown(queue->getItem(smallestIndoxInSamePriority));
+                    queue->moveBefore(req.getWuid(), item->queryWUID());
+                }
             }
         }
 
@@ -1412,16 +1402,17 @@ bool CWsSMCEx::onRemoveJob(IEspContext &context, IEspSMCJobRequest &req, IEspSMC
 
         secAbortWorkUnit(req.getWuid(), *context.querySecManager(), *context.queryUser());
 
-        Owned<IJobQueue> queue = createJobQueue(req.getQueueName());
-        QueueLock lock(queue);
-        
-        unsigned index=queue->findRank(req.getWuid());
-        if(index<queue->ordinality())
         {
-            if(!queue->cancelInitiateConversation(req.getWuid()))
-                throw MakeStringException(ECLWATCH_CANNOT_DELETE_WORKUNIT,"Failed to remove the workunit %s",req.getWuid());
-        }
+            Owned<IJobQueue> queue = createJobQueue(req.getQueueName());
+            QueueLock lock(queue);
 
+            unsigned index=queue->findRank(req.getWuid());
+            if(index<queue->ordinality())
+            {
+                if(!queue->cancelInitiateConversation(req.getWuid()))
+                    throw MakeStringException(ECLWATCH_CANNOT_DELETE_WORKUNIT,"Failed to remove the workunit %s",req.getWuid());
+            }
+        }
         AccessSuccess(context, "Removed job %s",req.getWuid());
         clearActivityInfoCache();
         resp.setRedirectUrl("/WsSMC/");
@@ -1439,10 +1430,12 @@ bool CWsSMCEx::onStopQueue(IEspContext &context, IEspSMCQueueRequest &req, IEspS
     {
         checkAccess(context,THORQUEUE_FEATURE,SecAccess_Full);
 
-        StringBuffer info;
-        Owned<IJobQueue> queue = createJobQueue(req.getQueueName());
-        queue->stop(createQueueActionInfo(context, "stopped", req, info));
-        AccessSuccess(context, "Stopped queue %s",req.getCluster());
+        {
+            Owned<IJobQueue> queue = createJobQueue(req.getQueueName());
+            StringBuffer info;
+            queue->stop(createQueueActionInfo(context, "stopped", req, info));
+        }
+        AccessSuccess(context, "Stopped queue %s", req.getCluster());
         clearActivityInfoCache();
 
         resp.setRedirectUrl("/WsSMC/");
@@ -1460,10 +1453,12 @@ bool CWsSMCEx::onResumeQueue(IEspContext &context, IEspSMCQueueRequest &req, IEs
     {
         checkAccess(context,THORQUEUE_FEATURE,SecAccess_Full);
 
-        StringBuffer info;
-        Owned<IJobQueue> queue = createJobQueue(req.getQueueName());
-        queue->resume(createQueueActionInfo(context, "resumed", req, info));
-        AccessSuccess(context, "Resumed queue %s",req.getCluster());
+        {
+            Owned<IJobQueue> queue = createJobQueue(req.getQueueName());
+            StringBuffer info;
+            queue->resume(createQueueActionInfo(context, "resumed", req, info));
+        }
+        AccessSuccess(context, "Resumed queue %s", req.getCluster());
         clearActivityInfoCache();
 
         resp.setRedirectUrl("/WsSMC/");
@@ -1498,10 +1493,12 @@ bool CWsSMCEx::onPauseQueue(IEspContext &context, IEspSMCQueueRequest &req, IEsp
     {
         checkAccess(context,THORQUEUE_FEATURE,SecAccess_Full);
 
-        StringBuffer info;
-        Owned<IJobQueue> queue = createJobQueue(req.getQueueName());
-        queue->pause(createQueueActionInfo(context, "paused", req, info));
-        AccessSuccess(context, "Paused queue %s",req.getCluster());
+        {
+            Owned<IJobQueue> queue = createJobQueue(req.getQueueName());
+            StringBuffer info;
+            queue->pause(createQueueActionInfo(context, "paused", req, info));
+        }
+        AccessSuccess(context, "Paused queue %s", req.getCluster());
         clearActivityInfoCache();
 
         resp.setRedirectUrl("/WsSMC/");
@@ -1518,11 +1515,14 @@ bool CWsSMCEx::onClearQueue(IEspContext &context, IEspSMCQueueRequest &req, IEsp
     try
     {
         checkAccess(context,THORQUEUE_FEATURE,SecAccess_Full);
-        Owned<IJobQueue> queue = createJobQueue(req.getQueueName());
         {
+            Owned<IJobQueue> queue = createJobQueue(req.getQueueName());
             QueueLock lock(queue);
             for(unsigned i=0;i<queue->ordinality();i++)
-                secAbortWorkUnit(queue->getItem(i)->queryWUID(), *context.querySecManager(), *context.queryUser());
+            {
+                Owned<IJobQueueItem> item = queue->getItem(i);
+                secAbortWorkUnit(item->queryWUID(), *context.querySecManager(), *context.queryUser());
+            }
             queue->clear();
         }
         AccessSuccess(context, "Cleared queue %s",req.getCluster());
@@ -1559,10 +1559,11 @@ bool CWsSMCEx::onSetJobPriority(IEspContext &context, IEspSMCPriorityRequest &re
 
         // set job priority
         int priority = lw->getPriorityValue();
-        Owned<IJobQueue> queue = createJobQueue(req.getQueueName());
-        QueueLock lock(queue);
-        queue->changePriority(req.getWuid(),priority);
-
+        {
+            Owned<IJobQueue> queue = createJobQueue(req.getQueueName());
+            QueueLock lock(queue);
+            queue->changePriority(req.getWuid(),priority);
+        }
         clearActivityInfoCache();
         resp.setRedirectUrl("/WsSMC/");
     }

+ 1 - 1
esp/services/ws_smc/ws_smcService.hpp

@@ -195,7 +195,7 @@ private:
     void readWUsAndStateFromJobQueue(IEspContext& context, CWsSMCTargetCluster& targetCluster, BoolHash& uniqueWUIDs, IArrayOf<IEspActiveWorkunit>& aws);
     void readWUsAndStateFromJobQueue(IEspContext& context, CIArrayOf<CWsSMCTargetCluster>& targetClusters, BoolHash& uniqueWUIDs, IArrayOf<IEspActiveWorkunit>& aws);
     void setESPTargetClusters(IEspContext& context, const CIArrayOf<CWsSMCTargetCluster>& targetClusters, IArrayOf<IEspTargetCluster>& respTargetClusters);
-    ActivityInfo* createActivityInfo(IEspContext &context, IEspActivityRequest &req);
+    ActivityInfo* createActivityInfo(IEspContext &context);
     void clearActivityInfoCache();
     ActivityInfo* getActivityInfo(IEspContext &context, IEspActivityRequest &req);
     void setActivityResponse(IEspContext &context, ActivityInfo* activityInfo, IEspActivityRequest &req, IEspActivityResponse& resp);

+ 47 - 21
esp/services/ws_workunits/ws_workunitsQuerySets.cpp

@@ -1183,6 +1183,40 @@ unsigned CWsWorkunitsEx::getGraphIdsByQueryId(const char *target, const char *qu
     return graphIds.length();
 }
 
+void CWsWorkunitsEx::checkAndSetClusterQueryState(IEspContext &context, const char* cluster, const char* querySetId, IArrayOf<IEspQuerySetQuery>& queries)
+{
+    try
+    {
+        double version = context.getClientVersion();
+        if (isEmpty(cluster))
+            cluster = querySetId;
+        Owned<IPropertyTree> queriesOnCluster = getQueriesOnCluster(cluster, querySetId);
+        if (!queriesOnCluster)
+        {
+            DBGLOG("getQueriesOnCluster() returns NULL for cluster<%s> and querySetId<%s>", cluster, querySetId);
+            return;
+        }
+
+        ForEachItemIn(i, queries)
+        {
+            IEspQuerySetQuery& query = queries.item(i);
+            const char* queryId = query.getId();
+            const char* querySetId0 = query.getQuerySetId();
+            if (!queryId || !querySetId0 || !strieq(querySetId0, querySetId))
+                continue;
+
+            IArrayOf<IEspClusterQueryState> clusters;
+            addClusterQueryStates(queriesOnCluster, cluster, queryId, clusters, version);
+            query.setClusters(clusters);
+        }
+    }
+    catch(IException *e)
+    {
+        EXCLOG(e, "CWsWorkunitsEx::checkAndSetClusterQueryState: Failed to read Query State On Cluster");
+        e->Release();
+    }
+}
+
 bool CWsWorkunitsEx::onWUListQueries(IEspContext &context, IEspWUListQueriesRequest & req, IEspWUListQueriesResponse & resp)
 {
     bool descending = req.getDescending();
@@ -1249,15 +1283,17 @@ bool CWsWorkunitsEx::onWUListQueries(IEspContext &context, IEspWUListQueriesRequ
     Owned<IConstQuerySetQueryIterator> it = factory->getQuerySetQueriesSorted(sortOrder, filters, filterBuf.bufferBase(), pageStartFrom, pageSize, &cacheHint, &numberOfQueries);
     resp.setCacheHint(cacheHint);
 
+    StringArray querySetIds;
     IArrayOf<IEspQuerySetQuery> queries;
     double version = context.getClientVersion();
     ForEach(*it)
     {
         IPropertyTree &query=it->query();
+        const char *queryTarget = query.queryProp("@querySetId");
         Owned<IEspQuerySetQuery> q = createQuerySetQuery();
         q->setId(query.queryProp("@id"));
         q->setName(query.queryProp("@name"));
-        q->setQuerySetId(query.queryProp("@querySetId"));
+        q->setQuerySetId(queryTarget);
         q->setDll(query.queryProp("@dll"));
         q->setWuid(query.queryProp("@wuid"));
         q->setActivated(query.getPropBool("@activated", false));
@@ -1282,28 +1318,18 @@ bool CWsWorkunitsEx::onWUListQueries(IEspContext &context, IEspWUListQueriesRequ
             q->setIsLibrary(query.getPropBool("@isLibrary"));
         }
 
-        try
-        {
-            const char* cluster = clusterReq;
-            const char* querySetId = query.queryProp("@querySetId");
-            if (isEmpty(cluster))
-                cluster = querySetId;
-            Owned<IPropertyTree> queriesOnCluster = getQueriesOnCluster(cluster, querySetId);
-            if (queriesOnCluster)
-            {
-                IArrayOf<IEspClusterQueryState> clusters;
-                addClusterQueryStates(queriesOnCluster, cluster, query.queryProp("@id"), clusters, version);
-                q->setClusters(clusters);
-            }
-        }
-        catch(IException *e)
-        {
-            StringBuffer err;
-            DBGLOG("Get exception in WUListQueries: %s", e->errorMessage(err.append(e->errorCode()).append(' ')).str());
-            e->Release();
-        }
+        if (!querySetIds.contains(queryTarget))
+            querySetIds.append(queryTarget);
         queries.append(*q.getClear());
     }
+
+    ForEachItemIn(i, querySetIds)
+    {
+        const char* querySetId = querySetIds.item(i);
+        if(querySetId && *querySetId)
+            checkAndSetClusterQueryState(context, clusterReq, querySetId, queries);
+    }
+
     resp.setQuerysetQueries(queries);
     resp.setNumberOfQueries(numberOfQueries);
 

+ 1 - 0
esp/services/ws_workunits/ws_workunitsService.hpp

@@ -179,6 +179,7 @@ public:
     unsigned getGraphIdsByQueryId(const char *target, const char *queryId, StringArray& graphIds);
     bool getQueryFiles(const char* query, const char* target, StringArray& logicalFiles, IArrayOf<IEspQuerySuperFile> *superFiles);
     void getGraphsByQueryId(const char *target, const char *queryId, const char *graphName, const char *subGraphId, IArrayOf<IEspECLGraphEx>& ECLGraphs);
+    void checkAndSetClusterQueryState(IEspContext &context, const char* cluster, const char* querySetId, IArrayOf<IEspQuerySetQuery>& queries);
 
     bool onWUQuery(IEspContext &context, IEspWUQueryRequest &req, IEspWUQueryResponse &resp);
     bool onWUPublishWorkunit(IEspContext &context, IEspWUPublishWorkunitRequest & req, IEspWUPublishWorkunitResponse & resp);

+ 3 - 2
roxie/ccd/ccddali.cpp

@@ -251,6 +251,8 @@ private:
     static void writeCache(const char *foundLoc, const char *newLoc, IPropertyTree *val)
     {
         CriticalBlock b(cacheCrit);
+        if (!cache)
+            initCache();
         cache->removeProp(foundLoc);
         if (val)
             cache->addPropTree(newLoc, LINK(val));
@@ -692,12 +694,11 @@ public:
                     serverStatus = new CSDSServerStatus("RoxieServer");
                     serverStatus->queryProperties()->setProp("@cluster", roxieName.str());
                     serverStatus->commitProperties();
-                    initCache();
+                    isConnected = true; // Make sure this is set before the onReconnect calls, so that they refresh with info from Dali rather than from cache
                     ForEachItemIn(idx, watchers)
                     {
                         watchers.item(idx).onReconnect();
                     }
-                    isConnected = true;
                 }
                 catch(IException *e)
                 {

+ 1 - 1
system/jlib/jdebug.cpp

@@ -376,7 +376,7 @@ double getCycleToNanoScale()
 
 void display_time(const char *title, cycle_t diff)
 {
-    DBGLOG("Time taken for %s: %"I64F"d cycles (%"I64F"dM) = %"I64F"d msec\n", title, diff, diff/1000000, cycle_to_nanosec(diff)/1000000);
+    DBGLOG("Time taken for %s: %"I64F"d cycles (%"I64F"dM) = %"I64F"d msec", title, diff, diff/1000000, cycle_to_nanosec(diff)/1000000);
 }
 
 TimeSection::TimeSection(const char * _title) : title(_title)

File diff suppressed because it is too large
+ 11 - 0
testing/regress/ecl/key/sqidxlimit.xml


+ 14 - 0
testing/regress/ecl/key/sqidxlimit2.xml

@@ -0,0 +1,14 @@
+<Dataset name='Result 1'>
+ <Row><cnt>3</cnt></Row>
+ <Row><cnt>1</cnt></Row>
+ <Row><cnt>0</cnt></Row>
+ <Row><cnt>0</cnt></Row>
+</Dataset>
+<Dataset name='Result 2'>
+ <Row><cnt>1</cnt></Row>
+ <Row><cnt>1</cnt></Row>
+ <Row><cnt>0</cnt></Row>
+ <Row><cnt>6</cnt></Row>
+ <Row><cnt>1</cnt></Row>
+ <Row><cnt>0</cnt></Row>
+</Dataset>

+ 48 - 0
testing/regress/ecl/sqidxlimit.ecl

@@ -0,0 +1,48 @@
+/*##############################################################################
+
+    HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems.
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+############################################################################## */
+
+#option ('globalAutoHoist', false);
+
+//Thor doesn't currently support catch correctly, or in a child query, so disable.
+//nothor
+
+import $.setup;
+sq := setup.sq('hthor');
+
+
+filtered(STRING name, unsigned klimit, unsigned slimit) := FUNCTION
+    f := sq.SimplePersonBookIndex(surname = name);
+    lim := LIMIT(f, klimit, KEYED);
+    x := LIMIT(lim, slimit);
+    RETURN CATCH(x, SKIP);
+END;
+
+
+ds1 := DATASET([
+            {'Halliday', 4, 3},
+            {'Halliday', 3, 2}
+            ], { string name, unsigned klimit, unsigned slimit });
+
+p1 := TABLE(ds1, { cnt := COUNT(NOFOLD(filtered(name, klimit, slimit))) });
+
+
+
+sequential(
+output(filtered('Halliday', 4, 3));
+output(filtered('Halliday', 4, 2));
+output(p1);
+);

+ 58 - 0
testing/regress/ecl/sqidxlimit2.ecl

@@ -0,0 +1,58 @@
+/*##############################################################################
+
+    HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems.
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+############################################################################## */
+
+#option ('globalAutoHoist', false);
+
+import $.setup;
+sq := setup.sq('hthor');
+
+
+ds2 := DATASET([
+            {'Halliday', 4},
+            {'Halliday', 1},
+            {'Halliday', 0}, // should be empty
+            {'Zingo', 2}
+            ], { string name, unsigned climit });
+
+filtered2(STRING name, unsigned climit) := FUNCTION
+    f := sq.SimplePersonBookIndex(surname = name);
+    RETURN CHOOSEN(f, climit);
+END;
+
+p2 := TABLE(ds2, { cnt := COUNT(NOFOLD(filtered2(name, climit))) });
+
+ds3 := DATASET([
+            {'Harper Lee', 4},
+            {'Harper Lee', 1},
+            {'Harper Lee', 0}, // should be empty
+            {'Various', 1000},
+            {'Various', 1},
+            {'Zingo', 2}
+            ], { string name, unsigned climit });
+
+filtered3(STRING searchname, unsigned climit) := FUNCTION
+    f := sq.SimplePersonBookIndex.books(author = searchname);
+    RETURN CHOOSEN(f, climit);
+END;
+
+p3 := TABLE(ds3, { cnt := COUNT(NOFOLD(filtered3(name, climit))) });
+
+
+sequential(
+output(p2);
+output(p3);
+);

+ 10 - 7
thorlcr/activities/hashdistrib/thhashdistribslave.cpp

@@ -2393,7 +2393,7 @@ public:
         else
             return htRows->clear();
     }
-    bool spillHashTable(); // returns true if freed mem
+    bool spillHashTable(bool critical); // returns true if freed mem
     bool flush(bool critical);
     bool rehash();
     void close()
@@ -2507,9 +2507,7 @@ public:
                 // spill whole bucket unless last
                 // The one left, will be last bucket standing and grown to fill mem
                 // it is still useful to use as much as poss. of remaining bucket HT as filter
-                if (bucket->spillHashTable())
-                    return true;
-                else if (critical && bucket->clearHashTable(true))
+                if (bucket->spillHashTable(critical))
                     return true;
             }
         }
@@ -2878,12 +2876,17 @@ void CBucket::doSpillHashTable()
     }
 }
 
-bool CBucket::spillHashTable()
+bool CBucket::spillHashTable(bool critical)
 {
     CriticalBlock b(lock);
     rowidx_t removeN = htRows->queryHtElements();
-    if (0 == removeN || spilt) // NB: if split, will be handled by CBucket on different priority
+    if (spilt) // NB: if split, will be handled by CBucket on different priority
         return false; // signal nothing to spill
+    else if (0 == removeN)
+    {
+        if (!critical || !clearHashTable(true))
+            return false; // signal nothing to spill
+    }
     doSpillHashTable();
     ActPrintLog(&owner, "Spilt bucket %d - %d elements of hash table", bucketN, removeN);
     return true;
@@ -2900,7 +2903,7 @@ bool CBucket::flush(bool critical)
         {
             if (clearHashTable(critical))
             {
-                PROGLOG("Flushed(%s) bucket %d - %d elements", critical?"(critical)":"", queryBucketNumber(), count);
+                PROGLOG("Flushed%s bucket %d - %d elements", critical?"(critical)":"", queryBucketNumber(), count);
                 return true;
             }
         }

+ 3 - 1
thorlcr/activities/lookupjoin/thlookupjoinslave.cpp

@@ -1214,7 +1214,8 @@ public:
     virtual bool addLocalRHSRow(CThorSpillableRowArray &localRhsRows, const void *row)
     {
         LinkThorRow(row);
-        localRhsRows.append(row);
+        if (!localRhsRows.append(row))
+            throw MakeActivityException(this, 0, "Out of memory: Cannot append local rhs row");
         return true;
     }
 // ISmartBufferNotify
@@ -1337,6 +1338,7 @@ protected:
     inline void setBroadcastingSpilt(bool tf) { atomic_set(&spiltBroadcastingRHS, (int)tf); }
     rowidx_t clearNonLocalRows(CThorSpillableRowArray &rows, rowidx_t startPos)
     {
+        CThorArrayLockBlock block(rows);
         rowidx_t clearedRows = 0;
         rowidx_t numRows = rows.numCommitted();
         for (rowidx_t r=startPos; r<numRows; r++)

+ 1 - 3
thorlcr/thorutil/thbuf.cpp

@@ -1687,9 +1687,7 @@ public:
                             eos = true;
                             return NULL;
                         }
-                        const void **toRead = rows.getBlock(rowsToRead);
-                        memcpy(readRows, toRead, rowsToRead * sizeof(void *));
-                        rows.noteSpilled(rowsToRead);
+                        rows.readBlock(readRows, rowsToRead);
                         rowPos = 0;
                         if (writersBlocked)
                         {

+ 14 - 6
thorlcr/thorutil/thmem.cpp

@@ -195,7 +195,6 @@ protected:
         spillFile.setown(createIFile(tempname.str()));
 
         rows.save(*spillFile, useCompression); // saves committed rows
-        rows.noteSpilled(numRows);
         return true;
     }
 
@@ -365,9 +364,7 @@ public:
             if (fetch >= granularity)
                 fetch = granularity;
             // consume 'fetch' rows
-            const void **toRead = rows.getBlock(fetch);
-            memcpy(readRows, toRead, fetch * sizeof(void *));
-            rows.noteSpilled(fetch);
+            rows.readBlock(readRows, fetch);
             numReadRows = fetch;
             pos = 0;
         }
@@ -1148,6 +1145,7 @@ void CThorSpillableRowArray::clearRows()
 
 void CThorSpillableRowArray::compact()
 {
+    CThorArrayLockBlock block(*this);
     assertex(0 == firstRow && numRows == commitRows);
     CThorExpandingRowArray::compact();
     commitRows = numRows;
@@ -1234,6 +1232,7 @@ rowidx_t CThorSpillableRowArray::save(IFile &iFile, bool useCompression)
         rows[i] = NULL;
     }
     writer->flush();
+    firstRow += n;
     offset_t bytesWritten = writer->getPosition();
     writer.clear();
     ActPrintLog(&activity, "CThorSpillableRowArray::save done, bytes = %"I64F"d", (__int64)bytesWritten);
@@ -1308,12 +1307,21 @@ void CThorSpillableRowArray::swap(CThorSpillableRowArray &other)
     commitRows = otherCommitRows;
 }
 
+void CThorSpillableRowArray::readBlock(const void **outRows, rowidx_t readRows)
+{
+    CThorArrayLockBlock block(*this);
+    dbgassertex(firstRow + readRows <= commitRows);
+    memcpy(outRows, rows + firstRow, readRows*sizeof(void *));
+    firstRow += readRows;
+}
+
 void CThorSpillableRowArray::transferRowsCopy(const void **outRows, bool takeOwnership)
 {
+    CThorArrayLockBlock block(*this);
     if (0 == numRows)
         return;
     assertex(numRows == commitRows);
-    memcpy(outRows, rows, numRows*sizeof(void **));
+    memcpy(outRows, rows, numRows*sizeof(void *));
     if (takeOwnership)
         firstRow = commitRows = numRows = 0;
     else
@@ -1360,6 +1368,7 @@ protected:
 
     bool spillRows()
     {
+        //This must only be called while a lock is held on spillableRows()
         rowidx_t numRows = spillableRows.numCommitted();
         if (numRows == 0)
             return false;
@@ -1378,7 +1387,6 @@ protected:
         Owned<IFile> iFile = createIFile(tempname.str());
         spillFiles.append(new CFileOwner(iFile.getLink()));
         spillableRows.save(*iFile, activity.getOptBool(THOROPT_COMPRESS_SPILLS, true)); // saves committed rows
-        spillableRows.noteSpilled(numRows);
 
         ++overflowCount;
 

+ 10 - 15
thorlcr/thorutil/thmem.hpp

@@ -408,11 +408,11 @@ public:
     void unregisterWriteCallback(IWritePosCallback &cb);
     inline void setAllowNulls(bool b) { CThorExpandingRowArray::setAllowNulls(b); }
     void kill();
-    void clearRows();
     void compact();
     void flush();
-    inline bool append(const void *row)
+    inline bool append(const void *row) __attribute__((warn_unused_result))
     {
+        //GH->JCS Should this really be inline?
         assertex(row || allowNulls);
         if (numRows >= maxRows)
         {
@@ -430,36 +430,26 @@ public:
     }
     bool appendRows(CThorExpandingRowArray &inRows, bool takeOwnership);
 
-    //The following can be accessed from the reader without any need to lock
+    //The following must either be accessed within a lock, or when no rows can be appended,
+    //(otherwise flush() might move all the rows, invalidating the indexes - or for query() the row)
     inline const void *query(rowidx_t i) const
     {
-        CThorArrayLockBlock block(*this);
         return CThorExpandingRowArray::query(i);
     }
     inline const void *get(rowidx_t i) const
     {
-        CThorArrayLockBlock block(*this);
         return CThorExpandingRowArray::get(i);
     }
     inline const void *getClear(rowidx_t i)
     {
-        CThorArrayLockBlock block(*this);
         return CThorExpandingRowArray::getClear(i);
     }
 
     //A thread calling the following functions must own the lock, or guarantee no other thread will access
     void sort(ICompare & compare, unsigned maxcores);
     rowidx_t save(IFile &file, bool useCompression);
-    const void **getBlock(rowidx_t readRows);
-    inline void noteSpilled(rowidx_t spilledRows)
-    {
-        firstRow += spilledRows;
-    }
-
-    //The block returned is only valid until the critical section is released
 
-    inline rowidx_t firstCommitted() const { return firstRow; }
-    inline rowidx_t numCommitted() const { return commitRows - firstRow; }
+    inline rowidx_t numCommitted() const { return commitRows - firstRow; } //MORE::Not convinced this is very safe!
 
 // access to
     void swap(CThorSpillableRowArray &src);
@@ -489,11 +479,16 @@ public:
     void deserializeRow(IRowDeserializerSource &in) { CThorExpandingRowArray::deserializeRow(in); }
     bool ensure(rowidx_t requiredRows) { return CThorExpandingRowArray::ensure(requiredRows); }
     void transferRowsCopy(const void **outRows, bool takeOwnership);
+    void readBlock(const void **outRows, rowidx_t readRows);
 
     virtual IThorArrayLock &queryLock() { return *this; }
 // IThorArrayLock
     virtual void lock() const { cs.enter(); }
     virtual void unlock() const { cs.leave(); }
+
+private:
+    void clearRows();
+    const void **getBlock(rowidx_t readRows);
 };