浏览代码

HPCC-18903 Fix issues in continuation info when using in-memory indexes

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 7 年之前
父节点
当前提交
52d737de80
共有 6 个文件被更改,包括 45 次插入27 次删除
  1. 21 15
      roxie/ccd/ccdactivities.cpp
  2. 18 10
      roxie/ccd/ccdkey.cpp
  3. 1 1
      roxie/ccd/ccdkey.hpp
  4. 1 0
      roxie/ccd/ccdmain.cpp
  5. 3 0
      roxie/ccd/ccdserver.cpp
  6. 1 1
      testing/regress/environment.xml.in

+ 21 - 15
roxie/ccd/ccdactivities.cpp

@@ -835,7 +835,6 @@ protected:
     Linked<IInMemoryIndexManager> manager;
     Linked<ITranslatorSet> translators;
     Owned<IInMemoryFileProcessor> processor;
-    StringBuffer indexSig;
     CriticalSection pcrit;
 
 public:
@@ -859,14 +858,6 @@ public:
         readPos = 0;
         if (resent)
         {
-            bool usedKey;
-            resentInfo.read(processed);
-            resentInfo.read(usedKey);
-            if (usedKey)
-                resentInfo.read(indexSig);
-            else
-                resentInfo.read(readPos);
-            assertex(resentInfo.remaining() == 0);
         }
     }
 
@@ -874,15 +865,30 @@ public:
     {
         CRoxieSlaveActivity::onCreate();
         helper->createSegmentMonitors(this);
+        const IKeyTranslator *keyTranslator = translators->queryKeyTranslator(0);  // any part would do - in-memory requires all actuals to have same layout
+        if (keyTranslator)
+            keyTranslator->translate(postFilter);
         if (resent)
         {
-            if (indexSig.length())
-                reader.setown(manager->selectKey(indexSig, postFilter, translators));
+            bool usedKey;
+            resentInfo.read(processed);
+            resentInfo.read(usedKey);
+            if (usedKey)
+                reader.setown(manager->selectKey(resentInfo, postFilter, translators));
+            else
+            {
+                resentInfo.read(readPos);
+                reader.setown(manager->createReader(postFilter, isGrouped, readPos, parallelPartNo, numParallel, translators));
+            }
+            assertex(resentInfo.remaining() == 0);
+        }
+        else
+        {
+            if (!forceUnkeyed && !isGrouped)
+                reader.setown(manager->selectKey(postFilter, translators, logctx));
+            if (!reader)
+                reader.setown(manager->createReader(postFilter, isGrouped, readPos, parallelPartNo, numParallel, translators));
         }
-        else if (!forceUnkeyed && !isGrouped)
-            reader.setown(manager->selectKey(postFilter, translators, logctx));
-        if (!reader)
-            reader.setown(manager->createReader(postFilter, isGrouped, readPos, parallelPartNo, numParallel, translators));
     }
 
     virtual const char *queryDynamicFileName() const

+ 18 - 10
roxie/ccd/ccdkey.cpp

@@ -534,7 +534,7 @@ public:
     offset_t memsize;
     const PtrToOffsetMapper &baseMap;
 
-    InMemoryDirectReader(const RowFilter &_postFilter, offset_t _readPos, bool _grouped,
+    InMemoryDirectReader(const RowFilter &_postFilter, bool _grouped, offset_t _readPos,
                          const char *_start, memsize_t _memsize,
                          const PtrToOffsetMapper &_baseMap, unsigned _partNo, unsigned _numParts,
                          const ITranslatorSet *_translators)
@@ -927,7 +927,7 @@ public:
         free (fileStart);
     }
 
-    virtual IDirectReader *selectKey(const char *sig, ScoredRowFilter &postFilters, const ITranslatorSet *translators) const override;
+    virtual IDirectReader *selectKey(MemoryBuffer &sig, ScoredRowFilter &postFilters, const ITranslatorSet *translators) const override;
     virtual IDirectReader *selectKey(ScoredRowFilter &filter, const ITranslatorSet *translators, IRoxieContextLogger &logctx) const override;
 
     InMemoryIndex &findIndex(const char *indexSig) const
@@ -1101,7 +1101,7 @@ public:
 
     InMemoryIndexCursor(const InMemoryIndexManager *_manager, const InMemoryIndex *_index,
                         const PtrToOffsetMapper &_baseMap, RowFilter &_postFilter, const RtlRecord &_recInfo,
-                        const ITranslatorSet *_translators)
+                        const ITranslatorSet *_translators, MemoryBuffer *serializedInfo = nullptr)
     : manager(_manager), index(_index), baseMap(_baseMap), rowInfo(_recInfo), postFilter(_postFilter), translators(_translators)
     {
         ForEachItemIn(idx, index->sortFields)
@@ -1122,6 +1122,8 @@ public:
         numPtrs = index->numPtrs;
         postFilter.recalcFieldsRequired();
         eof = false;
+        if (serializedInfo)
+            deserializeCursorPos(*serializedInfo);
     }
 
     ~InMemoryIndexCursor()
