Browse Source

Merge remote-tracking branch 'origin/candidate-3.10.x'

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 12 years ago
parent
commit
7a757a0c64

+ 3 - 3
common/roxiemanager/roxiequerymanager.cpp

@@ -139,7 +139,7 @@ private:
             IPropertyTree *pkgInfo = wuProcessor->queryPackageInfo();
             StringBuffer newQueryId;
             const char *qsName = resolveQuerySetName(querySetName);
-            addQueryToQuerySet(wu, qsName, queryName.str(), pkgInfo, activateOption, newQueryId);
+            addQueryToQuerySet(wu, qsName, queryName.str(), pkgInfo, activateOption, newQueryId, NULL);
 
             const char *queryComment = processingInfo.queryComment();
             if (queryComment)
@@ -441,7 +441,7 @@ public:
     {
         try
         {
-            setSuspendQuerySetQuery(resolveQuerySetName(querySetName), id, true);
+            setSuspendQuerySetQuery(resolveQuerySetName(querySetName), id, true, NULL);
             status.s.appendf("successfully suspended query %s", id);
         }
         catch(IException *e)
@@ -461,7 +461,7 @@ public:
     {
         try
         {
-            setSuspendQuerySetQuery(resolveQuerySetName(querySetName), id, false);
+            setSuspendQuerySetQuery(resolveQuerySetName(querySetName), id, false, NULL);
             status.s.appendf("successfully unsuspended query %s", id);
         }
         catch(IException *e)

+ 18 - 7
common/workunit/workunit.cpp

@@ -9033,7 +9033,7 @@ static void clearAliases(IPropertyTree * queryRegistry, const char * id)
     }
 }
 
