소스 검색

Merge pull request #12160 from ghalliday/issue21463

HPCC-21463 Refactor CThorContiguousRowBuffer

Reviewed-By: Jake Smith <jake.smith@lexisnexis.com>
Reviewed-By: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 6 년 전
부모
커밋
2308ffa4c5
4개의 변경된 파일140개의 추가작업 그리고 72개의 파일을 삭제
  1. 4 4
      ecl/hthor/hthor.cpp
  2. 3 13
      roxie/ccd/ccdkey.cpp
  3. 61 15
      rtl/eclrtl/rtlcommon.cpp
  4. 72 40
      rtl/eclrtl/rtlcommon.hpp

+ 4 - 4
ecl/hthor/hthor.cpp

@@ -8820,7 +8820,7 @@ const void *CHThorDiskNormalizeActivity::nextRow()
             localOffset += lastSizeRead;
             prefetchBuffer.finishedRow();
 
-            if (inputstream->eos())
+            if (prefetchBuffer.eos())
             {
                 lastSizeRead = 0;
                 break;
@@ -8917,7 +8917,7 @@ const void *CHThorDiskAggregateActivity::nextRow()
         helper.clearAggregate(outBuilder);
         while (!eofseen)
         {
-            while (!inputstream->eos())
+            while (!prefetchBuffer.eos())
             {
                 queryUpdateProgress();
 
@@ -8997,7 +8997,7 @@ const void *CHThorDiskCountActivity::nextRow()
         {
             if (eofseen) 
                 break;
-            while (!inputstream->eos())
+            while (!prefetchBuffer.eos())
             {
                 queryUpdateProgress();
 
@@ -9079,7 +9079,7 @@ const void *CHThorDiskGroupAggregateActivity::nextRow()
             if (!opened) open();
             while (!eofseen)
             {
-                while (!inputstream->eos())
+                while (!prefetchBuffer.eos())
                 {
                     queryUpdateProgress();
 

+ 3 - 13
roxie/ccd/ccdkey.cpp

@@ -491,7 +491,7 @@ public:
 
     virtual void serializeCursorPos(MemoryBuffer &mb) const override
     {
-        mb.append(tell());
+        mb.append(deserializeSource.tell());
     }
 
     virtual const byte *nextRow() override
@@ -830,12 +830,7 @@ public:
     {
         // MORE - could do with being faster than this!
         assertex(curStream != NULL);
-        unsigned __int64 pos = curStream->tell();
-        if (_ptr != buf.toByteArray())
-        {
-            size32_t dummy;
-            pos +=  (const char *)_ptr - (const char *)curStream->peek(1, dummy);
-        }
+        unsigned __int64 pos = deserializeSource.tell();
         return pos + thisFileStartPos;
     }
 
@@ -843,12 +838,7 @@ public:
     {
         // MORE - could do with being faster than this!
         assertex(curStream != NULL);
-        unsigned __int64 pos = curStream->tell();
-        if (_ptr != buf.toByteArray())
-        {
-            size32_t dummy;
-            pos +=  (const char *)_ptr - (const char *)curStream->peek(1, dummy);
-        }
+        unsigned __int64 pos = deserializeSource.tell();
         return makeLocalFposOffset(thisPartIdx-1, pos);
     }
 

+ 61 - 15
rtl/eclrtl/rtlcommon.cpp

@@ -4,17 +4,59 @@
 #include "junicode.hpp"
 #include "rtlcommon.hpp"
 
-CThorContiguousRowBuffer::CThorContiguousRowBuffer(ISerialStream * _in) : in(_in)
+CContiguousRowBuffer::CContiguousRowBuffer(ISerialStream * _in) : in(_in)
+{
+    clearBuffer();
+}
+
+void CContiguousRowBuffer::setStream(ISerialStream *_in)
+{
+    in = _in;
+    clearBuffer();
+}
+
+bool CContiguousRowBuffer::checkInputEos()
+{
+    assertex(!available);
+    peekBytesDirect(0);
+    return in->eos();
+}
+
+
+const byte * CContiguousRowBuffer::peekBytes(size32_t maxSize)
+{
+    if (maxSize < maxAvailable())
+        peekBytesDirect(maxSize);
+    return cur;
+}
+
+const byte * CContiguousRowBuffer::peekFirstByte()
+{
+    if (maxAvailable() == 0)
+        peekBytesDirect(0);
+    return cur;
+}
+
+void CContiguousRowBuffer::peekBytesDirect(unsigned maxSize)
+{
+    size_t toSkip = cur - buffer;
+    if (toSkip)
+        in->skip(toSkip);
+    buffer = static_cast<const byte *>(in->peek(maxSize, available));
+    cur = buffer;
+}
+
+//---------------------------------------------------------------------------------------------------------------------
+
+CThorContiguousRowBuffer::CThorContiguousRowBuffer(ISerialStream * _in) : CContiguousRowBuffer(_in)
 {
-    buffer = NULL;
-    maxOffset = 0;
     readOffset = 0;
 }
 
 void CThorContiguousRowBuffer::doRead(size32_t len, void * ptr)
 {
     ensureAccessible(readOffset + len);
-    memcpy(ptr, buffer+readOffset, len);
+    memcpy(ptr, cur+readOffset, len);
     readOffset += len;
 }
 
@@ -70,7 +112,7 @@ size32_t CThorContiguousRowBuffer::readVUni(ARowBuilder & target, size32_t offse
 size32_t CThorContiguousRowBuffer::sizePackedInt()
 {
     ensureAccessible(readOffset+1);
-    return rtlGetPackedSizeFromFirst(buffer[readOffset]);
+    return rtlGetPackedSizeFromFirst(cur[readOffset]);
 }
 
 size32_t CThorContiguousRowBuffer::sizeUtf8(size32_t len)
@@ -83,10 +125,11 @@ size32_t CThorContiguousRowBuffer::sizeUtf8(size32_t len)
     while (len)
     {
         ensureAccessible(nextOffset+1);
+        size32_t maxOffset = maxAvailable();
 
         for (;nextOffset < maxOffset;)
         {
-            nextOffset += readUtf8Size(buffer+nextOffset);  // This function only accesses the first byte
+            nextOffset += readUtf8Size(cur+nextOffset);  // This function only accesses the first byte
             if (--len == 0)
                 break;
         }
@@ -100,10 +143,11 @@ size32_t CThorContiguousRowBuffer::sizeVStr()
     for (;;)
     {
         ensureAccessible(nextOffset+1);
+        size32_t maxOffset = maxAvailable();
 
         for (; nextOffset < maxOffset; nextOffset++)
         {
-            if (buffer[nextOffset] == 0)
+            if (cur[nextOffset] == 0)
                 return (nextOffset + 1) - readOffset;
         }
     }
@@ -116,10 +160,11 @@ size32_t CThorContiguousRowBuffer::sizeVUni()
     for (;;)
     {
         ensureAccessible(nextOffset+sizeOfUChar);
+        size32_t maxOffset = maxAvailable();
 
         for (; nextOffset+1 < maxOffset; nextOffset += sizeOfUChar)
         {
-            if (buffer[nextOffset] == 0 && buffer[nextOffset+1] == 0)
+            if (cur[nextOffset] == 0 && cur[nextOffset+1] == 0)
                 return (nextOffset + sizeOfUChar) - readOffset;
         }
     }
@@ -134,9 +179,10 @@ void CThorContiguousRowBuffer::reportReadFail()
 
 const byte * CThorContiguousRowBuffer::peek(size32_t maxSize)
 {
-    if (maxSize+readOffset > maxOffset)
-        doPeek(maxSize+readOffset);
-    return buffer + readOffset;
+    size32_t required = readOffset+maxSize;
+    if (unlikely(required > maxAvailable()))
+        peekBytesDirect(required);
+    return cur + readOffset;
 }
 
 offset_t CThorContiguousRowBuffer::beginNested()
@@ -190,11 +236,11 @@ void CThorContiguousRowBuffer::skipVUni()
 
 const byte * CThorContiguousRowBuffer::querySelf()
 {
-    if (maxOffset == 0)
-        doPeek(0);
+    if (maxAvailable() == 0)
+        peekBytesDirect(0);
     if (childStartOffsets.ordinality())
-        return buffer + childStartOffsets.tos();
-    return buffer;
+        return cur + childStartOffsets.tos();
+    return cur;
 }
 
 void CThorContiguousRowBuffer::noteStartChild()

+ 72 - 40
rtl/eclrtl/rtlcommon.hpp

@@ -7,15 +7,76 @@
 #include "eclrtl.hpp"
 #include "eclhelper.hpp"
 
+//The CContiguousRowBuffer is a buffer used for reading ahead into a file, and unsuring there is a contiguous
+//block of data available to the reader.  Fixed size files could use this directly.
+class ECLRTL_API CContiguousRowBuffer
+{
+public:
+    CContiguousRowBuffer() = default;
+    CContiguousRowBuffer(ISerialStream * _in);
+
+    void setStream(ISerialStream *_in);
+    const byte * peekBytes(size32_t maxSize);
+    const byte * peekFirstByte();
+    void skipBytes(size32_t size)
+    {
+        cur += size;
+        available -= size;
+        // call eos() to ensure stream->eos() is true if this class has got to the end of the stream
+        eos();
+    }
+
+    inline bool eos()
+    {
+        if (likely(available))
+            return false;
+        return checkInputEos();
+    }
+    inline offset_t tell() const   { return in->tell() + (cur - buffer); }
+    inline const byte * queryRow() const { return cur; }
+    inline size_t maxAvailable() const { return available; }
+    inline void clearStream()      { setStream(nullptr); }
+
+    inline void reset(offset_t offset, offset_t flen = (offset_t)-1)
+    {
+        in->reset(offset, flen);
+        clearBuffer();
+    }
+
+protected:
+    void peekBytesDirect(size32_t size); // skip any consumed data and directly peek bytes from the input
+
+private:
+    bool checkInputEos();
+    void clearBuffer()
+    {
+        buffer = nullptr;
+        cur = nullptr;
+        available = 0;
+    }
+
+protected:
+    const byte * cur = nullptr;
+private:
+    ISerialStream* in = nullptr;
+    const byte * buffer = nullptr;
+    size32_t available = 0;
+};
+
+
 //The CThorContiguousRowBuffer is the source for a readAhead call to ensure the entire row
 //is in a contiguous block of memory.  The read() and skip() functions must be implemented
-class ECLRTL_API CThorContiguousRowBuffer : implements IRowPrefetcherSource
+class ECLRTL_API CThorContiguousRowBuffer : public CContiguousRowBuffer, implements IRowPrefetcherSource
 {
 public:
-    CThorContiguousRowBuffer() {};
+    CThorContiguousRowBuffer() = default;
     CThorContiguousRowBuffer(ISerialStream * _in);
 
-    inline void setStream(ISerialStream *_in) { in = _in; maxOffset = 0; readOffset = 0; }
+    inline void setStream(ISerialStream *_in)
+    {
+        CContiguousRowBuffer::setStream(_in);
+        readOffset = 0;
+    }
 
     virtual const byte * peek(size32_t maxSize) override;
     virtual offset_t beginNested() override;
@@ -35,46 +96,24 @@ public:
     virtual void skipVStr() override;
     virtual void skipVUni() override;
 
-    virtual const byte * querySelf() override;
+    virtual const byte * querySelf() override;      // Dubious - used from ifblocks
     virtual void noteStartChild() override;
     virtual void noteFinishChild() override;
 
-    inline bool eos()
-    {
-        return in->eos();
-    }
-
-    inline offset_t tell() const
-    {
-        return in->tell();
-    }
-
-    inline void clearStream()
-    {
-        in = nullptr;
-        maxOffset = 0;
-        readOffset = 0;
-    }
-
-    inline const byte * queryRow() const { return buffer; }
+    inline const byte * queryRow() const { return cur; }
     inline size32_t queryRowSize() const { return readOffset; }
     inline void finishedRow()
     {
-        if (readOffset)
-            in->skip(readOffset);
-        maxOffset = 0;
+        skipBytes(readOffset);
         readOffset = 0;
     }
 
     inline void reset(offset_t offset, offset_t flen = (offset_t)-1)
     {
-        in->reset(offset, flen);
-        buffer = nullptr;
-        maxOffset = 0;
+        CContiguousRowBuffer::reset(offset, flen);
         readOffset = 0;
     }
 
-
 protected:
     size32_t sizePackedInt();
     size32_t sizeUtf8(size32_t len);
@@ -83,27 +122,20 @@ protected:
     void reportReadFail();
 
 private:
-    inline void doPeek(size32_t maxSize)
-    {
-        buffer = static_cast<const byte *>(in->peek(maxSize, maxOffset));
-    }
 
     void doRead(size32_t len, void * ptr);
 
     inline void ensureAccessible(size32_t required)
     {
-        if (required > maxOffset)
+        if (required > maxAvailable())
         {
-            doPeek(required);
-            assertex(required <= maxOffset);
+            peekBytesDirect(required);
+            assertex(required <= maxAvailable());
         }
     }
 
 protected:
-    ISerialStream* in = nullptr;
-    const byte * buffer = nullptr;
-    size32_t maxOffset = 0;
-    size32_t readOffset = 0;
+    size32_t readOffset = 0;            // Offset within the current row
     UnsignedArray childStartOffsets;
 };