Browse Source

Fix problems with global priority stepped join.

For global stepped joins (using the priority ordering) it was possible
for the seek list to contain any item too early (which triggered an
assert in the new code).  It may possibly have caused problems in old
builds.

There was another problem where rows could (in very rare situations)
get lost if there were large numbers of matches for a single document.

Fix problem with hint attributes - not allowed on stepped, lost when a
project and table combined, and occasionally duplicated in the graph.

Add hint to roxie to allow the seek readahead to be configured.

Signed-off-by: Gavin Halliday <gavin.halliday@lexisnexis.com>
Gavin Halliday 13 years ago
parent
commit
57437903bd

+ 19 - 25
common/thorhelper/thorstep.cpp

@@ -156,8 +156,6 @@ const void * CSteppedInputLookahead::nextInputRow()
 {
     if (readAheadRows.ordinality())
         return readAheadRows.dequeue();
-    if (seekRows.ordinality())
-        return seekRows.dequeue();
     return input->nextInputRow();
 }
     
@@ -172,22 +170,13 @@ const void * CSteppedInputLookahead::nextInputRowGE(const void * seek, unsigned
             return (void *)next.getClear();
         }
     }
-    while (seekRows.ordinality())
-    {
-        OwnedLCRow next = seekRows.dequeue();
-        if (compare->docompare(next, seek, numFields) >= 0)
-        {
-            assertex(wasCompleteMatch);
-            return (void *)next.getClear();
-        }
-    }
     return input->nextInputRowGE(seek, numFields, wasCompleteMatch, stepExtra);
 }
 
 void CSteppedInputLookahead::ensureFilled(const void * seek, unsigned numFields, unsigned maxcount)
 {
-    //Transfer any rows with fields before the seek position to a list of pending rows, so we don't waste
-    //time sending seek rows that can't match..
+    const void * lastSeekRow = NULL;
+    //Remove any rows from the seek list that occur before the new seek row
     while (seekRows.ordinality())
     {
         const void * next = seekRows.head();
@@ -195,9 +184,18 @@ void CSteppedInputLookahead::ensureFilled(const void * seek, unsigned numFields,
         {
             //update the seek pointer to the best value - so that lowestInputProvider can skip its seekRows if necessary
             seek = seekRows.tail();
+            lastSeekRow = seek;
             break;
         }
-        readAheadRows.enqueue(seekRows.dequeue());
+        rowAllocator->releaseRow(seekRows.dequeue());
+    }
+
+    //Could the current readahead row be part of the seek set.
+    if (pending && compare->docompare(pending, seek, numFields) >= 0)
+    {
+        //Check not already added - could conceivably happen after rows are read directly beyond the matching seeks.
+        if (!lastSeekRow || compare->docompare(pending, lastSeekRow, numFields) > 0)
+            seekRows.enqueue(rowAllocator->linkRow(pending));
     }
 
     //Return mismatches is selected because we don't want it to seek exact matches beyond the last seek position
@@ -214,7 +212,13 @@ void CSteppedInputLookahead::ensureFilled(const void * seek, unsigned numFields,
         //but if so the next read request will do another blocked read, so just ignore this one.
         if (wasCompleteMatch)
         {
-            seekRows.enqueue(next);
+            readAheadRows.enqueue(next);
+            if (!lastSeekRow || compare->docompare(next, lastSeekRow, numFields) > 0)
+            {
+                //Only record unique seek positions in the seek rows
+                seekRows.enqueue(rowAllocator->linkRow(next));
+                lastSeekRow = next;
+            }
             //update the seek pointer to the best value.  
             seek = next;
         }
@@ -225,21 +229,11 @@ void CSteppedInputLookahead::ensureFilled(const void * seek, unsigned numFields,
 
 unsigned CSteppedInputLookahead::ordinality() const
 {
-    //pending <= readAheadRows.head(), so if there are any items in readAheadRows, then don't include pending
-    if ((readAheadRows.ordinality() == 0) && pending)
-        return seekRows.ordinality() + 1;
     return seekRows.ordinality();
 }
 
 const void * CSteppedInputLookahead::querySeek(unsigned i) const
 {
-    //pending <= readAheadRows.head(), so if there are any items in readAheadRows, then don't include pending
-    if ((readAheadRows.ordinality() == 0) && pending)
-    {
-        if (i == 0)
-            return pending;
-        i--;
-    }
     return seekRows.item(i);
 }
 

+ 2 - 3
common/thorhelper/thorstep.ipp

@@ -240,7 +240,6 @@ public:
         }
         return next();
     }
-    inline bool hasRecordPending() { return pending != NULL; }
     inline void resetInputEOF() { input->resetEOF(); }
 
 private:
@@ -258,8 +257,8 @@ protected:
 
 private:
     Linked<ISteppedInput> input;
-    LinkedRowQueue seekRows;                    // rows that have been read from the input to provide as seek pointers for the next term
-    LinkedRowQueue readAheadRows;               // previous seek rows, that have been read past, but still required to return as results.
+    LinkedRowQueue readAheadRows;               // rows that have been read from the input to provide as seek pointers for the next term
+    LinkedRowQueue seekRows;                    // rows that have been read ahead previous seek rows, that have been read past, but still required to return as results.
 
 protected:
     IRangeCompare * compare;

+ 1 - 0
ecl/hql/hqlgram.y

@@ -8845,6 +8845,7 @@ stepFlag
                         {
                             $$.setExpr(createExprAttribute(filteredAtom), $1);
                         }
+    | hintAttribute
     ;
 
 

+ 9 - 1
ecl/hql/hqlopt.cpp

@@ -3070,7 +3070,15 @@ IHqlExpression * CTreeOptimizer::doCreateTransformed(IHqlExpression * transforme
                         if (countProjectAttr)
                             expandedTransform.setown(createComma(LINK(expandedTransform), LINK(countProjectAttr)));
                         noteUnused(child);
-                        OwnedHqlExpr ret = createDataset(op, LINK(child->queryChild(0)), createComma(expandedTransform.getClear(), LINK(transformedSeq), LINK(transformKeyed)));
+                        HqlExprArray args;
+                        args.append(*LINK(child->queryChild(0)));
+                        args.append(*expandedTransform.getClear());
+                        args.append(*LINK(transformedSeq));
+                        if (transformKeyed)
+                            args.append(*LINK(transformKeyed));
+                        unwindHintAttrs(args, transformed);
+                        unwindHintAttrs(args, child);
+                        OwnedHqlExpr ret = createDataset(op, args);
                         ret.setown(child->cloneAllAnnotations(ret));
                         return transformed->cloneAllAnnotations(ret);
                     }

+ 9 - 0
ecl/hql/hqlutil.cpp

@@ -1368,6 +1368,15 @@ IHqlExpression * queryHintChild(IHqlExpression * expr, _ATOM name, unsigned idx)
     return NULL;
 }
 
+void unwindHintAttrs(HqlExprArray & args, IHqlExpression * expr)
+{
+    ForEachChild(i, expr)
+    {
+        IHqlExpression * cur = expr->queryChild(i);
+        if ((cur->queryName() == hintAtom) && cur->isAttribute())
+            args.append(*LINK(cur));
+    }
+}
 
 //---------------------------------------------------------------------------
 

+ 1 - 0
ecl/hql/hqlutil.hpp

@@ -43,6 +43,7 @@ extern HQL_API bool isFieldSelectedFromRecord(IHqlExpression * expr);
 extern HQL_API void gatherHints(HqlExprCopyArray & target, IHqlExpression * expr);
 extern HQL_API IHqlExpression * queryHint(IHqlExpression * expr, _ATOM name);
 extern HQL_API IHqlExpression * queryHintChild(IHqlExpression * expr, _ATOM name, unsigned idx);
+extern HQL_API void unwindHintAttrs(HqlExprArray & args, IHqlExpression * expr);
 
 extern HQL_API IHqlExpression * replaceChildDataset(IHqlExpression * expr, IHqlExpression * newChild, unsigned whichChild);
 extern HQL_API IHqlExpression * insertChildDataset(IHqlExpression * expr, IHqlExpression * newChild, unsigned whichChild);

+ 3 - 1
ecl/hqlcpp/hqlsource.cpp

@@ -1228,8 +1228,10 @@ void SourceBuilder::associateTargetCursor(BuildCtx & subctx, BuildCtx & ctx, Bou
 
 void SourceBuilder::buildTransformElements(BuildCtx & ctx, IHqlExpression * expr, bool ignoreFilters)
 {
-    if (expr != instance->dataset)
+    //This function can be called again for the unfiltered tranform.  Don't process annotations again.
+    if ((expr != instance->dataset) && !ignoreFilters)
         instance->processAnnotations(expr);
+
     expr = expr->queryBody();
     node_operator op = expr->getOperator();
 

+ 1 - 1
ecl/regress/textsearch.ecl

@@ -755,7 +755,7 @@ doReadWord(searchRecord search) := FUNCTION
 
     steppedMatches := stepped(matches, doc, segment, wpos);
 
-    projected := project(steppedMatches, createMatchRecord(left));
+    projected := project(steppedMatches, createMatchRecord(left), hint(dontDuplicateMe));
 
     return projected;
 END;

+ 11 - 3
roxie/ccd/ccdserver.cpp

@@ -20874,10 +20874,11 @@ protected:
     unsigned * seekSizes;
     bool optimizeSteppedPostFilter;
     ISteppingMeta * projectedMeta;
+    unsigned maxSeekLookahead;
 
 public:
     CRoxieServerIndexReadActivity(const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager, const RemoteActivityId &_remoteId,
-        IKeyArray * _keySet, TranslatorArray *_translators, unsigned _rawSize, unsigned _maxRecordSize, bool _sorted, bool _isLocal, bool _maySkip)
+        IKeyArray * _keySet, TranslatorArray *_translators, unsigned _rawSize, unsigned _maxRecordSize, bool _sorted, bool _isLocal, bool _maySkip, unsigned _maxSeekLookahead)
         : CRoxieServerIndexReadBaseActivity(_factory, _probeManager, _remoteId, _keySet, _translators, _rawSize, _maxRecordSize, _sorted, _isLocal, _maySkip), 
           readHelper((IHThorIndexReadArg &)basehelper)
     {
@@ -20885,6 +20886,7 @@ public:
         unsigned flags = indexHelper.getFlags();
         optimizeSteppedPostFilter = (flags & TIRunfilteredtransform) != 0;
         seekSizes = NULL;
+        maxSeekLookahead = _maxSeekLookahead;
 
         if (rawMeta)
         {
@@ -21066,7 +21068,11 @@ public:
         IMultipleStepSeekInfo *seeks = stepExtra.queryExtraSeeks();
         if (seeks)
         {
-            seeks->ensureFilled(seek, numFields, 40000/seekLen);  // MORE - could make this configurable
+            unsigned lookahead = 40000/seekLen;
+            if (maxSeekLookahead && (lookahead > maxSeekLookahead))
+                lookahead  = maxSeekLookahead;
+            seeks->ensureFilled(seek, numFields, lookahead);
+
             unsigned serialized = 1; // rawseek is always serialized...
             unsigned patchLength = out.length();
             out.append(serialized);  // NOTE - we come back and patch with the actual value...
@@ -21594,6 +21600,7 @@ public:
     bool variableFileName;
     bool enableFieldTranslation;
     unsigned rawSize;
+    unsigned maxSeekLookahead;
     Owned<const IResolvedFile> indexfile;
 
     CRoxieServerSideCache *cache;
@@ -21635,6 +21642,7 @@ public:
         }
         int cacheSize = _graphNode.getPropInt("hint[@name='cachehits']/@value", serverSideCacheSize);
         cache = cacheSize ? new CRoxieServerSideCache(cacheSize) : NULL;
+        maxSeekLookahead = _graphNode.getPropInt("hint[@name='maxseeklookahead']/@value", 0);
     }
 
     ~CRoxieServerBaseIndexActivityFactory()
@@ -21679,7 +21687,7 @@ public:
         else if (isSimple && !maySkip)
             return new CRoxieServerSimpleIndexReadActivity(this, _probeManager, remoteId, keySet, translatorArray, rawSize, maxRecordSize, isLocal);
         else
-            return new CRoxieServerIndexReadActivity(this, _probeManager, remoteId, keySet, translatorArray, rawSize, maxRecordSize, sorted, isLocal, maySkip);
+            return new CRoxieServerIndexReadActivity(this, _probeManager, remoteId, keySet, translatorArray, rawSize, maxRecordSize, sorted, isLocal, maySkip, maxSeekLookahead);
     }
 };