-IPropertyTree * addNamedQuery(IPropertyTree * queryRegistry, const char * name, const char * wuid, const char * dll, bool library)
+IPropertyTree * addNamedQuery(IPropertyTree * queryRegistry, const char * name, const char * wuid, const char * dll, bool library, const char *userid)
 {
     StringBuffer lcName(name);
     lcName.toLowerCase();
@@ -9063,6 +9063,8 @@ IPropertyTree * addNamedQuery(IPropertyTree * queryRegistry, const char * name,
     newEntry->setPropInt("@seq", seq);
     if (library)
         newEntry->setPropBool("@isLibrary", true);
+    if (userid && *userid)
+        newEntry->setProp("@publishedBy", userid);
     return queryRegistry->addPropTree("Query", newEntry);
 }
 
@@ -9163,7 +9165,7 @@ extern WORKUNIT_API IPropertyTree * resolveQueryAlias(const char *queryset, cons
     return resolveQueryAlias(queryRegistry, alias);
 }
 
-void setQuerySuspendedState(IPropertyTree * queryRegistry, const char *id, bool suspend)
+void setQuerySuspendedState(IPropertyTree * queryRegistry, const char *id, bool suspend, const char *userid)
 {
     StringBuffer lcId(id);
     lcId.toLowerCase();
@@ -9173,10 +9175,19 @@ void setQuerySuspendedState(IPropertyTree * queryRegistry, const char *id, bool
     IPropertyTree *tree = queryRegistry->queryPropTree(xpath);
     if (tree)
     {
+        if (tree->getPropBool("@suspended", false) == suspend)
+            return;
         if (suspend)
+        {
             tree->addPropBool("@suspended", true);
+            if (userid && *userid)
+                tree->addProp("@suspendedBy", userid);
+        }
         else
+        {
             tree->removeProp("@suspended");
+            tree->removeProp("@suspendedBy");
+        }
     }
     else
         throw MakeStringException((suspend)? QUERRREG_SUSPEND : QUERRREG_UNSUSPEND, "Modifying query suspended state failed.  Could not find query %s", id);
@@ -9303,7 +9314,7 @@ extern WORKUNIT_API IPropertyTree * getPackageSetRegistry(const char * wsEclId,
     return conn->getRoot();
 }
 
-void addQueryToQuerySet(IWorkUnit *workunit, const char *querySetName, const char *queryName, IPropertyTree *packageInfo, WUQueryActivationOptions activateOption, StringBuffer &newQueryId)
+void addQueryToQuerySet(IWorkUnit *workunit, const char *querySetName, const char *queryName, IPropertyTree *packageInfo, WUQueryActivationOptions activateOption, StringBuffer &newQueryId, const char *userid)
 {
     StringBuffer cleanQueryName;
     appendUtf8XmlName(cleanQueryName, strlen(queryName), queryName);
@@ -9337,7 +9348,7 @@ void addQueryToQuerySet(IWorkUnit *workunit, const char *querySetName, const cha
         }
     }
 
-    IPropertyTree *newEntry = addNamedQuery(queryRegistry, cleanQueryName, wuid.str(), dllName.str(), isLibrary(workunit));
+    IPropertyTree *newEntry = addNamedQuery(queryRegistry, cleanQueryName, wuid.str(), dllName.str(), isLibrary(workunit), userid);
     newQueryId.append(newEntry->queryProp("@id"));
     workunit->setIsQueryService(true); //will check querysets before delete
     workunit->commit();
@@ -9350,7 +9361,7 @@ void addQueryToQuerySet(IWorkUnit *workunit, const char *querySetName, const cha
         if (aliasTree)
         {
             if (activateOption == ACTIVATE_SUSPEND_PREVIOUS)
-                setQuerySuspendedState(queryRegistry, cleanQueryName, true);
+                setQuerySuspendedState(queryRegistry, cleanQueryName, true, userid);
             else 
                 removeNamedQuery(queryRegistry, aliasTree->queryProp("@id"));
         }
@@ -9374,10 +9385,10 @@ void addQuerySetAlias(const char *querySetName, const char *alias, const char *i
     setQueryAlias(queryRegistry, alias, id);
 }
 
-void setSuspendQuerySetQuery(const char *querySetName, const char *id, bool suspend)
+void setSuspendQuerySetQuery(const char *querySetName, const char *id, bool suspend, const char *userid)
 {
     Owned<IPropertyTree> queryRegistry = getQueryRegistry(querySetName, true);
-    setQuerySuspendedState(queryRegistry, id, suspend);
+    setQuerySuspendedState(queryRegistry, id, suspend, userid);
 }
 
 void deleteQuerySetQuery(const char *querySetName, const char *id)

+ 4 - 4
common/workunit/workunit.hpp

@@ -1211,7 +1211,7 @@ enum WUQueryActivationOptions
 };
 
 
-extern WORKUNIT_API IPropertyTree * addNamedQuery(IPropertyTree * queryRegistry, const char * name, const char * wuid, const char * dll, bool library);       // result not linked
+extern WORKUNIT_API IPropertyTree * addNamedQuery(IPropertyTree * queryRegistry, const char * name, const char * wuid, const char * dll, bool library, const char *userid);       // result not linked
 extern WORKUNIT_API void removeNamedQuery(IPropertyTree * queryRegistry, const char * id);
 extern WORKUNIT_API void removeWuidFromNamedQueries(IPropertyTree * queryRegistry, const char * wuid);
 extern WORKUNIT_API void removeDllFromNamedQueries(IPropertyTree * queryRegistry, const char * dll);
@@ -1227,16 +1227,16 @@ extern WORKUNIT_API IPropertyTree * getQueryRegistryRoot();
 
 extern WORKUNIT_API void setQueryCommentForNamedQuery(IPropertyTree * queryRegistry, const char *id, const char *queryComment);
 
-extern WORKUNIT_API void setQuerySuspendedState(IPropertyTree * queryRegistry, const char * name, bool suspend);
+extern WORKUNIT_API void setQuerySuspendedState(IPropertyTree * queryRegistry, const char * name, bool suspend, const char *userid);
 
 extern WORKUNIT_API IPropertyTree * addNamedPackageSet(IPropertyTree * packageRegistry, const char * name, IPropertyTree *packageInfo, bool overWrite);     // result not linked
 extern WORKUNIT_API void removeNamedPackage(IPropertyTree * packageRegistry, const char * id);
 extern WORKUNIT_API IPropertyTree * getPackageSetRegistry(const char * wsEclId, bool readonly);
 
-extern WORKUNIT_API void addQueryToQuerySet(IWorkUnit *workunit, const char *querySetName, const char *queryName, IPropertyTree *packageInfo, WUQueryActivationOptions activateOption, StringBuffer &newQueryId);
+extern WORKUNIT_API void addQueryToQuerySet(IWorkUnit *workunit, const char *querySetName, const char *queryName, IPropertyTree *packageInfo, WUQueryActivationOptions activateOption, StringBuffer &newQueryId, const char *userid);
 extern WORKUNIT_API bool removeQuerySetAlias(const char *querySetName, const char *alias);
 extern WORKUNIT_API void addQuerySetAlias(const char *querySetName, const char *alias, const char *id);
-extern WORKUNIT_API void setSuspendQuerySetQuery(const char *querySetName, const char *id, bool suspend);
+extern WORKUNIT_API void setSuspendQuerySetQuery(const char *querySetName, const char *id, bool suspend, const char *userid);
 extern WORKUNIT_API void deleteQuerySetQuery(const char *querySetName, const char *id);
 extern WORKUNIT_API const char *queryIdFromQuerySetWuid(const char *querySetName, const char *wuid, IStringVal &id);
 extern WORKUNIT_API void removeQuerySetAliasesFromNamedQuery(const char *querySetName, const char * id);

+ 19 - 3
common/workunit/wujobq.cpp

@@ -1143,9 +1143,8 @@ public:
     }
 
 
-    unsigned copyItems(sQueueData &qd,CJobQueueContents &dest)
+    unsigned copyItemsImpl(sQueueData &qd,CJobQueueContents &dest)
     {
-        Cconnlockblock block(this,false);
         unsigned ret=0;
         StringBuffer path;
         for (unsigned i=0;;i++) {
@@ -1158,6 +1157,24 @@ public:
         return ret;
     }
 
+    unsigned copyItems(sQueueData &qd,CJobQueueContents &dest)
+    {
+        Cconnlockblock block(this,false);
+        return copyItemsImpl(qd,dest);
+    }
+
+    void copyItemsAndState(CJobQueueContents& contents, StringBuffer& state)
+    {
+        assertex(qdata);
+        Cconnlockblock block(this,false);
+        assertex(qdata->root);
+
+        copyItemsImpl(*qdata,contents);
+
+        const char *st = qdata->root->queryProp("@state");
+        if (st&&*st)
+            state.set(st);
+    }
 
     unsigned takeItems(sQueueData &qd,CJobQueueContents &dest)
     {
@@ -1956,4 +1973,3 @@ extern bool WORKUNIT_API switchWorkUnitQueue(IWorkUnit* wu, const char *cluster)
 
     return wu->switchThorQueue(cluster, &switcher);
 }
-

+ 1 - 0
common/workunit/wujobq.hpp

@@ -91,6 +91,7 @@ interface IJobQueue: extends IInterface
     virtual unsigned findRank(const char *wuid)=0;
     virtual unsigned copyItems(CJobQueueContents &dest)=0;  // takes a snapshot copy of the entire queue (returns number copied)
     virtual bool getLastDequeuedInfo(StringAttr &wuid, CDateTime &enqueuedt, int &priority)=0;
+    virtual void copyItemsAndState(CJobQueueContents& contents, StringBuffer& state)=0;
 
 
 //manipulation

+ 34 - 9
esp/eclwatch/ws_XSLT/WUQuerysetQueries.xslt

@@ -66,6 +66,25 @@
               var selectedRows = 0;
               var sortableTable = null;
 
+              function viewDetails(active) {
+                var selectedRows;
+                if (active)
+                    selectedRows = aliasDataTable.getSelectedRows();
+                else
+                    selectedRows = queryDataTable.getSelectedRows();
+                if (selectedRows.length > 1) {
+                    alert("Please select only one query for viewing Query Dertails.");
+                    return;
+                }
+
+                var record;
+                if (active)
+                    record = aliasDataTable.getRecord(selectedRows[0]);
+                else
+                    record = queryDataTable.getRecord(selectedRows[0]);
+                document.location.href = "/WsWorkunits/WUQueryDetails?QueryId=" + record.getData('Id') + "&QuerySet=" + querySet;
+              }
+
               function deleteQueries() {
                 actionWorkunits('Delete');
               }
@@ -169,12 +188,6 @@
                   }
               };
 
-              var formatLinkDetails = function(elCell, oRecord, oColumn, sData) {
-                  if (sData != "") {
-                      elCell.innerHTML = "<a href=\"/WsWorkunits/WUQueryDetails?QueryId=" + sData + "&QuerySet=" + querySet + "\">" + sData + "</a>";
-                  }
-              };
-
               function reloadPage()
               {
                   var clusterSelect = document.getElementById('Clusters');
@@ -196,7 +209,7 @@
               YAHOO.util.Event.addListener(window, "load", function() {
                 LoadQueries = function() {
                   var queryColumnDefs = [
-                    {key:"Id", sortable:true, resizeable:true, formatter: formatLinkDetails},
+                    {key:"Id", sortable:true, resizeable:true},
                     {key:"Name", sortable:true, resizeable:true},
                     {key:"Wuid", sortable:true, resizeable:true},
                     {key:"Dll", sortable:true, resizeable:true},
@@ -204,7 +217,7 @@
                   ];
                   if (clusterName != '')
                     queryColumnDefs = [
-                        {key:"Id", sortable:true, resizeable:true, formatter: formatLinkDetails},
+                        {key:"Id", sortable:true, resizeable:true},
                         {key:"Name", sortable:true, resizeable:true},
                         {key:"Wuid", sortable:true, resizeable:true},
                         {key:"Dll", sortable:true, resizeable:true},
@@ -228,6 +241,7 @@
 
                   queryDataTable.subscribe("rowMouseoverEvent", queryDataTable.onEventHighlightRow);   
                   queryDataTable.subscribe("rowMouseoutEvent", queryDataTable.onEventUnhighlightRow);   
+                  queryDataTable.subscribe("rowClickEvent", queryDataTable.onEventSelectRow);
 
                   return {
                     oqDS: queryDataSource,
@@ -238,7 +252,7 @@
                 LoadAliases = function() {
                   var aliasColumnDefs = [
                     {key:"Name", sortable:true, resizeable:true},
-                    {key:"Id", sortable:true, resizeable:true, formatter: formatLinkDetails}
+                    {key:"Id", sortable:true, resizeable:true}
                   ];
 
                   var aliasDataSource = new YAHOO.util.DataSource(querysetAliases);
@@ -252,6 +266,7 @@
 
                   aliasDataTable.subscribe("rowMouseoverEvent", aliasDataTable.onEventHighlightRow);   
                   aliasDataTable.subscribe("rowMouseoutEvent", aliasDataTable.onEventUnhighlightRow);   
+                  aliasDataTable.subscribe("rowClickEvent", aliasDataTable.onEventSelectRow);
 
                   return {
                     oqDS: aliasDataSource,
@@ -376,6 +391,11 @@
                                 <button type="button" name="ActivateButton" onclick="activateQueries();">Activate</button>
                             </em>
                           </span>
+                          <span id="DetailsButton1" class="yui-button yui-push-button">
+                            <em class="first-child">
+                                <button type="button" name="DetailsButton1" onclick="viewDetails(false);">Query Details</button>
+                            </em>
+                          </span>
                         </td>
                     </tr>
                 </table>
@@ -388,6 +408,11 @@
                   <button type="button" name="DeleteAliasButton" onclick="deleteAliases();">Delete</button>
                 </em>
               </span>
+              <span id="DetailsButton" class="yui-button yui-push-button">
+                <em class="first-child">
+                  <button type="button" name="DetailsButton" onclick="viewDetails(true);">Query Details</button>
+                </em>
+              </span>
             </div>
           </div>
         </div>

+ 1 - 1
esp/eclwatch/ws_XSLT/account.xslt

@@ -60,7 +60,7 @@
 <td>
 <br/>
 <br/>
-<a href="javascript:go('/ws_lnaccount/UpdateUserInput')">Change Password</a>
+<a href="javascript:go('/ws_account/UpdateUserInput')">Change Password</a>
 <br/>
 <a href="javascript:go('/WsSMC/Activity')">Home</a></td>
 </tr>

+ 71 - 55
esp/services/ws_smc/ws_smcService.cpp

@@ -206,10 +206,8 @@ bool isInWuList(IArrayOf<IEspActiveWorkunit>& aws, const char* wuid)
     return bFound;
 }
 
-void addQueuedWorkUnits(const char *queueName, IJobQueue *queue, IArrayOf<IEspActiveWorkunit> &aws, IEspContext &context, const char *serverName, const char *instanceName)
+void addQueuedWorkUnits(const char *queueName, CJobQueueContents &contents, IArrayOf<IEspActiveWorkunit> &aws, IEspContext &context, const char *serverName, const char *instanceName)
 {
-    CJobQueueContents contents;
-    queue->copyItems(contents);
     Owned<IJobQueueIterator> iter = contents.getIterator();
     unsigned count=0;
     ForEach(*iter)
@@ -240,44 +238,33 @@ void addQueuedWorkUnits(const char *queueName, IJobQueue *queue, IArrayOf<IEspAc
     }
 }
 
-const char *getQueueState(IJobQueue *queue, int runningJobsInQueue, int *colorTypePtr)
+void CWsSMCEx::getQueueState(int runningJobsInQueue, StringBuffer& queueState, BulletType& bulletType)
 {
-    int qStatus = 1;
-    const char *queueState = NULL;
-    if (queue->stopped())
-    {
-        queueState = "stopped";
-        qStatus = 3;
-    }
-    else if (queue->paused())
+    bool queuePausedOrStopped = false;
+    if ((queueState.length() > 0) && (strieq(queueState.str(),"stopped") || strieq(queueState.str(),"paused")))
+        queuePausedOrStopped = true;
+    else
+        queueState.set("running");
+
+    bulletType = bulletGreen;
+    if (NotFound == runningJobsInQueue)
     {
-        queueState = "paused";
-        qStatus = 2;
+        if (queuePausedOrStopped)
+            bulletType = bulletWhite;
+        else
+            bulletType = bulletError;
     }
-    else
-        queueState = "running";
-    if (NULL != colorTypePtr)
+    else if (runningJobsInQueue > 0)
     {
-        int &color_type = *colorTypePtr;
-        color_type = 6;
-        if (NotFound == runningJobsInQueue)
-        {
-            if (qStatus > 1)
-                color_type = 3;
-            else
-                color_type = 5;
-        }
-        else if (runningJobsInQueue > 0)
-        {
-            if (qStatus > 1)
-                color_type = 1;
-            else
-                color_type = 4;
-        }
-        else if (qStatus > 1)
-            color_type = 2;
+        if (queuePausedOrStopped)
+            bulletType = bulletOrange;
+        else
+            bulletType = bulletGreen;
     }
-    return queueState;
+    else if (queuePausedOrStopped)
+        bulletType = bulletYellow;
+
+    return;
 }
 
 bool CWsSMCEx::onActivity(IEspContext &context, IEspActivityRequest &req, IEspActivityResponse& resp)
@@ -517,16 +504,20 @@ bool CWsSMCEx::onActivity(IEspContext &context, IEspActivityRequest &req, IEspAc
                 str.clear();
                 const char *queueName = cluster.getThorQueue(str).str();
                 returnCluster->setQueueName(queueName);
+
+                StringBuffer queueState;
+                CJobQueueContents contents;
                 Owned<IJobQueue> queue = createJobQueue(queueName);
-                addQueuedWorkUnits(queueName, queue, aws, context, "ThorMaster", NULL);
+                queue->copyItemsAndState(contents, queueState);
+                addQueuedWorkUnits(queueName, contents, aws, context, "ThorMaster", NULL);
 
+                BulletType bulletType = bulletGreen;
                 int serverID = runningQueueNames.find(queueName);
                 int numRunningJobsInQueue = (NotFound != serverID) ? runningJobsInQueue[serverID] : -1;
-                int color_type;
-                const char *queueState = getQueueState(queue, numRunningJobsInQueue, &color_type);
-                returnCluster->setQueueStatus(queueState);
+                getQueueState(numRunningJobsInQueue, queueState, bulletType);
+                returnCluster->setQueueStatus(queueState.str());
                 if (version > 1.06)
-                    returnCluster->setQueueStatus2(color_type);
+                    returnCluster->setQueueStatus2(bulletType);
                 if (version > 1.10)
                     returnCluster->setClusterSize(cluster.getSize());
 
@@ -544,15 +535,18 @@ bool CWsSMCEx::onActivity(IEspContext &context, IEspActivityRequest &req, IEspAc
                     returnCluster->setQueueName(cluster.getAgentQueue(str).str());
                     str.clear();
                     const char *queueName = cluster.getAgentQueue(str).str();
+                    StringBuffer queueState;
+                    CJobQueueContents contents;
                     Owned<IJobQueue> queue = createJobQueue(queueName);
-                    addQueuedWorkUnits(queueName, queue, aws, context, "RoxieServer", NULL);
+                    queue->copyItemsAndState(contents, queueState);
+                    addQueuedWorkUnits(queueName, contents, aws, context, "RoxieServer", NULL);
 
-                    int color_type;
+                    BulletType bulletType = bulletGreen;
                     int serverID = runningQueueNames.find(queueName);
                     int numRunningJobsInQueue = (NotFound != serverID) ? runningJobsInQueue[serverID] : -1;
-                    const char *queueState = getQueueState(queue, numRunningJobsInQueue, &color_type);
-                    returnCluster->setQueueStatus(queueState);
-                    returnCluster->setQueueStatus2(color_type);
+                    getQueueState(numRunningJobsInQueue, queueState, bulletType);
+                    returnCluster->setQueueStatus(queueState.str());
+                    returnCluster->setQueueStatus2(bulletType);
                     if (version > 1.10)
                         returnCluster->setClusterSize(cluster.getSize());
 
@@ -568,15 +562,18 @@ bool CWsSMCEx::onActivity(IEspContext &context, IEspActivityRequest &req, IEspAc
                 returnCluster->setQueueName(cluster.getAgentQueue(str).str());
                 str.clear();
                 const char *queueName = cluster.getAgentQueue(str).str();
+                StringBuffer queueState;
+                CJobQueueContents contents;
                 Owned<IJobQueue> queue = createJobQueue(queueName);
-                addQueuedWorkUnits(queueName, queue, aws, context, "HThorServer", NULL);
+                queue->copyItemsAndState(contents, queueState);
+                addQueuedWorkUnits(queueName, contents, aws, context, "HThorServer", NULL);
 
-                int color_type;
+                BulletType bulletType = bulletGreen;
                 int serverID = runningQueueNames.find(queueName);
                 int numRunningJobsInQueue = (NotFound != serverID) ? runningJobsInQueue[serverID] : -1;
-                const char *queueState = getQueueState(queue, numRunningJobsInQueue, &color_type);
-                returnCluster->setQueueStatus(queueState);
-                returnCluster->setQueueStatus2(color_type);
+                getQueueState(numRunningJobsInQueue, queueState, bulletType);
+                returnCluster->setQueueStatus(queueState.str());
+                returnCluster->setQueueStatus2(bulletType);
                 HThorClusters.append(*returnCluster);
             }
         }
@@ -616,10 +613,11 @@ bool CWsSMCEx::onActivity(IEspContext &context, IEspActivityRequest &req, IEspAc
                 targetClusters->str(targetCluster);
 
                 StringBuffer queueName;
-                getClusterEclCCServerQueueName(queueName, targetCluster.str());
-                QueueWrapper queue(queueName.str());
+                StringBuffer queueState;
                 CJobQueueContents contents;
-                queue->copyItems(contents);
+                getClusterEclCCServerQueueName(queueName, targetCluster.str());
+                Owned<IJobQueue> queue = createJobQueue(queueName);
+                queue->copyItemsAndState(contents, queueState);
                 unsigned count=0;
                 Owned<IJobQueueIterator> iter = contents.getIterator();
                 ForEach(*iter) 
@@ -635,7 +633,7 @@ bool CWsSMCEx::onActivity(IEspContext &context, IEspActivityRequest &req, IEspAc
                     aws.append(*wu.getLink());
                 }
 
-                addServerJobQueue(serverJobQueues, queueName, serverName, "ECLCCserver");
+                addServerJobQueue(serverJobQueues, queueName, queueState.str(), serverName, "ECLCCserver");
             }
         }
 
@@ -778,6 +776,24 @@ void CWsSMCEx::addServerJobQueue(IArrayOf<IEspServerJobQueue>& jobQueues, const
     jobQueues.append(*jobQueue.getClear());
 }
 
+void CWsSMCEx::addServerJobQueue(IArrayOf<IEspServerJobQueue>& jobQueues, const char* queueName, const char* queueState, const char* serverName, const char* serverType)
+{
+    if (!queueName || !*queueName || !serverName || !*serverName || !serverType || !*serverType)
+        return;
+
+    Owned<IEspServerJobQueue> jobQueue = createServerJobQueue("", "");
+    jobQueue->setQueueName(queueName);
+    jobQueue->setServerName(serverName);
+    jobQueue->setServerType(serverType);
+
+    if (queueState && (strieq(queueState,"stopped") || strieq(queueState,"paused")))
+        jobQueue->setQueueStatus(queueState);
+    else
+        jobQueue->setQueueStatus("running");
+
+    jobQueues.append(*jobQueue.getClear());
+}
+
 void CWsSMCEx::addToThorClusterList(IArrayOf<IEspThorCluster>& clusters, IEspThorCluster* cluster, const char* sortBy, bool descending)
 {
     if (clusters.length() < 1)

+ 12 - 0
esp/services/ws_smc/ws_smcService.hpp

@@ -22,6 +22,16 @@
 #include "TpWrapper.hpp"
 #include "WUXMLInfo.hpp"
 
+enum BulletType
+{
+    bulletNONE = 0,
+    bulletOrange = 1,
+    bulletYellow = 2,
+    bulletWhite = 3,
+    bulletGreen = 4,
+    bulletError = 5
+};
+
 class CWsSMCEx : public CWsSMC
 {
     long m_counter;
@@ -68,6 +78,8 @@ private:
     void addToThorClusterList(IArrayOf<IEspThorCluster>& clusters, IEspThorCluster* cluster, const char* sortBy, bool descending);
     void addToRoxieClusterList(IArrayOf<IEspRoxieCluster>& clusters, IEspRoxieCluster* cluster, const char* sortBy, bool descending);
     void addServerJobQueue(IArrayOf<IEspServerJobQueue>& jobQueues, const char* queueName, const char* serverName, const char* serverType);
+    void addServerJobQueue(IArrayOf<IEspServerJobQueue>& jobQueues, const char* queueName, const char* queueState, const char* serverName, const char* serverType);
+    void getQueueState(int runningJobsInQueue, StringBuffer& queueState, BulletType& colorType);
 };
 
 

+ 5 - 5
esp/services/ws_workunits/ws_workunitsQuerySets.cpp

@@ -530,7 +530,7 @@ bool CWsWorkunitsEx::onWUPublishWorkunit(IEspContext &context, IEspWUPublishWork
 
     StringBuffer queryId;
     WUQueryActivationOptions activate = (WUQueryActivationOptions)req.getActivate();
-    addQueryToQuerySet(wu, target.str(), queryName.str(), NULL, activate, queryId);
+    addQueryToQuerySet(wu, target.str(), queryName.str(), NULL, activate, queryId, context.queryUserId());
     if (req.getMemoryLimit() || !req.getTimeLimit_isNull() || !req.getWarnTimeLimit_isNull() || req.getPriority() || req.getComment())
     {
         Owned<IPropertyTree> queryTree = getQueryById(target.str(), queryId, false);
@@ -1065,13 +1065,13 @@ bool CWsWorkunitsEx::onWUQuerysetQueryAction(IEspContext &context, IEspWUQuerySe
             switch (req.getAction())
             {
                 case CQuerySetQueryActionTypes_ToggleSuspend:
-                    setQuerySuspendedState(queryset, id, !queryIds->getPropBool(id));
+                    setQuerySuspendedState(queryset, id, !queryIds->getPropBool(id), context.queryUserId());
                     break;
                 case CQuerySetQueryActionTypes_Suspend:
-                    setQuerySuspendedState(queryset, id, true);
+                    setQuerySuspendedState(queryset, id, true, context.queryUserId());
                     break;
                 case CQuerySetQueryActionTypes_Unsuspend:
-                    setQuerySuspendedState(queryset, id, false);
+                    setQuerySuspendedState(queryset, id, false, NULL);
                     break;
                 case CQuerySetQueryActionTypes_Activate:
                 {
@@ -1236,7 +1236,7 @@ bool CWsWorkunitsEx::onWUQuerysetCopyQuery(IEspContext &context, IEspWUQuerySetC
 
     StringBuffer targetQueryId;
     WUQueryActivationOptions activate = (WUQueryActivationOptions)req.getActivate();
-    addQueryToQuerySet(wu, target, queryName.str(), NULL, activate, targetQueryId);
+    addQueryToQuerySet(wu, target, queryName.str(), NULL, activate, targetQueryId, context.queryUserId());
     if (req.getMemoryLimit() || !req.getTimeLimit_isNull() || ! req.getWarnTimeLimit_isNull() || req.getPriority())
     {
         Owned<IPropertyTree> queryTree = getQueryById(target, targetQueryId, false);

+ 17 - 13
roxie/ccd/ccdstate.cpp

@@ -1286,6 +1286,12 @@ private:
             loadStandaloneQuery(standAloneDll, numChannels, "roxie");
         else
         {
+            // We want to kill the old packages, but not until we have created the new ones (i.e. at the end of this function)
+            // So that the query/dll caching will work for anything that is not affected by the changes
+            CIArrayOf<CRoxieQueryPackageManager> oldQueryPackages;
+            appendArray(oldQueryPackages, allQueryPackages);   // Note - this LOOKS unused but do not delete (see above)
+            allQueryPackages.kill();
+            stateHash = 0;
             ForEachItemIn(idx, allQuerySetNames)
             {
                 createQueryPackageManagers(numChannels, allQuerySetNames.item(idx));
@@ -1304,12 +1310,14 @@ private:
             ForEach(*ids)
             {
                 const char *id = ids->query().queryProp("@id");
-                if (!id)
-                    throw MakeStringException(ROXIE_MISSING_PARAMS, "Query name missing");
-                Owned<IQueryFactory> query = getQuery(id, logctx);
-                if (!query)
-                    throw MakeStringException(ROXIE_MISSING_PARAMS, "Query %s not found", id);
-                query->getQueryInfo(reply, full, logctx);
+                if (id)
+                {
+                    Owned<IQueryFactory> query = getQuery(id, logctx);
+                    if (query)
+                        query->getQueryInfo(reply, full, logctx);
+                    else
+                        reply.appendf(" <Query id=\"%s\" error=\"Query not found\"/>\n", id);
+                }
             }
         }
         else
@@ -2158,16 +2166,11 @@ private:
     {
         // Called from reload inside write lock block
         unsubscribe();
-        // We want to kill the old packages, but not until we have created the new ones (i.e. at the end of this function)
-        // So that the query/dll caching will work for anything that is not affected by the changes
-        CIArrayOf<CRoxieQueryPackageManager> oldQueryPackages;
-        appendArray(oldQueryPackages, allQueryPackages);
-        allQueryPackages.kill();
-        stateHash = 0;
 
         Owned<IDaliPackageWatcher> notifier = daliHelper->getPackageSetsSubscription(this);
         if (notifier)
             notifiers.append(*notifier.getClear());
+        bool packageLoaded = false;
         Owned<IPropertyTree> packageTree = daliHelper->getPackageSets();
         Owned<IPropertyTreeIterator> packageSets = packageTree->getElements("PackageSet");
         ForEach(*packageSets)
@@ -2198,6 +2201,7 @@ private:
                             Owned<IPropertyTree> xml = daliHelper->getPackageMap(packageMapId);
                             packageMap->load(xml);
                             createQueryPackageManager(numChannels, packageMap.getLink(), querySet);
+                            packageLoaded = true;
                             notifiers.append(*daliHelper->getPackageMapSubscription(packageMapId, this));
                         }
                         catch (IException *E)
@@ -2211,7 +2215,7 @@ private:
                 }
             }
         }
-        if (!allQueryPackages.length())
+        if (!packageLoaded)
         {
             if (traceLevel)
                 DBGLOG("Loading empty package for QuerySet %s", querySet);

+ 1 - 2
thorlcr/activities/indexread/thindexreadslave.cpp

@@ -1006,8 +1006,7 @@ public:
     virtual void abort()
     {
         CIndexReadSlaveBase::abort();
-        if (receiving)
-            cancelReceiveMsg(0, mpTag);
+        cancelReceiveMsg(0, mpTag);
     }
 };
 

+ 38 - 9
thorlcr/activities/lookupjoin/thlookupjoinslave.cpp

@@ -88,7 +88,7 @@ class CBroadcaster : public CSimpleInterface
     IBCastReceive *recvInterface;
     Semaphore allDoneSem;
     CriticalSection allDoneLock, bcastOtherCrit;
-    bool allDone, allDoneWaiting, allRequestStop, stopping;
+    bool allDone, allDoneWaiting, allRequestStop, stopping, stopRecv;
     Owned<IBitSet> slavesDone, slavesStopping;
 
     class CRecv : implements IThreaded
@@ -100,6 +100,15 @@ class CBroadcaster : public CSimpleInterface
         {
         }
         void start() { threaded.start(); }
+        void stop()
+        {
+            broadcaster.cancelReceive();
+            threaded.join();
+        }
+        void wait()
+        {
+            threaded.join();
+        }
     // IThreaded
         virtual void main() { broadcaster.recvLoop(); }
     } receiver;
@@ -132,6 +141,12 @@ class CBroadcaster : public CSimpleInterface
             }
             threaded.join();
         }
+        void wait()
+        {
+            ActPrintLog(&broadcaster.activity, "CSend::wait(), messages to send: %d", broadcastQueue.ordinality());
+            addBlock(NULL);
+            threaded.join();
+        }
     // IThreaded
         virtual void main()
         {
@@ -142,6 +157,7 @@ class CBroadcaster : public CSimpleInterface
                     break;
                 broadcaster.broadcastToOthers(sendItem);
             }
+            ActPrintLog(&broadcaster.activity, "Sender stopped");
         }
     } sender;
 
@@ -205,7 +221,7 @@ class CBroadcaster : public CSimpleInterface
                 if (0 == sendRecv) // send
                 {
 #ifdef _TRACEBROADCAST
-                    ActPrintLog(&activity, "Broadcast node %d Sending to node %d size %d", myNode, t, sendLen);
+                    ActPrintLog(&activity, "Broadcast node %d Sending to node %d, origin %d, size %d, code=%d", myNode, t, origin, sendLen, (unsigned)sendItem->queryCode());
 #endif
                     CMessageBuffer &msg = sendItem->queryMsg();
                     msg.setReplyTag(rt); // simulate sendRecv
@@ -214,7 +230,7 @@ class CBroadcaster : public CSimpleInterface
                 else // recv reply
                 {
 #ifdef _TRACEBROADCAST
-                    ActPrintLog(&activity, "Broadcast node %d Sent to node %d size %d received ack", myNode, t, sendLen);
+                    ActPrintLog(&activity, "Broadcast node %d Sent to node %d, origin %d, size %d, code=%d - received ack", myNode, t, origin, sendLen, (unsigned)sendItem->queryCode());
 #endif
                     if (!activity.receiveMsg(replyMsg, t, rt))
                         break;
@@ -223,10 +239,15 @@ class CBroadcaster : public CSimpleInterface
         }
     }
     // called by CRecv thread
+    void cancelReceive()
+    {
+        stopRecv = true;
+        activity.cancelReceiveMsg(RANK_ALL, mpTag);
+    }
     void recvLoop()
     {
         CMessageBuffer msg;
-        while (!activity.queryAbortSoon())
+        while (!stopRecv && !activity.queryAbortSoon())
         {
             rank_t sendRank;
             if (!activity.receiveMsg(msg, RANK_ALL, mpTag, &sendRank))
@@ -235,7 +256,7 @@ class CBroadcaster : public CSimpleInterface
             CMessageBuffer ackMsg;
             Owned<CSendItem> sendItem = new CSendItem(msg);
 #ifdef _TRACEBROADCAST
-            ActPrintLog(&activity, "Broadcast node %d received from node %d size %d, code=%d", myNode, (unsigned)sendRank, sendItem->length(), (unsigned)sendItem->queryCode());
+            ActPrintLog(&activity, "Broadcast node %d received from node %d, origin node %d, size %d, code=%d", myNode, (unsigned)sendRank, sendItem->queryOrigin(), sendItem->length(), (unsigned)sendItem->queryCode());
 #endif
             comm.send(ackMsg, sendRank, replyTag); // send ack
             sender.addBlock(sendItem.getLink());
@@ -248,15 +269,20 @@ class CBroadcaster : public CSimpleInterface
                     if (slaveStop(sendItem->queryOrigin()-1) || allDone)
                     {
                         recvInterface->bCastReceive(NULL); // signal last
-                        return; // finished
+                        ActPrintLog(&activity, "recvLoop, received last slaveStop");
+                        // NB: this slave has nothing more to receive.
+                        // However the sender will still be re-broadcasting some packets, including these stop packets
+                        return;
                     }
                     break;
                 }
                 case bcast_sendStopping:
+                {
                     slavesStopping->set(sendItem->queryOrigin()-1, true);
                     // allRequestStop=true, if I'm stopping and all others have requested also
                     allRequestStop = slavesStopping->scan(0, false) == slaves;
                     // fall through
+                }
                 case bcast_send:
                 {
                     if (!allRequestStop) // don't care if all stopping
@@ -271,7 +297,7 @@ class CBroadcaster : public CSimpleInterface
 public:
     CBroadcaster(CActivityBase &_activity) : activity(_activity), receiver(*this), sender(*this), comm(_activity.queryJob().queryJobComm())
     {
-        allDone = allDoneWaiting = allRequestStop = stopping = false;
+        allDone = allDoneWaiting = allRequestStop = stopping = stopRecv = false;
         myNode = activity.queryJob().queryMyRank();
         slaves = activity.queryJob().querySlaves();
         slavesDone.setown(createBitSet());
@@ -285,6 +311,7 @@ public:
         if (stopping)
             slavesStopping->set(myNode-1, true);
         recvInterface = _recvInterface;
+        stopRecv = false;
         mpTag = _mpTag;
         receiver.start();
         sender.start();
@@ -315,13 +342,14 @@ public:
     void end()
     {
         waitReceiverDone();
-        sender.stop();
-        // NB: receiver will have already stopped
+        receiver.wait(); // terminates when received stop from all others
+        sender.wait(); // terminates when any remaining packets, including final stop packets have been re-broadcast
     }
     void cancel()
     {
         allDoneWaiting = false;
         allDone = true;
+        receiver.stop();
         sender.stop();
         allDoneSem.signal();
     }
@@ -332,6 +360,7 @@ public:
     }
     void final()
     {
+        ActPrintLog(&activity, "CBroadcaster::final()");
         Owned<CSendItem> sendItem = newSendItem(bcast_stop);
         send(sendItem);
     }

+ 1 - 1
thorlcr/graph/thgraph.cpp

@@ -2699,7 +2699,7 @@ IThorResource &queryThor()
 CActivityBase::CActivityBase(CGraphElementBase *_container) : container(*_container), timeActivities(_container->queryJob().queryTimeActivities())
 {
     mpTag = TAG_NULL;
-    abortSoon = cancelledReceive = reInit = false;
+    abortSoon = receiving = cancelledReceive = reInit = false;
     baseHelper.set(container.queryHelper());
     parentExtractSz = 0;
     parentExtract = NULL;