|
@@ -8442,7 +8442,6 @@ public:
|
|
|
{
|
|
|
parent->stop(oid, idx, aborting); // NOTE - may call init()
|
|
|
stopped = true; // parent code relies on stop being called exactly once per adaptor, so make sure it is!
|
|
|
- idx = (unsigned) -1; // causes minIndex not to save rows for me...
|
|
|
}
|
|
|
};
|
|
|
|
|
@@ -8493,6 +8492,16 @@ public:
|
|
|
return minIdx;
|
|
|
}
|
|
|
|
|
|
+ inline bool isLastTailReader(unsigned exceptOid)
|
|
|
+ {
|
|
|
+ for (unsigned i = 0; i < numOutputs; i++)
|
|
|
+ {
|
|
|
+ if (i != exceptOid && adaptors[i].idx == tailIdx && used[i])
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+
|
|
|
void initOutputs()
|
|
|
{
|
|
|
activeOutputs = numOutputs;
|
|
@@ -8528,31 +8537,35 @@ public:
|
|
|
|
|
|
const void *readBuffered(unsigned &idx, unsigned oid)
|
|
|
{
|
|
|
- CriticalBlock b(crit);
|
|
|
- ActivityTimer t(totalCycles, timeActivities); // NOTE - time spent waiting for crit not included here. But it will have been included on the totalTime of the person holding the crit, so that is right
|
|
|
- if (idx == headIdx) // test once without getting the crit2 sec
|
|
|
+ //False positives are fine, false negatives are not.. so headIdx must only be updated when a row will be available.
|
|
|
+ const unsigned curIdx = idx;
|
|
|
+ if (curIdx == headIdx) // test once without getting the crit2 sec
|
|
|
{
|
|
|
- CriticalUnblock b1(crit); // Allow other pullers to read (so long as they are not at the head)
|
|
|
CriticalBlock b2(crit2); // but only one puller gets to read the head
|
|
|
- if (error)
|
|
|
- {
|
|
|
- throw error.getLink();
|
|
|
- }
|
|
|
- if (idx == headIdx) // test again now that we have it
|
|
|
+ if (curIdx == headIdx) // test again now that we have it
|
|
|
{
|
|
|
+ ActivityTimer t(totalCycles, timeActivities); // NOTE - time spent waiting for crit not included here. But it will have been included on the totalTime of the person holding the crit, so that is right
|
|
|
+ if (error)
|
|
|
+ throw error.getLink();
|
|
|
+
|
|
|
try
|
|
|
{
|
|
|
const void *row = input->nextInGroup();
|
|
|
- CriticalBlock b3(crit);
|
|
|
- headIdx++;
|
|
|
- idx++;
|
|
|
if (activeOutputs==1)
|
|
|
{
|
|
|
#ifdef TRACE_SPLIT
|
|
|
CTXLOG("spill %d optimised return of %p", activityId, row);
|
|
|
#endif
|
|
|
+ headIdx++;
|
|
|
+ idx++;
|
|
|
return row; // optimization for the case where only one output still active.
|
|
|
}
|
|
|
+ CriticalBlock b(crit);
|
|
|
+ headIdx++;
|
|
|
+ idx++;
|
|
|
+ if (activeOutputs==1)
|
|
|
+ return row; // optimization for the case where only one output still active.
|
|
|
+
|
|
|
buffer.enqueue(row);
|
|
|
if (row) LinkRoxieRow(row);
|
|
|
return row;
|
|
@@ -8573,13 +8586,16 @@ public:
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
- unsigned lidx = idx - tailIdx;
|
|
|
+
|
|
|
+ CriticalBlock b(crit);
|
|
|
+ ActivityTimer t(totalCycles, timeActivities); // NOTE - time spent waiting for crit not included here. But it will have been included on the totalTime of the person holding the crit, so that is right
|
|
|
+ unsigned lidx = curIdx - tailIdx;
|
|
|
idx++;
|
|
|
if (!lidx)
|
|
|
{
|
|
|
// The numOutputs <= 2 check optimizes slightly the common case of 2-way splitters.
|
|
|
// The rationale is that I MUST be the last reader if there are only 2 - the other guy must have put the record there in the first place
|
|
|
- if (numOutputs <= 2 || minIndex(oid) > tailIdx) // NOTE - this includes the case where minIndex returns (unsigned) -1, meaning no other active pullers
|
|
|
+ if (numOutputs <= 2 || isLastTailReader(oid)) // NOTE - this includes the case where minIndex returns (unsigned) -1, meaning no other active pullers
|
|
|
{
|
|
|
tailIdx++;
|
|
|
const void *ret = buffer.dequeue(); // no need to link - last puller
|
|
@@ -8634,7 +8650,7 @@ public:
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- void stop(unsigned oid, unsigned idx, bool aborting)
|
|
|
+ void stop(unsigned oid, unsigned & idx, bool aborting)
|
|
|
{
|
|
|
// Note that OutputAdaptor code ensures that stop is not called more than once per adaptor
|
|
|
CriticalBlock b(crit);
|
|
@@ -8672,6 +8688,7 @@ public:
|
|
|
}
|
|
|
}
|
|
|
activeOutputs--;
|
|
|
+ idx = (unsigned) -1; // causes minIndex not to save rows for me...
|
|
|
return;
|
|
|
}
|
|
|
#ifdef TRACE_SPLIT
|
|
@@ -26958,11 +26975,13 @@ protected:
|
|
|
const char * test[] = { NULL, NULL };
|
|
|
const char * test12345[] = { "1", "2", "3", "4", "5", NULL, NULL };
|
|
|
|
|
|
+ unsigned start = msTick();
|
|
|
testSplitActivity(factory, test, test, numOutputs, 0);
|
|
|
testSplitActivity(factory, test12345, test12345, numOutputs, 0);
|
|
|
testSplitActivity(factory, test12345, test12345, numOutputs, 1000000);
|
|
|
+ unsigned elapsed = msTick() - start;
|
|
|
|
|
|
- DBGLOG("testSplit %d done", numOutputs);
|
|
|
+ DBGLOG("testSplit %d done in %dms", numOutputs, elapsed);
|
|
|
}
|
|
|
|
|
|
void testSplitter()
|