Browse Source

Merge pull request #8888 from jakesmith/hpcc-15829

HPCC-15829 Race condition spilling result stream after read

Reviewed-by: Gavin Halliday <ghalliday@hpccsystems.com>
Gavin Halliday 9 years ago
parent
commit
5b3236a59a
1 changed files with 47 additions and 25 deletions
  1. 47 25
      thorlcr/thorutil/thmem.cpp

+ 47 - 25
thorlcr/thorutil/thmem.cpp

@@ -267,18 +267,19 @@ class CSharedSpillableRowSet : public CSpillableStreamBase, implements IInterfac
 {
     class CStream : public CSimpleInterface, implements IRowStream, implements IWritePosCallback
     {
-        rowidx_t pos;
-        offset_t outputOffset;
+        rowidx_t pos = 0;
+        offset_t outputOffset = (offset_t)-1;
         Owned<IRowStream> spillStream;
         Linked<CSharedSpillableRowSet> owner;
+        rowidx_t toRead = 0;
+        bool eos = false;
     public:
         IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
 
-        CStream(CSharedSpillableRowSet &_owner) : owner(&_owner)
+        CStream(CSharedSpillableRowSet &_owner, rowidx_t _toRead) : owner(&_owner), toRead(_toRead)
         {
-            pos = 0;
-            outputOffset = (offset_t)-1;
-            owner->rows.registerWriteCallback(*this); // NB: CStream constructor called within rows lock
+            // NB: CStream constructor called within rows lock and only called if not yet spilled
+            owner->rows.registerWriteCallback(*this);
         }
         ~CStream()
         {
@@ -289,26 +290,43 @@ class CSharedSpillableRowSet : public CSpillableStreamBase, implements IInterfac
     // IRowStream
         virtual const void *nextRow()
         {
-            if (spillStream)
-                return spillStream->nextRow();
-            CRowsLockBlock block(*owner);
-            if (owner->spillFile) // i.e. has spilt
-            {
-                block.clearCB = true;
-                assertex(((offset_t)-1) != outputOffset);
-                unsigned rwFlags = DEFAULT_RWFLAGS;
-                if (owner->preserveNulls)
-                    rwFlags |= rw_grouped;
-                spillStream.setown(::createRowStreamEx(owner->spillFile, owner->rowIf, outputOffset, (offset_t)-1, (unsigned __int64)-1, rwFlags));
-                owner->rows.unregisterWriteCallback(*this); // no longer needed
-                return spillStream->nextRow();
-            }
-            else if (pos == owner->rows.numCommitted())
+            if (!eos)
             {
-                owner->rows.unregisterWriteCallback(*this); // no longer needed
-                return NULL;
+                const void *ret;
+                if (spillStream)
+                    ret = spillStream->nextRow();
+                else
+                {
+                    CRowsLockBlock block(*owner);
+                    if (owner->spillFile) // i.e. has spilt
+                    {
+                        block.clearCB = true;
+                        assertex(((offset_t)-1) != outputOffset);
+                        unsigned rwFlags = DEFAULT_RWFLAGS;
+                        if (owner->preserveNulls)
+                            rwFlags |= rw_grouped;
+                        spillStream.setown(::createRowStreamEx(owner->spillFile, owner->rowIf, outputOffset, (offset_t)-1, (unsigned __int64)-1, rwFlags));
+                        owner->rows.unregisterWriteCallback(*this); // no longer needed
+                        ret = spillStream->nextRow();
+                    }
+                    else
+                    {
+                        // NB: would not reach here if nothing left to read
+                        ret = owner->rows.get(pos++);
+                        if (pos == toRead)
+                        {
+                            owner->rows.unregisterWriteCallback(*this); // no longer needed
+                            eos = true; // for any subsequent calls
+                        }
+                        return ret;
+                    }
+                }
+                if (ret)
+                    return ret;
+                if (!owner->preserveNulls)
+                    eos = true;
             }
-            return owner->rows.get(pos++);
+            return nullptr;
         }
         virtual void stop()
         {
@@ -345,7 +363,11 @@ public:
                 rwFlags |= rw_grouped;
             return ::createRowStream(spillFile, rowIf, rwFlags);
         }
-        return new CStream(*this);
+        rowidx_t toRead = rows.numCommitted();
+        if (toRead)
+            return new CStream(*this, toRead);
+        else
+            return createNullRowStream();
     }
 };