|
@@ -728,6 +728,24 @@ class RoxieQueue : public CInterface, implements IThreadFactory
|
|
|
unsigned numWorkers;
|
|
|
unsigned started;
|
|
|
std::atomic<unsigned> idle;
|
|
|
+
|
|
|
+ void noteQueued()
|
|
|
+ {
|
|
|
+ CriticalBlock b(counterCrit);
|
|
|
+ queueLength++;
|
|
|
+ if (queueLength>maxQueueLength)
|
|
|
+ maxQueueLength = queueLength;
|
|
|
+ // NOTE - there is a small race condition here - if idle is 1 but two enqueue's happen
|
|
|
+ // close enough together that the signal has not yet caused idle to come back down to zero, then the
|
|
|
+ // desired new thread may not be created. It's unlikely, and it's benign in that the query is still
|
|
|
+ // processed and the thread will be created next time the HWM is reached.
|
|
|
+ if (started < numWorkers && idle==0)
|
|
|
+ {
|
|
|
+ workers->start(this);
|
|
|
+ started++;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
public:
|
|
|
IMPLEMENT_IINTERFACE;
|
|
|
|
|
@@ -745,7 +763,14 @@ public:
|
|
|
|
|
|
void start()
|
|
|
{
|
|
|
- // We start on demand now...
|
|
|
+ if (prestartSlaveThreads)
|
|
|
+ {
|
|
|
+ while (started < numWorkers)
|
|
|
+ {
|
|
|
+ workers->start(this);
|
|
|
+ started++;
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
IPooledThreadIterator *running()
|
|
@@ -773,16 +798,7 @@ public:
|
|
|
header.tick = msTick();
|
|
|
#endif
|
|
|
waiting.enqueue(x);
|
|
|
-
|
|
|
- CriticalBlock b(counterCrit);
|
|
|
- queueLength++;
|
|
|
- if (queueLength>maxQueueLength)
|
|
|
- maxQueueLength = queueLength;
|
|
|
- if (idle==0 && started < numWorkers)
|
|
|
- {
|
|
|
- workers->start(this);
|
|
|
- started++;
|
|
|
- }
|
|
|
+ noteQueued();
|
|
|
}
|
|
|
available.signal();
|
|
|
}
|
|
@@ -825,10 +841,7 @@ public:
|
|
|
l.CTXLOG("enqueued %s", header.toString(xx).str());
|
|
|
}
|
|
|
waiting.enqueue(x);
|
|
|
- CriticalBlock b(counterCrit);
|
|
|
- queueLength++;
|
|
|
- if (queueLength>maxQueueLength)
|
|
|
- maxQueueLength = queueLength;
|
|
|
+ noteQueued();
|
|
|
}
|
|
|
available.signal();
|
|
|
}
|
|
@@ -1378,6 +1391,7 @@ IPooledThread *RoxieQueue::createNew()
|
|
|
|
|
|
void RoxieQueue::abortChannel(unsigned channel)
|
|
|
{
|
|
|
+ CriticalBlock b(counterCrit);
|
|
|
Owned<IPooledThreadIterator> wi = workers->running();
|
|
|
ForEach(*wi)
|
|
|
{
|