瀏覽代碼

gh-2675 - Implementation of spilling hash dedup

Allow DEDUP,HASH to deal with datasets larger than memory.

General idea is to spill to buckets when memory full, then process each
bucket in turn, creating more buckets if still too much to deal with.
The impl. streams rows from the input, creating HT entries based on the keys
it's dedupping on. The 1st unmatched row is output, the rest are dedupped.

Each phase creates N in memory buckets. Rows are streamed into the buckets,
the 1st row added is known to be unique and is returned as output from
hashdeup. On a spill request, the bucket is flushed to disk, to be used by
the next phase, rows are still added to the HT, so that the rest of the
stream can have some chance of dedupping, but no rows will be returned as
output. On future spill requests to an already spilled bucket, the HT will
be discarded.

Once all have been filtered/spilt by the existing HT, Any reamining HT is discarded and the buckets are read as input, and the process repeated.

The # of buckets is estimated if possible, based on the # of inputs rows.
The 1st round the # of rows may not be available (unless in meta).
On subsequent rounds (reading back buckets), the total # of rows + # of rows it reached at the spill point are known and are used as the basis for # of buckets in the next round.

Signed-off-by: Jake Smith <jake.smith@lexisnexis.com>
Jake Smith 13 年之前
父節點
當前提交
8473466da4

File diff suppressed because it is too large
+ 848 - 105
thorlcr/activities/hashdistrib/thhashdistribslave.cpp


+ 1 - 1
thorlcr/graph/thgraph.cpp

@@ -2591,7 +2591,7 @@ void CJobBase::runSubgraph(CGraphBase &graph, size32_t parentExtractSz, const by
     graph.executeSubGraph(parentExtractSz, parentExtract);
 }
 
-IEngineRowAllocator *CJobBase::getRowAllocator(IOutputMetaData * meta, unsigned activityId) const
+IEngineRowAllocator *CJobBase::getRowAllocator(IOutputMetaData * meta, unsigned activityId, roxiemem::RoxieHeapFlags flags) const
 {
     return thorAllocator->getRowAllocator(meta, activityId);
 }

+ 2 - 1
thorlcr/graph/thgraph.hpp

@@ -881,8 +881,9 @@ public:
         return LINK(allGraphs.find(gid));
     }
 
-    IEngineRowAllocator *getRowAllocator(IOutputMetaData * meta, unsigned activityId) const;
+    IEngineRowAllocator *getRowAllocator(IOutputMetaData * meta, unsigned activityId, roxiemem::RoxieHeapFlags flags=roxiemem::RHFnone) const;
     roxiemem::IRowManager *queryRowManager() const;
+    IThorAllocator *queryThorAllocator() const { return thorAllocator; }
     bool queryUseCheckpoints() const;
     const bool &queryPausing() const { return pausing; }
     const bool &queryResumed() const { return resumed; }

+ 11 - 1
thorlcr/shared/thor.hpp

@@ -33,15 +33,25 @@ typedef unsigned __int64 rowcount_t;
 #define RCUNBOUND RCMAX
 #define RCUNSET RCMAX
 typedef size32_t rowidx_t;
+#define RCIDXMAX ((rowidx_t)(size32_t)-1)
 #define RIPF ""
 
+// validate that type T doesn't truncate
 template <class T>
 inline rowcount_t validRC(T X)
 {
-    assertex(X == (rowcount_t)X);
+    if (X != (rowcount_t)X)
+        throw MakeStringException(0, "rowcount_t value truncation");
     return (rowcount_t)X;
 }
 
+template <class T>
+inline rowidx_t validRIDX(T X)
+{
+    if (X != (rowidx_t)X)
+        throw MakeStringException(0, "rowidx_t value truncation");
+    return (rowidx_t)X;
+}
 
 #if 0
     #define CATCHALL ...

+ 3 - 41
thorlcr/thorutil/thmem.cpp

@@ -1067,20 +1067,6 @@ IRowStream *CThorSpillableRowArray::createRowStream()
 
 class CThorRowCollectorBase : public CSimpleInterface, implements roxiemem::IBufferedRowCallback
 {
-    class CFileOwner : public CSimpleInterface, implements IInterface
-    {
-        IFile *iFile;
-    public:
-        IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
-        CFileOwner(IFile *_iFile) : iFile(_iFile)
-        {
-        }
-        ~CFileOwner()
-        {
-            iFile->remove();
-        }
-        IFile &queryIFile() const { return *iFile; }
-    };
 protected:
     CActivityBase &activity;
     CThorSpillableRowArray spillableRows;
@@ -1180,33 +1166,6 @@ protected:
         }
         ++outStreams;
 
-        class CStreamFileOwner : public CSimpleInterface, implements IExtRowStream
-        {
-            Linked<CFileOwner> fileOwner;
-            IExtRowStream *stream;
-        public:
-            IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
-            CStreamFileOwner(CFileOwner *_fileOwner, IExtRowStream *_stream) : fileOwner(_fileOwner)
-            {
-                stream = LINK(_stream);
-            }
-            ~CStreamFileOwner()
-            {
-                stream->Release();
-            }
-        // IExtRowStream
-            virtual const void *nextRow() { return stream->nextRow(); }
-            virtual void stop() { stream->stop(); }
-            virtual offset_t getOffset() { return stream->getOffset(); }
-            virtual void stop(CRC32 *crcout=NULL) { stream->stop(); }
-            virtual const void *prefetchRow(size32_t *sz=NULL) { return stream->prefetchRow(sz); }
-            virtual void prefetchDone() { stream->prefetchDone(); }
-            virtual void reinit(offset_t offset, offset_t len, unsigned __int64 maxRows)
-            {
-                stream->reinit(offset, len, maxRows);
-            }
-        };
-
         // NB: CStreamFileOwner, shares reference so CFileOwner, last usage, will auto delete file
         // which may be one of these streams of CThorRowCollectorBase itself
         IArrayOf<IRowStream> instrms;
