Browse Source

Merge pull request #4422 from jakesmith/hpcc-9385

HPCC-9385 - Fix thread pool runningCount()

Reviewed-By: Gavin Halliday <gavin.halliday@lexisnexis.com>
Reviewed-By: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 12 years ago
parent
commit
922024209d
3 changed files with 56 additions and 8 deletions
  1. 50 8
      system/jlib/jthread.cpp
  2. 1 0
      system/jlib/jthread.hpp
  3. 5 0
      system/mp/mputil.hpp

+ 50 - 8
system/jlib/jthread.cpp

@@ -737,8 +737,8 @@ protected: friend class CPooledThreadWrapper;
     unsigned delay;
     Semaphore availsem;
     atomic_t numrunning;
+    virtual void notifyStarted(CPooledThreadWrapper *item)=0;
     virtual bool notifyStopped(CPooledThreadWrapper *item)=0;
-
 };
 
 
@@ -774,19 +774,32 @@ public:
     IPooledThread &queryThread() { return *thread; }
     void setThread(IPooledThread *_thread) { thread = _thread; } // takes ownership
     bool isStopped() { return (handle==0); }
-    PooledThreadHandle markStopped() { PooledThreadHandle ret=handle; handle = 0; if (ret) atomic_dec(&parent.numrunning); return ret; }
+    PooledThreadHandle markStopped()
+    {
+        PooledThreadHandle ret=handle;
+        handle = 0;
+        if (ret) // JCSMORE - I can't see how handle can not be set if here..
+            atomic_dec(&parent.numrunning);
+        return ret;
+    }
+    void markStarted()
+    {
+        atomic_inc(&parent.numrunning);
+    }
 
     int run()
     {
-        do {
+        do
+        {
             sem.wait();
             {
                 CriticalBlock block(parent.crit); // to synchronize
                 if (parent.stopall)
                     break;
-
             }
-            try {
+            parent.notifyStarted(this);
+            try
+            {
                 char *&threadname = cthreadname.threadname;
                 char *temp = threadname;    // swap running name and threadname
                 threadname = runningname;
@@ -898,9 +911,14 @@ class CThreadPool: public CThreadPoolBase, implements IThreadPool, public CInter
     IThreadFactory *factory;
     unsigned stacksize;
     unsigned timeoutOnRelease;
+    unsigned traceStartDelayPeriod;
+    unsigned startsInPeriod;
+    cycle_t startDelayInPeriod;
+    CCycleTimer overAllTimer;
 
     PooledThreadHandle _start(void *param,const char *name, bool noBlock, unsigned timeout=0)
     {
+        CCycleTimer startTimer;
         bool timedout = defaultmax && !availsem.wait(noBlock ? 0 : (timeout>0?timeout:delay));
         PooledThreadHandle ret;
         {
@@ -908,7 +926,21 @@ class CThreadPool: public CThreadPoolBase, implements IThreadPool, public CInter
             if (timedout&&!availsem.wait(0)) {  // make sure take allocated sem if has become available
                 if (noBlock || timeout > 0)
                     throw MakeStringException(0, "No threads available in pool %s", poolname.get());
-                PROGLOG("WARNING: Pool limit exceeded for %s", poolname.get());
+                WARNLOG("Pool limit exceeded for %s", poolname.get());
+            }
+            if (traceStartDelayPeriod)
+            {
+                ++startsInPeriod;
+                startDelayInPeriod += startTimer.elapsedCycles();
+                if (overAllTimer.elapsedCycles() >= queryOneSecCycles()*traceStartDelayPeriod) // check avg. delay per minute
+                {
+                    cycle_t avg = startDelayInPeriod/startsInPeriod;
+                    unsigned avgMs = static_cast<unsigned>(cycle_to_nanosec(avg)/1000000);
+                    PROGLOG("%s: %d threads started in last %d seconds, average delay = %d milliseconds", poolname.get(), startsInPeriod, traceStartDelayPeriod, avgMs);
+                    startsInPeriod = 0;
+                    startDelayInPeriod = 0;
+                    overAllTimer.reset();
+                }
             }
             CPooledThreadWrapper &t = allocThread();
             if (name)
@@ -937,6 +969,9 @@ public:
         timeoutOnRelease = _timeoutOnRelease;
         targetpoolsize = _targetpoolsize?_targetpoolsize:defaultmax;
         atomic_set(&numrunning,0);
+        traceStartDelayPeriod = 0;
+        startsInPeriod = 0;
+        startDelayInPeriod = 0;
     }
 
     ~CThreadPool()
@@ -983,7 +1018,6 @@ public:
         if (stacksize)
             ret.setStackSize(stacksize);
         ret.start();
-        atomic_inc(&numrunning);
         threadwrappers.append(ret);
         return ret;
     }
@@ -1105,7 +1139,7 @@ public:
         return true;
     }
 
-  IPooledThreadIterator *running()
+    IPooledThreadIterator *running()
     {
         CriticalBlock block(crit);
         CPooledThreadIterator *ret = new CPooledThreadIterator;
@@ -1125,6 +1159,10 @@ public:
         return (unsigned)atomic_read(&numrunning);
     }
 
+    void notifyStarted(CPooledThreadWrapper *item)
+    {
+        item->markStarted();
+    }
     bool notifyStopped(CPooledThreadWrapper *item)
     {
         CriticalBlock block(crit);
@@ -1147,6 +1185,10 @@ public:
         }
         return ret;
     }
+    void setStartDelayTracing(unsigned secs)
+    {
+        traceStartDelayPeriod = secs;
+    }
 };
 
 

+ 1 - 0
system/jlib/jthread.hpp

@@ -222,6 +222,7 @@ interface IThreadPool : extends IInterface
         virtual unsigned runningCount()=0;                  // number of currently running threads
         virtual PooledThreadHandle startNoBlock(void *param)=0; // starts a new thread if it can do so without blocking, else throws exception
         virtual PooledThreadHandle startNoBlock(void *param,const char *name)=0;    // starts a new thread if it can do so without blocking, else throws exception
+        virtual void setStartDelayTracing(unsigned secs) = 0;        // set start delay tracing period
 };
 
 extern jlib_decl IThreadPool *createThreadPool(

+ 5 - 0
system/mp/mputil.hpp

@@ -122,6 +122,11 @@ public:
             runname.append(' ').appendhex(*(b++),true);
         pool->start(&mb,runname.str());
     }
+    void setThreadPoolTracing(unsigned secs)
+    {
+        if (pool)
+            pool->setStartDelayTracing(secs);
+    }
 };