Procházet zdrojové kódy

Merge pull request #4118 from jakesmith/hpcc-8978

HPCC-8978 - Issues with SOAPCALL in child queries.

Reviewed-By: Gavin Halliday <gavin.halliday@lexisnexis.com>
Reviewed-By: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman před 12 roky
rodič
revize
4051109b26

+ 40 - 78
common/thorhelper/thorsoapcall.cpp

@@ -686,18 +686,17 @@ public:
 class CWSCHelper : public CInterface, implements IWSCHelper
 {
 private:
-    QueueOf<const void, false> outputQ;                     // all access is protected by outputCrit
-    CriticalSection outputCrit, errorCrit, toXmlCrit, transformCrit, onfailCrit, timeoutCrit;
+    SimpleInterThreadQueueOf<const void, false> outputQ;
+    SpinLock outputQLock;
+    CriticalSection toXmlCrit, transformCrit, onfailCrit, timeoutCrit;
     unsigned done;
-    InterruptableSemaphore resultAvailable;
-    bool complete;
     Linked<ClientCertificate> clientCert;
 
     static CriticalSection secureContextCrit;
     static Owned<ISecureSocketContext> secureContext;
 
     CTimeMon timeLimitMon;
-    bool timeLimitExceeded;
+    bool complete, timeLimitExceeded;
     IRoxieAbortMonitor * roxieAbortMonitor;
 
 protected:
@@ -705,6 +704,8 @@ protected:
     WSCType wscType;
 
 public:
+    IMPLEMENT_IINTERFACE;
+
     CWSCHelper(IWSCRowProvider *_rowProvider, IEngineRowAllocator * _outputAllocator, const char *_authToken, WSCMode _wscMode, ClientCertificate *_clientCert, const IContextLogger &_logctx, IRoxieAbortMonitor *_roxieAbortMonitor, WSCType _wscType)
         : logctx(_logctx), outputAllocator(_outputAllocator), clientCert(_clientCert), roxieAbortMonitor(_roxieAbortMonitor)
     {
@@ -812,14 +813,10 @@ public:
             acceptType.toLowerCase();
         }
 
-        if(callHelper)
-        {
+        if (callHelper)
             rowTransformer = callHelper->queryInputTransformer();
-        }
         else
-        {
             rowTransformer = NULL;
-        }
 
         OwnedRoxieString hosts(helper->getHosts());
         UrlListParser urlListParser(hosts);
@@ -864,100 +861,68 @@ public:
         for (unsigned i=0; i<numRowThreads; i++)
             threads.append(*new CWSCHelperThread(this));
     }
-
-    IMPLEMENT_IINTERFACE;
-
     ~CWSCHelper()
     {
         complete = true;
         waitUntilDone();
         threads.kill();
-        while (outputQ.ordinality())
-            outputAllocator->releaseRow(outputQ.dequeue());
     }
-
     void waitUntilDone()
     {
         ForEachItemIn(i,threads)
             threads.item(i).join();
+        loop
+        {
+            const void *row = outputQ.dequeueNow();
+            if (!row)
+                break;
+            outputAllocator->releaseRow(row);
+        }
+        outputQ.reset();
     }
-
     void start()
     {
         if (timeLimitMS != WAIT_FOREVER)
             timeLimitMon.reset(timeLimitMS);
 
+        done = 0;
+        complete = aborted = timeLimitExceeded = false;
+
         ForEachItemIn(i,threads)
             threads.item(i).start();
     }
-
     void abort()
     {
         aborted = true;
         complete = true;
-        resultAvailable.signal();
+        outputQ.stop();
     }
-
-    size32_t __deprecated__getRow(void *buffer)
-    {
-        const void * row = getRow();
-        if (row)
-        {
-            size32_t sizeGot = outputAllocator->queryOutputMeta()->getRecordSize(row);
-            memcpy(buffer, row, sizeGot);
-            outputAllocator->releaseRow(row);
-            return sizeGot;
-        }
-        return 0;
-    }
-
     const void * getRow()
     {
-        if (complete) return NULL;
+        if (complete)
+            return NULL;
         loop
         {
-            resultAvailable.wait();
+            const void *row = outputQ.dequeue();
             if (aborted)
                 break;
-            {
-                CriticalBlock block(outputCrit);
-                if (outputQ.ordinality())
-                {
-                    return outputQ.dequeue();
-                }
-                else if (done == numRowThreads)
-                {
-                    complete = true;
-                    if (error.get())
-                        throw error.getLink();
-                    break;
-                }
-                // should never get here
-            }
+            if (row)
+                return row;
+            // should only be here if setDone() triggered
+            complete = true;
+            Owned<IException> e = getError();
+            if (e)
+                throw e.getClear();
+            break;
         }
         return NULL;
     }
