Browse Source

HPCC-15404 Optimize roxie standalone queries

Step 1 - moving ownership of the threadpools into the RoxieQueue objects. This
commit should not change any behaviour.

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 9 years ago
parent
commit
2f37699d93
1 changed files with 73 additions and 71 deletions
  1. 73 71
      roxie/ccd/ccdqueue.cpp

+ 73 - 71
roxie/ccd/ccdqueue.cpp

@@ -718,19 +718,49 @@ void doPing(IRoxieQueryPacket *packet, const IRoxieContextLogger &logctx)
 //
 // RoxieQueue - holds pending transactions on a roxie agent
 
-class RoxieQueue : public CInterface
+class RoxieQueue : public CInterface, implements IThreadFactory
 {
+    Owned <IThreadPool> workers;
     QueueOf<IRoxieQueryPacket, true> waiting;
     Semaphore available;
     CriticalSection qcrit;
     unsigned headRegionSize;
+    unsigned numWorkers;
 
 public:
     IMPLEMENT_IINTERFACE;
 
-    RoxieQueue(unsigned _headRegionSize)
+    RoxieQueue(unsigned _headRegionSize, unsigned _numWorkers)
     {
         headRegionSize = _headRegionSize;
+        numWorkers = _numWorkers;
+        workers.setown(createThreadPool("RoxieWorkers", this, NULL, numWorkers));
+    }
+
+    virtual IPooledThread *createNew();
+    void abortChannel(unsigned channel);
+
+    void start()
+    {
+        for (unsigned i = 0; i < numWorkers; i++)
+            workers->start(this);
+    }
+
+    IPooledThreadIterator *running()
+    {
+        return workers->running();
+    }
+
+    void stopAll()
+    {
+        workers->stopAll(true);
+        signal(workers->runningCount());
+    }
+
+    void join()
+    {
+        workers->joinAll(true);
+        workers.clear();  // Breaks a cyclic reference count that would stop us from releasing RoxieReceiverThread otherwise
     }
 
     void enqueue(IRoxieQueryPacket *x)
@@ -1332,6 +1362,21 @@ public:
     }
 };
 
+IPooledThread *RoxieQueue::createNew()
+{
+    return new CRoxieWorker;
+}
+
+void RoxieQueue::abortChannel(unsigned channel)
+{
+    Owned<IPooledThreadIterator> wi = workers->running();
+    ForEach(*wi)
+    {
+        CRoxieWorker &w = (CRoxieWorker &) wi->query();
+        w.abortChannel(channel);
+    }
+}
+
 //=================================================================================
 
 class CallbackEntry : public CInterface, implements IPendingCallback
@@ -1377,43 +1422,25 @@ public:
     }
 };
 
