瀏覽代碼

Merge pull request #6606 from richardkchapman/roxie-splitter-selftest

HPCC-12200 Roxie splitters aren't very fair

Reviewed-by: Gavin Halliday <ghalliday@hpccsystems.com>
Gavin Halliday 10 年之前
父節點
當前提交
ad8575cbd2
共有 6 個文件被更改,包括 158 次插入27 次删除
  1. 1 0
      roxie/ccd/ccd.hpp
  2. 14 9
      roxie/ccd/ccdcontext.cpp
  3. 3 1
      roxie/ccd/ccdmain.cpp
  4. 1 2
      roxie/ccd/ccdquery.cpp
  5. 138 13
      roxie/ccd/ccdserver.cpp
  6. 1 2
      roxie/ccd/ccdstate.cpp

+ 1 - 0
roxie/ccd/ccd.hpp

@@ -401,6 +401,7 @@ extern bool mergeSlaveStatistics;
 extern bool roxieMulticastEnabled;   // enable use of multicast for sending requests to slaves
 extern bool preloadOnceData;
 extern bool reloadRetriesFailed;
+extern bool selfTestMode;
 
 extern unsigned roxiePort;     // If listening on multiple, this is the first. Used for lock cascading
 

+ 14 - 9
roxie/ccd/ccdcontext.cpp

