Explorar o código

Merge pull request #2925 from jakesmith/hashdedup-spill

gh-2675 - Implementation of spilling hash dedup

Reviewed-By: Gavin Halliday <gavin.halliday@lexisnexis.com>
Reviewed-By: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman %!s(int64=13) %!d(string=hai) anos
pai
achega
6f1c6fc8f6

A diferenza do arquivo foi suprimida porque é demasiado grande
+ 848 - 105
thorlcr/activities/hashdistrib/thhashdistribslave.cpp


+ 1 - 1
thorlcr/graph/thgraph.cpp

@@ -2591,7 +2591,7 @@ void CJobBase::runSubgraph(CGraphBase &graph, size32_t parentExtractSz, const by
     graph.executeSubGraph(parentExtractSz, parentExtract);
 }
 
-IEngineRowAllocator *CJobBase::getRowAllocator(IOutputMetaData * meta, unsigned activityId) const
+IEngineRowAllocator *CJobBase::getRowAllocator(IOutputMetaData * meta, unsigned activityId, roxiemem::RoxieHeapFlags flags) const
 {
     return thorAllocator->getRowAllocator(meta, activityId);
 }

+ 2 - 1
thorlcr/graph/thgraph.hpp

@@ -881,8 +881,9 @@ public:
         return LINK(allGraphs.find(gid));
     }
 
-    IEngineRowAllocator *getRowAllocator(IOutputMetaData * meta, unsigned activityId) const;
+    IEngineRowAllocator *getRowAllocator(IOutputMetaData * meta, unsigned activityId, roxiemem::RoxieHeapFlags flags=roxiemem::RHFnone) const;
     roxiemem::IRowManager *queryRowManager() const;
+    IThorAllocator *queryThorAllocator() const { return thorAllocator; }
     bool queryUseCheckpoints() const;
     const bool &queryPausing() const { return pausing; }
     const bool &queryResumed() const { return resumed; }

+ 11 - 1
thorlcr/shared/thor.hpp

@@ -33,15 +33,25 @@ typedef unsigned __int64 rowcount_t;
 #define RCUNBOUND RCMAX
 #define RCUNSET RCMAX
 typedef size32_t rowidx_t;
+#define RCIDXMAX ((rowidx_t)(size32_t)-1)
 #define RIPF ""
 
+// validate that type T doesn't truncate
 template <class T>
 inline rowcount_t validRC(T X)
 {
-    assertex(X == (rowcount_t)X);
+    if (X != (rowcount_t)X)
+        throw MakeStringException(0, "rowcount_t value truncation");
     return (rowcount_t)X;
 }
 
+template <class T>
+inline rowidx_t validRIDX(T X)
+{
+    if (X != (rowidx_t)X)
+        throw MakeStringException(0, "rowidx_t value truncation");
+    return (rowidx_t)X;
+}
 
 #if 0
     #define CATCHALL ...

+ 3 - 41
thorlcr/thorutil/thmem.cpp

@@ -1067,20 +1067,6 @@ IRowStream *CThorSpillableRowArray::createRowStream()
 
 class CThorRowCollectorBase : public CSimpleInterface, implements roxiemem::IBufferedRowCallback
 {
-    class CFileOwner : public CSimpleInterface, implements IInterface
-    {
-        IFile *iFile;
-    public:
-        IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
-        CFileOwner(IFile *_iFile) : iFile(_iFile)
-        {
-        }
-        ~CFileOwner()
-        {
-            iFile->remove();
-        }
-        IFile &queryIFile() const { return *iFile; }
-    };
 protected:
     CActivityBase &activity;
     CThorSpillableRowArray spillableRows;
@@ -1180,33 +1166,6 @@ protected:
         }
         ++outStreams;
 
-        class CStreamFileOwner : public CSimpleInterface, implements IExtRowStream
-        {
-            Linked<CFileOwner> fileOwner;
-            IExtRowStream *stream;
-        public:
-            IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
-            CStreamFileOwner(CFileOwner *_fileOwner, IExtRowStream *_stream) : fileOwner(_fileOwner)
-            {
-                stream = LINK(_stream);
-            }
-            ~CStreamFileOwner()
-            {
-                stream->Release();
-            }
-        // IExtRowStream
-            virtual const void *nextRow() { return stream->nextRow(); }
-            virtual void stop() { stream->stop(); }
-            virtual offset_t getOffset() { return stream->getOffset(); }
-            virtual void stop(CRC32 *crcout=NULL) { stream->stop(); }
-            virtual const void *prefetchRow(size32_t *sz=NULL) { return stream->prefetchRow(sz); }
-            virtual void prefetchDone() { stream->prefetchDone(); }
-            virtual void reinit(offset_t offset, offset_t len, unsigned __int64 maxRows)
-            {
-                stream->reinit(offset, len, maxRows);
-            }
-        };
-
         // NB: CStreamFileOwner, shares reference so CFileOwner, last usage, will auto delete file
         // which may be one of these streams of CThorRowCollectorBase itself
         IArrayOf<IRowStream> instrms;
@@ -1694,6 +1653,8 @@ public:
     {
         return rowManager;
     }
+    virtual roxiemem::RoxieHeapFlags queryFlags() const { return flags; }
+    virtual bool queryCrc() const { return false; }
 
 // IRowAllocatorCache
     virtual unsigned getActivityId(unsigned cacheId) const