-class RoxieReceiverBase : public CInterface, implements IThreadFactory, implements IRoxieOutputQueueManager
+class RoxieReceiverBase : public CInterface, implements IRoxieOutputQueueManager
 {
 protected:
 #ifdef ROXIE_SLA_LOGIC
     RoxieQueue slaQueue;
-    Owned <IThreadPool> slaWorkers;
 #endif
     RoxieQueue hiQueue;
-    Owned <IThreadPool> hiWorkers;
     RoxieQueue loQueue;
-    Owned <IThreadPool> loWorkers;
     unsigned numWorkers;
 
-    void abortChannel(unsigned channel, IThreadPool *workers)
-    {
-        Owned<IPooledThreadIterator> wi = workers->running();
-        ForEach(*wi)
-        {
-            CRoxieWorker &w = (CRoxieWorker &) wi->query();
-            w.abortChannel(channel);
-        }
-    }
-
 public:
     IMPLEMENT_IINTERFACE;
 
 #ifdef ROXIE_SLA_LOGIC
-    RoxieReceiverBase(unsigned _numWorkers) : numWorkers(_numWorkers), slaQueue(headRegionSize), hiQueue(headRegionSize), loQueue(headRegionSize)
+    RoxieReceiverBase(unsigned _numWorkers) : numWorkers(_numWorkers), slaQueue(headRegionSize, _numWorkers), hiQueue(headRegionSize, _numWorkers), loQueue(headRegionSize, _numWorkers)
 #else
-    RoxieReceiverBase(unsigned _numWorkers) : numWorkers(_numWorkers), hiQueue(headRegionSize), loQueue(headRegionSize)
+    RoxieReceiverBase(unsigned _numWorkers) : numWorkers(_numWorkers), hiQueue(headRegionSize, _numWorkers), loQueue(headRegionSize, _numWorkers)
 #endif
     {
-        loWorkers.setown(createThreadPool("RoxieLoWorkers", this, NULL, numWorkers));
-        hiWorkers.setown(createThreadPool("RoxieHiWorkers", this, NULL, numWorkers));
-#ifdef ROXIE_SLA_LOGIC
-        slaWorkers.setown(createThreadPool("RoxieSLAWorkers", this, NULL, numWorkers));
-#endif
         CriticalBlock b(ccdChannelsCrit);
         Owned<IPropertyTreeIterator> it = ccdChannels->getElements("RoxieSlaveProcess");
         ForEach(*it)
@@ -1476,10 +1503,10 @@ public:
         {
             logctx.CTXLOG("ERROR: suspending channel %d - aborting active queries", channel);
 #ifdef ROXIE_SLA_LOGIC
-            abortChannel(channel, slaWorkers);
+            slaQueue.abortChannel(channel);
 #endif
-            abortChannel(channel, hiWorkers);
-            abortChannel(channel, loWorkers);
+            hiQueue.abortChannel(channel);
+            loQueue.abortChannel(channel);
         }
         return prev;
     }
@@ -1500,54 +1527,29 @@ public:
 
     virtual void start() 
     {
-        for (unsigned i = 0; i < numWorkers; i++)
-        {
-            // MORE - why would we have same number of each?
-            // MORE - All workers (hi or low) have same sys priority, same number of workers,
-            //        and same queue size ... What can make a query marked high priority
-            //        get prioity of resource over low one ?
-            // MORE: I think we may want to have one set of worker threads that gets jobs of 2 (or 3 with SLA)
-            //       queues (sla, high, and low). These workers will give higher priority for jobs
-            //       on higher priority queues but without starving low ones... something similar to what
-            //       I implemented in UDP output queues.
-            //       For the time being, I will keep as is, and just add SLA queue and workers.
-            loWorkers->start(&loQueue);
-            hiWorkers->start(&hiQueue);
+        loQueue.start();
+        hiQueue.start();
 #ifdef ROXIE_SLA_LOGIC
-            slaWorkers->start(&slaQueue);
+        slaQueue.start();
 #endif
-        }
     }
 
     virtual void stop() 
     {
-        loWorkers->stopAll(true);
-        loQueue.signal(loWorkers->runningCount()); // MORE  - looks like a race here... interruptableSemaphore would be better
-        hiWorkers->stopAll(true);
-        hiQueue.signal(hiWorkers->runningCount());
+        loQueue.stopAll();
+        hiQueue.stopAll();
 #ifdef ROXIE_SLA_LOGIC
-        slaWorkers->stopAll(true);
-        slaQueue.signal(slaWorkers->runningCount());
+        slaQueue.stopAll();
 #endif
     }
 
     virtual void join()  
     { 
+        loQueue.join();
+        hiQueue.join();
 #ifdef ROXIE_SLA_LOGIC
-        slaWorkers->joinAll(true);
+        slaQueue.join();
 #endif
-        hiWorkers->joinAll(true);
-        loWorkers->joinAll(true);
-        loWorkers.clear();  // Breaks a cyclic reference count that would stop us from releasing RoxieReceiverThread otherwise
-        hiWorkers.clear();
-#ifdef ROXIE_SLA_LOGIC
-        slaWorkers.clear();
-#endif
-    }
-
-    virtual IPooledThread *createNew()
-    {
-        return new CRoxieWorker;
     }
 
     IArrayOf<CallbackEntry> callbacks;
@@ -1905,11 +1907,11 @@ public:
         return sendManager->abortData(header.uid, header.getSequenceId(), header.serverIdx);
     }
 
