Prechádzať zdrojové kódy

Merge pull request #6704 from jakesmith/hpcc-12625b

HPCC-12625 Fix regression in last commit for this issue.

Reviewed-by: Gavin Halliday <ghalliday@hpccsystems.com>
Gavin Halliday 10 rokov pred
rodič
commit
572f3bd5fe

+ 53 - 53
dali/base/dasds.cpp

@@ -8173,7 +8173,8 @@ public:
         INIT_NAMEDCOUNT;
         SDSManager->querySubscriberTable().getSubscribers(subs);
     }
-    bool match(const char *head, const char *path, bool &sub)
+    enum SubCommitType { subCommitNone, subCommitExact, subCommitBelow, subCommitAbove };
+    SubCommitType match(const char *head, const char *path)
     {
         bool wild = false;
         loop
@@ -8194,30 +8195,23 @@ public:
                 wild = true;
             }
             else if (*head != *path)
-                return false;
+                return subCommitNone;
             else
                 path++;
 
             head++;
             if ('\0' == *path)
             {
-                if ('\0' == *head) // absolute match
-                {
-                    sub = false;
-                    return true;
-                }
+                if ('\0' == *head)
+                    return subCommitExact; // absolute match
                 else if (!wild && '/' != *head) // e.g. change=/a/bc, subscriber=/a/b
-                    return false;
-                sub = true; // e.g. change=/a/b/c, subscriber=/a/b
-                return true;
+                    return subCommitNone;
+                return subCommitBelow; // e.g. change=/a/b/c, subscriber=/a/b
             }
             else 
             {
-                if ('\0' == *head) // e.g. change=/a/b, subscriber=/a/b/c - not matched yet, but returning true keeps it from being pruned
-                {
-                    sub = false;
-                    return true;
-                }
+                if ('\0' == *head)
+                    return subCommitAbove; // e.g. change=/a/b, subscriber=/a/b/c - not matched yet, but returning true keeps it from being pruned
             }
         }
     }
@@ -8231,22 +8225,45 @@ public:
         scan(*rootChanges, stack, pruned);
     }
 
-    bool prune(const char *xpath, bool &sub, CSubscriberCopyArray *matches, CSubscriberArray &pruned)
+    bool prune(const char *xpath, CSubscriberCopyArray &candidates, CSubscriberArray &pruned)
     {
-        sub = false;
         ForEachItemInRev(s, subs)
         {
             CSubscriberContainer &subscriber = subs.item(s);
-            bool _sub; // false = (xpath NOT below subscriber), (true = xpath equals or is below subscriber)
-            if (subscriber.isUnsubscribed() || !match(xpath, subscriber.queryXPath(), _sub))
+            SubCommitType subCommit;
+            if (subscriber.isUnsubscribed())
+                subCommit = subCommitNone;
+            else
+                subCommit = match(xpath, subscriber.queryXPath());
+            switch (subCommit)
             {
-                pruned.append(*LINK(&subscriber));
-                subs.remove(s);
+                case subCommitNone:
+                {
+                    pruned.append(*LINK(&subscriber));
+                    subs.remove(s);
+                    break;
+                }
+                case subCommitExact:
+                {
+                    candidates.append(subscriber);
+                    break;
+                }
+                case subCommitBelow: // e.g. change=/a/b/c, subscriber=/a/b
+                {
+                    if (!subscriber.querySub())
+                    {
+                        pruned.append(*LINK(&subscriber));
+                        subs.remove(s);
+                    }
+                    else
+                        candidates.append(subscriber);
+                    break;
+                }
+                case subCommitAbove: // e.g. change=/a/b, subscriber=/a/b/c
+                    break; // keep in subs, deeper changes may match
+                default:
+                    throwUnexpected();
             }
-            else if (_sub)
-                sub = true;
-            else if (matches)
-                matches->append(subscriber);
         }
         return (subs.ordinality() > 0);
     }
