소스 검색

HPCC-14630 Speed up group sort by only checking option once

Signed-off-by: Gavin Halliday <gavin.halliday@lexisnexis.com>
Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Gavin Halliday 9 년 전
부모
커밋
3a5be38215

+ 3 - 1
thorlcr/activities/filter/thfilterslave.cpp

@@ -242,6 +242,7 @@ class CFilterGroupSlaveActivity : public CFilterSlaveActivityBase, public CThorS
     IHThorFilterGroupArg *helper;
     Owned<IThorRowLoader> groupLoader;
     Owned<IRowStream> groupStream;
+    bool compressSpills;
 
 public:
     IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
@@ -249,6 +250,7 @@ public:
     CFilterGroupSlaveActivity(CGraphElementBase *container) : CFilterSlaveActivityBase(container), CThorSteppable(this)
     {
         groupLoader.setown(createThorRowLoader(*this, NULL, stableSort_none, rc_allMem));
+        compressSpills = getOptBool(THOROPT_COMPRESS_SPILLS, true);
     }
     void init(MemoryBuffer &data, MemoryBuffer &slaveData)
     {
@@ -286,7 +288,7 @@ public:
                 {
                     CThorSpillableRowArray spillableRows(*this, this);
                     spillableRows.transferFrom(rows);
-                    groupStream.setown(spillableRows.createRowStream());
+                    groupStream.setown(spillableRows.createRowStream(SPILL_PRIORITY_SPILLABLE_STREAM, compressSpills));
                 }
                 // else read next group
             }

+ 3 - 1
thorlcr/activities/lookupjoin/thlookupjoinslave.cpp

@@ -1402,6 +1402,7 @@ protected:
     // Handling failover to a) hashed local lookupjoin b) hash distributed standard join
     bool smart;
     bool rhsCollated;
+    bool compressSpills;
     Owned<IHashDistributor> lhsDistributor, rhsDistributor;
     ICompare *compareLeft;
     UnsignedArray flushedRowMarkers;
@@ -1758,7 +1759,7 @@ protected:
                             /* NB: will kill array when stream exhausted or if spilt
                              * Ensure spill priority of these spillable streams is lower than the stream in the loader in the next stage
                              */
-                            rightStreams.append(*rows.createRowStream()); // NB: default SPILL_PRIORITY_SPILLABLE_STREAM is lower than SPILL_PRIORITY_LOOKUPJOIN
+                            rightStreams.append(*rows.createRowStream(SPILL_PRIORITY_SPILLABLE_STREAM, compressSpills)); // NB: default SPILL_PRIORITY_SPILLABLE_STREAM is lower than SPILL_PRIORITY_LOOKUPJOIN
                         }
                     }
                     // NB: 'right' deliberately added after rhsNodeRow streams, so that rhsNodeRow can be consumed into loader 1st
@@ -1967,6 +1968,7 @@ public:
                 break;
         }
         overflowWriteCount = 0;
+        compressSpills = getOptBool(THOROPT_COMPRESS_SPILLS, true);
         ActPrintLog("Smart join = %s", smart?"true":"false");
     }
     bool exceedsLimit(rowidx_t count, const void *left, const void *right, const void *&failRow)

+ 4 - 3
thorlcr/msort/tsorts.cpp

@@ -440,18 +440,19 @@ public:
         size32_t blksize = 0x100000;
 
         // JCSMORE - at the moment, the localsort set is already sorted
+        bool compressSpills = activity.getOptBool(THOROPT_COMPRESS_SPILLS, true);
         if (1 == activity.queryJob().querySlaves())
         {
             CThorSpillableRowArray spillableRows(activity, &rowIf);
             rowCount = localRows.ordinality();
             spillableRows.transferFrom(localRows);
-            return spillableRows.createRowStream();
+            return spillableRows.createRowStream(SPILL_PRIORITY_SPILLABLE_STREAM, compressSpills);
         }
         if (partNo)
         {
             CThorSpillableRowArray spillableRows(activity, &rowIf);
             spillableRows.transferFrom(localRows);
-            Owned<IRowStream> spillableStream = spillableRows.createRowStream();
+            Owned<IRowStream> spillableStream = spillableRows.createRowStream(SPILL_PRIORITY_SPILLABLE_STREAM, compressSpills);
 
             CMessageBuffer mb;
             loop
@@ -552,7 +553,7 @@ public:
             rowCount = globalRows.ordinality();
             CThorSpillableRowArray spillableRows(activity, &rowIf);
             spillableRows.transferFrom(globalRows);
-            return spillableRows.createRowStream();
+            return spillableRows.createRowStream(SPILL_PRIORITY_SPILLABLE_STREAM, compressSpills);
         }
     }
 };

