浏览代码

HPCC-22008 Fix race condition in non-linking cache implementations

Signed-off-by: Gavin Halliday <gavin.halliday@lexisnexis.com>
Gavin Halliday 6 年之前
父节点
当前提交
ea10bd0ca5

+ 1 - 2
common/deftype/deftype.cpp

@@ -1822,8 +1822,7 @@ static ITypeInfo * commonUpType(CHashedTypeInfo * candidate)
         match = globalTypeCache->addOrFind(*candidate);
         if (match == candidate)
             return match;
-        match->Link();
-        if (!static_cast<CHashedTypeInfo *>(match)->isAlive())
+        if (!static_cast<CHashedTypeInfo *>(match)->isAliveAndLink())
         {
             globalTypeCache->replace(*candidate);
             return candidate;

+ 1 - 5
ecl/hql/hqlexpr.cpp

@@ -3980,8 +3980,7 @@ IHqlExpression * CHqlExpression::commonUpExpression()
         if (match == this)
             return this;
 #endif
-        match->Link();
-        if (!static_cast<CHqlExpression *>(match)->isAlive())
+        if (!static_cast<CHqlExpression *>(match)->isAliveAndLink())
         {
             exprCache->replace(*this);
 #ifdef GATHER_COMMON_STATS
@@ -4973,7 +4972,6 @@ void CHqlRealExpression::setOperands(HqlExprArray & _ownedOperands)
 
 bool CHqlRealExpression::equals(const IHqlExpression & other) const
 {
-    if (!isAlive()) return false;
 #ifndef CONSISTENCY_CHECK
     if (this == &other)
         return true;
@@ -6191,7 +6189,6 @@ CHqlConstant *CHqlConstant::makeConstant(IValue *_val)
 
 bool CHqlConstant::equals(const IHqlExpression & other) const
 {
-    if (!isAlive()) return false;
     IValue * oval = other.queryValue();
     if (oval)
         if (val->queryType() == oval->queryType())
@@ -7089,7 +7086,6 @@ void CHqlAnnotation::sethash()
 
 bool CHqlAnnotation::equals(const IHqlExpression & other) const
 {
-    if (!isAlive()) return false;
     if (getAnnotationKind() != other.getAnnotationKind())
         return false;
 

+ 3 - 2
ecl/hql/hqlgram2.cpp

@@ -12686,13 +12686,14 @@ bool verifySimplifiedDefinition(IHqlExpression *origExpr, IHqlExpression *simpli
 
 int testHqlInternals()
 {
-    printf("Sizes: const(%u) expr(%u) select(%u) dataset(%u) annotation(%u) prop(%u)\n",
+    printf("Sizes: const(%u) expr(%u) select(%u) dataset(%u) annotation(%u) prop(%u) loc(%u)\n",
             (unsigned)sizeof(CHqlConstant),
             (unsigned)sizeof(CHqlExpressionWithType),
             (unsigned)sizeof(CHqlSelectExpression),
             (unsigned)sizeof(CHqlDataset),
             (unsigned)sizeof(CHqlAnnotation),
-            (unsigned)sizeof(CHqlDynamicProperty)
+            (unsigned)sizeof(CHqlDynamicProperty),
+            (unsigned)sizeof(CHqlLocationAnnotation)
             );
 
     //

+ 4 - 4
fs/dafsclient/rmtfile.cpp

@@ -616,13 +616,13 @@ public:
     CEndpointCS *getCrit(const SocketEndpoint &ep)
     {
         CriticalBlock b(crit);
-        Linked<CEndpointCS> clientCrit = find(ep);
-        if (!clientCrit || !clientCrit->isAlive()) // if !isAlive(), then it is in the process of being destroyed/removed.
+        CEndpointCS * clientCrit = find(ep);
+        if (!clientCrit || !clientCrit->isAliveAndLink()) // if !isAliveAndLink(), then it is in the process of being destroyed/removed.
         {
-            clientCrit.setown(new CEndpointCS(*this, ep));
+            clientCrit = new CEndpointCS(*this, ep);
             replace(*clientCrit); // NB table doesn't own
         }
-        return clientCrit.getClear();
+        return clientCrit;
     }
     unsigned getHashFromElement(const void *e) const
     {

+ 1 - 2
roxie/ccd/ccddali.cpp

@@ -668,8 +668,7 @@ public:
     static IRoxieDaliHelper *connectToDali(unsigned waitToConnect)
     {
         CriticalBlock b(daliHelperCrit);
-        LINK(daliHelper);
-        if (!daliHelper || !daliHelper->isAlive())
+        if (!daliHelper || !daliHelper->isAliveAndLink())
             daliHelper = new CRoxieDaliHelper();
         if (waitToConnect && fileNameServiceDali.length() && (!topology || !topology->getPropBool("@lockDali", false)))
         {

+ 31 - 24
roxie/ccd/ccdfile.cpp

@@ -390,7 +390,7 @@ public:
     virtual offset_t appendFile(IFile *file,offset_t pos,offset_t len) { throwUnexpected(); return 0; }
 
     virtual const char *queryFilename() { return logical->queryFilename(); }
-    virtual bool isAlive() const { return CInterface::isAlive(); }
+    virtual bool isAliveAndLink() const { return CInterface::isAliveAndLink(); }
 
     virtual IMemoryMappedFile *getMappedFile() override
     {
@@ -998,9 +998,9 @@ public:
                     if (todo.ordinality())
                     {
                         ILazyFileIO *popped = &todo.popGet();
-                        if (popped->isAlive())
+                        if (popped->isAliveAndLink())
                         {
-                            next.set(popped);
+                            next.setown(popped);
                         }
                         numFilesToProcess--;    // must decrement counter for SNMP accuracy
                     }
@@ -1164,9 +1164,10 @@ public:
         try
         {
             CriticalBlock b(crit);
-            Linked<ILazyFileIO> f = files.getValue(localLocation);
-            if (f && f->isAlive())
+            ILazyFileIO * match = files.getValue(localLocation);
+            if (match && match->isAliveAndLink())
             {
+                Owned<ILazyFileIO> f = match;
                 if ((dfsSize != (offset_t) -1 && dfsSize != f->getSize()) ||
                     (!dfsDate.isNull() && !dfsDate.equals(*f->queryDateTime(), false)))
                 {
@@ -1274,29 +1275,35 @@ public:
             CriticalBlock b(cpcrit); // paranoid...
             closePending[remote] = false;
         }
-        CriticalBlock b(crit);
-        ICopyArrayOf<ILazyFileIO> goers;
-        HashIterator h(files);
-        ForEach(h)
+        IArrayOf<ILazyFileIO> goers;
         {
-            ILazyFileIO *f = files.mapToValue(&h.query());
-            if (f->isAlive() && f->isOpen() && f->isRemote()==remote && !f->isCopying())
+            CriticalBlock b(crit);
+            HashIterator h(files);
+            ForEach(h)
             {
-                unsigned age = msTick() - f->getLastAccessed();
-                if (age > maxFileAge[remote])
+                ILazyFileIO * match = files.mapToValue(&h.query());
+                if (match->isAliveAndLink())
                 {
-                    if (traceLevel > 5)
+                    Owned<ILazyFileIO> f = match;
+                    if (f->isOpen() && f->isRemote()==remote && !f->isCopying())
                     {
-                        // NOTE - querySource will cause the file to be opened if not already open
-                        // That's OK here, since we know the file is open and remote.
-                        // But don't be tempted to move this line outside these if's (eg. to trace the idle case)
-                        const char *fname = remote ? f->querySource()->queryFilename() : f->queryFilename();
-                        DBGLOG("Closing inactive %s file %s (last accessed %u ms ago)", remote ? "remote" : "local",  fname, age);
+                        unsigned age = msTick() - f->getLastAccessed();
+                        if (age > maxFileAge[remote])
+                        {
+                            if (traceLevel > 5)
+                            {
+                                // NOTE - querySource will cause the file to be opened if not already open
+                                // That's OK here, since we know the file is open and remote.
+                                // But don't be tempted to move this line outside these if's (eg. to trace the idle case)
+                                const char *fname = remote ? f->querySource()->queryFilename() : f->queryFilename();
+                                DBGLOG("Closing inactive %s file %s (last accessed %u ms ago)", remote ? "remote" : "local",  fname, age);
+                            }
+                            f->close();
+                        }
+                        else
+                            goers.append(*f.getClear());
                     }
-                    f->close();
                 }
-                else
-                    goers.append(*f);
             }
         }
         unsigned numFilesLeft = goers.ordinality(); 
@@ -2414,9 +2421,9 @@ public:
         cached = cache;
     }
 
-    virtual bool isAlive() const
+    virtual bool isAliveAndLink() const
     {
-        return CInterface::isAlive();
+        return CInterface::isAliveAndLink();
     }
     virtual const char *queryFileName() const
     {

+ 2 - 2
roxie/ccd/ccdfile.hpp

@@ -32,7 +32,7 @@ interface ILazyFileIO : extends IFileIO
 {
     virtual const char *queryFilename() = 0;
     virtual void checkOpen() = 0;
-    virtual bool isAlive() const = 0;
+    virtual bool isAliveAndLink() const = 0;
     virtual void addSource(IFile *source) = 0;
     virtual bool isRemote() = 0;
     virtual offset_t getSize() = 0;
@@ -105,7 +105,7 @@ interface IResolvedFile : extends ISimpleSuperFileEnquiry
     virtual const char *queryPhysicalName() const = 0; // Returns NULL unless in local file mode.
     virtual const char *queryFileName() const = 0;
     virtual void setCache(IResolvedFileCache *cache) = 0;
-    virtual bool isAlive() const = 0;
+    virtual bool isAliveAndLink() const = 0;
     virtual const IPropertyTree *queryProperties() const = 0;
 
     virtual void remove() = 0;

+ 4 - 4
roxie/ccd/ccdquery.cpp

@@ -107,8 +107,8 @@ public:
     static const CQueryDll *getQueryDll(const char *dllName, bool isExe)
     {
         CriticalBlock b(dllCacheLock);
-        CQueryDll *dll = LINK(dllCache.getValue(dllName));
-        if (dll && dll->isAlive())
+        CQueryDll *dll = dllCache.getValue(dllName);
+        if (dll && dll->isAliveAndLink())
             return dll;
         else
         {
@@ -1117,8 +1117,8 @@ public:
     {
         hash64_t hv = rtlHash64Data(sizeof(channelNo), &channelNo, hashValue);
         SpinBlock b(queriesCrit);
-        CQueryFactory *factory = LINK(queryMap.getValue(hv));
-        if (factory && factory->isAlive())
+        CQueryFactory *factory = queryMap.getValue(hv);
+        if (factory && factory->isAliveAndLink())
             return factory;
         else
             return NULL;

+ 1 - 2
roxie/ccd/ccdstate.cpp

@@ -375,8 +375,7 @@ public:
         IResolvedFile *cache = files.getValue(filename);
         if (cache)
         {
-            LINK(cache);
-            if (cache->isAlive())
+            if (cache->isAliveAndLink())
                 return cache;
             if (traceLevel)
                 DBGLOG("Not returning %s from cache as isAlive() returned false", filename);

+ 1 - 2
roxie/roxiemem/roxiemem.cpp

@@ -5006,8 +5006,7 @@ protected:
             CHeap & heap = fixedHeaps.item(i);
             if (heap.matches(chunkSize, activityId, flags))
             {
-                heap.Link();
-                if (heap.isAlive())
+                if (heap.isAliveAndLink())
                     return &heap;
             }
         }

+ 37 - 18
system/jlib/jiface.hpp

@@ -68,7 +68,7 @@ class CSimpleInterfaceOf : public INTERFACE
 {
     friend class CInterfaceOf<INTERFACE>;    // want to keep xxcount private outside this pair of classes
 public:
-    inline virtual ~CSimpleInterfaceOf() {}
+    inline virtual ~CSimpleInterfaceOf() { }
 
     inline CSimpleInterfaceOf() : xxcount(1) { }
     inline bool IsShared(void) const    { return xxcount.load(std::memory_order_relaxed) > 1; }
@@ -119,31 +119,46 @@ class CInterfaceOf : public CSimpleInterfaceOf<INTERFACE>
 public:
     virtual void beforeDispose() {}
 
-    inline bool isAlive() const         { return this->xxcount.load(std::memory_order_relaxed) < DEAD_PSEUDO_COUNT; }       //only safe if Link() is called first
+    //Function to be called when checking if an entry in a non-linking cache is valid
+    //must be called inside a critical section that protects access to the cache.
+    //This function must only increment the link count if the object is valid - otherwise it
+    //ends up being freed more than once.
+    inline bool isAliveAndLink() const
+    {
+        unsigned expected = this->xxcount.load(std::memory_order_acquire);
+        for (;;)
+        {
+            //If count is 0, or count >= DEAD_PSEUDO_COUNT then return false - combine both into a single test
+            if ((expected-1) >= (DEAD_PSEUDO_COUNT-1))
+                return false;
+
+            //Avoid incrementing the link count if xxcount=0, otherwise it introduces a race condition
+            //so use compare and exchange to increment it only if it hasn't changed
+            if (this->xxcount.compare_exchange_weak(expected, expected+1, std::memory_order_acq_rel))
+                return true;
+        }
+    }
 
     inline bool Release(void) const
     {
         if (this->xxcount.fetch_sub(1,std::memory_order_release) == 1)
         {
-            unsigned zero = 0;
-            //Because beforeDispose could cause this object to be linked/released or call isAlive(), this->xxcount is set
-            //to a a high mid-point positive number to avoid poss. of releasing again.
-            if (this->xxcount.compare_exchange_strong(zero, DEAD_PSEUDO_COUNT, std::memory_order_acq_rel))
+            //Overwrite with a special value so that any calls to Link()/Release within beforeDispose()
+            //do not cause the object to be released again.
+            this->xxcount.store(DEAD_PSEUDO_COUNT, std::memory_order_release);
+            try
+            {
+                const_cast<CInterfaceOf<INTERFACE> *>(this)->beforeDispose();
+            }
+            catch (...)
             {
-                try
-                {
-                    const_cast<CInterfaceOf<INTERFACE> *>(this)->beforeDispose();
-                }
-                catch (...)
-                {
 #if _DEBUG
-                    assert(!"ERROR - Exception in beforeDispose - object will be leaked");
+                assert(!"ERROR - Exception in beforeDispose - object will be leaked");
 #endif
-                    throw;
-                }
-                delete this;
-                return true;
+                throw;
             }
+            delete this;
+            return true;
         }
         return false;
     }
@@ -219,7 +234,11 @@ class CSingleThreadInterfaceOf : public CSingleThreadSimpleInterfaceOf<INTERFACE
 public:
     virtual void beforeDispose() {}
 
-    inline bool isAlive() const         { return this->xxcount < DEAD_PSEUDO_COUNT; }       //only safe if Link() is called first
+    inline bool isAliveAndLink() const
+    {
+        this->Link();
+        return this->xxcount < DEAD_PSEUDO_COUNT;
+    }
 
     inline bool Release(void) const
     {

+ 2 - 0
testing/unittests/CMakeLists.txt

@@ -34,6 +34,7 @@ set (    SRCS
          dalitests.cpp
          jlibtests.cpp
          cryptotests.cpp
+         hqltests.cpp
          configmgr/ConfigMgrUnitTests.cpp
          configmgr/ConfigMgrTemplateTests.cpp
          configmgr/ConfigMgrHPCCTests.cpp
@@ -52,6 +53,7 @@ include_directories (
          ./../../common/deftype
          ./../../system/security/cryptohelper
          ./../../configuration/configmgr/configmgrlib
+         ${HPCC_SOURCE_DIR}/ecl/hql
          ${HPCC_SOURCE_DIR}/configuration/configmgr/RapidJSON/include
          ${CMAKE_BINARY_DIR}
          ${CMAKE_BINARY_DIR}/oss

+ 128 - 0
testing/unittests/hqltests.cpp

@@ -0,0 +1,128 @@
+/*##############################################################################
+
+    HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+############################################################################## */
+
+#ifdef _USE_CPPUNIT
+#include "unittests.hpp"
+#include "hqlexpr.hpp"
+
+class ThreadedParseStressTest : public CppUnit::TestFixture
+{
+    CPPUNIT_TEST_SUITE( ThreadedParseStressTest  );
+        CPPUNIT_TEST(testAllocs);
+        CPPUNIT_TEST(testThreads);
+    CPPUNIT_TEST_SUITE_END();
+
+    void testThreads(unsigned maxParallel)
+    {
+        const unsigned numIter = 20000;
+        const unsigned numFor = 50;
+        CCycleTimer timer;
+        class casyncfor: public CAsyncFor
+        {
+        public:
+            void Do(unsigned i)
+            {
+                if (i & 1)
+                {
+                    const char * query =
+                            "RECORD\n"
+                            "  unsigned2 v21;\n"
+                            "  unsigned2 v12;\n"
+                            "  unsigned2 v8;\n"
+                            "  real8 factor;\n"
+                            " END;\n";
+                    OwnedHqlExpr parsed;
+                    for (unsigned i=0; i < numIter; i++)
+                    {
+                        OwnedHqlExpr next = parseQuery(query, nullptr);
+                    }
+                }
+                else
+                {
+                    for (unsigned i=0; i < numIter*2; i++)
+                    {
+                        //Allocate something the same size as CLocationAnnotation
+                        free(calloc(72, 1));
+                        /*
+                        try
+                        {
+                            SocketEndpoint ep(99);
+//                            Owned<ISocket> temp = ISocket::connect_timeout(ep, 0);
+                            Owned<ISocket> temp = ISocket::create(99);
+                        }
+                        catch (IException * e)
+                        {
+                            e->Release();
+                        }*/
+                    }
+                }
+            }
+        } afor;
+        afor.For(numFor, maxParallel, false, false);
+        unsigned __int64 numNs = timer.elapsedNs();
+        unsigned __int64 totalIters = numFor * numIter;
+        printf("test %u threads took %uns each parse\n", maxParallel, (unsigned)(numNs/(totalIters)));
+    }
+
+    void testThreads()
+    {
+        testThreads(2);
+        testThreads(3);
+        testThreads(4);
+        testThreads(8);
+        testThreads(16);
+        testThreads(32);
+    }
+
+    void testAllocs(unsigned maxParallel)
+    {
+        const unsigned numIter = 100000;
+        CCycleTimer timer;
+        class casyncfor: public CAsyncFor
+        {
+        public:
+            void Do(unsigned i)
+            {
+                OwnedHqlExpr zero = createConstant(0);
+                for (unsigned i=0; i < numIter; i++)
+                {
+                    OwnedHqlExpr annot = createLocationAnnotation(LINK(zero), nullptr, 0, 0);
+                }
+            }
+        } afor;
+        afor.For(100, maxParallel, false, false);
+        unsigned __int64 numNs = timer.elapsedNs();
+        unsigned __int64 totalIters = 100 * numIter;
+        printf("test %u threads took %uns each alloc\n", maxParallel, (unsigned)(numNs/(totalIters)));
+    }
+
+    void testAllocs()
+    {
+        testAllocs(2);
+        testAllocs(3);
+        testAllocs(4);
+        testAllocs(8);
+        testAllocs(16);
+        testAllocs(32);
+    }
+
+};
+
+CPPUNIT_TEST_SUITE_REGISTRATION( ThreadedParseStressTest );
+CPPUNIT_TEST_SUITE_NAMED_REGISTRATION( ThreadedParseStressTest, "ThreadedParseStressTest" );
+
+#endif // _USE_CPPUNIT

+ 4 - 4
thorlcr/graph/thgraphslave.cpp

@@ -2111,12 +2111,12 @@ public:
         if (crc)
             id.append(crc);
         CriticalBlock b(crit);
-        Linked<CLazyFileIO> file = files.find(id);
-        if (!file || !file->isAlive())
+        CLazyFileIO * file = files.find(id);
+        if (!file || !file->isAliveAndLink())
         {
             Owned<IActivityReplicatedFile> repFile = createEnsurePrimaryPartFile(logicalFilename, &partDesc);
             bool compressed = partDesc.queryOwner().isCompressed();
-            file.setown(new CLazyFileIO(*this, filename, id, repFile.getClear(), compressed, expander));
+            file = new CLazyFileIO(*this, filename, id, repFile.getClear(), compressed, expander);
             files.replace(* file); // NB: files does not own 'file', CLazyFileIO will remove itself from cache on destruction
 
             /* NB: there will be 1 CLazyFileIO per physical file part name
@@ -2130,7 +2130,7 @@ public:
              */
         }
         file->setActivity(&activity); // an activity needed by IActivityReplicatedFile, mainly for logging purposes.
-        return file.getClear();
+        return file;
     }
 friend class CLazyFileIO;
 };