@@ -1173,14 +1175,14 @@ public:
 
     virtual void reset()
     {
-        cur = 0;
+        cur = (unsigned) -1;
         eof = false;
     }
 
     virtual const byte *findNext(const RowCursor & current)
     {
         size_t high = numPtrs;
-        size_t low = cur;
+        size_t low = cur+1;
 
         //Find the value of low,high where all rows 0..low-1 are < search and rows low..max are >= search
         while (low<high)
@@ -1240,21 +1242,27 @@ public:
     virtual void serializeCursorPos(MemoryBuffer &mb) const 
     {
         index->serializeCursorPos(mb);
+        mb.append(cur);
+        // MORE - we could save some of the state in keyCursor to avoid seeking the next row
     }
 
+    void deserializeCursorPos(MemoryBuffer &mb)
+    {
+        // Note - index signature already read
+        mb.read(cur);
+    }
 };
 
-IDirectReader *InMemoryIndexManager::selectKey(const char *indexSig, ScoredRowFilter &postFilters, const ITranslatorSet *translators) const
+IDirectReader *InMemoryIndexManager::selectKey(MemoryBuffer &serializedInfo, ScoredRowFilter &postFilters, const ITranslatorSet *translators) const
 {
+    StringBuffer indexSig;
+    serializedInfo.read(indexSig);
     InMemoryIndex &thisIndex = findIndex(indexSig);
-    return new InMemoryIndexCursor(this, &thisIndex, baseMap, postFilters, recInfo, translators);
+    return new InMemoryIndexCursor(this, &thisIndex, baseMap, postFilters, recInfo, translators, &serializedInfo);
 }
 
 IDirectReader *InMemoryIndexManager::selectKey(ScoredRowFilter &filter, const ITranslatorSet *translators, IRoxieContextLogger &logctx) const
 {
-    const IKeyTranslator *keyTranslator = translators->queryKeyTranslator(0);  // any part would do - in-memory requires all actuals to have same layout
-    if (keyTranslator)
-        keyTranslator->translate(filter);
     if (!inMemoryKeysEnabled)
         return nullptr;
     unsigned best = 0;

+ 1 - 1
roxie/ccd/ccdkey.hpp

@@ -66,7 +66,7 @@ interface IInMemoryIndexManager : extends IInterface
     virtual void load(IFileIOArray *, IOutputMetaData *preloadLayout, bool preload) = 0;
     virtual bool IsShared() const = 0;
     virtual IDirectReader *selectKey(ScoredRowFilter &filter, const ITranslatorSet *translators, IRoxieContextLogger &logctx) const = 0;
-    virtual IDirectReader *selectKey(const char *sig, ScoredRowFilter &filter, const ITranslatorSet *translators) const = 0;
+    virtual IDirectReader *selectKey(MemoryBuffer &sig, ScoredRowFilter &filter, const ITranslatorSet *translators) const = 0;
     virtual IDirectReader *createReader(const RowFilter &postFilter, bool _grouped, offset_t readPos, unsigned partNo, unsigned numParts, const ITranslatorSet *translators) const = 0;
     virtual void setKeyInfo(IPropertyTree &indexInfo) = 0;
 };

+ 1 - 0
roxie/ccd/ccdmain.cpp

@@ -549,6 +549,7 @@ int STARTQUERY_API start_query(int argc, const char *argv[])
             topology->setPropInt("@allFilesDynamic", globals->getPropInt("--allFilesDynamic", 1));
             topology->setProp("@memTraceLevel", globals->queryProp("--memTraceLevel"));
             topology->setProp("@disableLocalOptimizations", globals->queryProp("--disableLocalOptimizations"));
+            topology->setPropInt("@indexReadChunkSize", globals->getPropInt("--indexReadChunkSize", 60000));
         }
         if (topology->hasProp("PreferredCluster"))
         {

+ 3 - 0
roxie/ccd/ccdserver.cpp

@@ -21600,6 +21600,9 @@ public:
                 }
                 assertex(manager != NULL);
                 helper.createSegmentMonitors(this);
+                const IKeyTranslator *keyTranslator = translators->queryKeyTranslator(0);  // any part would do - in-memory requires all actuals to have same layout
+                if (keyTranslator)
+                    keyTranslator->translate(postFilter);
                 reader.setown(manager->selectKey(postFilter, translators, *this));
                 if (!reader)
                     reader.setown(manager->createReader(postFilter, isGrouped, 0, 0, 1, translators));

+ 1 - 1
testing/regress/environment.xml.in

@@ -69,7 +69,7 @@
                 highTimeout="2000"
                 ignoreMissingFiles="false"
                 ignoreOrphans="true"
-                indexReadChunkSize="60000"
+                indexReadChunkSize="10"
                 initIbytiDelay="100"
                 jumboFrames="false"
                 lazyOpen="smart"