Browse Source

Merge branch 'soapCallMS' of https://github.com/RussWhitehead/HPCC-Platform into RussWhitehead-soapCallMS

Conflicts:
	rtl/include/eclhelper.hpp

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 13 years ago
parent
commit
18d3460af8

+ 75 - 47
common/thorhelper/thorsoapcall.cpp

@@ -335,7 +335,7 @@ public:
     ISocket* connect(SocketEndpoint &ep,
                      const IContextLogger &logctx,
                      unsigned retries,
-                     unsigned timeout,
+                     unsigned timeoutMS,
                      IRoxieAbortMonitor * roxieAbortMonitor)
     {
         if (lookup(ep, logctx))
@@ -349,10 +349,10 @@ public:
         {
             checkRoxieAbortMonitor(roxieAbortMonitor);
             Owned<ISocket> sock;
-            Owned<ISocketConnectWait> scw = nonBlockingConnect(ep, timeout == WAIT_FOREVER ? 60000 : timeout*retries*1000);
+            Owned<ISocketConnectWait> scw = nonBlockingConnect(ep, timeoutMS == WAIT_FOREVER ? 60000 : timeoutMS*(retries+1));
             loop
             {
-                sock.setown(scw->wait(1000));//throws if connect fails or timeout
+                sock.setown(scw->wait(1000));//throws if connect fails or timeoutMS
                 checkRoxieAbortMonitor(roxieAbortMonitor);
                 if (sock)
                     return sock.getLink();
@@ -388,11 +388,11 @@ public:
                      char const* host,
                      const IContextLogger &logctx,
                      unsigned retries,
-                     unsigned timeout,
+                     unsigned timeoutMS,
                      IRoxieAbortMonitor * roxieAbortMonitor )
     {
         SocketEndpoint ep(host, port);
-        return connect(ep, logctx, retries, timeout, roxieAbortMonitor);
+        return connect(ep, logctx, retries, timeoutMS, roxieAbortMonitor);
     }
 
     virtual IPooledThread *createNew()
@@ -564,7 +564,7 @@ interface IWSCAsyncFor: public IInterface
 {
     virtual void processException(const Url &url, const void *row, IException *e) = 0;
     virtual void processException(const Url &url, ConstPointerArray &inputRows, IException *e) = 0;
-    virtual void checkTimeLimitExceeded() = 0;
+    virtual void checkTimeLimitExceeded(unsigned * _remainingMS) = 0;
 
     virtual void createHttpRequest(Url &url, StringBuffer &request) = 0;
     virtual int readHttpResponse(StringBuffer &response, ISocket *socket) = 0;
@@ -731,17 +731,39 @@ public:
         logXML = (flags & SOAPFlog) != 0;
         logUserMsg = (flags & SOAPFlogusermsg) != 0;
 
-        timeout = helper->getTimeout();
-        if (timeout == (unsigned)-1)
-            timeout = 300; // 300 second default
-        else if (timeout == 0)
-            timeout = WAIT_FOREVER;
+        IHThorWebServiceCallExtra2 * helperExtra2 = static_cast<IHThorWebServiceCallExtra2*>(helper->selectInterface(TAIsoapcallextra_2));
+        if (helperExtra2)
+        {
+            double dval = helperExtra2->getTimeoutMS();//double, indicating seconds and nanoseconds.
+            if (dval == -1.0)//not provided
+                timeoutMS = 300*1000; // 300 second default
+            else if (dval == 0)
+                timeoutMS = WAIT_FOREVER;
+            else
+                timeoutMS = dval * 1000;
 
-        timeLimit = helper->getTimeLimit();
-        if (timeLimit == 0  ||  timeLimit == (unsigned)-1)
-            timeLimit = WAIT_FOREVER;   //default
+            dval = helperExtra2->getTimeLimitMS();
+            if (dval <= 0.0)
+                timeLimitMS = WAIT_FOREVER;
+            else
+                timeLimitMS = dval * 1000;
+        }
         else
-            timeLimitMon.reset(timeLimit*1000);
+        {
+            timeoutMS = helper->getTimeout();//get timeout, in seconds
+            if (timeoutMS == (unsigned)-1)
+                timeoutMS = 300*1000; // 300 second default
+            else if (timeoutMS == 0)
+                timeoutMS = WAIT_FOREVER;
+            else
+                timeoutMS *= 1000;
+
+            timeLimitMS = helper->getTimeLimit();
+            if (timeLimitMS == 0  ||  timeLimitMS == (unsigned)-1)
+                timeLimitMS = WAIT_FOREVER;	//default
+            else
+                timeLimitMS *= 1000;
+        }
 
         if (wscType == STsoap)
         {
@@ -855,7 +877,6 @@ public:
         else
             throw MakeStringException(0, "%sCALL specified no URLs",wscType == STsoap ? "SOAP" : "HTTP");
 
-
         for (unsigned i=0; i<numRowThreads; i++)
             threads.append(*new CWSCHelperThread(this));
     }
@@ -879,6 +900,9 @@ public:
 
     void start()
     {
+        if (timeLimitMS != WAIT_FOREVER)
+            timeLimitMon.reset(timeLimitMS);
+
         ForEachItemIn(i,threads)
             threads.item(i).start();
     }
@@ -965,17 +989,19 @@ public:
         return secureContext->createSecureSocket(sock);
     }
 
-    bool isTimeLimitExceeded()
+    bool isTimeLimitExceeded(unsigned *_remainingMS)
     {
-        if (timeLimit != WAIT_FOREVER)
+        if (timeLimitMS != WAIT_FOREVER)
         {
             CriticalBlock block(timeoutCrit);
-            if (timeLimitExceeded || timeLimitMon.timedout())
+            if (timeLimitExceeded || timeLimitMon.timedout(_remainingMS))
             {
                 timeLimitExceeded = true;
                 return true;
             }
         }
+        else
+            *_remainingMS = (unsigned)-1;
         return false;
     }
 
@@ -1045,8 +1071,8 @@ protected:
     unsigned numRowThreads;
     unsigned numUrlThreads;
     unsigned maxRetries;
-    unsigned timeout; //seconds
-    unsigned timeLimit; //seconds
+    unsigned timeoutMS;
+    unsigned timeLimitMS;
     bool logXML;
     bool logMin;
     bool logUserMsg;
@@ -1294,10 +1320,10 @@ class CWSCAsyncFor : implements IWSCAsyncFor, public CInterface, public CAsyncFo
         size32_t maxLen;
         size32_t curPosn;
         ISocket * socket;
-        unsigned timeout;
+        unsigned timeoutMS;
     public:
         CSocketDataProvider(const char * _buffer, size32_t _curPosn, size32_t _currLen, size32_t _maxLen, ISocket * _sock, unsigned _timeout ) 
-            : buffer(_buffer), currLen(_currLen), maxLen(_maxLen), curPosn(_curPosn), socket(_sock), timeout(_timeout)
+            : buffer(_buffer), currLen(_currLen), maxLen(_maxLen), curPosn(_curPosn), socket(_sock), timeoutMS(_timeout)
         {
         }
         size32_t getBytes(char * buf, size32_t len)
@@ -1317,7 +1343,7 @@ class CWSCAsyncFor : implements IWSCAsyncFor, public CInterface, public CAsyncFo
                 count = 0;
                 do
                 {
-                    socket->read(buf + count, 0, len - count, bytesRead, timeout);
+                    socket->readtms(buf + count, 0, len - count, bytesRead, timeoutMS);
                     count += bytesRead;
                 } while (count != len);
                 currLen = curPosn = 0;
@@ -1331,7 +1357,7 @@ class CWSCAsyncFor : implements IWSCAsyncFor, public CInterface, public CAsyncFo
                 do
                 {
                     size32_t read;
-                    socket->read(buf+avail+bytesRead, 0, len-avail-bytesRead, read, timeout);
+                    socket->readtms(buf+avail+bytesRead, 0, len-avail-bytesRead, read, timeoutMS);
                     bytesRead += read;
                 } while (len != (bytesRead + avail));
                 count += bytesRead;
@@ -1350,6 +1376,7 @@ private:
     StringBuffer responsePath;
     Owned<CSocketDataProvider> dataProvider;
     XmlReaderOptions options;
+    unsigned remainingMS;
 
     inline void checkRoxieAbortMonitor(IRoxieAbortMonitor * roxieAbortMonitor)
     {
@@ -1438,10 +1465,10 @@ private:
         bool chunked;
         size32_t read = 0;
         do {
-            checkTimeLimitExceeded();
+            checkTimeLimitExceeded(&remainingMS);
             checkRoxieAbortMonitor(master->roxieAbortMonitor);
-            socket->read(buffer+read, 0, WSCBUFFERSIZE-read, bytesRead, master->timeout);
-            checkTimeLimitExceeded();
+            socket->readtms(buffer+read, 0, WSCBUFFERSIZE-read, bytesRead, MIN(master->timeoutMS,remainingMS));
+            checkTimeLimitExceeded(&remainingMS);
             checkRoxieAbortMonitor(master->roxieAbortMonitor);
 
             read += bytesRead;
@@ -1485,7 +1512,8 @@ private:
                             read chunk-size and CRLF
                         }
 */
-                        dataProvider.setown(new CSocketDataProvider(buffer, payloadofs, read, WSCBUFFERSIZE, socket, master->timeout));
+                        checkTimeLimitExceeded(&remainingMS);
+                        dataProvider.setown(new CSocketDataProvider(buffer, payloadofs, read, WSCBUFFERSIZE, socket, MIN(master->timeoutMS,remainingMS)));
                         dataProvider->getBytes(&ch, 1);
                         while (isalpha(ch) || isdigit(ch))
                         {   //get chunk-size
@@ -1544,10 +1572,10 @@ private:
                 response.append(read,payload);
             if (payloadsize) {  // read directly into response
                 while (read<payloadsize) {
-                    checkTimeLimitExceeded();
+                    checkTimeLimitExceeded(&remainingMS);
                     checkRoxieAbortMonitor(master->roxieAbortMonitor);
-                    socket->read(response.reserve(payloadsize-read), 0, payloadsize-read, bytesRead, master->timeout);
-                    checkTimeLimitExceeded();
+                    socket->readtms(response.reserve(payloadsize-read), 0, payloadsize-read, bytesRead, MIN(master->timeoutMS,remainingMS));
+                    checkTimeLimitExceeded(&remainingMS);
                     checkRoxieAbortMonitor(master->roxieAbortMonitor);
 
                     read += bytesRead;
@@ -1560,10 +1588,10 @@ private:
             }
             else {
                 loop {
-                    checkTimeLimitExceeded();
+                    checkTimeLimitExceeded(&remainingMS);
                     checkRoxieAbortMonitor(master->roxieAbortMonitor);
-                    socket->read(buffer, 0, WSCBUFFERSIZE, bytesRead, master->timeout);
-                    checkTimeLimitExceeded();
+                    socket->readtms(buffer, 0, WSCBUFFERSIZE, bytesRead, MIN(master->timeoutMS,remainingMS));
+                    checkTimeLimitExceeded(&remainingMS);
                     checkRoxieAbortMonitor(master->roxieAbortMonitor);
 
                     if (bytesRead==0)
@@ -1690,10 +1718,10 @@ private:
             master->setErrorOwn(ne.getClear());
     }
 
-    inline void checkTimeLimitExceeded()
+    inline void checkTimeLimitExceeded(unsigned * remainingMS)
     {
-        if (master->isTimeLimitExceeded())
-            throw MakeStringException(TIMELIMIT_EXCEEDED, "%sCALL TIMELIMIT(%u) exceeded", master->wscType == STsoap ? "SOAP" : "HTTP", master->timeLimit);
+        if (master->isTimeLimitExceeded(remainingMS))
+            throw MakeStringException(TIMELIMIT_EXCEEDED, "%sCALL TIMELIMIT(%ums) exceeded", master->wscType == STsoap ? "SOAP" : "HTTP", master->timeLimitMS);
     }
 
 public:
@@ -1742,15 +1770,15 @@ public:
             {
                 try
                 {
-                    checkTimeLimitExceeded();
+                    checkTimeLimitExceeded(&remainingMS);
                     Url &connUrl = master->proxyUrlArray.empty() ? url : master->proxyUrlArray.item(0);
-                    socket.setown(blacklist->connect(connUrl.port, connUrl.host, master->logctx, (unsigned)master->maxRetries, master->timeout, master->roxieAbortMonitor));
+                    socket.setown(blacklist->connect(connUrl.port, connUrl.host, master->logctx, (unsigned)master->maxRetries, master->timeoutMS, master->roxieAbortMonitor));
                     if (stricmp(url.method, "https") == 0)
                     {
                         Owned<ISecureSocket> ssock = master->createSecureSocket(socket.getClear());
                         if (ssock) 
                         {
-                            checkTimeLimitExceeded();
+                            checkTimeLimitExceeded(&remainingMS);
                             int status = ssock->secure_connect();
                             if (status < 0)
                             {
@@ -1769,7 +1797,7 @@ public:
                 {
                     if (master->timeLimitExceeded)
                     {
-                        master->logctx.CTXLOG("%sCALL exiting: time limit (%u) exceeded",master->wscType == STsoap ? "SOAP" : "HTTP", master->timeLimit);
+                        master->logctx.CTXLOG("%sCALL exiting: time limit (%ums) exceeded",master->wscType == STsoap ? "SOAP" : "HTTP", master->timeLimitMS);
                         processException(url, inputRows, e);
                         return;
                     }
@@ -1798,12 +1826,12 @@ public:
             }
             try
             {
-                checkTimeLimitExceeded();
+                checkTimeLimitExceeded(&remainingMS);
                 checkRoxieAbortMonitor(master->roxieAbortMonitor);
                 socket->write(request.str(), request.length());
                 if (soapTraceLevel > 4)
                     master->logctx.CTXLOG("%sCALL: sent request (%s) to %s:%d", master->wscType == STsoap ? "SOAP" : "HTTP",master->service.str(), url.host.str(), url.port);
-                checkTimeLimitExceeded();
+                checkTimeLimitExceeded(&remainingMS);
                 checkRoxieAbortMonitor(master->roxieAbortMonitor);
 
                 int rval = readHttpResponse(response, socket);
@@ -1826,7 +1854,7 @@ public:
                     throw MakeStringException(-1, "Zero length response in processQuery");
                 }
                 endTime = msTick();
-                checkTimeLimitExceeded();
+                checkTimeLimitExceeded(&remainingMS);
                 ColumnProvider * meta = (ColumnProvider*)CreateColumnProvider(endTime-startTime, master->flags&SOAPFencoding?true:false);
                 processResponse(url, response, meta);
                 delete meta;
@@ -1867,7 +1895,7 @@ public:
                 if (master->timeLimitExceeded)
                 {
                     processException(url, inputRows, e);
-                    master->logctx.CTXLOG("%sCALL exiting: time limit (%u) exceeded", master->wscType == STsoap ? "SOAP" : "HTTP", master->timeLimit);
+                    master->logctx.CTXLOG("%sCALL exiting: time limit (%ums) exceeded", master->wscType == STsoap ? "SOAP" : "HTTP", master->timeLimitMS);
                     break;
                 }
 
@@ -1882,7 +1910,7 @@ public:
                 StringBuffer s;
                 master->logctx.CTXLOG("Exception %s - retrying? (%d<%d)", e->errorMessage(s).str(), attempts, master->maxRetries);
 
-                if (attempts >= master->maxRetries)
+                if (attempts > master->maxRetries)
                 {
                     // error affects all inputRows
                     master->logctx.CTXLOG("Exiting: maxRetries exceeded");

+ 2 - 2
ecl/hql/hqlgram.y

@@ -3129,13 +3129,13 @@ soapFlag
                         }
     | TIMEOUT '(' expression ')'
                         {
-                            parser->normalizeExpression($3, type_int, false);
+                            parser->normalizeExpression($3, type_real, false);
                             $$.setExpr(createExprAttribute(timeoutAtom, $3.getExpr()));
                             $$.setPosition($1);
                         }
     | TIMELIMIT '(' expression ')'
                         {
-                            parser->normalizeExpression($3, type_int, false);
+                            parser->normalizeExpression($3, type_real, false);
                             $$.setExpr(createExprAttribute(timeLimitAtom, $3.getExpr()));
                             $$.setPosition($1);
                         }

+ 1 - 0
ecl/hqlcpp/hqlcpp.ipp

@@ -1663,6 +1663,7 @@ public:
     void doBuildVarStringFunction(BuildCtx & ctx, const char * name, IHqlExpression * value);
     void doBuildDataFunction(BuildCtx & ctx, const char * name, IHqlExpression * value);
     void doBuildStringFunction(BuildCtx & ctx, const char * name, IHqlExpression * value);
+    void doBuildDoubleFunction(BuildCtx & ctx, const char * name, IHqlExpression * value);
     void doBuildFunction(BuildCtx & ctx, ITypeInfo * type, const char * name, IHqlExpression * value);
     void doBuildFunctionReturn(BuildCtx & ctx, ITypeInfo * type, IHqlExpression * value);
     void doBuildUserFunctionReturn(BuildCtx & ctx, ITypeInfo * type, IHqlExpression * value);

+ 18 - 2
ecl/hqlcpp/hqlhtcpp.cpp

@@ -3121,6 +3121,12 @@ void HqlCppTranslator::doBuildUnsignedFunction(BuildCtx & ctx, const char * name
     doBuildFunction(ctx, unsignedType, name, value);
 }
 
+void HqlCppTranslator::doBuildDoubleFunction(BuildCtx & ctx, const char * name, IHqlExpression * value)
+{
+    Owned<ITypeInfo> type = makeRealType(8);
+    doBuildFunction(ctx, doubleType, name, value);
+}
+
 void HqlCppTranslator::doBuildUnsigned64Function(BuildCtx & ctx, const char * name, IHqlExpression * value)
 {
     Owned<ITypeInfo> type = makeIntType(8, false);
@@ -16377,6 +16383,12 @@ ABoundActivity * HqlCppTranslator::doBuildActivitySOAP(BuildCtx & ctx, IHqlExpre
     //virtual unsigned getTimeLimit()
     doBuildUnsignedFunction(instance->classctx, "getTimeLimit", queryPropertyChild(expr, timeLimitAtom, 0));
 
+    //virtual double getTimeoutMS()
+    doBuildDoubleFunction(instance->classctx, "getTimeoutMS", queryPropertyChild(expr, timeoutAtom, 0));
+
+    //virtual double getTimeLimitMS()
+    doBuildDoubleFunction(instance->classctx, "getTimeLimitMS", queryPropertyChild(expr, timeLimitAtom, 0));
+
     if (namespaceAttr)
     {
         doBuildVarStringFunction(instance->startctx, "queryNamespaceName", namespaceAttr->queryChild(0));
@@ -16423,7 +16435,6 @@ ABoundActivity * HqlCppTranslator::doBuildActivitySOAP(BuildCtx & ctx, IHqlExpre
             buildTransformBody(onFailCtx, onFailTransform, dataset, NULL, expr, selSeq);
         }
     }
-
     buildInstanceSuffix(instance);
     if (boundDataset)
         buildConnectInputOutput(ctx, instance, boundDataset, 0, 0);
@@ -16529,6 +16540,12 @@ ABoundActivity * HqlCppTranslator::doBuildActivityHTTP(BuildCtx & ctx, IHqlExpre
     //virtual unsigned getTimeLimit()
     doBuildUnsignedFunction(instance->classctx, "getTimeLimit", queryPropertyChild(expr, timeLimitAtom, 0));
 
+    //virtual double getTimeoutMS()
+    doBuildDoubleFunction(instance->classctx, "getTimeoutMS", queryPropertyChild(expr, timeoutAtom, 0));
+
+    //virtual double getTimeLimitMS()
+    doBuildDoubleFunction(instance->classctx, "getTimeLimitMS", queryPropertyChild(expr, timeLimitAtom, 0));
+
     if (namespaceAttr)
     {
         doBuildVarStringFunction(instance->startctx, "queryNamespaceName", namespaceAttr->queryChild(0));
@@ -16567,7 +16584,6 @@ ABoundActivity * HqlCppTranslator::doBuildActivityHTTP(BuildCtx & ctx, IHqlExpre
             buildTransformBody(onFailCtx, onFail->queryChild(0), dataset, NULL, expr, selSeq);
         }
     }
-
     buildInstanceSuffix(instance);
     if (boundDataset)
         buildConnectInputOutput(ctx, instance, boundDataset, 0, 0);

+ 7 - 4
ecl/hthor/hthor.cpp

@@ -6972,10 +6972,13 @@ void CHThorWSCBaseActivity::init()
     // Build authentication token
     StringBuffer uidpair;
     IUserDescriptor *userDesc = agent.queryCodeContext()->queryUserDescriptor();
-    userDesc->getUserName(uidpair);
-    uidpair.append(":");
-    userDesc->getPassword(uidpair);
-    JBASE64_Encode(uidpair.str(), uidpair.length(), authToken);
+    if (userDesc)//NULL if standalone
+    {
+        userDesc->getUserName(uidpair);
+        uidpair.append(":");
+        userDesc->getPassword(uidpair);
+        JBASE64_Encode(uidpair.str(), uidpair.length(), authToken);
+    }
     soapTraceLevel = agent.queryWorkUnit()->getDebugValueInt("soapTraceLevel", 1);
 }
 

+ 8 - 1
rtl/include/eclhelper.hpp

@@ -959,6 +959,7 @@ enum ActivityInterfaceEnum
     TAIinlinetablearg_1,
     TAIshuffleextra_1,
     TAIhashdeduparg_2,
+    TAIsoapcallextra_2,
 
 //Should remain as last of all meaningful tags, but before aliases
     TAImax,
@@ -2106,8 +2107,14 @@ struct IHThorWebServiceCallExtra : public IInterface
 };
 typedef IHThorWebServiceCallExtra IHThorSoapCallExtra;
 
+struct IHThorWebServiceCallExtra2 : public IInterface
+{
+    virtual double getTimeoutMS()   { return (double)-1.0; }//not specified, use default
+    virtual double getTimeLimitMS() { return (double)-1.0; }//not specified, use default
+};
+typedef IHThorWebServiceCallExtra2 IHThorSoapCallExtra2;
 
-struct IHThorWebServiceCallArg : public IHThorWebServiceCallActionArg, public IHThorWebServiceCallExtra
+struct IHThorWebServiceCallArg : public IHThorWebServiceCallActionArg, public IHThorWebServiceCallExtra, public IHThorWebServiceCallExtra2
 {
     COMMON_NEWTHOR_FUNCTIONS
 };

+ 6 - 1
rtl/include/eclhelper_base.hpp

@@ -2344,7 +2344,7 @@ class CThorXmlWriteArg : public CThorArg, implements IHThorXmlWriteArg
 
 //-- SOAP --
 
-class CThorSoapActionArg : public CThorArg, implements IHThorSoapActionArg
+class CThorSoapActionArg : public CThorArg, implements IHThorSoapActionArg, public IHThorWebServiceCallExtra2
 {
     virtual void Link() const { RtlCInterface::Link(); }
     virtual bool Release() const { return RtlCInterface::Release(); }
@@ -2359,6 +2359,8 @@ class CThorSoapActionArg : public CThorArg, implements IHThorSoapActionArg
         case TAIsoapactionarg_1:
         case TAIsoapactionarg_2:
             return static_cast<IHThorSoapActionArg *>(this);
+        case TAIsoapcallextra_2:
+            return static_cast<IHThorWebServiceCallExtra2 *>(this);
         }
         return NULL;
     }
@@ -2401,7 +2403,10 @@ class CThorSoapCallArg : public CThorArg, implements IHThorSoapCallArg
             return static_cast<IHThorSoapActionArg *>(this);
         case TAIsoapcallextra_1:
             return static_cast<IHThorSoapCallExtra *>(this);
+        case TAIsoapcallextra_2:
+            return static_cast<IHThorWebServiceCallExtra2 *>(this);
         }
+
         return NULL;
     }