|
@@ -8442,7 +8442,6 @@ public:
|
|
|
{
|
|
|
parent->stop(oid, idx, aborting); // NOTE - may call init()
|
|
|
stopped = true; // parent code relies on stop being called exactly once per adaptor, so make sure it is!
|
|
|
- idx = (unsigned) -1; // causes minIndex not to save rows for me...
|
|
|
}
|
|
|
};
|
|
|
|
|
@@ -8493,6 +8492,16 @@ public:
|
|
|
return minIdx;
|
|
|
}
|
|
|
|
|
|
+ inline bool isLastTailReader(unsigned exceptOid)
|
|
|
+ {
|
|
|
+ for (unsigned i = 0; i < numOutputs; i++)
|
|
|
+ {
|
|
|
+ if (i != exceptOid && adaptors[i].idx == tailIdx && used[i])
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+
|
|
|
void initOutputs()
|
|
|
{
|
|
|
activeOutputs = numOutputs;
|
|
@@ -8529,10 +8538,11 @@ public:
|
|
|
const void *readBuffered(unsigned &idx, unsigned oid)
|
|
|
{
|
|
|
//False positives are fine, false negatives are not.. so headIdx must only be updated when a row will be available.
|
|
|
- if (idx == headIdx) // test once without getting the crit2 sec
|
|
|
+ const unsigned curIdx = idx;
|
|
|
+ if (curIdx == headIdx) // test once without getting the crit2 sec
|
|
|
{
|
|
|
CriticalBlock b2(crit2); // but only one puller gets to read the head
|
|
|
- if (idx == headIdx) // test again now that we have it
|
|
|
+ if (curIdx == headIdx) // test again now that we have it
|
|
|
{
|
|
|
ActivityTimer t(totalCycles, timeActivities); // NOTE - time spent waiting for crit not included here. But it will have been included on the totalTime of the person holding the crit, so that is right
|
|
|
if (error)
|
|
@@ -8579,13 +8589,13 @@ public:
|
|
|
|
|
|
CriticalBlock b(crit);
|
|
|
ActivityTimer t(totalCycles, timeActivities); // NOTE - time spent waiting for crit not included here. But it will have been included on the totalTime of the person holding the crit, so that is right
|
|
|
- unsigned lidx = idx - tailIdx;
|
|
|
+ unsigned lidx = curIdx - tailIdx;
|
|
|
idx++;
|
|
|
if (!lidx)
|
|
|
{
|
|
|
// The numOutputs <= 2 check optimizes slightly the common case of 2-way splitters.
|
|
|
// The rationale is that I MUST be the last reader if there are only 2 - the other guy must have put the record there in the first place
|
|
|
- if (numOutputs <= 2 || minIndex(oid) > tailIdx) // NOTE - this includes the case where minIndex returns (unsigned) -1, meaning no other active pullers
|
|
|
+ if (numOutputs <= 2 || isLastTailReader(oid)) // NOTE - this includes the case where minIndex returns (unsigned) -1, meaning no other active pullers
|
|
|
{
|
|
|
tailIdx++;
|
|
|
const void *ret = buffer.dequeue(); // no need to link - last puller
|
|
@@ -8640,7 +8650,7 @@ public:
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- void stop(unsigned oid, unsigned idx, bool aborting)
|
|
|
+ void stop(unsigned oid, unsigned & idx, bool aborting)
|
|
|
{
|
|
|
// Note that OutputAdaptor code ensures that stop is not called more than once per adaptor
|
|
|
CriticalBlock b(crit);
|
|
@@ -8678,6 +8688,7 @@ public:
|
|
|
}
|
|
|
}
|
|
|
activeOutputs--;
|
|
|
+ idx = (unsigned) -1; // causes minIndex not to save rows for me...
|
|
|
return;
|
|
|
}
|
|
|
#ifdef TRACE_SPLIT
|
|
@@ -26964,11 +26975,13 @@ protected:
|
|
|
const char * test[] = { NULL, NULL };
|
|
|
const char * test12345[] = { "1", "2", "3", "4", "5", NULL, NULL };
|
|
|
|
|
|
+ unsigned start = msTick();
|
|
|
testSplitActivity(factory, test, test, numOutputs, 0);
|
|
|
testSplitActivity(factory, test12345, test12345, numOutputs, 0);
|
|
|
testSplitActivity(factory, test12345, test12345, numOutputs, 1000000);
|
|
|
+ unsigned elapsed = msTick() - start;
|
|
|
|
|
|
- DBGLOG("testSplit %d done", numOutputs);
|
|
|
+ DBGLOG("testSplit %d done in %dms", numOutputs, elapsed);
|
|
|
}
|
|
|
|
|
|
void testSplitter()
|