|
@@ -726,7 +726,8 @@ class RoxieQueue : public CInterface, implements IThreadFactory
|
|
|
CriticalSection qcrit;
|
|
|
unsigned headRegionSize;
|
|
|
unsigned numWorkers;
|
|
|
-
|
|
|
+ unsigned started;
|
|
|
+ std::atomic<unsigned> idle;
|
|
|
public:
|
|
|
IMPLEMENT_IINTERFACE;
|
|
|
|
|
@@ -735,6 +736,8 @@ public:
|
|
|
headRegionSize = _headRegionSize;
|
|
|
numWorkers = _numWorkers;
|
|
|
workers.setown(createThreadPool("RoxieWorkers", this, NULL, numWorkers));
|
|
|
+ started = 0;
|
|
|
+ idle = 0;
|
|
|
}
|
|
|
|
|
|
virtual IPooledThread *createNew();
|
|
@@ -742,8 +745,7 @@ public:
|
|
|
|
|
|
void start()
|
|
|
{
|
|
|
- for (unsigned i = 0; i < numWorkers; i++)
|
|
|
- workers->start(this);
|
|
|
+ // We start on demand now...
|
|
|
}
|
|
|
|
|
|
IPooledThreadIterator *running()
|
|
@@ -776,6 +778,11 @@ public:
|
|
|
queueLength++;
|
|
|
if (queueLength>maxQueueLength)
|
|
|
maxQueueLength = queueLength;
|
|
|
+ if (idle==0 && started < numWorkers)
|
|
|
+ {
|
|
|
+ workers->start(this);
|
|
|
+ started++;
|
|
|
+ }
|
|
|
}
|
|
|
available.signal();
|
|
|
}
|
|
@@ -866,7 +873,9 @@ public:
|
|
|
|
|
|
void wait()
|
|
|
{
|
|
|
+ idle++;
|
|
|
available.wait();
|
|
|
+ idle--;
|
|
|
}
|
|
|
|
|
|
void signal(unsigned num)
|