@@ -1796,6 +1757,7 @@ public:
         allAllocators.append(*ret);
         return ret;
     }
+    virtual bool queryCrc() const { return true; }
 };
 
 

+ 9 - 3
thorlcr/thorutil/thmem.hpp

@@ -154,6 +154,8 @@ interface IThorAllocator : extends IInterface
 {
     virtual IEngineRowAllocator *getRowAllocator(IOutputMetaData * meta, unsigned activityId) const = 0;
     virtual roxiemem::IRowManager *queryRowManager() const = 0;
+    virtual roxiemem::RoxieHeapFlags queryFlags() const = 0;
+    virtual bool queryCrc() const = 0;
 };
 
 IThorAllocator *createThorAllocator(memsize_t memSize, bool crcChecking, bool usePacked);
@@ -206,6 +208,7 @@ graph_decl StringBuffer &getRecordString(const void *key, IOutputRowSerializer *
 #define SPILL_PRIORITY_HASHJOIN 10
 #define SPILL_PRIORITY_LARGESORT 10
 #define SPILL_PRIORITY_GROUPSORT 20
+#define SPILL_PRIORITY_HASHDEDUP 30
 #define SPILL_PRIORITY_OVERFLOWABLE_BUFFER SPILL_PRIORITY_DEFAULT
 #define SPILL_PRIORITY_SPILLABLE_STREAM SPILL_PRIORITY_DEFAULT
 #define SPILL_PRIORITY_RESULT SPILL_PRIORITY_DEFAULT
@@ -228,7 +231,7 @@ protected:
     bool allowNulls;
     StableSortFlag stableSort;
     rowidx_t maxRows;  // Number of rows that can fit in the allocated memory.
-    rowidx_t numRows;  // rows that have been added can only be updated by writing thread.
+    rowidx_t numRows;  // High water mark of rows added
 
     void init(rowidx_t initialSize, StableSortFlag stableSort);
     void **allocateStableTable(bool error); // allocates stable table based on std. ptr table
@@ -251,12 +254,12 @@ public:
     void setRow(rowidx_t idx, const void *row) // NB: takes ownership
     {
         OwnedConstThorRow _row = row;
-        assertex(idx < maxRows);
+        dbgassertex(idx < maxRows);
         const void *oldRow = rows[idx];
         if (oldRow)
             ReleaseThorRow(oldRow);
         rows[idx] = _row.getClear();
-        if (idx+1>numRows)
+        if (idx+1>numRows) // keeping high water mark
             numRows = idx+1;
     }
     inline bool append(const void *row) // NB: takes ownership on success
@@ -291,9 +294,12 @@ public:
             return NULL;
         const void *row = rows[i];
         rows[i] = NULL;
+        if (i == (numRows-1)) // keeping high water mark
+            --numRows;
         return row;
     }
     inline rowidx_t ordinality() const { return numRows; }
+    inline rowidx_t queryMaxRows() const { return maxRows; }
 
     inline const void **getRowArray() { return rows; }
     void swap(CThorExpandingRowArray &src);

+ 46 - 0
thorlcr/thorutil/thormisc.hpp

@@ -32,6 +32,7 @@
 #include "workunit.hpp"
 #include "eclhelper.hpp"
 #include "thexception.hpp"
+#include "thorcommon.hpp"
 #include "thor.hpp"
 
 
@@ -135,6 +136,51 @@ public:
     virtual bool action() = 0;
 };
 
+// simple class which takes ownership of the underlying file and deletes it on destruction
+class graph_decl CFileOwner : public CSimpleInterface, implements IInterface
+{
+    IFile *iFile;
+public:
+    IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
+    CFileOwner(IFile *_iFile) : iFile(_iFile)
+    {
+    }
+    ~CFileOwner()
+    {
+        iFile->remove();
+    }
+    IFile &queryIFile() const { return *iFile; }
+};
+
+// stream wrapper, that takes ownership of a CFileOwner
+class graph_decl CStreamFileOwner : public CSimpleInterface, implements IExtRowStream
+{
+    Linked<CFileOwner> fileOwner;
+    IExtRowStream *stream;
+public:
+    IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
+    CStreamFileOwner(CFileOwner *_fileOwner, IExtRowStream *_stream) : fileOwner(_fileOwner)
+    {
+        stream = LINK(_stream);
+    }
+    ~CStreamFileOwner()
+    {
+        stream->Release();
+    }
+// IExtRowStream
+    virtual const void *nextRow() { return stream->nextRow(); }
+    virtual void stop() { stream->stop(); }
+    virtual offset_t getOffset() { return stream->getOffset(); }
+    virtual void stop(CRC32 *crcout=NULL) { stream->stop(); }
+    virtual const void *prefetchRow(size32_t *sz=NULL) { return stream->prefetchRow(sz); }
+    virtual void prefetchDone() { stream->prefetchDone(); }
+    virtual void reinit(offset_t offset, offset_t len, unsigned __int64 maxRows)
+    {
+        stream->reinit(offset, len, maxRows);
+    }
+};
+
+
 #define DEFAULT_QUERYSO_LIMIT 10
 
 class graph_decl CFifoFileCache : public CSimpleInterface