+ 9 - 7
thorlcr/thorutil/thmem.cpp

@@ -348,10 +348,10 @@ class CSpillableStream : public CSpillableStreamBase, implements IRowStream
 public:
     IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
 
-    CSpillableStream(CActivityBase &_activity, CThorSpillableRowArray &inRows, IRowInterfaces *_rowIf, bool _preserveNulls, unsigned _spillPriority)
+    CSpillableStream(CActivityBase &_activity, CThorSpillableRowArray &inRows, IRowInterfaces *_rowIf, bool _preserveNulls, unsigned _spillPriority, bool _compressSpills)
         : CSpillableStreamBase(_activity, inRows, _rowIf, _preserveNulls, _spillPriority)
     {
-        useCompression = activity.getOptBool(THOROPT_COMPRESS_SPILLS, true);
+        useCompression = _compressSpills;
         pos = numReadRows = 0;
         granularity = 500; // JCSMORE - rows
 
@@ -1423,10 +1423,10 @@ void CThorSpillableRowArray::transferRowsCopy(const void **outRows, bool takeOwn
     }
 }
 
-IRowStream *CThorSpillableRowArray::createRowStream(unsigned spillPriority)
+IRowStream *CThorSpillableRowArray::createRowStream(unsigned spillPriority, bool compressSpills)
 {
     assertex(rowIf);
-    return new CSpillableStream(activity, *this, rowIf, allowNulls, spillPriority);
+    return new CSpillableStream(activity, *this, rowIf, allowNulls, spillPriority, compressSpills);
 }
 
 
@@ -1452,6 +1452,7 @@ protected:
     bool mmRegistered;
     Owned<CSharedSpillableRowSet> spillableRowSet;
     unsigned options;
+    bool compressSpills;
 
     bool spillRows()
     {
@@ -1474,7 +1475,7 @@ protected:
         GetTempName(tempName, tempPrefix.str(), true);
         Owned<IFile> iFile = createIFile(tempName.str());
         VStringBuffer spillPrefixStr("RowCollector(%d)", spillPriority);
-        spillableRows.save(*iFile, activity.getOptBool(THOROPT_COMPRESS_SPILLS, true), spillPrefixStr.str()); // saves committed rows
+        spillableRows.save(*iFile, compressSpills, spillPrefixStr.str()); // saves committed rows
         spillFiles.append(new CFileOwner(iFile.getLink()));
         ++overflowCount;
 
@@ -1553,7 +1554,7 @@ protected:
         // NB: CStreamFileOwner links CFileOwner - last usage will auto delete file
         // which may be one of these streams or CThorRowCollectorBase itself
         unsigned rwFlags = DEFAULT_RWFLAGS;
-        if (activity.getOptBool(THOROPT_COMPRESS_SPILLS, true))
+        if (compressSpills)
             rwFlags |= rw_compress;
         if (preserveGrouping)
             rwFlags |= rw_grouped;
@@ -1592,7 +1593,7 @@ protected:
                     return NULL;
                 }
                 if (!shared)
-                    instrms.append(*spillableRows.createRowStream(spillPriority)); // NB: stream will take ownership of rows in spillableRows
+                    instrms.append(*spillableRows.createRowStream(spillPriority, compressSpills)); // NB: stream will take ownership of rows in spillableRows
                 else
                 {
                     spillableRowSet.setown(new CSharedSpillableRowSet(activity, spillableRows, rowIf, preserveGrouping, spillPriority));
@@ -1659,6 +1660,7 @@ public:
         maxCores = activity.queryMaxCores();
         options = 0;
         spillableRows.setup(rowIf, false, stableSort);
+        compressSpills = activity.getOptBool(THOROPT_COMPRESS_SPILLS, true);
     }
     ~CThorRowCollectorBase()
     {

+ 1 - 1
thorlcr/thorutil/thmem.hpp

@@ -474,7 +474,7 @@ public:
     void transferFrom(CThorExpandingRowArray &src);
     void transferFrom(CThorSpillableRowArray &src);
 
-    IRowStream *createRowStream(unsigned spillPriority=SPILL_PRIORITY_SPILLABLE_STREAM);
+    IRowStream *createRowStream(unsigned spillPriority, bool compressSpills);
 
     offset_t serializedSize()
     {