소스 검색

Merge pull request #5129 from richardkchapman/ordered-concat

HPCC-9253 Significant semaphore overhead in the unordered concat

Reviewed-by: Gavin Halliday <ghalliday@hpccsystems.com>
Gavin Halliday 11 년 전
부모
커밋
8fd6b1763f
5개의 변경된 파일245개의 추가작업 그리고 80개의 파일을 삭제
  1. 153 80
      roxie/ccd/ccdserver.cpp
  2. 3 0
      testing/ecl/key/threadedConcat.xml
  3. 9 0
      testing/ecl/key/threadedGroupedConcat.xml
  4. 33 0
      testing/ecl/threadedConcat.ecl
  5. 47 0
      testing/ecl/threadedGroupedConcat.ecl

+ 153 - 80
roxie/ccd/ccdserver.cpp

@@ -12168,26 +12168,128 @@ IRoxieServerActivityFactory *createRoxieServerJoinActivityFactory(unsigned _id,
 
 #define CONCAT_READAHEAD 1000
 
-MAKEPointerArray(RecordPullerThread, RecordPullerArray);
+class CRoxieThreadedConcatReader : public CInterface, implements IRecordPullerCallback
+{
+public:
+    IMPLEMENT_IINTERFACE;
+    CRoxieThreadedConcatReader(InterruptableSemaphore &_ready, bool _grouped)
+    : puller(false), ready(_ready), eof(false)
+    {
+
+    }
+
+    void start(unsigned parentExtractSize, const byte *parentExtract, bool paused, IRoxieSlaveContext *ctx)
+    {
+        space.reinit(CONCAT_READAHEAD);
+        puller.start(parentExtractSize, parentExtract, paused, ctx->concatPreload(), false, ctx);
+    }
+
+    void stop(bool aborting)
+    {
+        space.interrupt();
+        puller.stop(aborting);
+    }
+
+    IRoxieInput *queryInput() const
+    {
+        return puller.queryInput();
+    }
+
+    void reset()
+    {
+        puller.reset();
+        ForEachItemIn(idx, buffer)
+            ReleaseRoxieRow(buffer.item(idx));
+        buffer.clear();
+        eof = false;
+    }
+
+    void setInput(IRoxieInput *_in)
+    {
+        puller.setInput(this, _in);
+    }
+
+    virtual void processRow(const void *row)
+    {
+        buffer.enqueue(row);
+        ready.signal();
+        space.wait();
+    }
+
+    virtual void processGroup(const ConstPointerArray &rows)
+    {
+        // We use record-by-record input mode of the puller thread even in grouped mode.
+        throwUnexpected();
+    }
+
+    virtual void processEOG()
+    {
+        processRow(NULL);
+    }
+
+    virtual void processDone()
+    {
+        eof = true;
+        ready.signal();
+    }
+
+    virtual bool fireException(IException *e)
+    {
+        // called from puller thread on failure
+        ready.interrupt(LINK(e));
+        space.interrupt(e);
+        return true;
+    }
 
-class CRoxieServerThreadedConcatActivity : public CRoxieServerActivity, implements IRecordPullerCallback
+    bool peek(const void * &row, bool &anyActive)
+    {
+        if (buffer.ordinality())
+        {
+            space.signal();
+            row = buffer.dequeue();
+            return true;
+        }
+        if (!eof)
+            anyActive = true;
+        return false;
+    }
+
+    inline bool atEof() const
+    {
+        return eof;
+    }
+
+protected:
+    RecordPullerThread puller;
+    InterruptableSemaphore space;
+    InterruptableSemaphore &ready;
+    SafeQueueOf<const void, true> buffer;
+    bool eof;
+};
+
+MAKEPointerArray(CRoxieThreadedConcatReader, ReaderArray);
+
+class CRoxieServerThreadedConcatActivity : public CRoxieServerActivity
 {
-    QueueOf<const void, true> buffer;
     InterruptableSemaphore ready;
-    InterruptableSemaphore space;
-    CriticalSection crit;
-    unsigned eofs;
-    RecordPullerArray pullers;
+    ReaderArray pullers;
     unsigned numInputs;
+    unsigned nextPuller; // for round robin
+    unsigned readyPending;
+    bool eof;
+    bool inGroup;
+    bool grouped;
 
 public:
     CRoxieServerThreadedConcatActivity(const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager, bool _grouped, unsigned _numInputs)
-        : CRoxieServerActivity(_factory, _probeManager)
+        : CRoxieServerActivity(_factory, _probeManager), grouped(_grouped)
     {
-        eofs = 0;
+        eof = false;
         numInputs = _numInputs;
+        nextPuller = 0;
+        readyPending = 0;
         for (unsigned i = 0; i < numInputs; i++)
-            pullers.append(*new RecordPullerThread(_grouped));
+            pullers.append(*new CRoxieThreadedConcatReader(ready, _grouped));
 
     }
 
@@ -12199,23 +12301,23 @@ public:
 
     virtual void start(unsigned parentExtractSize, const byte *parentExtract, bool paused)
     {
-        space.reinit(CONCAT_READAHEAD);
+        eof = false;
+        inGroup = false;
+        nextPuller = 0;
+        readyPending = 0;
         ready.reinit();
-        eofs = 0;
         CRoxieServerActivity::start(parentExtractSize, parentExtract, paused);
         ForEachItemIn(idx, pullers)
         {
-            pullers.item(idx).start(parentExtractSize, parentExtract, paused, ctx->concatPreload(), false, ctx);  
+            pullers.item(idx).start(parentExtractSize, parentExtract, paused, ctx);
             // NOTE - it is ok to start the thread running while parts of the subgraph are still being started, since everything 
             // in the part of the subgraph that the thread uses has been started.
             // Note that splitters are supposed to cope with being used when only some outputs have been started.
         }
     }
 
-
     virtual void stop(bool aborting)    
     {
-        space.interrupt();
         ready.interrupt();
         ForEachItemIn(idx, pullers)
             pullers.item(idx).stop(aborting);
@@ -12240,15 +12342,16 @@ public:
         CRoxieServerActivity::reset();
         ForEachItemIn(idx, pullers)
             pullers.item(idx).reset();
-        ForEachItemIn(idx1, buffer)
-            ReleaseRoxieRow(buffer.item(idx1));
-        buffer.clear();
+        eof = false;
+        inGroup = false;
+        nextPuller = 0;
+        readyPending = 0;
     }
 
     virtual void setInput(unsigned idx, IRoxieInput *_in)
     {
         if (pullers.isItem(idx))
-            pullers.item(idx).setInput(this, _in);
+            pullers.item(idx).setInput(_in);
         else
             throw MakeStringException(ROXIE_SET_INPUT, "Internal error: setInput() parameter out of bounds at %s(%d)", __FILE__, __LINE__); 
     }
@@ -12256,74 +12359,44 @@ public:
     virtual const void * nextInGroup()
     {
         ActivityTimer t(totalCycles, timeActivities, ctx->queryDebugContext());
+        if (eof)
+            return NULL;
+        bool anyActive = false;
         loop
         {
+            if (readyPending && !inGroup)
+                readyPending--;
+            else
+                ready.wait();
+            ForEachItemIn(unused_index, pullers)
             {
-                CriticalBlock b(crit);
-                if (eofs==numInputs && !buffer.ordinality())
-                    return NULL;  // eof
-            }
-            ready.wait();
-            const void *ret;
-            {
-                CriticalBlock b(crit);
-                ret = buffer.dequeue();
+                // NOTE - we round robin not just because it's more efficient, but because it ensures the preservation of grouping information
+                const void *ret;
+                bool fetched = pullers.item(nextPuller).peek(ret, anyActive);
+                if (fetched)
+                {
+                    inGroup = (ret != NULL);
+                    return ret;
+                }
+                if (inGroup && grouped)
+                {
+                    // Some other puller has data, but we can't consume it until the group we are reading is complete.
+                    readyPending++;
+                    anyActive = true;
+                    break;
+                }
+                nextPuller++;
+                if (nextPuller==pullers.ordinality())
+                    nextPuller = 0;
             }
-            if (ret)
-                processed++;
-            space.signal();
-            return ret;
-        }
-    }
-
-    virtual bool fireException(IException *e)
-    {
-        // called from puller thread on failure
-        ready.interrupt(LINK(e));
-        space.interrupt(e);
-        return true;
-    }
-
-    virtual void processRow(const void *row)
-    {
-        {
-            CriticalBlock b(crit);
-            buffer.enqueue(row);
-        }
-        ready.signal();
-        space.wait();
-    }
-
-    virtual void processGroup(const ConstPointerArray &rows)
-    {
-        // NOTE - a bit bizzare in that it waits for the space AFTER using it.
-        // But the space semaphore is only there to stop infinite readahead. And otherwise it would deadlock
-        // if group was larger than max(space)
-        {
-            CriticalBlock b(crit);
-            ForEachItemIn(idx, rows)
-                buffer.enqueue(rows.item(idx));
-            buffer.enqueue(NULL);
-        }
-        for (unsigned i2 = 0; i2 <= rows.length(); i2++) // note - does 1 extra for the null
-        {
-            ready.signal();
-            space.wait();
+            if (!anyActive)
+                break;
+            // A ready signal without anything being ready means someone reached end-of-file.
         }
+        eof = true;
+        return NULL;
     }
 
-    virtual void processEOG()
-    {
-        // Used when output is not grouped - just ignore
-    }
-
-    virtual void processDone()
-    {
-        CriticalBlock b(crit);
-        eofs++;
-        if (eofs == numInputs)
-            ready.signal();
-    }
 };
 
 class CRoxieServerOrderedConcatActivity : public CRoxieServerActivity

+ 3 - 0
testing/ecl/key/threadedConcat.xml

@@ -0,0 +1,3 @@
+<Dataset name='Result 1'>
+ <Row><Result_1>6000000</Result_1></Row>
+</Dataset>

+ 9 - 0
testing/ecl/key/threadedGroupedConcat.xml

@@ -0,0 +1,9 @@
+<Dataset name='Result 1'>
+ <Row><Result_1>24501</Result_1></Row>
+</Dataset>
+<Dataset name='Result 2'>
+ <Row><Result_2>24501</Result_2></Row>
+</Dataset>
+<Dataset name='Result 3'>
+ <Row><Result_3>24501</Result_3></Row>
+</Dataset>

+ 33 - 0
testing/ecl/threadedConcat.ecl

@@ -0,0 +1,33 @@
+/*##############################################################################
+
+    HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems.
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+############################################################################## */
+
+numRecords := 1000000;
+
+r1 := { unsigned id; };
+
+createIds(unsigned n, unsigned delta) := NOFOLD(DATASET(n, TRANSFORM(r1, SELF.id := COUNTER + delta)));
+
+x(unsigned i) := createIds(numRecords, i);
+
+combineDs(DATASET(r1) x1, DATASET(r1) x2, DATASET(r1) x3, DATASET(r1) x4, DATASET(r1) x5, DATASET(r1) x6) :=
+     x1 + x2 + x3 + x4 + x5 + x6;
+
+y(unsigned i) := combineDs(x(i), x(i+1000), x(i+2000), x(i+3000), x(i+4000), x(i+5000));
+
+//z(unsigned i) := combineDs(y(i), y(i+8), y(i+16), y(i+24), y(i+32), y(i+40));
+
+count(y(1));  // should be numRecords * 6

+ 47 - 0
testing/ecl/threadedGroupedConcat.ecl

@@ -0,0 +1,47 @@
+/*##############################################################################
+
+    HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems.
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+############################################################################## */
+
+// Testing that threaded concat preserves grouping information
+
+numRecords := 10000;
+
+r1 := { unsigned id; unsigned gs };
+
+createIds(unsigned n, unsigned groupsize) :=
+   GROUP(
+       DATASET(n,
+               TRANSFORM(r1, SELF.id := (COUNTER+groupsize-1) / groupsize, SELF.gs := groupsize)
+              ),
+       id);
+
+x(unsigned i) := createIds(numRecords, i);
+
+combineDs(GROUPED DATASET(r1) x1, GROUPED DATASET(r1) x2, GROUPED DATASET(r1) x3, GROUPED DATASET(r1) x4, GROUPED DATASET(r1) x5, GROUPED DATASET(r1) x6) :=
+     x1 + x2 + x3 + x4 + x5 + x6;
+
+combineDsU(GROUPED DATASET(r1) x1, GROUPED DATASET(r1) x2, GROUPED DATASET(r1) x3, GROUPED DATASET(r1) x4, GROUPED DATASET(r1) x5, GROUPED DATASET(r1) x6) :=
+     x1 & x2 & x3 & x4 & x5 & x6;
+
+
+y := GROUP(combineDs(x(1), x(2), x(3), x(4), x(5), x(6)));
+yU := GROUP(combineDsU(x(1), x(2), x(3), x(4), x(5), x(6)));
+
+//y;
+// These three counts should be the same
+count(dedup(yU, RECORD));
+count(dedup(y, RECORD));
+count(dedup(y, RECORD, ALL));