-
-    bool rowAvailable()
-    {
-        CriticalBlock block(outputCrit);
-        return (outputQ.ordinality() > 0);
-    }
-
-    bool queryDone()
-    {
-        CriticalBlock block(outputCrit);
-        return (done == numRowThreads);
-    }
-
     IException * getError()
     {
-        CriticalBlock block(errorCrit);
+        SpinBlock sb(outputQLock);
         return error.getLink();
     }
-
     inline IEngineRowAllocator * queryOutputAllocator() const { return outputAllocator; }
-
     ISecureSocket *createSecureSocket(ISocket *sock)
     {
         {
@@ -972,7 +937,6 @@ public:
         }
         return secureContext->createSecureSocket(sock);
     }
-
     bool isTimeLimitExceeded(unsigned *_remainingMS)
     {
         if (timeLimitMS != WAIT_FOREVER)
@@ -999,7 +963,6 @@ public:
             logctx.CTXLOG("%s: %.*s", wscCallTypeText(), lenText, text.getstr());
         }
     }
-
     inline IXmlToRowTransformer * getRowTransformer() { return rowTransformer; }
     inline const char * wscCallTypeText() const { return wscType == STsoap ? "SOAPCALL" : "HTTPCALL"; }
 
@@ -1009,28 +972,27 @@ protected:
 
     void putRow(const void * row)
     {
-        CriticalBlock block(outputCrit);
         outputQ.enqueue(row);
-        resultAvailable.signal();
     }
-
     void setDone()
     {
-        CriticalBlock block(outputCrit);
-        done++;
-        if (done == numRowThreads)
-            resultAvailable.signal();
+        bool doStop;
+        {
+            SpinBlock sb(outputQLock);
+            done++;
+            doStop = (done == numRowThreads);
+        }
+        if (doStop)
+            outputQ.stop();
     }
-
     void setErrorOwn(IException * e)
     {
-        CriticalBlock block(errorCrit);
+        SpinBlock sb(outputQLock);
         if (error)
             ::Release(e);
         else
             error.setown(e);
     }