@@ -1694,6 +1653,8 @@ public:
     {
         return rowManager;
     }
+    virtual roxiemem::RoxieHeapFlags queryFlags() const { return flags; }
+    virtual bool queryCrc() const { return false; }
 
 // IRowAllocatorCache
     virtual unsigned getActivityId(unsigned cacheId) const
@@ -1796,6 +1757,7 @@ public:
         allAllocators.append(*ret);
         return ret;
     }
+    virtual bool queryCrc() const { return true; }
 };
 
 

+ 9 - 3
thorlcr/thorutil/thmem.hpp

@@ -154,6 +154,8 @@ interface IThorAllocator : extends IInterface
 {
     virtual IEngineRowAllocator *getRowAllocator(IOutputMetaData * meta, unsigned activityId) const = 0;
     virtual roxiemem::IRowManager *queryRowManager() const = 0;
+    virtual roxiemem::RoxieHeapFlags queryFlags() const = 0;
+    virtual bool queryCrc() const = 0;
 };
 
 IThorAllocator *createThorAllocator(memsize_t memSize, bool crcChecking, bool usePacked);
@@ -206,6 +208,7 @@ graph_decl StringBuffer &getRecordString(const void *key, IOutputRowSerializer *
 #define SPILL_PRIORITY_HASHJOIN 10
 #define SPILL_PRIORITY_LARGESORT 10
 #define SPILL_PRIORITY_GROUPSORT 20
+#define SPILL_PRIORITY_HASHDEDUP 30
 #define SPILL_PRIORITY_OVERFLOWABLE_BUFFER SPILL_PRIORITY_DEFAULT
 #define SPILL_PRIORITY_SPILLABLE_STREAM SPILL_PRIORITY_DEFAULT
 #define SPILL_PRIORITY_RESULT SPILL_PRIORITY_DEFAULT
@@ -228,7 +231,7 @@ protected:
     bool allowNulls;
     StableSortFlag stableSort;
     rowidx_t maxRows;  // Number of rows that can fit in the allocated memory.
-    rowidx_t numRows;  // rows that have been added can only be updated by writing thread.
+    rowidx_t numRows;  // High water mark of rows added
 
     void init(rowidx_t initialSize, StableSortFlag stableSort);
     void **allocateStableTable(bool error); // allocates stable table based on std. ptr table
@@ -251,12 +254,12 @@ public:
     void setRow(rowidx_t idx, const void *row) // NB: takes ownership
     {
         OwnedConstThorRow _row = row;
-        assertex(idx < maxRows);
+        dbgassertex(idx < maxRows);
         const void *oldRow = rows[idx];
         if (oldRow)
             ReleaseThorRow(oldRow);
         rows[idx] = _row.getClear();
-        if (idx+1>numRows)
+        if (idx+1>numRows) // keeping high water mark
             numRows = idx+1;
     }
     inline bool append(const void *row) // NB: takes ownership on success
@@ -291,9 +294,12 @@ public:
             return NULL;
         const void *row = rows[i];
         rows[i] = NULL;
+        if (i == (numRows-1)) // keeping high water mark
+            --numRows;
         return row;
     }
     inline rowidx_t ordinality() const { return numRows; }
+    inline rowidx_t queryMaxRows() const { return maxRows; }
 
     inline const void **getRowArray() { return rows; }
     void swap(CThorExpandingRowArray &src);

+ 46 - 0
thorlcr/thorutil/thormisc.hpp

@@ -32,6 +32,7 @@
 #include "workunit.hpp"
 #include "eclhelper.hpp"
 #include "thexception.hpp"
+#include "thorcommon.hpp"
 #include "thor.hpp"
 
 
@@ -135,6 +136,51 @@ public:
     virtual bool action() = 0;
 };
 
+// simple class which takes ownership of the underlying file and deletes it on destruction
+class graph_decl CFileOwner : public CSimpleInterface, implements IInterface
+{
+    IFile *iFile;
+public:
+    IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
+    CFileOwner(IFile *_iFile) : iFile(_iFile)
+    {
+    }
+    ~CFileOwner()
+    {
+        iFile->remove();
+    }
+    IFile &queryIFile() const { return *iFile; }
+};
+
+// stream wrapper, that takes ownership of a CFileOwner
+class graph_decl CStreamFileOwner : public CSimpleInterface, implements IExtRowStream
+{
+    Linked<CFileOwner> fileOwner;
+    IExtRowStream *stream;
+public:
+    IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
+    CStreamFileOwner(CFileOwner *_fileOwner, IExtRowStream *_stream) : fileOwner(_fileOwner)
+    {
+        stream = LINK(_stream);
+    }
+    ~CStreamFileOwner()
+    {
+        stream->Release();
+    }
+// IExtRowStream
+    virtual const void *nextRow() { return stream->nextRow(); }
+    virtual void stop() { stream->stop(); }
+    virtual offset_t getOffset() { return stream->getOffset(); }
+    virtual void stop(CRC32 *crcout=NULL) { stream->stop(); }
+    virtual const void *prefetchRow(size32_t *sz=NULL) { return stream->prefetchRow(sz); }
+    virtual void prefetchDone() { stream->prefetchDone(); }
+    virtual void reinit(offset_t offset, offset_t len, unsigned __int64 maxRows)
+    {
+        stream->reinit(offset, len, maxRows);
+    }
+};
+
+
 #define DEFAULT_QUERYSO_LIMIT 10
 
 class graph_decl CFifoFileCache : public CSimpleInterface