@@ -2279,17 +2279,22 @@ public:
     CSlaveContext(const IQueryFactory *_factory, const SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, bool _hasChildren)
     : CRoxieContextBase(_factory, _logctx)
     {
-        header = &_packet->queryHeader();
-        const byte *traceInfo = _packet->queryTraceInfo();
-        options.setFromSlaveLoggingFlags(*traceInfo);
-        bool debuggerActive = (*traceInfo & LOGGING_DEBUGGERACTIVE) != 0 && _hasChildren;  // No option to debug simple remote activity
-        if (debuggerActive)
+        if (_packet)
         {
-            CSlaveDebugContext *slaveDebugContext = new CSlaveDebugContext(this, logctx, *header);
-            slaveDebugContext->init(_packet);
-            debugContext.setown(slaveDebugContext);
-            probeManager.setown(createDebugManager(debugContext, "slaveDebugger"));
+            header = &_packet->queryHeader();
+            const byte *traceInfo = _packet->queryTraceInfo();
+            options.setFromSlaveLoggingFlags(*traceInfo);
+            bool debuggerActive = (*traceInfo & LOGGING_DEBUGGERACTIVE) != 0 && _hasChildren;  // No option to debug simple remote activity
+            if (debuggerActive)
+            {
+                CSlaveDebugContext *slaveDebugContext = new CSlaveDebugContext(this, logctx, *header);
+                slaveDebugContext->init(_packet);
+                debugContext.setown(slaveDebugContext);
+                probeManager.setown(createDebugManager(debugContext, "slaveDebugger"));
+            }
         }
+        else
+            assertex(selfTestMode);
     }
     virtual void beforeDispose()
     {

+ 3 - 1
roxie/ccd/ccdmain.cpp

@@ -115,6 +115,7 @@ unsigned preabortKeyedJoinsThreshold = 100;
 unsigned preabortIndexReadsThreshold = 100;
 bool preloadOnceData;
 bool reloadRetriesFailed;
+bool selfTestMode = false;
 
 unsigned memoryStatsInterval = 0;
 memsize_t defaultMemoryLimit;
@@ -428,7 +429,8 @@ int STARTQUERY_API start_query(int argc, const char *argv[])
     #ifdef _USE_CPPUNIT
     if (argc>=2 && stricmp(argv[1], "-selftest")==0)
     {
-        queryStderrLogMsgHandler()->setMessageFields(MSGFIELD_time | MSGFIELD_prefix);
+        selfTestMode = true;
+        queryStderrLogMsgHandler()->setMessageFields(MSGFIELD_time | MSGFIELD_milliTime | MSGFIELD_prefix);
         CppUnit::TextUi::TestRunner runner;
         if (argc==2)
         {

+ 1 - 2
roxie/ccd/ccdquery.cpp

@@ -1634,9 +1634,8 @@ extern IQueryFactory *createServerQueryFactory(const char *id, const IQueryDll *
         ::Release(dll);
         return cached;
     }
-    if (dll)
+    if (dll && !selfTestMode)
     {
-        checkWorkunitVersionConsistency(dll);
         Owned<ISharedOnceContext> sharedOnceContext;
         IPropertyTree *workflow = dll->queryWorkUnit()->queryWorkflowTree();
         if (workflow && workflow->hasProp("Item[@mode='once']"))

+ 138 - 13
roxie/ccd/ccdserver.cpp

@@ -8405,7 +8405,6 @@ public:
 #ifdef TRACE_SPLIT
             parent->CTXLOG("Adaptor %d got back %p for record %d", oid, ret, idx);
 #endif
-            idx++;
             if (ret)
             {
                 processed++;
@@ -8484,7 +8483,7 @@ public:
     unsigned minIndex(unsigned exceptOid)
     {
         // MORE - yukky code (and slow). Could keep them heapsorted by idx or something
-        // this is trying to determine whethwe any of the adaptors will in the future read a given record
+        // this is trying to determine whether any of the adaptors will in the future read a given record
         unsigned minIdx = (unsigned) -1;
         for (unsigned i = 0; i < numOutputs; i++)
         {
@@ -8527,14 +8526,14 @@ public:
         delete [] used;
     }
 
-    const void *readBuffered(unsigned idx, unsigned oid)
+    const void *readBuffered(unsigned &idx, unsigned oid)
     {
         CriticalBlock b(crit);
         ActivityTimer t(totalCycles, timeActivities); // NOTE - time spent waiting for crit not included here. But it will have been included on the totalTime of the person holding the crit, so that is right
         if (idx == headIdx) // test once without getting the crit2 sec
         {
-            CriticalUnblock b1(crit);
-            CriticalBlock b2(crit2);
+            CriticalUnblock b1(crit); // Allow other pullers to read (so long as they are not at the head)
+            CriticalBlock b2(crit2);  // but only one puller gets to read the head
             if (error)
             {
                 throw error.getLink();
@@ -8546,6 +8545,7 @@ public:
                     const void *row = input->nextInGroup();
                     CriticalBlock b3(crit);
                     headIdx++;
+                    idx++;
                     if (activeOutputs==1)
                     {
 #ifdef TRACE_SPLIT
@@ -8554,6 +8554,8 @@ public:
                         return row;  // optimization for the case where only one output still active.
                     }
                     buffer.enqueue(row);
+                    if (row) LinkRoxieRow(row);
+                    return row;
                 }
                 catch (IException *E)
                 {
@@ -8571,11 +8573,13 @@ public:
                 }
             }
         }
-        idx -= tailIdx;
-        if (!idx)
+        unsigned lidx = idx - tailIdx;
+        idx++;
+        if (!lidx)
         {
-            unsigned min = minIndex(oid);
-            if (min > tailIdx)
+            // The numOutputs <= 2 check optimizes slightly the common case of 2-way splitters.
+            // The rationale is that I MUST be the last reader if there are only 2 - the other guy must have put the record there in the first place
+            if (numOutputs <= 2 || minIndex(oid) > tailIdx) // NOTE - this includes the case where minIndex returns (unsigned) -1, meaning no other active pullers
             {
                 tailIdx++;
                 const void *ret = buffer.dequeue(); // no need to link - last puller
@@ -8585,7 +8589,7 @@ public:
                 return ret;
             }
         }
-        const void *ret = buffer.item(idx);
+        const void *ret = buffer.item(lidx);
         if (ret) LinkRoxieRow(ret);
 #ifdef TRACE_SPLIT
         CTXLOG("standard return of %p", ret);
@@ -26404,6 +26408,7 @@ class TestInput : public CInterface, implements IRoxieInput
 public:
     enum { STATEreset, STATEstarted, STATEstopped } state;
     bool allRead;
+    unsigned repeat;
     IMPLEMENT_IINTERFACE;
     TestInput(IRoxieSlaveContext *_ctx, char const * const *_input) 
     { 
@@ -26416,6 +26421,7 @@ public:
         recordSize = testMeta.getFixedSize();
         state = STATEreset;
         activityId = 1;
+        repeat = 0;
     }
     virtual IOutputMetaData * queryOutputMeta() const { return &testMeta; }
     virtual void prestart(unsigned parentExtractSize, const byte *parentExtract)
@@ -26457,6 +26463,7 @@ public:
         ASSERT(allRead || !eof);
         if (eof)
             return NULL;
+    again:
         const char *nextSource = input[count++];
         if (nextSource)
         {
@@ -26466,6 +26473,12 @@ public:
             strncpy((char *) ret, nextSource, recordSize);
             return ret;
         }
+        else if (repeat)
+        {
+            repeat--;
+            count = 0;
+            goto again;
+        }
         else
         {
             endSeen++;
@@ -26527,6 +26540,16 @@ public:
 bool MergeActivityTest::isDedup = false;
 extern "C" IHThorArg * mergeActivityTestFactory() { return new MergeActivityTest; }
 
+struct SplitActivityTest : public ccdserver_hqlhelper::CThorSplitArg {
+public:
+    virtual unsigned numBranches() { return 2; }
+    virtual IOutputMetaData * queryOutputMeta()
+    {
+        return &testMeta;
+    }
+};
+extern "C" IHThorArg * splitActivityTestFactory() { return new SplitActivityTest; }
+
 class CcdServerTest : public CppUnit::TestFixture  
 {
     CPPUNIT_TEST_SUITE(CcdServerTest);
@@ -26537,6 +26560,7 @@ class CcdServerTest : public CppUnit::TestFixture
         CPPUNIT_TEST(testMerge);
         CPPUNIT_TEST(testMergeDedup);
         CPPUNIT_TEST(testMiscellaneous);
+        CPPUNIT_TEST(testSplitter);
         CPPUNIT_TEST(testCleanup);
     CPPUNIT_TEST_SUITE_END();
 protected:
@@ -26572,15 +26596,15 @@ protected:
         testActivity(activity, input, NULL, output);
     }
 
-    void testActivity(IRoxieServerActivity *activity, char const * const *input, char const * const *input2, char const * const *output)
+    void testActivity(IRoxieServerActivity *activity, char const * const *input, char const * const *input2, char const * const *output, unsigned outputIdx = 0)
     {
         TestInput in(ctx, input);
         TestInput in2(ctx, input2);
-        IRoxieInput *out = activity->queryOutput(0);
-        IOutputMetaData *meta = out->queryOutputMeta();
         activity->setInput(0, &in);
         if (input2)
             activity->setInput(1, &in2);
+        IRoxieInput *out = activity->queryOutput(outputIdx);
+        IOutputMetaData *meta = out->queryOutputMeta();
         void *buf = alloca(meta->getFixedSize());
 
         for (unsigned iteration = 0; iteration < 8; iteration++)
@@ -26637,6 +26661,81 @@ protected:
         }
     }
 
+    void testSplitActivity(IRoxieServerActivityFactory *factory, char const * const *input, char const * const *output, unsigned numOutputs, unsigned repeat)
+    {
+        Owned <IRoxieServerActivity> activity = factory->createActivity(NULL);
+        TestInput in(ctx, input);
+        in.repeat = repeat;
+        activity->setInput(0, &in);
+        ArrayOf<IRoxieInput *> out;
+        for (unsigned i = 0; i < numOutputs; i++)
+        {
+            out.append(activity->queryOutput(i));
+        }
+        activity->onCreate(ctx, NULL);
+
+        class casyncfor: public CAsyncFor
+        {
+        public:
+            casyncfor(CcdServerTest *_parent, IRoxieServerActivity *_activity, ArrayOf<IRoxieInput *> &_outputs, char const * const *_input, char const * const *_output, unsigned _repeat)
+            : parent(_parent), activity(_activity), outputs(_outputs), input(_input), output(_output), repeat(_repeat)
+            {}
+            void Do(unsigned i)
+            {
+                IRoxieInput *out = outputs.item(i);
+                out->start(0, NULL, false);
+                IOutputMetaData *meta = out->queryOutputMeta();
+                void *buf = alloca(meta->getFixedSize());
+                unsigned count = 0;
+                unsigned repeats = repeat;
+                loop
+                {
+                    const void *next = out->nextInGroup();
+                    if (!next)
+                    {
+                        ASSERT(repeats==0);
+                        ASSERT(output[count++] == NULL);
+                        next = out->nextInGroup();
+                        if (!next)
+                        {
+                            ASSERT(output[count++] == NULL);
+                            break;
+                        }
+                    }
+                    if (output[count] == NULL && repeats)
+                    {
+                        repeats--;
+                        count = 0;
+                    }
+                    ASSERT(output[count] != NULL);
+                    unsigned outsize = meta->getRecordSize(next);
+                    memset(buf, 0, outsize);
+                    strncpy((char *) buf, output[count++], outsize);
+                    ASSERT(memcmp(next, buf, outsize) == 0);
+                    ReleaseRoxieRow(next);
+                }
+                out->stop(false);
+            }
+        private:
+            IRoxieServerActivity *activity;
+            ArrayOf<IRoxieInput *> &outputs;
+            char const * const *input;
+            char const * const *output;
+            unsigned repeat;
+            CcdServerTest *parent;
+        } afor(this, activity, out, input, output, repeat);
+        afor.For(numOutputs, numOutputs);
+
+        ASSERT(in.state == TestInput::STATEstopped);
+        for (unsigned i2 = 0; i2 < numOutputs; i2++)
+        {
+            out.item(i2)->reset();
+        }
+        ASSERT(in.state == TestInput::STATEreset);
+        ctx->queryRowManager().reportLeaks();
+        ASSERT(ctx->queryRowManager().numPagesAfterCleanup(true) == 0);
+    }
+
     static int compareFunc(const void *l, const void *r)
     {
         return strcmp(*(char **) l, *(char **) r);
@@ -26848,6 +26947,32 @@ protected:
         DBGLOG("testMergeDedup done");
     }
 
+    void doTestSplitter(unsigned numOutputs)
+    {
+        DBGLOG("testSplit %d", numOutputs);
+        init();
+        Owned <IRoxieServerActivityFactory> factory = createRoxieServerThroughSpillActivityFactory(*queryFactory, splitActivityTestFactory, numOutputs);
+        factory->setInput(0,0,0);
+        Owned <IRoxieServerActivity> activity = factory->createActivity(NULL);
+
+        const char * test[] = { NULL, NULL };
+        const char * test12345[] = { "1", "2", "3", "4", "5", NULL, NULL };
+
+        testSplitActivity(factory, test, test, numOutputs, 0);
+        testSplitActivity(factory, test12345, test12345, numOutputs, 0);
+        testSplitActivity(factory, test12345, test12345, numOutputs, 1000000);
+
+        DBGLOG("testSplit %d done", numOutputs);
+    }
+
+    void testSplitter()
+    {
+        doTestSplitter(1);
+        doTestSplitter(2);
+        doTestSplitter(3);
+        doTestSplitter(10);
+    }
+
     void testMiscellaneous()
     {
         DBGLOG("sizeof(CriticalSection)=%u", (unsigned) sizeof(CriticalSection));

+ 1 - 2
roxie/ccd/ccdstate.cpp

@@ -717,8 +717,7 @@ typedef CResolvedPackage<CRoxiePackageNode> CRoxiePackage;
 IRoxiePackage *createRoxiePackage(IPropertyTree *p, IRoxiePackageMap *packages)
 {
     Owned<CRoxiePackage> pkg = new CRoxiePackage(p);
-    if (packages)
-        pkg->resolveBases(packages);
+    pkg->resolveBases(packages);
     return pkg.getClear();
 }