|
@@ -12173,7 +12173,7 @@ class CRoxieThreadedConcatReader : public CInterface, implements IRecordPullerCa
|
|
|
public:
|
|
|
IMPLEMENT_IINTERFACE;
|
|
|
CRoxieThreadedConcatReader(InterruptableSemaphore &_ready, bool _grouped)
|
|
|
- : puller(false), ready(_ready), eof(false)
|
|
|
+ : puller(false), grouped(_grouped), ready(_ready), eof(false)
|
|
|
{
|
|
|
|
|
|
}
|
|
@@ -12224,7 +12224,8 @@ public:
|
|
|
|
|
|
virtual void processEOG()
|
|
|
{
|
|
|
- processRow(NULL);
|
|
|
+ if (grouped)
|
|
|
+ processRow(NULL);
|
|
|
}
|
|
|
|
|
|
virtual void processDone()
|
|
@@ -12265,6 +12266,7 @@ protected:
|
|
|
InterruptableSemaphore &ready;
|
|
|
SafeQueueOf<const void, true> buffer;
|
|
|
bool eof;
|
|
|
+ bool grouped;
|
|
|
};
|
|
|
|
|
|
MAKEPointerArray(CRoxieThreadedConcatReader, ReaderArray);
|
|
@@ -12284,8 +12286,8 @@ public:
|
|
|
CRoxieServerThreadedConcatActivity(const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager, bool _grouped, unsigned _numInputs)
|
|
|
: CRoxieServerActivity(_factory, _probeManager), grouped(_grouped)
|
|
|
{
|
|
|
- eof = false;
|
|
|
numInputs = _numInputs;
|
|
|
+ eof = (numInputs==0);
|
|
|
nextPuller = 0;
|
|
|
readyPending = 0;
|
|
|
for (unsigned i = 0; i < numInputs; i++)
|
|
@@ -12301,7 +12303,7 @@ public:
|
|
|
|
|
|
virtual void start(unsigned parentExtractSize, const byte *parentExtract, bool paused)
|
|
|
{
|
|
|
- eof = false;
|
|
|
+ eof = (numInputs==0);
|
|
|
inGroup = false;
|
|
|
nextPuller = 0;
|
|
|
readyPending = 0;
|
|
@@ -12361,13 +12363,13 @@ public:
|
|
|
ActivityTimer t(totalCycles, timeActivities, ctx->queryDebugContext());
|
|
|
if (eof)
|
|
|
return NULL;
|
|
|
- bool anyActive = false;
|
|
|
loop
|
|
|
{
|
|
|
if (readyPending && !inGroup)
|
|
|
readyPending--;
|
|
|
else
|
|
|
ready.wait();
|
|
|
+ bool anyActive = false;
|
|
|
ForEachItemIn(unused_index, pullers)
|
|
|
{
|
|
|
// NOTE - we round robin not just because it's more efficient, but because it ensures the preservation of grouping information
|