Explorar o código

HPCC-12200 Roxie splitters aren't very fair

Extended unit testing code.

Fixed race condition that could lead to rows being left behind in splitter
after no-one wanted them.

Slight optimization of common case of 2-way splitter

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman %!s(int64=10) %!d(string=hai) anos
pai
achega
3da4e56d69
Modificáronse 2 ficheiros con 51 adicións e 21 borrados
  1. 1 1
      roxie/ccd/ccdmain.cpp
  2. 50 20
      roxie/ccd/ccdserver.cpp

+ 1 - 1
roxie/ccd/ccdmain.cpp

@@ -430,7 +430,7 @@ int STARTQUERY_API start_query(int argc, const char *argv[])
     if (argc>=2 && stricmp(argv[1], "-selftest")==0)
     {
         selfTestMode = true;
-        queryStderrLogMsgHandler()->setMessageFields(MSGFIELD_time | MSGFIELD_prefix);
+        queryStderrLogMsgHandler()->setMessageFields(MSGFIELD_time | MSGFIELD_milliTime | MSGFIELD_prefix);
         CppUnit::TextUi::TestRunner runner;
         if (argc==2)
         {

+ 50 - 20
roxie/ccd/ccdserver.cpp

@@ -8405,7 +8405,6 @@ public:
 #ifdef TRACE_SPLIT
             parent->CTXLOG("Adaptor %d got back %p for record %d", oid, ret, idx);
 #endif
-            idx++;
             if (ret)
             {
                 processed++;
@@ -8527,14 +8526,14 @@ public:
         delete [] used;
     }
 
-    const void *readBuffered(unsigned idx, unsigned oid)
+    const void *readBuffered(unsigned &idx, unsigned oid)
     {
         CriticalBlock b(crit);
         ActivityTimer t(totalCycles, timeActivities); // NOTE - time spent waiting for crit not included here. But it will have been included on the totalTime of the person holding the crit, so that is right
         if (idx == headIdx) // test once without getting the crit2 sec
         {
-            CriticalUnblock b1(crit);
-            CriticalBlock b2(crit2);
+            CriticalUnblock b1(crit); // Allow other pullers to read (so long as they are not at the head)
+            CriticalBlock b2(crit2);  // but only one puller gets to read the head
             if (error)
             {
                 throw error.getLink();
@@ -8546,6 +8545,7 @@ public:
                     const void *row = input->nextInGroup();
                     CriticalBlock b3(crit);
                     headIdx++;
+                    idx++;
                     if (activeOutputs==1)
                     {
 #ifdef TRACE_SPLIT
@@ -8554,6 +8554,8 @@ public:
                         return row;  // optimization for the case where only one output still active.
                     }
                     buffer.enqueue(row);
+                    if (row) LinkRoxieRow(row);
+                    return row;
                 }
                 catch (IException *E)
                 {
@@ -8571,11 +8573,13 @@ public:
                 }
             }
         }
-        idx -= tailIdx;
-        if (!idx)
+        unsigned lidx = idx - tailIdx;
+        idx++;
+        if (!lidx)
         {
-            unsigned min = minIndex(oid);
-            if (min > tailIdx) // NOTE - this includes the case where min==(unsigned) -1, meaning no other active pullers
+            // The numOutputs <= 2 check optimizes slightly the common case of 2-way splitters.
+            // The rationale is that I MUST be the last reader if there are only 2 - the other guy must have put the record there in the first place
+            if (numOutputs <= 2 || minIndex(oid) > tailIdx) // NOTE - this includes the case where minIndex returns (unsigned) -1, meaning no other active pullers
             {
                 tailIdx++;
                 const void *ret = buffer.dequeue(); // no need to link - last puller
@@ -8585,7 +8589,7 @@ public:
                 return ret;
             }
         }
-        const void *ret = buffer.item(idx);
+        const void *ret = buffer.item(lidx);
         if (ret) LinkRoxieRow(ret);
 #ifdef TRACE_SPLIT
         CTXLOG("standard return of %p", ret);
@@ -26404,6 +26408,7 @@ class TestInput : public CInterface, implements IRoxieInput
 public:
     enum { STATEreset, STATEstarted, STATEstopped } state;
     bool allRead;
+    unsigned repeat;
     IMPLEMENT_IINTERFACE;
     TestInput(IRoxieSlaveContext *_ctx, char const * const *_input) 
     { 
@@ -26416,6 +26421,7 @@ public:
         recordSize = testMeta.getFixedSize();
         state = STATEreset;
         activityId = 1;
+        repeat = 0;
     }
     virtual IOutputMetaData * queryOutputMeta() const { return &testMeta; }
     virtual void prestart(unsigned parentExtractSize, const byte *parentExtract)
@@ -26457,6 +26463,7 @@ public:
         ASSERT(allRead || !eof);
         if (eof)
             return NULL;
+    again:
         const char *nextSource = input[count++];
         if (nextSource)
         {
@@ -26466,6 +26473,12 @@ public:
             strncpy((char *) ret, nextSource, recordSize);
             return ret;
         }
+        else if (repeat)
+        {
+            repeat--;
+            count = 0;
+            goto again;
+        }
         else
         {
             endSeen++;
@@ -26648,10 +26661,11 @@ protected:
         }
     }
 
-    void testSplitActivity(IRoxieServerActivityFactory *factory, char const * const *input, char const * const *output, unsigned numOutputs)
+    void testSplitActivity(IRoxieServerActivityFactory *factory, char const * const *input, char const * const *output, unsigned numOutputs, unsigned repeat)
     {
         Owned <IRoxieServerActivity> activity = factory->createActivity(NULL);
         TestInput in(ctx, input);
+        in.repeat = repeat;
         activity->setInput(0, &in);
         ArrayOf<IRoxieInput *> out;
         for (unsigned i = 0; i < numOutputs; i++)
@@ -26663,8 +26677,8 @@ protected:
         class casyncfor: public CAsyncFor
         {
         public:
-            casyncfor(CcdServerTest *_parent, IRoxieServerActivity *_activity, ArrayOf<IRoxieInput *> &_outputs, char const * const *_input, char const * const *_output)
-            : parent(_parent), activity(_activity), outputs(_outputs), input(_input), output(_output)
+            casyncfor(CcdServerTest *_parent, IRoxieServerActivity *_activity, ArrayOf<IRoxieInput *> &_outputs, char const * const *_input, char const * const *_output, unsigned _repeat)
+            : parent(_parent), activity(_activity), outputs(_outputs), input(_input), output(_output), repeat(_repeat)
             {}
             void Do(unsigned i)
             {
@@ -26673,11 +26687,13 @@ protected:
                 IOutputMetaData *meta = out->queryOutputMeta();
                 void *buf = alloca(meta->getFixedSize());
                 unsigned count = 0;
+                unsigned repeats = repeat;
                 loop
                 {
                     const void *next = out->nextInGroup();
                     if (!next)
                     {
+                        ASSERT(repeats==0);
                         ASSERT(output[count++] == NULL);
                         next = out->nextInGroup();
                         if (!next)
@@ -26686,6 +26702,11 @@ protected:
                             break;
                         }
                     }
+                    if (output[count] == NULL && repeats)
+                    {
+                        repeats--;
+                        count = 0;
+                    }
                     ASSERT(output[count] != NULL);
                     unsigned outsize = meta->getRecordSize(next);
                     memset(buf, 0, outsize);
@@ -26700,9 +26721,10 @@ protected:
             ArrayOf<IRoxieInput *> &outputs;
             char const * const *input;
             char const * const *output;
+            unsigned repeat;
             CcdServerTest *parent;
-        } afor(this, activity, out, input, output);
-        afor.For(numOutputs, 1);
+        } afor(this, activity, out, input, output, repeat);
+        afor.For(numOutputs, numOutputs);
 
         ASSERT(in.state == TestInput::STATEstopped);
         for (unsigned i2 = 0; i2 < numOutputs; i2++)
@@ -26925,21 +26947,29 @@ protected:
         DBGLOG("testMergeDedup done");
     }
 
-    void testSplitter()
+    void doTestSplitter(unsigned numOutputs)
     {
-        DBGLOG("testSplit");
+        DBGLOG("testSplit %d", numOutputs);
         init();
-        Owned <IRoxieServerActivityFactory> factory = createRoxieServerThroughSpillActivityFactory(*queryFactory, splitActivityTestFactory, 2);
+        Owned <IRoxieServerActivityFactory> factory = createRoxieServerThroughSpillActivityFactory(*queryFactory, splitActivityTestFactory, numOutputs);
         factory->setInput(0,0,0);
         Owned <IRoxieServerActivity> activity = factory->createActivity(NULL);
 
         const char * test[] = { NULL, NULL };
         const char * test12345[] = { "1", "2", "3", "4", "5", NULL, NULL };
 
-        testSplitActivity(factory, test, test, 2);
-        testSplitActivity(factory, test12345, test12345, 2);
+        testSplitActivity(factory, test, test, numOutputs, 0);
+        testSplitActivity(factory, test12345, test12345, numOutputs, 0);
+        testSplitActivity(factory, test12345, test12345, numOutputs, 1000000);
+
+        DBGLOG("testSplit %d done", numOutputs);
+    }
 
-        DBGLOG("testSplit done");
+    void testSplitter()
+    {
+        doTestSplitter(1);
+        doTestSplitter(2);
+        doTestSplitter(3);
     }
 
     void testMiscellaneous()