Browse Source

HPCC-14825 Helper classes for managing multiple strands

Signed-off-by: Gavin Halliday <gavin.halliday@lexisnexis.com>
Gavin Halliday 9 years ago
parent
commit
40630e200e

+ 2 - 0
common/thorhelper/CMakeLists.txt

@@ -38,6 +38,7 @@ set (    SRCS
          thorsoapcall.cpp 
          thorstep.cpp 
          thorstep2.cpp 
+         thorstrand.cpp
          thortalgo.cpp 
          thortlex.cpp 
          thortparse.cpp 
@@ -57,6 +58,7 @@ set (    SRCS
          thorrparse.hpp 
          thorsoapcall.hpp 
          thorstep.hpp 
+         thorstrand.hpp
          thorxmlread.hpp 
          thorxmlwrite.hpp
          roxierow.hpp

File diff suppressed because it is too large
+ 1441 - 0
common/thorhelper/thorstrand.cpp


+ 187 - 0
common/thorhelper/thorstrand.hpp

@@ -0,0 +1,187 @@
+/*##############################################################################
+
+    HPCC SYSTEMS software Copyright (C) 2015 HPCC Systems®.
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+############################################################################## */
+
+#ifndef THORSTRAND_HPP
+#define THORSTRAND_HPP
+
+#include "jqueue.hpp"
+#include "thorhelper.hpp"
+#include "roxiestream.hpp"
+#include "roxiemem.hpp"
+
+class IStrandJunction : extends IInterface
+{
+public:
+    virtual IEngineRowStream * queryOutput(unsigned n) = 0;
+    virtual void setInput(unsigned n, IEngineRowStream * _stream) = 0;
+    virtual void ready() = 0;
+    virtual void reset() = 0;
+    virtual void abort() = 0;
+};
+
+inline void startJunction(IStrandJunction * junction) { if (junction) junction->ready(); }
+inline void resetJunction(IStrandJunction * junction) { if (junction) junction->reset(); }
+
+interface IManyToOneRowStream : extends IRowStream
+{
+public:
+    virtual IRowWriterEx * getWriter(unsigned n) = 0;
+    virtual void abort() = 0;
+};
+
+interface IStrandBranch : extends IInterface
+{
+    virtual IStrandJunction * queryInputJunction() = 0;
+    virtual IStrandJunction * queryOutputJunction() = 0;
+};
+
+extern THORHELPER_API IStrandJunction * createStrandJunction(roxiemem::IRowManager & _rowManager, unsigned numInputs, unsigned numOutputs, unsigned blockSize, bool isOrdered);
+extern THORHELPER_API IStrandBranch * createStrandBranch(roxiemem::IRowManager & _rowManager, unsigned numStrands, unsigned blockSize, bool isOrdered, bool isGrouped);
+extern THORHELPER_API void clearRowQueue(IRowQueue * queue);
+
+extern THORHELPER_API IManyToOneRowStream * createManyToOneRowStream(roxiemem::IRowManager & _rowManager, unsigned numInputs, unsigned blockSize, bool isOrdered);
+
+//---------------------------------------------------------------------------------------------------------------------
+
+class RowBlockAllocator;
+class THORHELPER_API RoxieRowBlock
+{
+public:
+    const static unsigned numDummyDynamicRows = 1;
+    explicit RoxieRowBlock(unsigned _maxRows) noexcept : maxRows(_maxRows)
+    {
+        readPos = 0;
+        writePos = 0;
+        endOfChunk = false;
+    }
+    ~RoxieRowBlock();
+
+    inline bool addRowNowFull(const void * row)
+    {
+        dbgassertex(writePos < maxRows);
+        rows[writePos] = row;
+        return (++writePos == maxRows);
+    }
+
+    bool empty() const;
+    IException * getClearException()
+    {
+         return exception.getClear();
+    }
+    inline bool isEndOfChunk() const { return endOfChunk; }
+    inline bool nextRow(const void * & row)
+    {
+        if (readPos >= writePos)
+            return false;
+        row = rows[readPos++];
+        return true;
+    }
+    inline size32_t numRows() const { return writePos - readPos; }
+
+    bool readFromStream(IRowStream * stream);
+    inline void releaseBlock()
+    {
+        //This function is called instead of directly calling delete in case a cache is introduced later.
+        delete this;
+    }
+    void releaseRows();
+
+    inline void setEndOfChunk() { endOfChunk = true; }
+    inline void setExceptionOwn(IException * e) { exception.setown(e); }
+
+    void throwAnyPendingException();
+
+    static void operator delete (void * ptr);
+
+protected:
+    Owned<IException> exception;
+    const size32_t maxRows;
+    size32_t readPos;
+    size32_t writePos;
+    bool endOfChunk;
+    const void * rows[numDummyDynamicRows];        // Actually multiple rows.  Memory is allocated by the RowBlockAllocator.
+};
+
+
+class THORHELPER_API RowBlockAllocator
+{
+public:
+    RowBlockAllocator(roxiemem::IRowManager & _rowManager, unsigned rowsPerBlock);
+    RoxieRowBlock * newBlock();
+
+    size32_t maxRowsPerBlock() const { return rowsPerBlock; }
+
+public:
+    size32_t rowsPerBlock;
+    Owned<roxiemem::IFixedRowHeap> heap;
+};
+
+
+//---------------------------------------------------------------------------------------------------------------------
+
+typedef IQueueOf<RoxieRowBlock *> IRowBlockQueue;
+
+
+//MORE:  This implementation should be improved!  Directly use the correct queue implementation??
+class CRowBlockQueue : implements CInterfaceOf<IRowBlockQueue>
+{
+public:
+    CRowBlockQueue(unsigned numReaders, unsigned numWriters, unsigned maxItems, unsigned maxSlots)
+    {
+        queue.setown(createRowQueue(numReaders, numWriters, maxItems, maxSlots));
+    }
+
+    virtual bool enqueue(RoxieRowBlock * const item)
+    {
+        return queue->enqueue(reinterpret_cast<const void *>(item));
+    }
+    virtual bool dequeue(RoxieRowBlock * & result)
+    {
+        const void * tempResult;
+        bool ok = queue->dequeue(tempResult);
+        result = const_cast<RoxieRowBlock *>(reinterpret_cast<const RoxieRowBlock *>(tempResult));
+        return ok;
+    }
+    virtual bool tryDequeue(RoxieRowBlock * & result)
+    {
+        const void * tempResult;
+        bool ok = queue->tryDequeue(tempResult);
+        result = const_cast<RoxieRowBlock *>(reinterpret_cast<const RoxieRowBlock *>(tempResult));
+        return ok;
+    }
+    virtual void reset()
+    {
+        queue->reset();
+    }
+    virtual void noteWriterStopped()
+    {
+        queue->noteWriterStopped();
+    }
+    virtual void abort()
+    {
+        queue->abort();
+    }
+
+private:
+    Owned<IRowQueue> queue;
+};
+
+
+
+
+
+#endif // THORSTRAND_HPP

+ 4 - 4
system/jlib/jatomic.hpp

@@ -32,11 +32,11 @@ inline static void spinPause() { __pause(); }
 # else
 inline static void spinPause() { _mm_pause(); }
 # endif
+#elif defined(_ARCH_PPC64EL_)
+inline static void spinPause() { } // MORE: Is there an equivalent?
+#elif defined(_ARCH_ARM64_)
+inline static void spinPause() { } // MORE: Is there an equivalent?
 #else
-// _ARCH_ARM64_ || _ARCH_ARM32_
-// inline static void spinPause() { __nop(); }
-// _ARCH_PPC64EL_
-// __asm__ __volatile__ ("or 0,0,0");
 inline static void spinPause() { }
 #endif
 

+ 6 - 0
system/jlib/jio.hpp

@@ -169,6 +169,12 @@ interface IRowWriter: extends IInterface
     virtual void flush() = 0;
 };
 
+interface IRowWriterEx : extends IRowWriter
+{
+public:
+    virtual void noteStopped() = 0;
+};
+
 interface IRowLinkCounter: extends IInterface
 {
     virtual void linkRow(const void *row)=0;

+ 32 - 0
system/jlib/jthread.cpp

@@ -715,6 +715,38 @@ void CAsyncFor::For(unsigned num,unsigned maxatonce,bool abortFollowingException
         throw e;
 }
 
+//---------------------------------------------------------------------------------------------------------------------
+
+class CSimpleFunctionThread : public Thread
+{
+    std::function<void()> func;
+public:
+    inline CSimpleFunctionThread(std::function<void()> _func) : Thread("TaskProcessor"), func(_func) { }
+    virtual int run()
+    {
+        func();
+        return 1;
+    }
+};
+
+void asyncStart(IThreaded & threaded)
+{
+    CThreaded * thread = new CThreaded("AsyncStart", &threaded);
+    thread->startRelease();
+}
+
+void asyncStart(const char * name, IThreaded & threaded)
+{
+    CThreaded * thread = new CThreaded(name, &threaded);
+    thread->startRelease();
+}
+
+//Experimental - is this a useful function to replace some uses of IThreaded?
+void asyncStart(std::function<void()> func)
+{
+    (new CSimpleFunctionThread(func))->startRelease();
+}
+
 // ---------------------------------------------------------------------------
 // Thread Pools
 // ---------------------------------------------------------------------------

+ 7 - 0
system/jlib/jthread.hpp

@@ -24,6 +24,7 @@
 #include "jmutex.hpp"
 #include "jexcept.hpp"
 #include "jhash.hpp"
+#include <functional>
 
 #ifdef _WIN32
 #define DEFAULT_THREAD_PRIORITY THREAD_PRIORITY_NORMAL
@@ -153,6 +154,12 @@ public:
     virtual int run() { owner->main(); return 1; }
 };
 
+extern jlib_decl void asyncStart(IThreaded & threaded);
+extern jlib_decl void asyncStart(const char * name, IThreaded & threaded);
+#if defined(__cplusplus) and __cplusplus >= 201100
+extern jlib_decl void asyncStart(std::function<void()> func);
+#endif
+
 // Similar to above, but the underlying thread always remains running. This can make repeated start + join's significantly quicker
 class jlib_decl CThreadedPersistent : public CInterface
 {

+ 2 - 7
testing/unittests/jlibtests.cpp

@@ -528,6 +528,7 @@ CPPUNIT_TEST_SUITE_REGISTRATION( JlibStringBufferTest );
 CPPUNIT_TEST_SUITE_NAMED_REGISTRATION( JlibStringBufferTest, "JlibStringBufferTest" );
 
 
+
 /* =========================================================== */
 
 static const unsigned split4_2[] = {0, 2, 4 };
@@ -617,7 +618,6 @@ public:
 CPPUNIT_TEST_SUITE_REGISTRATION( JlibQuantileTest );
 CPPUNIT_TEST_SUITE_NAMED_REGISTRATION( JlibQuantileTest, "JlibQuantileTest" );
 
-
 /* =========================================================== */
 
 class JlibReaderWriterTest : public CppUnit::TestFixture
@@ -705,13 +705,12 @@ class JlibReaderWriterTest : public CppUnit::TestFixture
                     work = spinCalculation(work, workScale);
                 target.enqueue(buffer + i);
             }
+            target.noteWriterStopped();
             doneSem.signal();
             return 0;
         }
     };
 public:
-    JlibReaderWriterTest() { unitWorkTimeMs = 0; }
-
     const static size_t bufferSize = 0x100000;//0x100000*64;
     void testQueue(IRowQueue & queue, unsigned numProducers, unsigned numConsumers, unsigned queueElements, unsigned readerWork, unsigned writerWork)
     {
@@ -746,10 +745,6 @@ public:
         for (unsigned i7 = 0; i7 < numProducers; i7++)
             writerDoneSem.wait();
 
-        //Now add NULL records to the queue so the consumers know to terminate
-        for (unsigned i8 = 0; i8 < numConsumers; i8++)
-            queue.enqueue(NULL);
-
         //Wait for the readers to complete
         for (unsigned i3 = 0; i3 < numConsumers; i3++)
             stopSem.wait();