@@ -8254,15 +8271,15 @@ public:
     // recurse down all matching subscription stubs while qualified
     void scanAll(PDState state, CBranchChange &changes, CPTStack &stack, CSubscriberArray &pruned)
     {
-        bool sub;
-        if (prune(xpath.str(), sub, NULL, pruned))
+        CSubscriberCopyArray candidates;
+        if (prune(xpath.str(), candidates, pruned))
         {
             MemoryBuffer notifyData;
-            if (sub)
+            if (candidates.ordinality())
             {
-                ForEachItemInRev(s, subs)
+                ForEachItemInRev(s, candidates)
                 {
-                    CSubscriberContainer &subscriber = subs.item(s);
+                    CSubscriberContainer &subscriber = candidates.item(s);
                     if (!subscriber.isUnsubscribed())
                     {
                         if (subscriber.qualify(stack, true))
@@ -8274,7 +8291,7 @@ public:
                         else
                             pruned.append(*LINK(&subscriber));
                     }
-                    subs.remove(s);
+                    subs.zap(subscriber);
                 }
             }
             else
@@ -8320,9 +8337,8 @@ public:
 
     void scan(CBranchChange &changes, CPTStack &stack, CSubscriberArray &pruned)
     {
-        CSubscriberCopyArray matches;
-        bool sub;
-        if (!prune(xpath.str(), sub, &matches, pruned))
+        CSubscriberCopyArray candidates;
+        if (!prune(xpath.str(), candidates, pruned))
             return;
 
         PushPop pp(stack, *changes.tree);
@@ -8331,7 +8347,7 @@ public:
             scanAll(changes.local, changes, stack, pruned);
             return;
         }
-        else if (matches.ordinality()) // xpath matched some subscribers, and/or below some, need to check for sub subscribers
+        else if (candidates.ordinality()) // xpath matched some subscribers, and/or below some, need to check for sub subscribers
         {
             bool ret = false;
             // avoid notifying on PDS_Structure only, which signifies changes deeper down only
@@ -8339,9 +8355,9 @@ public:
             if (changes.state && changes.local && (changes.local != PDS_Structure))
             {
                 int lastSendValue = -1;
-                ForEachItemInRev(s, matches)
+                ForEachItemInRev(s, candidates)
                 {
-                    CSubscriberContainer &subscriber = matches.item(s);
+                    CSubscriberContainer &subscriber = candidates.item(s);
                     if (!subscriber.isUnsubscribed())
                     {
                         if (subscriber.qualify(stack, false))
@@ -8372,23 +8388,7 @@ public:
                     subs.zap(subscriber);
                 }
             }
-            else
-            {
-                // remove non-sub subcribers at this level
-                ForEachItemInRev(s, subs)
-                {
-                    CSubscriberContainer &subscriber = subs.item(s);
-                    unsigned subDepth = subscriber.queryDepth();
-                    unsigned stackDepth = stack.ordinality();
-                    if ((!subscriber.querySub() && subDepth==stackDepth) || 0 == changes.children.ordinality())
-                    {
-                        pruned.append(*LINK(&subscriber));
-                        subs.remove(s);
-                    }
-                }
-            }
         }
-
         ForEachItemIn(c, changes.children)
         {
             CBranchChange &childChanges = changes.children.item(c);

+ 1 - 1
testing/regress/ecl-test.json

@@ -22,7 +22,7 @@
             "thor",
             "roxie"
         ],
-        "timeout":"72000000",
+        "timeout":"720",
         "maxAttemptCount":"3",
         "defaultSetupClusters": [
             "all"

+ 112 - 0
testing/unittests/dalitests.cpp

@@ -446,6 +446,7 @@ class CDaliTests : public CppUnit::TestFixture
 //        CPPUNIT_TEST(testReadAllSDS); // Ignoring this test; See comments below
         CPPUNIT_TEST(testSDSRW);
         CPPUNIT_TEST(testSDSSubs);
+        CPPUNIT_TEST(testSDSSubs2);
         CPPUNIT_TEST(testFiles);
         CPPUNIT_TEST(testGroups);
         CPPUNIT_TEST(testMultiCluster);
@@ -2247,6 +2248,117 @@ public:
         pool->joinAll();
         PROGLOG("Hammer test took: %d ms", tm.elapsed());
     }
+
+    void testSDSSubs2()
+    {
+        class CSubscriber : public CSimpleInterfaceOf<ISDSSubscription>
+        {
+            StringAttr xpath;
+            bool sub;
+            StringBuffer &result;
+            SubscriptionId id;
+        public:
+            CSubscriber(StringBuffer &_result, const char *_xpath, bool _sub) : result(_result), xpath(_xpath), sub(_sub)
+            {
+                id = querySDS().subscribe(xpath, *this, sub, !sub);
+                PROGLOG("Subscribed to %s", xpath.get());
+            }
+            ~CSubscriber()
+            {
+                querySDS().unsubscribe(id);
+            }
+            virtual void notify(SubscriptionId id, const char *_xpath, SDSNotifyFlags flags, unsigned valueLen, const void *valueData)
+            {
+                PROGLOG("CSubscriber notified path=%s for subscriber=%s, sub=%s", _xpath, xpath.get(), sub?"true":"false");
+                if (result.length())
+                    result.append("|");
+                result.append(xpath);
+                if (!sub && valueLen)
+                    result.append(",").append(valueLen, (const char *)valueData);
+            }
+        };
+        Owned<IRemoteConnection> conn = querySDS().connect("/DAREGRESS/TestSub2", myProcessSession(), RTM_CREATE, INFINITE);
+        Owned<IPropertyTree> tree = createPTreeFromXMLString("<a><b1><c/></b1><b2/><b3><d><e/></d></b3></a>");
+        IPropertyTree *root = conn->queryRoot();
+        root->setPropTree("a", tree.getClear());
+        conn->commit();
+
+        StringBuffer result;
+        Owned<ISDSSubscription> sub1 = new CSubscriber(result, "/DAREGRESS/TestSub2/a", true);
+        Owned<ISDSSubscription> sub2 = new CSubscriber(result, "/DAREGRESS/TestSub2/a/b1", false);
+        Owned<ISDSSubscription> sub3 = new CSubscriber(result, "/DAREGRESS/TestSub2/a/b2", false);
+        Owned<ISDSSubscription> sub4 = new CSubscriber(result, "/DAREGRESS/TestSub2/a/b1/c", false);
+        Owned<ISDSSubscription> sub5 = new CSubscriber(result, "/DAREGRESS/TestSub2/a/b3", true);
+
+        MilliSleep(1000);
+
+        StringArray expectedResults;
+        expectedResults.append("/DAREGRESS/TestSub2/a");
+        expectedResults.append("/DAREGRESS/TestSub2/a|/DAREGRESS/TestSub2/a/b1,testv");
+        expectedResults.append("/DAREGRESS/TestSub2/a|/DAREGRESS/TestSub2/a/b2,testv");
+        expectedResults.append("/DAREGRESS/TestSub2/a|/DAREGRESS/TestSub2/a/b1/c,testv");
+        expectedResults.append("/DAREGRESS/TestSub2/a|/DAREGRESS/TestSub2/a/b1,testv");
+        expectedResults.append("/DAREGRESS/TestSub2/a|/DAREGRESS/TestSub2/a/b2,testv");
+        expectedResults.append("/DAREGRESS/TestSub2/a");
+        expectedResults.append("/DAREGRESS/TestSub2/a|/DAREGRESS/TestSub2/a/b3");
+        expectedResults.append("/DAREGRESS/TestSub2/a|/DAREGRESS/TestSub2/a/b3");
+
+        StringArray props;
+        props.appendList("S:a,S:a/b1,S:a/b2,S:a/b1/c,S:a/b1/d,S:a/b2/e,S:a/b2/e/f,D:a/b3/d/e,D:a/b3/d", ",");
+
+        assertex(expectedResults.ordinality() == props.ordinality());
+
+        ForEachItemIn(p, props)
+        {
+            result.clear(); // filled by subscriber notifications
+            const char *cmd = props.item(p);
+            const char *propPath=cmd+2;
+            switch (*cmd)
+            {
+                case 'S':
+                {
+                    PROGLOG("Changing %s", propPath);
+                    root->setProp(propPath, "testv");
+                    break;
+                }
+                case 'D':
+                {
+                    PROGLOG("Deleting %s", propPath);
+                    root->removeProp(propPath);
+                    break;
+                }
+                default:
+                    throwUnexpected();
+            }
+            conn->commit();
+
+            MilliSleep(100); // time for notifications to come through
+
+            PROGLOG("Checking results");
+            StringArray resultArray;
+            resultArray.appendList(result, "|");
+            result.clear();
+            resultArray.sortAscii();
+            ForEachItemIn(r, resultArray)
+            {
+                if (result.length())
+                    result.append("|");
+                result.append(resultArray.item(r));
+            }
+            const char *expectedResult = expectedResults.item(p);
+            if (0 == strcmp(expectedResult, result))
+                PROGLOG("testSDSSubs2 [ %s ]: MATCH", cmd);
+            else
+            {
+                VStringBuffer errMsg("testSDSSubs2 [ %s ]: MISMATCH", cmd);
+                errMsg.newline().append("Expected: ").append(expectedResult);
+                errMsg.newline().append("Got: ").append(result);
+                PROGLOG("%s", errMsg.str());
+                CPPUNIT_ASSERT_MESSAGE(errMsg.str(), 0);
+            }
+        }
+    }
+
 };
 
 CPPUNIT_TEST_SUITE_REGISTRATION( CDaliTests );