Prechádzať zdrojové kódy

Merge pull request #6267 from ghalliday/issue11937_4

HPCC-11937 Fix race condition in smart join

Reviewed-By: Jamie Noss <james.noss@lexisnexis.com>
Reviewed-By: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 11 rokov pred
rodič
commit
c11ea30a71

+ 3 - 1
thorlcr/activities/lookupjoin/thlookupjoinslave.cpp

@@ -1214,7 +1214,8 @@ public:
     virtual bool addLocalRHSRow(CThorSpillableRowArray &localRhsRows, const void *row)
     {
         LinkThorRow(row);
-        localRhsRows.append(row);
+        if (!localRhsRows.append(row))
+            throw MakeActivityException(this, 0, "Out of memory: Cannot append local rhs row");
         return true;
     }
 // ISmartBufferNotify
@@ -1337,6 +1338,7 @@ protected:
     inline void setBroadcastingSpilt(bool tf) { atomic_set(&spiltBroadcastingRHS, (int)tf); }
     rowidx_t clearNonLocalRows(CThorSpillableRowArray &rows, rowidx_t startPos)
     {
+        CThorArrayLockBlock block(rows);
         rowidx_t clearedRows = 0;
         rowidx_t numRows = rows.numCommitted();
         for (rowidx_t r=startPos; r<numRows; r++)

+ 1 - 3
thorlcr/thorutil/thbuf.cpp

@@ -1687,9 +1687,7 @@ public:
                             eos = true;
                             return NULL;
                         }
-                        const void **toRead = rows.getBlock(rowsToRead);
-                        memcpy(readRows, toRead, rowsToRead * sizeof(void *));
-                        rows.noteSpilled(rowsToRead);
+                        rows.readBlock(readRows, rowsToRead);
                         rowPos = 0;
                         if (writersBlocked)
                         {

+ 14 - 6
thorlcr/thorutil/thmem.cpp

@@ -195,7 +195,6 @@ protected:
         spillFile.setown(createIFile(tempname.str()));
 
         rows.save(*spillFile, useCompression); // saves committed rows
-        rows.noteSpilled(numRows);
         return true;
     }
 
@@ -365,9 +364,7 @@ public:
             if (fetch >= granularity)
                 fetch = granularity;
             // consume 'fetch' rows
-            const void **toRead = rows.getBlock(fetch);
-            memcpy(readRows, toRead, fetch * sizeof(void *));
-            rows.noteSpilled(fetch);
+            rows.readBlock(readRows, fetch);
             numReadRows = fetch;
             pos = 0;
         }
@@ -1148,6 +1145,7 @@ void CThorSpillableRowArray::clearRows()
 
 void CThorSpillableRowArray::compact()
 {
+    CThorArrayLockBlock block(*this);
     assertex(0 == firstRow && numRows == commitRows);
     CThorExpandingRowArray::compact();
     commitRows = numRows;
@@ -1234,6 +1232,7 @@ rowidx_t CThorSpillableRowArray::save(IFile &iFile, bool useCompression)
         rows[i] = NULL;
     }
     writer->flush();
+    firstRow += n;
     offset_t bytesWritten = writer->getPosition();
     writer.clear();
     ActPrintLog(&activity, "CThorSpillableRowArray::save done, bytes = %"I64F"d", (__int64)bytesWritten);
@@ -1308,12 +1307,21 @@ void CThorSpillableRowArray::swap(CThorSpillableRowArray &other)
     commitRows = otherCommitRows;
 }
 