-    bool abortRunning(RoxiePacketHeader &header, IThreadPool *workers, bool checkRank, bool &preActivity)
+    bool abortRunning(RoxiePacketHeader &header, RoxieQueue &queue, bool checkRank, bool &preActivity)
     {
         bool queryFound = false;
         bool ret = false;
-        Owned<IPooledThreadIterator> wi = workers->running();
+        Owned<IPooledThreadIterator> wi = queue.running();
         ForEach(*wi)
         {
             CRoxieWorker &w = (CRoxieWorker &) wi->query();
@@ -1933,7 +1935,7 @@ public:
         return ret;
     }
 
-    void doIbyti(RoxiePacketHeader &header, RoxieQueue &queue, IThreadPool *workers)
+    void doIbyti(RoxiePacketHeader &header, RoxieQueue &queue)
     {
         assertex(!localSlave);
         atomic_inc(&ibytiPacketsReceived);
@@ -1951,7 +1953,7 @@ public:
         
         if (header.retries == QUERY_ABORTED)
         {
-            abortRunning(header, workers, false, preActivity);
+            abortRunning(header, queue, false, preActivity);
             queue.remove(header);
 
             if (traceLevel > 10)
@@ -1985,7 +1987,7 @@ public:
                     atomic_inc(&ibytiPacketsWorked);
                     return;
                 }
-                if (abortRunning(header, workers, true, preActivity))
+                if (abortRunning(header, queue, true, preActivity))
                 {
                     if (preActivity)
                         atomic_inc(&ibytiPacketsWorked); // MORE - may want to have a diff counter for this (not in queue but in IBYTI wait or before) 
@@ -2000,7 +2002,7 @@ public:
         }
     }
 
-    void processMessage(MemoryBuffer &mb, RoxiePacketHeader &header, RoxieQueue &queue, IThreadPool *workers)
+    void processMessage(MemoryBuffer &mb, RoxiePacketHeader &header, RoxieQueue &queue)
     {
         // NOTE - this thread needs to do as little as possible - just read packets and queue them up - otherwise we can get packet loss due to buffer overflow
         // DO NOT put tracing on this thread except at very high tracelevels!
@@ -2016,7 +2018,7 @@ public:
             doFileCallback(packet);
         }
         else if ((header.activityId & ~ROXIE_PRIORITY_MASK) == 0)
-            doIbyti(header, queue, workers); // MORE - check how fast this is!
+            doIbyti(header, queue); // MORE - check how fast this is!
         else
         {
             Owned<IRoxieQueryPacket> packet = createRoxiePacket(mb);
@@ -2047,7 +2049,7 @@ public:
                 else  
                     atomic_inc(&retriesReceivedSec); 
                 bool alreadyRunning = false;
-                Owned<IPooledThreadIterator> wi = workers->running();
+                Owned<IPooledThreadIterator> wi = queue.running();
                 ForEach(*wi)
                 {
                     CRoxieWorker &w = (CRoxieWorker &) wi->query();
@@ -2135,13 +2137,13 @@ public:
                 }
 #ifdef ROXIE_SLA_LOGIC
                 if (header.activityId & ROXIE_SLA_PRIORITY)
-                    processMessage(mb, header, slaQueue, slaWorkers);
+                    processMessage(mb, header, slaQueue);
                 else
 #endif
                 if (header.activityId & ROXIE_HIGH_PRIORITY)
-                    processMessage(mb, header, hiQueue, hiWorkers);
+                    processMessage(mb, header, hiQueue);
                 else
-                    processMessage(mb, header, loQueue, loWorkers);
+                    processMessage(mb, header, loQueue);
             }
             catch (IException *E)
             {