-
     void toXML(const byte * self, IXmlWriter & out) { CriticalBlock block(toXmlCrit); helper->toXML(self, out); }
     size32_t transformRow(ARowBuilder & rowBuilder, IColumnProvider * row) 
     { 

+ 0 - 3
common/thorhelper/thorsoapcall.hpp

@@ -44,9 +44,6 @@ typedef IWSCRowProvider ISoapCallRowProvider;//DEPRECATED
 //Web Service Call Helper
 interface IWSCHelper : extends IInterface
 {
-    virtual bool rowAvailable() = 0;
-    virtual size32_t __deprecated__getRow(void * buffer) = 0;
-    virtual bool queryDone() = 0;
     virtual void start() = 0;
     virtual void abort() = 0;
     virtual void waitUntilDone() = 0;

+ 8 - 3
system/jlib/jqueue.tpp

@@ -343,10 +343,8 @@ protected:
 public:
     SimpleInterThreadQueueOf<BASE, ALLOWNULLS>() 
     {
-        enqwaiting = 0;
-        deqwaiting = 0;
-        stopped = false; 
         limit = 0; // no limit
+        reset();
     }
 
     ~SimpleInterThreadQueueOf<BASE, ALLOWNULLS>() 
@@ -354,6 +352,13 @@ public:
         stop();
     }
 
+    void reset()
+    {
+        enqwaiting = 0;
+        deqwaiting = 0;
+        stopped = false;
+    }
+
     bool enqueue(BASE *e,unsigned timeout=INFINITE) 
     { 
         CriticalBlock b(SELF::crit);    

+ 0 - 1
thorlcr/activities/activitymasters_lcr.cmake

@@ -59,7 +59,6 @@ set (    SRCS
          result/thresult.cpp 
          rollup/throllup.cpp 
          selectnth/thselectnth.cpp 
-         soapcall/thsoapcall.cpp 
          spill/thspill.cpp 
          thdiskbase.cpp 
          topn/thtopn.cpp 

+ 0 - 53
thorlcr/activities/soapcall/thsoapcall.cpp

@@ -1,53 +0,0 @@
-/*##############################################################################
-
-    HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems.
-
-    Licensed under the Apache License, Version 2.0 (the "License");
-    you may not use this file except in compliance with the License.
-    You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
-    Unless required by applicable law or agreed to in writing, software
-    distributed under the License is distributed on an "AS IS" BASIS,
-    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-    See the License for the specific language governing permissions and
-    limitations under the License.
-############################################################################## */
-
-#include "thexception.hpp"
-#include "thsoapcall.ipp"
-#include "dasess.hpp"
-
-class SoapCallActivityMaster : public CMasterActivity
-{
-private:
-    StringBuffer authToken;
-
-public:
-    SoapCallActivityMaster(CMasterGraphElement * info) : CMasterActivity(info)
-    {
-    }
-
-    virtual void init()
-    {
-        // Build authentication token
-        StringBuffer uidpair;
-        IUserDescriptor *userDesc = container.queryJob().queryUserDescriptor();
-        userDesc->getUserName(uidpair);
-        uidpair.append(":");
-        userDesc->getPassword(uidpair);
-        JBASE64_Encode(uidpair.str(), uidpair.length(), authToken);
-    }
-
-    void serializeSlaveData(MemoryBuffer &dst, unsigned slave)
-    {
-        dst.append(authToken.str());
-    }
-};
-
-
-CActivityBase *createSoapCallActivityMaster(CMasterGraphElement *container)
-{
-    return new SoapCallActivityMaster(container);
-}

+ 0 - 27
thorlcr/activities/soapcall/thsoapcall.ipp

@@ -1,27 +0,0 @@
-/*##############################################################################
-
-    HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems.
-
-    Licensed under the Apache License, Version 2.0 (the "License");
-    you may not use this file except in compliance with the License.
-    You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
-    Unless required by applicable law or agreed to in writing, software
-    distributed under the License is distributed on an "AS IS" BASIS,
-    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-    See the License for the specific language governing permissions and
-    limitations under the License.
-############################################################################## */
-
-#ifndef _THSOAPCALL_IPP
-#define _THSOAPCALL_IPP
-
-#include "thactivitymaster.ipp"
-
-
-CActivityBase *createSoapCallActivityMaster(CMasterGraphElement *container);
-
-
-#endif

+ 28 - 16
thorlcr/activities/soapcall/thsoapcallslave.cpp

@@ -17,9 +17,21 @@
 
 #include "thsoapcallslave.ipp"
 #include "thactivityutil.ipp"
+#include "dasess.hpp"
 
 //---------------------------------------------------------------------------
 
+
+static StringBuffer &buildAuthToken(IUserDescriptor *userDesc, StringBuffer &authToken)
+{
+    StringBuffer uidpair;
+    userDesc->getUserName(uidpair);
+    uidpair.append(":");
+    userDesc->getPassword(uidpair);
+    JBASE64_Encode(uidpair.str(), uidpair.length(), authToken);
+    return authToken;
+}
+
 class SoapRowCallSlaveActivity : public CSlaveActivity, public CThorDataLink, implements ISoapCallRowProvider
 {
     bool eof;
@@ -33,11 +45,11 @@ public:
     // IThorSlaveActivity overloaded methods
     virtual void init(MemoryBuffer &data, MemoryBuffer &slaveData)
     {
-        const char *authToken;
-        data.read(authToken);
+        StringBuffer authToken;
+        buildAuthToken(queryJob().queryUserDescriptor(), authToken);
         appendOutputLinked(this);
         if (container.queryLocalOrGrouped() || firstNode())
-            soaphelper.setown(createSoapCallHelper(this, queryRowAllocator(), authToken, SCrow, NULL, queryDummyContextLogger(),NULL));
+            soaphelper.setown(createSoapCallHelper(this, queryRowAllocator(), authToken.str(), SCrow, NULL, queryDummyContextLogger(),NULL));
     }
     // IThorDataLink methods
     virtual void start()
@@ -51,7 +63,8 @@ public:
     }
     virtual void stop()
     {
-        abortSoon = true;
+        if (soaphelper)
+            soaphelper->waitUntilDone();
         dataLinkStop();
     }
     CATCH_NEXTROW()
@@ -112,10 +125,10 @@ public:
     // IThorSlaveActivity overloaded methods
     virtual void init(MemoryBuffer &data, MemoryBuffer &slaveData)
     {
-        const char *authToken;
-        data.read(authToken);
+        StringBuffer authToken;
+        buildAuthToken(queryJob().queryUserDescriptor(), authToken);
         appendOutputLinked(this);
-        soaphelper.setown(createSoapCallHelper(this, queryRowAllocator(), authToken, SCdataset, NULL, queryDummyContextLogger(),NULL));
+        soaphelper.setown(createSoapCallHelper(this, queryRowAllocator(), authToken.str(), SCdataset, NULL, queryDummyContextLogger(),NULL));
     }
     // IThorDataLink methods
     virtual void start()
@@ -130,7 +143,7 @@ public:
     }
     virtual void stop()
     {
-        abortSoon = true;
+        eof = true;
         stopInput(input);
         dataLinkStop();
     }
@@ -173,7 +186,7 @@ public:
     virtual const void * getNextRow()
     {
         CriticalBlock b(crit);
-        if (abortSoon)
+        if (eof)
             return NULL;
         return input->nextRow();
     }
@@ -197,10 +210,10 @@ public:
     // IThorSlaveActivity overloaded methods
     virtual void init(MemoryBuffer &data, MemoryBuffer &slaveData)
     {
-        const char *authToken;
-        data.read(authToken);
+        StringBuffer authToken;
+        buildAuthToken(queryJob().queryUserDescriptor(), authToken);
         if (container.queryLocalOrGrouped() || firstNode())
-            soaphelper.setown(createSoapCallHelper(this, NULL, authToken, SCrow, NULL, queryDummyContextLogger(),NULL));
+            soaphelper.setown(createSoapCallHelper(this, NULL, authToken.str(), SCrow, NULL, queryDummyContextLogger(),NULL));
     }
 
     // IThorSlaveProcess overloaded methods
@@ -249,10 +262,9 @@ public:
     // IThorSlaveActivity overloaded methods
     virtual void init(MemoryBuffer &data, MemoryBuffer &slaveData)
     {
-        const char *authToken;
-        data.read(authToken);
-
-        soaphelper.setown(createSoapCallHelper(this, NULL, authToken, SCdataset, NULL, queryDummyContextLogger(),NULL));
+        StringBuffer authToken;
+        buildAuthToken(queryJob().queryUserDescriptor(), authToken);
+        soaphelper.setown(createSoapCallHelper(this, NULL, authToken.str(), SCdataset, NULL, queryDummyContextLogger(),NULL));
     }
 
     // IThorSlaveProcess overloaded methods

+ 4 - 7
thorlcr/master/thactivitymaster.cpp

@@ -73,7 +73,6 @@ MODULE_INIT(INIT_PRIORITY_STANDARD)
 #include "xmlwrite/thxmlwrite.ipp"
 #include "merge/thmerge.ipp"
 #include "fetch/thfetch.ipp"
-#include "soapcall/thsoapcall.ipp"
 #include "loop/thloop.ipp"
 
 CActivityBase *createGroupActivityMaster(CMasterGraphElement *container);
@@ -147,6 +146,10 @@ public:
             case TAKnwayjoin:
             case TAKgraphloopresultread:
             case TAKstreamediterator:
+            case TAKsoap_rowdataset:
+            case TAKsoap_rowaction:
+            case TAKsoap_datasetdataset:
+            case TAKsoap_datasetaction:
                 ret = new CMasterActivity(this);
                 break;
             case TAKskipcatch:
@@ -341,12 +344,6 @@ public:
             case TAKmerge:
                 ret = createMergeActivityMaster(this);
                 break;
-            case TAKsoap_rowdataset:
-            case TAKsoap_rowaction:
-            case TAKsoap_datasetdataset:
-            case TAKsoap_datasetaction:
-                ret = createSoapCallActivityMaster(this);
-                break;
             case TAKkeydiff:
                 ret = createKeyDiffActivityMaster(this);
                 break;