+void CThorSpillableRowArray::readBlock(const void **outRows, rowidx_t readRows)
+{
+    CThorArrayLockBlock block(*this);
+    dbgassertex(firstRow + readRows <= commitRows);
+    memcpy(outRows, rows + firstRow, readRows*sizeof(void *));
+    firstRow += readRows;
+}
+
 void CThorSpillableRowArray::transferRowsCopy(const void **outRows, bool takeOwnership)
 {
+    CThorArrayLockBlock block(*this);
     if (0 == numRows)
         return;
     assertex(numRows == commitRows);
-    memcpy(outRows, rows, numRows*sizeof(void **));
+    memcpy(outRows, rows, numRows*sizeof(void *));
     if (takeOwnership)
         firstRow = commitRows = numRows = 0;
     else
@@ -1360,6 +1368,7 @@ protected:
 
     bool spillRows()
     {
+        //This must only be called while a lock is held on spillableRows()
         rowidx_t numRows = spillableRows.numCommitted();
         if (numRows == 0)
             return false;
@@ -1378,7 +1387,6 @@ protected:
         Owned<IFile> iFile = createIFile(tempname.str());
         spillFiles.append(new CFileOwner(iFile.getLink()));
         spillableRows.save(*iFile, activity.getOptBool(THOROPT_COMPRESS_SPILLS, true)); // saves committed rows
-        spillableRows.noteSpilled(numRows);
 
         ++overflowCount;
 

+ 10 - 15
thorlcr/thorutil/thmem.hpp

@@ -408,11 +408,11 @@ public:
     void unregisterWriteCallback(IWritePosCallback &cb);
     inline void setAllowNulls(bool b) { CThorExpandingRowArray::setAllowNulls(b); }
     void kill();
-    void clearRows();
     void compact();
     void flush();
-    inline bool append(const void *row)
+    inline bool append(const void *row) __attribute__((warn_unused_result))
     {
+        //GH->JCS Should this really be inline?
         assertex(row || allowNulls);
         if (numRows >= maxRows)
         {
@@ -430,36 +430,26 @@ public:
     }
     bool appendRows(CThorExpandingRowArray &inRows, bool takeOwnership);
 
-    //The following can be accessed from the reader without any need to lock
+    //The following must either be accessed within a lock, or when no rows can be appended,
+    //(otherwise flush() might move all the rows, invalidating the indexes - or for query() the row)
     inline const void *query(rowidx_t i) const
     {
-        CThorArrayLockBlock block(*this);
         return CThorExpandingRowArray::query(i);
     }
     inline const void *get(rowidx_t i) const
     {
-        CThorArrayLockBlock block(*this);
         return CThorExpandingRowArray::get(i);
     }
     inline const void *getClear(rowidx_t i)
     {
-        CThorArrayLockBlock block(*this);
         return CThorExpandingRowArray::getClear(i);
     }
 
     //A thread calling the following functions must own the lock, or guarantee no other thread will access
     void sort(ICompare & compare, unsigned maxcores);
     rowidx_t save(IFile &file, bool useCompression);
-    const void **getBlock(rowidx_t readRows);
-    inline void noteSpilled(rowidx_t spilledRows)
-    {
-        firstRow += spilledRows;
-    }
-
-    //The block returned is only valid until the critical section is released
 
-    inline rowidx_t firstCommitted() const { return firstRow; }
-    inline rowidx_t numCommitted() const { return commitRows - firstRow; }
+    inline rowidx_t numCommitted() const { return commitRows - firstRow; } //MORE::Not convinced this is very safe!
 
 // access to
     void swap(CThorSpillableRowArray &src);
@@ -489,11 +479,16 @@ public:
     void deserializeRow(IRowDeserializerSource &in) { CThorExpandingRowArray::deserializeRow(in); }
     bool ensure(rowidx_t requiredRows) { return CThorExpandingRowArray::ensure(requiredRows); }
     void transferRowsCopy(const void **outRows, bool takeOwnership);
+    void readBlock(const void **outRows, rowidx_t readRows);
 
     virtual IThorArrayLock &queryLock() { return *this; }
 // IThorArrayLock
     virtual void lock() const { cs.enter(); }
     virtual void unlock() const { cs.leave(); }
+
+private:
+    void clearRows();
+    const void **getBlock(rowidx_t readRows);
 };