Преглед на файлове

HPCC-15829 Race condition spilling result stream after read

A small window existed where a result stream could have been
fully read, then spilt, then nextRow() called again.
Under those circumstances, the spill would not setup the stream
read offset and the result stream was not marked as having been
completely ready (at eos). As a result an attempt to read the
spill stream was made, but an assert was hit.

Signed-off-by: Jake Smith <jake.smith@lexisnexis.com>
Jake Smith преди 9 години
родител
ревизия
7bc1971fb4
променени са 1 файла, в които са добавени 47 реда и са изтрити 25 реда
  1. 47 25
      thorlcr/thorutil/thmem.cpp

+ 47 - 25
thorlcr/thorutil/thmem.cpp

@@ -267,18 +267,19 @@ class CSharedSpillableRowSet : public CSpillableStreamBase, implements IInterfac
 {
     class CStream : public CSimpleInterface, implements IRowStream, implements IWritePosCallback
     {
-        rowidx_t pos;
-        offset_t outputOffset;
+        rowidx_t pos = 0;
+        offset_t outputOffset = (offset_t)-1;
         Owned<IRowStream> spillStream;
         Linked<CSharedSpillableRowSet> owner;
+        rowidx_t toRead = 0;
+        bool eos = false;
     public:
         IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
 
-        CStream(CSharedSpillableRowSet &_owner) : owner(&_owner)
+        CStream(CSharedSpillableRowSet &_owner, rowidx_t _toRead) : owner(&_owner), toRead(_toRead)
         {
-            pos = 0;
-            outputOffset = (offset_t)-1;
-            owner->rows.registerWriteCallback(*this); // NB: CStream constructor called within rows lock
+            // NB: CStream constructor called within rows lock and only called if not yet spilled
+            owner->rows.registerWriteCallback(*this);
         }
         ~CStream()
         {
@@ -289,26 +290,43 @@ class CSharedSpillableRowSet : public CSpillableStreamBase, implements IInterfac
     // IRowStream
         virtual const void *nextRow()
         {
-            if (spillStream)
-                return spillStream->nextRow();
-            CRowsLockBlock block(*owner);
-            if (owner->spillFile) // i.e. has spilt
-            {
-                block.clearCB = true;
-                assertex(((offset_t)-1) != outputOffset);
-                unsigned rwFlags = DEFAULT_RWFLAGS;
-                if (owner->preserveNulls)
-                    rwFlags |= rw_grouped;
-                spillStream.setown(::createRowStreamEx(owner->spillFile, owner->rowIf, outputOffset, (offset_t)-1, (unsigned __int64)-1, rwFlags));
-                owner->rows.unregisterWriteCallback(*this); // no longer needed
-                return spillStream->nextRow();
-            }
-            else if (pos == owner->rows.numCommitted())
+            if (!eos)
             {
-                owner->rows.unregisterWriteCallback(*this); // no longer needed
-                return NULL;
+                const void *ret;
+                if (spillStream)
+                    ret = spillStream->nextRow();
+                else
+                {
+                    CRowsLockBlock block(*owner);
+                    if (owner->spillFile) // i.e. has spilt
+                    {
+                        block.clearCB = true;
+                        assertex(((offset_t)-1) != outputOffset);
+                        unsigned rwFlags = DEFAULT_RWFLAGS;
+                        if (owner->preserveNulls)
+                            rwFlags |= rw_grouped;
+                        spillStream.setown(::createRowStreamEx(owner->spillFile, owner->rowIf, outputOffset, (offset_t)-1, (unsigned __int64)-1, rwFlags));
+                        owner->rows.unregisterWriteCallback(*this); // no longer needed
+                        ret = spillStream->nextRow();
+                    }
+                    else
+                    {
+                        // NB: would not reach here if nothing left to read
+                        ret = owner->rows.get(pos++);
+                        if (pos == toRead)
+                        {
+                            owner->rows.unregisterWriteCallback(*this); // no longer needed
+                            eos = true; // for any subsequent calls
+                        }
+                        return ret;
+                    }
+                }
+                if (ret)
+                    return ret;
+                if (!owner->preserveNulls)
+                    eos = true;
             }
-            return owner->rows.get(pos++);
+            return nullptr;
         }
         virtual void stop()
         {
@@ -345,7 +363,11 @@ public:
                 rwFlags |= rw_grouped;
             return ::createRowStream(spillFile, rowIf, rwFlags);
         }
-        return new CStream(*this);
+        rowidx_t toRead = rows.numCommitted();
+        if (toRead)
+            return new CStream(*this, toRead);
+        else
+            return createNullRowStream();
     }
 };