瀏覽代碼

HPCC-9246 SOAPCALL may lose records in 4.0.0-rc6

A regression while fixing HPCC-8978 (SOAPCALL in child queries) introduced
a race condition into SOAPCALL that could result in records being lost.

A separate race condition that meant the result of the sopcall.ecl regression
test might be indeterminate in the number of error rows returned was spotted
while tracking down this issue (rows are allocated to caller threads
indeterminately, but exceptions may be reported one per batch).  That one
is not fixed - it's pretty benign - but the test was changed to ensure that
it does not give false positives in the regression suite.

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 12 年之前
父節點
當前提交
75b74932d7
共有 3 個文件被更改,包括 9 次插入4 次删除
  1. 8 2
      common/thorhelper/thorsoapcall.cpp
  2. 0 1
      roxie/ccd/ccdserver.cpp
  3. 1 1
      testing/ecl/roxie/soapcall.ecl

+ 8 - 2
common/thorhelper/thorsoapcall.cpp

@@ -686,7 +686,7 @@ public:
 class CWSCHelper : public CInterface, implements IWSCHelper
 {
 private:
-    SimpleInterThreadQueueOf<const void, false> outputQ;
+    SimpleInterThreadQueueOf<const void, true> outputQ;
     SpinLock outputQLock;
     CriticalSection toXmlCrit, transformCrit, onfailCrit, timeoutCrit;
     unsigned done;
@@ -972,6 +972,7 @@ protected:
 
     void putRow(const void * row)
     {
+        assertex(row);
         outputQ.enqueue(row);
     }
     void setDone()
@@ -983,7 +984,12 @@ protected:
             doStop = (done == numRowThreads);
         }
         if (doStop)
-            outputQ.stop();
+        {
+            // Note - Don't stop the queue - that effectively discards what's already on there,
+            // which is not what we want.
+            // Instead, push a NULL to indicate the end of the output.
+            outputQ.enqueue(NULL);
+        }
     }
     void setErrorOwn(IException * e)
     {

+ 0 - 1
roxie/ccd/ccdserver.cpp

@@ -13813,7 +13813,6 @@ public:
 //=================================================================================
 
 typedef SafeQueueOf<const void, true> SafeRowQueue;
-typedef SimpleInterThreadQueueOf<const void, true> InterThreadRowQueue;
 
 class CRowQueuePseudoInput : public CPseudoRoxieInput
 {

+ 1 - 1
testing/ecl/roxie/soapcall.ecl

@@ -54,7 +54,7 @@ END;
 
 output(SORT(SOAPCALL(d, 'http://127.0.0.1:9876|http://127.0.0.1:9875','soapbase', { unkname }, DATASET(ServiceOutRecord), onFail(doError(LEFT)),RETRY(0), log('SOAP: ' + unkname),TIMEOUT(1)), record));
 output(SORT(SOAPCALL('http://127.0.0.1:9876','soapbase', { string unkname := 'FAIL' }, dataset(ServiceOutRecord),onFail(doError2),RETRY(0), LOG(MIN)),record));
-output(SORT(SOAPCALL(d, 'http://127.0.0.1:9876','soapbaseNOSUCHQUERY', { unkname }, DATASET(ServiceOutRecord), onFail(doError(LEFT)),MERGE(25),RETRY(0), LOG(MIN)), record));
+output(SORT(SOAPCALL(d, 'http://127.0.0.1:9876','soapbaseNOSUCHQUERY', { unkname }, DATASET(ServiceOutRecord), onFail(doError(LEFT)),MERGE(25),PARALLEL(1),RETRY(0), LOG(MIN)), record));
 
 childRecord := record
 unsigned            id;