thorsoapcall.cpp 68 KB


  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "jliball.hpp"
  14. #include "jqueue.tpp"
  15. #include "jisem.hpp"
  16. #include "thorxmlread.hpp"
  17. #include "thorxmlwrite.hpp"
  18. #include "thorcommon.ipp"
  19. #include "thorsoapcall.hpp"
  20. #include "securesocket.hpp"
  21. #include "eclrtl.hpp"
  22. #include "roxiemem.hpp"
  23. using roxiemem::OwnedRoxieString;
  24. #ifndef _WIN32
  25. #include <stdexcept>
  26. #endif
  27. #include <new>
  28. #define CONTENT_LENGTH "Content-Length: "
  29. unsigned soapTraceLevel = 1;
  30. #define WSCBUFFERSIZE 0x10000
  31. #define MAXWSCTHREADS 50 //Max Web Service Call Threads
  32. interface IReceivedRoxieException : extends IException
  33. {
  34. public:
  35. virtual const void *errorRow() = 0;
  36. };
  37. #define EXCEPTION_PREFIX "ReceivedRoxieException:"
  38. class ReceivedRoxieException: public CInterface, public IReceivedRoxieException
  39. {
  40. public:
  41. IMPLEMENT_IINTERFACE;
  42. ReceivedRoxieException(int code, const char *_msg, const void *_row=NULL) : errcode(code), msg(_msg), row(_row) { };
  43. int errorCode() const { return (errcode); };
  44. StringBuffer & errorMessage(StringBuffer &str) const
  45. {
  46. if (strncmp(str.str(), EXCEPTION_PREFIX, strlen(EXCEPTION_PREFIX)) == 0)
  47. str.append(msg);
  48. else
  49. str.append(EXCEPTION_PREFIX).append(" (").append(msg).append(")");
  50. return str;
  51. };
  52. MessageAudience errorAudience() const { return (MSGAUD_user); };
  53. const void *errorRow() { return (row); };
  54. private:
  55. int errcode;
  56. StringAttr msg;
  57. const void *row;
  58. };
  59. //=================================================================================================
  60. class Url : public CInterface, implements IInterface
  61. {
  62. public:
  63. IMPLEMENT_IINTERFACE;
  64. StringBuffer method;
  65. StringBuffer host;
  66. unsigned port;
  67. StringBuffer path;
  68. StringBuffer userPasswordPair;
  69. StringBuffer &getUrlString(StringBuffer &url) const
  70. {
  71. return url.append(method).append("://").append(host).append(":").append(port).append(path);
  72. }
  73. IException *getUrlException(IException *e) const
  74. {
  75. StringBuffer url;
  76. StringBuffer text;
  77. e->errorMessage(text);
  78. rtlAddExceptionTag(text, "url", getUrlString(url).str());
  79. if (text.length() <= 1024)
  80. return MakeStringException(e->errorCode(), "%s", text.str());
  81. else
  82. return MakeStringExceptionDirect(e->errorCode(), text.str());
  83. }
  84. Url() : port(0)
  85. {
  86. }
  87. private:
  88. char translateHex(char hex) {
  89. if(hex >= 'A')
  90. return (hex & 0xdf) - 'A' + 10;
  91. else
  92. return hex - '0';
  93. }
  94. void userPassword_decode(const char* userpass, StringBuffer& result)
  95. {
  96. if(!userpass || !*userpass)
  97. return;
  98. const char *finger = userpass;
  99. while (*finger)
  100. {
  101. char c = *finger++;
  102. if (c == '%')
  103. {
  104. if(*finger != '\0')
  105. {
  106. c = translateHex(*finger);
  107. finger++;
  108. }
  109. if(*finger != '\0')
  110. {
  111. c = (char)(c*16 + translateHex(*finger));
  112. finger++;
  113. }
  114. }
  115. result.append(c);
  116. }
  117. return;
  118. }
  119. public:
  120. Url(char *urltext)
  121. {
  122. char *p;
  123. if ((p = strstr(urltext, "://")) != NULL)
  124. {
  125. *p = 0;
  126. p += 3; // skip past the colon-slash-slash
  127. method.append(urltext);
  128. urltext = p;
  129. }
  130. else
  131. throw MakeStringException(-1, "Malformed URL");
  132. if ((p = strchr(urltext, '@')) != NULL)
  133. {
  134. // extract username & password
  135. *p = 0;
  136. p++;
  137. userPassword_decode(urltext, userPasswordPair);
  138. urltext = p;
  139. }
  140. if ((p = strchr(urltext, ':')) != NULL)
  141. {
  142. // extract the port
  143. *p = 0;
  144. p++;
  145. port = atoi(p);
  146. host.append(urltext);
  147. if ((p = strchr(p, '/')) != NULL)
  148. path.append(p);
  149. else
  150. path.append("/");
  151. }
  152. else
  153. {
  154. // no port - look at method for port
  155. if (stricmp(method.str(), "https") == 0)
  156. port = 443;
  157. else if (stricmp(method.str(), "http") == 0)
  158. port = 80;
  159. else
  160. throw MakeStringException(-1, "Unsupported access method");
  161. if ((p = strchr(urltext, '/')) != NULL)
  162. {
  163. *p = 0;
  164. p++;
  165. host.append(urltext);
  166. path.append("/").append(p);
  167. }
  168. else
  169. {
  170. host.append(urltext);
  171. path.append("/");
  172. }
  173. }
  174. }
  175. };
  176. typedef IArrayOf<Url> UrlArray;
  177. //=================================================================================================
  178. class UrlListParser
  179. {
  180. public:
  181. UrlListParser(const char * text)
  182. {
  183. fullText = strdup(text);
  184. }
  185. ~UrlListParser()
  186. {
  187. free(fullText);
  188. }
  189. unsigned getUrls(UrlArray &array)
  190. {
  191. char *copyFullText = strdup(fullText);
  192. char *saveptr;
  193. char *url = strtok_r(copyFullText, "|", &saveptr);
  194. while (url != NULL)
  195. {
  196. array.append(*new Url(url));
  197. url = strtok_r(NULL, "|", &saveptr);
  198. }
  199. free(copyFullText);
  200. return array.ordinality();
  201. }
  202. private:
  203. char *fullText;
  204. };
  205. //=================================================================================================
  206. #define BLACKLIST_RETRIES 10
  207. #define ROXIE_ABORT_EVENT 1407
  208. #define TIMELIMIT_EXCEEDED 1408
  209. class BlackLister : public CInterface, implements IThreadFactory
  210. {
  211. SocketEndpointArray list;
  212. Owned<IThreadPool> pool;
  213. CriticalSection crit;
  214. private:
  215. inline void checkRoxieAbortMonitor(IRoxieAbortMonitor * roxieAbortMonitor)
  216. {
  217. if (roxieAbortMonitor)
  218. {
  219. try
  220. {
  221. roxieAbortMonitor->checkForAbort();//throws
  222. }
  223. catch (IException *e)
  224. {
  225. StringBuffer s;
  226. throw MakeStringException(ROXIE_ABORT_EVENT, "%s", e->errorMessage(s).str());
  227. }
  228. }
  229. }
  230. public:
  231. bool lookup(SocketEndpoint &ep, const IContextLogger &logctx)
  232. {
  233. CriticalBlock b(crit);
  234. if (list.find(ep)!=NotFound)
  235. {
  236. if (soapTraceLevel > 3)
  237. {
  238. StringBuffer s;
  239. logctx.CTXLOG("socket %s is blacklisted", ep.getUrlStr(s).str());
  240. }
  241. return true;
  242. }
  243. return false;
  244. }
  245. void blacklist(SocketEndpoint &ep, const IContextLogger &logctx)
  246. {
  247. CriticalBlock b(crit);
  248. if (list.find(ep)==NotFound)
  249. {
  250. if (soapTraceLevel > 0)
  251. {
  252. StringBuffer s;
  253. logctx.CTXLOG("Blacklisting socket %s", ep.getUrlStr(s).str());
  254. }
  255. list.append(ep);
  256. pool->start(&ep);
  257. }
  258. }
  259. void deblacklist(SocketEndpoint &ep)
  260. {
  261. CriticalBlock b(crit);
  262. unsigned idx = list.find(ep);
  263. if (idx!=NotFound)
  264. {
  265. if (soapTraceLevel > 0)
  266. {
  267. StringBuffer s;
  268. DBGLOG("De-blacklisting socket %s", ep.getUrlStr(s).str());
  269. }
  270. list.remove(idx);
  271. }
  272. }
  273. public:
  274. IMPLEMENT_IINTERFACE;
  275. BlackLister()
  276. {
  277. pool.setown(createThreadPool("SocketBlacklistPool", this, NULL, 0, 0));
  278. }
  279. ISocket* connect(SocketEndpoint &ep,
  280. const IContextLogger &logctx,
  281. unsigned retries,
  282. unsigned timeoutMS,
  283. IRoxieAbortMonitor * roxieAbortMonitor)
  284. {
  285. if (lookup(ep, logctx))
  286. {
  287. StringBuffer s;
  288. ep.getUrlStr(s);
  289. throw MakeStringException(-1, "blacklisted socket %s", s.str());
  290. }
  291. Owned<IException> exc;
  292. try
  293. {
  294. checkRoxieAbortMonitor(roxieAbortMonitor);
  295. Owned<ISocket> sock;
  296. Owned<ISocketConnectWait> scw = nonBlockingConnect(ep, timeoutMS == WAIT_FOREVER ? 60000 : timeoutMS*(retries+1));
  297. loop
  298. {
  299. sock.setown(scw->wait(1000));//throws if connect fails or timeoutMS
  300. checkRoxieAbortMonitor(roxieAbortMonitor);
  301. if (sock)
  302. return sock.getLink();
  303. }
  304. }
  305. catch (IJSOCK_Exception *e)
  306. {
  307. EXCLOG(e,"BlackLister::connect");
  308. if (exc)
  309. e->Release();
  310. else
  311. exc.setown(e);
  312. }
  313. blacklist(ep, logctx);
  314. if (exc->errorCode()==JSOCKERR_connection_failed) {
  315. StringBuffer s;
  316. ep.getUrlStr(s);
  317. throw MakeStringException(JSOCKERR_connection_failed, "connection failed %s", s.str());
  318. }
  319. throw exc.getClear();
  320. return NULL;
  321. }
  322. bool blacklisted (unsigned short port, char const* host)
  323. {
  324. SocketEndpoint ep(host, port);
  325. return (list.find(ep)!=NotFound);
  326. }
  327. ISocket* connect(unsigned short port,
  328. char const* host,
  329. const IContextLogger &logctx,
  330. unsigned retries,
  331. unsigned timeoutMS,
  332. IRoxieAbortMonitor * roxieAbortMonitor )
  333. {
  334. SocketEndpoint ep(host, port);
  335. return connect(ep, logctx, retries, timeoutMS, roxieAbortMonitor);
  336. }
  337. virtual IPooledThread *createNew()
  338. {
  339. class SocketDeblacklister : public CInterface, implements IPooledThread
  340. {
  341. SocketEndpoint ep;
  342. BlackLister &parent;
  343. Semaphore stopped;
  344. public:
  345. IMPLEMENT_IINTERFACE;
  346. SocketDeblacklister(BlackLister &_parent): parent(_parent)
  347. {
  348. }
  349. ~SocketDeblacklister()
  350. {
  351. }
  352. virtual void init(void *param)
  353. {
  354. ep.set(*(SocketEndpoint *) param);
  355. }
  356. virtual void main()
  357. {
  358. unsigned delay = 5000;
  359. for (unsigned i = 0; i < BLACKLIST_RETRIES; i++)
  360. {
  361. try
  362. {
  363. Owned<ISocket> s = ISocket::connect_timeout(ep, 10000);
  364. s->close();
  365. break;
  366. }
  367. catch (IJSOCK_Exception *E)
  368. {
  369. // EXCLOG(E, "While updating socket blacklist"); // MORE - may need to downgrade if this fires traps
  370. E->Release();
  371. if (stopped.wait(delay))
  372. return;
  373. delay += delay;
  374. }
  375. }
  376. parent.deblacklist(ep);
  377. }
  378. virtual bool stop()
  379. {
  380. stopped.signal();
  381. return true;
  382. }
  383. virtual bool canReuse() { return true; }
  384. };
  385. return new SocketDeblacklister(*this);
  386. }
  387. void stop()
  388. {
  389. pool->stopAll();
  390. }
  391. } *blacklist;
  392. MODULE_INIT(INIT_PRIORITY_STANDARD)
  393. {
  394. blacklist = new BlackLister;
  395. return true;
  396. }
  397. MODULE_EXIT()
  398. {
  399. blacklist->stop();
  400. delete blacklist;
  401. }
  402. //=================================================================================================
  403. class ColumnProvider : public CInterface, public IColumnProvider
  404. {
  405. public:
  406. ColumnProvider(unsigned _callLatencyMs) : callLatencyMs(_callLatencyMs), base(NULL) {}
  407. IMPLEMENT_IINTERFACE;
  408. virtual bool getBool(const char * path) { return base->getBool(path); }
  409. virtual void getData(size32_t len, void * text, const char * path) { base->getData(len, text, path); }
  410. virtual void getDataX(size32_t & len, void * & text, const char * path) { base->getDataX(len, text, path); }
  411. virtual __int64 getInt(const char * path)
  412. {
  413. __int64 ret = base->getInt(path);
  414. if((ret==0) && path && *path=='_')
  415. {
  416. if (stricmp(path, "_call_latency_ms")==0)
  417. ret = callLatencyMs;
  418. else if (stricmp(path, "_call_latency")==0)
  419. ret = (callLatencyMs + 500) / 1000;
  420. }
  421. return ret;
  422. }
  423. virtual void getQString(size32_t len, char * text, const char * path) { base->getQString(len, text, path); }
  424. virtual void getString(size32_t len, char * text, const char * path) { base->getString(len, text, path); }
  425. virtual void getStringX(size32_t & len, char * & text, const char * path) { base->getStringX(len, text, path); }
  426. virtual void getUnicodeX(size32_t & len, UChar * & text, const char * path) { base->getUnicodeX(len, text, path); }
  427. virtual bool getIsSetAll(const char * path) { return base->getIsSetAll(path); }
  428. virtual IColumnProviderIterator * getChildIterator(const char * path) { return base->getChildIterator(path); }
  429. virtual void getUtf8X(size32_t & len, char * & text, const char * path) { base->getUtf8X(len, text, path); }
  430. virtual bool readBool(const char * path, bool _default) { return base->readBool(path, _default); }
  431. virtual void readData(size32_t len, void * text, const char * path, size32_t _lenDefault, const void * _default) { base->readData(len, text, path, _lenDefault, _default); }
  432. virtual void readDataX(size32_t & len, void * & text, const char * path, size32_t _lenDefault, const void * _default) { base->readDataX(len, text, path, _lenDefault, _default); }
  433. virtual __int64 readInt(const char * path, __int64 _default)
  434. {
  435. if(path && *path=='_')
  436. {
  437. if (stricmp(path, "_call_latency_ms")==0)
  438. return callLatencyMs;
  439. else if (stricmp(path, "_call_latency")==0)
  440. return (callLatencyMs + 500) / 1000;
  441. }
  442. return base->readInt(path, _default);
  443. }
  444. virtual void readQString(size32_t len, char * text, const char * path, size32_t _lenDefault, const char * _default) { base->readQString(len, text, path, _lenDefault, _default); }
  445. virtual void readString(size32_t len, char * text, const char * path, size32_t _lenDefault, const char * _default) { base->readString(len, text, path, _lenDefault, _default); }
  446. virtual void readStringX(size32_t & len, char * & text, const char * path, size32_t _lenDefault, const char * _default) { base->readStringX(len, text, path, _lenDefault, _default); }
  447. virtual void readUnicodeX(size32_t & len, UChar * & text, const char * path, size32_t _lenDefault, const UChar * _default) { base->readUnicodeX(len, text, path, _lenDefault, _default); }
  448. virtual bool readIsSetAll(const char * path, bool _default) { return base->readIsSetAll(path, _default); }
  449. virtual void readUtf8X(size32_t & len, char * & text, const char * path, size32_t _lenDefault, const char * _default) { base->readUtf8X(len, text, path, _lenDefault, _default); }
  450. void setBase(IColumnProvider * _base) { base = _base; }
  451. virtual void getDataRaw(size32_t len, void * text, const char * path) { base->getDataRaw(len, text, path); }
  452. virtual void getDataRawX(size32_t & len, void * & text, const char * path) { base->getDataRawX(len, text, path); }
  453. virtual void readDataRaw(size32_t len, void * text, const char * path, size32_t _lenDefault, const void * _default) { base->readDataRaw(len, text, path, _lenDefault, _default); }
  454. virtual void readDataRawX(size32_t & len, void * & text, const char * path, size32_t _lenDefault, const void * _default) { base->readDataRawX(len, text, path, _lenDefault, _default); }
  455. protected:
  456. IColumnProvider * base;
  457. private:
  458. unsigned callLatencyMs;
  459. };
  460. //=================================================================================================
  461. //Same as ColumnProvider, except returns data as hex64Binary instead of 16 bit hex
  462. class ColumnProviderData64 : public ColumnProvider
  463. {
  464. public:
  465. ColumnProviderData64(unsigned _callLatencyMs) : ColumnProvider(_callLatencyMs) {}
  466. virtual void getData(size32_t len, void * text, const char * path) { ColumnProvider::getDataRaw(len, text, path); }
  467. virtual void getDataX(size32_t & len, void * & text, const char * path) { ColumnProvider::getDataRawX(len, text, path); }
  468. virtual void readData(size32_t len, void * text, const char * path, size32_t _lenDefault, const void * _default) { ColumnProvider::readDataRaw(len, text, path, _lenDefault, _default); }
  469. virtual void readDataX(size32_t len, void * text, const char * path, size32_t _lenDefault, const void * _default) { ColumnProvider::readDataRawX(len, text, path, _lenDefault, _default); }
  470. };
  471. //=================================================================================================
  472. IColumnProvider * CreateColumnProvider(unsigned _callLatencyMs, bool _encoding)
  473. {
  474. if (!_encoding)
  475. return new ColumnProvider(_callLatencyMs);
  476. else
  477. return new ColumnProviderData64(_callLatencyMs);
  478. }
  479. //=================================================================================================
  480. enum WSCType{STsoap, SThttp} ; //web service call type
  481. //Web Services Call Asynchronous For
  482. interface IWSCAsyncFor: public IInterface
  483. {
  484. virtual void processException(const Url &url, const void *row, IException *e) = 0;
  485. virtual void processException(const Url &url, ConstPointerArray &inputRows, IException *e) = 0;
  486. virtual void checkTimeLimitExceeded(unsigned * _remainingMS) = 0;
  487. virtual void createHttpRequest(Url &url, StringBuffer &request) = 0;
  488. virtual int readHttpResponse(StringBuffer &response, ISocket *socket) = 0;
  489. virtual void processResponse(Url &url, StringBuffer &response, ColumnProvider * meta) = 0;
  490. virtual StringBuffer getResponsePath() = 0;
  491. virtual ConstPointerArray & getInputRows() = 0;
  492. virtual IWSCHelper * getMaster() = 0;
  493. virtual IEngineRowAllocator * getOutputAllocator() = 0;
  494. virtual void For(unsigned num,unsigned maxatonce,bool abortFollowingException=false,bool shuffled=false) = 0;
  495. virtual void Do(unsigned idx=0)=0;
  496. };
  497. class CWSCHelper;
  498. IWSCAsyncFor * createWSCAsyncFor(CWSCHelper * _master, CommonXmlWriter &_xmlWriter, ConstPointerArray &_inputRows, PTreeReaderOptions _options);
  499. //=================================================================================================
  500. class CMatchCB : public CInterface, implements IXMLSelect
  501. {
  502. IWSCAsyncFor &parent;
  503. const Url &url;
  504. StringAttr tail;
  505. ColumnProvider * meta;
  506. public:
  507. IMPLEMENT_IINTERFACE;
  508. CMatchCB(IWSCAsyncFor &_parent, const Url &_url, const char *_tail, ColumnProvider * _meta) : parent(_parent), url(_url), tail(_tail), meta(_meta)
  509. {
  510. }
  511. virtual void match(IColumnProvider &entry, offset_t startOffset, offset_t endOffset)
  512. {
  513. Owned<IException> e;
  514. if (tail.length())
  515. {
  516. StringBuffer path(parent.getResponsePath());
  517. unsigned idx = (unsigned)entry.getInt(path.append("/@sequence").str());
  518. Owned<IColumnProviderIterator> excIter = entry.getChildIterator("Exception");
  519. IColumnProvider *excptEntry = excIter->first();
  520. if (excptEntry)
  521. {
  522. int code = (int)excptEntry->getInt("Code");
  523. size32_t len;
  524. char *message;
  525. excptEntry->getStringX(len, message, "Message");
  526. StringBuffer mstr(len, message);
  527. rtlFree(message);
  528. DBGLOG("Roxie exception: %s", mstr.str());
  529. e.setown(new ReceivedRoxieException(code, mstr.str()));
  530. parent.processException(url, parent.getInputRows().item(idx), e.getLink());
  531. }
  532. }
  533. if (parent.getMaster()->getRowTransformer() && !e)
  534. {
  535. IEngineRowAllocator * outputAllocator = parent.getOutputAllocator();
  536. Owned<IColumnProviderIterator> iter = entry.getChildIterator(tail);
  537. IColumnProvider *rowEntry = iter->first();
  538. while (rowEntry)
  539. {
  540. RtlDynamicRowBuilder rowBuilder(outputAllocator);
  541. size32_t sizeGot;
  542. if(meta)
  543. {
  544. meta->setBase(rowEntry);
  545. sizeGot = parent.getMaster()->transformRow(rowBuilder, meta);
  546. }
  547. else
  548. {
  549. sizeGot = parent.getMaster()->transformRow(rowBuilder, rowEntry);
  550. }
  551. if (sizeGot > 0)
  552. parent.getMaster()->putRow(rowBuilder.finalizeRowClear(sizeGot));
  553. rowEntry = iter->next();
  554. }
  555. }
  556. }
  557. };
  558. //=================================================================================================
  559. bool isValidHttpValue(char const * in)
  560. {
  561. for (const byte * ptr = (byte*)in; *ptr; ++ptr)
  562. if (((*ptr <= '\x1F') && (*ptr != '\x09')) || (*ptr == '\x7F'))
  563. return false;
  564. return true;
  565. }
  566. //=================================================================================================
  567. //Web Service Call helper thread
  568. class CWSCHelperThread : public Thread
  569. {
  570. private:
  571. CWSCHelper * master;
  572. virtual void outputXmlRows(CommonXmlWriter &xmlWriter, ConstPointerArray &inputRows, const char *itemtag=NULL, bool encode_off=false, char const * itemns = NULL);
  573. virtual void createESPQuery(CommonXmlWriter &xmlWriter, ConstPointerArray &inputRows);
  574. virtual void createSOAPliteralOrEncodedQuery(CommonXmlWriter &xmlWriter, ConstPointerArray &inputRows);
  575. virtual void createXmlSoapQuery(CommonXmlWriter &xmlWriter, ConstPointerArray &inputRows);
  576. virtual void processQuery(ConstPointerArray &inputRows);
  577. //Thread
  578. virtual int run();
  579. public:
  580. CWSCHelperThread(CWSCHelper * _master) : Thread("CWSCHelperThread")
  581. {
  582. master = _master;
  583. }
  584. ~CWSCHelperThread()
  585. {
  586. }
  587. };
  588. //=================================================================================================
  589. class CWSCHelper : public CInterface, implements IWSCHelper
  590. {
  591. private:
  592. SimpleInterThreadQueueOf<const void, true> outputQ;
  593. SpinLock outputQLock;
  594. CriticalSection toXmlCrit, transformCrit, onfailCrit, timeoutCrit;
  595. unsigned done;
  596. Linked<ClientCertificate> clientCert;
  597. static CriticalSection secureContextCrit;
  598. static Owned<ISecureSocketContext> secureContext;
  599. CTimeMon timeLimitMon;
  600. bool complete, timeLimitExceeded;
  601. IRoxieAbortMonitor * roxieAbortMonitor;
  602. protected:
  603. CIArrayOf<CWSCHelperThread> threads;
  604. WSCType wscType;
  605. public:
  606. IMPLEMENT_IINTERFACE;
  607. CWSCHelper(IWSCRowProvider *_rowProvider, IEngineRowAllocator * _outputAllocator, const char *_authToken, WSCMode _wscMode, ClientCertificate *_clientCert, const IContextLogger &_logctx, IRoxieAbortMonitor *_roxieAbortMonitor, WSCType _wscType)
  608. : logctx(_logctx), outputAllocator(_outputAllocator), clientCert(_clientCert), roxieAbortMonitor(_roxieAbortMonitor)
  609. {
  610. wscMode = _wscMode;
  611. wscType = _wscType;
  612. done = 0;
  613. complete = aborted = timeLimitExceeded = false;
  614. rowProvider = _rowProvider;
  615. helper = rowProvider->queryActionHelper();
  616. callHelper = rowProvider->queryCallHelper(); //MORE: This should not be done this way!! Should use extra as below.
  617. helperExtra = static_cast<IHThorSoapCallExtra*>(helper->selectInterface(TAIsoapcallextra_1));
  618. flags = helper->getFlags();
  619. OwnedRoxieString s;
  620. authToken.append(_authToken);
  621. if (helper->numRetries() < 0)
  622. maxRetries = 3; // default to 3 retries
  623. else
  624. maxRetries = helper->numRetries();
  625. //Allow all of these options to be specified separately. Possibly useful, and the code is cleaner.
  626. logMin = (flags & SOAPFlogmin) != 0;
  627. logXML = (flags & SOAPFlog) != 0;
  628. logUserMsg = (flags & SOAPFlogusermsg) != 0;
  629. double dval = helper->getTimeout(); // In seconds, but may include fractions of a second...
  630. if (dval == -1.0) //not provided
  631. timeoutMS = 300*1000; // 300 second default
  632. else if (dval == 0)
  633. timeoutMS = WAIT_FOREVER;
  634. else
  635. timeoutMS = dval * 1000;
  636. dval = helper->getTimeLimit();
  637. if (dval <= 0.0)
  638. timeLimitMS = WAIT_FOREVER;
  639. else
  640. timeLimitMS = dval * 1000;
  641. if (wscType == STsoap)
  642. {
  643. soapaction.set(s.setown(helper->getSoapAction()));
  644. if(soapaction.get() && !isValidHttpValue(soapaction.get()))
  645. throw MakeStringException(-1, "SOAPAction value contained illegal characters: %s", soapaction.get());
  646. httpHeaderName.set(s.setown(helper->getHttpHeaderName()));
  647. if(httpHeaderName.get() && !isValidHttpValue(httpHeaderName.get()))
  648. throw MakeStringException(-1, "HTTPHEADER name contained illegal characters: %s", httpHeaderName.get());
  649. httpHeaderValue.set(s.setown(helper->getHttpHeaderValue()));
  650. if(httpHeaderValue.get() && !isValidHttpValue(httpHeaderValue.get()))
  651. throw MakeStringException(-1, "HTTPHEADER value contained illegal characters: %s", httpHeaderValue.get());
  652. StringAttr proxyAddress;
  653. proxyAddress.set(s.setown(helper->getProxyAddress()));
  654. if (!proxyAddress.isEmpty())
  655. {
  656. UrlListParser proxyUrlListParser(proxyAddress);
  657. if (0 == proxyUrlListParser.getUrls(proxyUrlArray))
  658. throw MakeStringException(0, "SOAPCALL PROXYADDRESS specified no URLs");
  659. }
  660. if ((flags & SOAPFliteral) && (flags & SOAPFencoding))
  661. throw MakeStringException(0, "SOAPCALL 'LITERAL' and 'ENCODING' options are mutually exclusive");
  662. header.set(s.setown(helper->getHeader()));
  663. footer.set(s.setown(helper->getFooter()));
  664. if(flags & SOAPFnamespace)
  665. {
  666. OwnedRoxieString ns = helper->getNamespaceName();
  667. if(ns && *ns)
  668. xmlnamespace.set(ns);
  669. }
  670. }
  671. if (wscType == SThttp)
  672. {
  673. //Check for unsupported flags
  674. if ((flags & SOAPFliteral) || (flags & SOAPFencoding))
  675. throw MakeStringException(0, "HTTPCALL 'LITERAL' and 'ENCODINGD' options not supported");
  676. }
  677. if (callHelper)
  678. {
  679. OwnedRoxieString iteratorPath(callHelper->getInputIteratorPath());
  680. char const * ipath = iteratorPath;
  681. if(ipath && (*ipath == '/'))
  682. ++ipath;
  683. inputpath.set(ipath);
  684. }
  685. service.set(s.setown(helper->getService()));
  686. service.trim();
  687. if (wscType == SThttp)
  688. {
  689. service.toUpperCase(); //GET/PUT/POST
  690. if (strcmp(service.str(), "GET"))
  691. throw MakeStringException(0, "HTTPCALL Only 'GET' service supported");
  692. OwnedRoxieString acceptTypeSupplied(helper->getAcceptType()); // text/html, text/xml, etc
  693. acceptType.set(acceptTypeSupplied);
  694. acceptType.trim();
  695. acceptType.toLowerCase();
  696. }
  697. if (callHelper)
  698. rowTransformer = callHelper->queryInputTransformer();
  699. else
  700. rowTransformer = NULL;
  701. OwnedRoxieString hosts(helper->getHosts());
  702. UrlListParser urlListParser(hosts);
  703. if ((numUrls = urlListParser.getUrls(urlArray)) > 0)
  704. {
  705. if (wscMode == SCrow)
  706. {
  707. numRowThreads = 1;
  708. numUrlThreads = helper->numParallelThreads();
  709. if (numUrlThreads == 0)
  710. numUrlThreads = 1;
  711. else if (numUrlThreads > MAXWSCTHREADS)
  712. numUrlThreads = MAXWSCTHREADS;
  713. numRecordsPerBatch = 1;
  714. }
  715. else
  716. {
  717. unsigned totThreads = helper->numParallelThreads();
  718. if (totThreads < 1)
  719. totThreads = 2; // default to 2 threads
  720. else if (totThreads > MAXWSCTHREADS)
  721. totThreads = MAXWSCTHREADS;
  722. numUrlThreads = (numUrls < totThreads)? numUrls: totThreads;
  723. numRowThreads = totThreads / numUrlThreads;
  724. if (numRowThreads < 1)
  725. numRowThreads = 1;
  726. else if (numRowThreads > MAXWSCTHREADS)
  727. numRowThreads = MAXWSCTHREADS;
  728. numRecordsPerBatch = helper->numRecordsPerBatch();
  729. if (numRecordsPerBatch < 1)
  730. numRecordsPerBatch = 1;
  731. }
  732. }
  733. else
  734. throw MakeStringException(0, "%sCALL specified no URLs",wscType == STsoap ? "SOAP" : "HTTP");
  735. for (unsigned i=0; i<numRowThreads; i++)
  736. threads.append(*new CWSCHelperThread(this));
  737. }
  738. ~CWSCHelper()
  739. {
  740. complete = true;
  741. waitUntilDone();
  742. threads.kill();
  743. }
  744. void waitUntilDone()
  745. {
  746. ForEachItemIn(i,threads)
  747. threads.item(i).join();
  748. loop
  749. {
  750. const void *row = outputQ.dequeueNow();
  751. if (!row)
  752. break;
  753. outputAllocator->releaseRow(row);
  754. }
  755. outputQ.reset();
  756. }
  757. void start()
  758. {
  759. if (timeLimitMS != WAIT_FOREVER)
  760. timeLimitMon.reset(timeLimitMS);
  761. done = 0;
  762. complete = aborted = timeLimitExceeded = false;
  763. ForEachItemIn(i,threads)
  764. threads.item(i).start();
  765. }
  766. void abort()
  767. {
  768. aborted = true;
  769. complete = true;
  770. outputQ.stop();
  771. }
  772. const void * getRow()
  773. {
  774. if (complete)
  775. return NULL;
  776. loop
  777. {
  778. const void *row = outputQ.dequeue();
  779. if (aborted)
  780. break;
  781. if (row)
  782. return row;
  783. // should only be here if setDone() triggered
  784. complete = true;
  785. Owned<IException> e = getError();
  786. if (e)
  787. throw e.getClear();
  788. break;
  789. }
  790. return NULL;
  791. }
  792. IException * getError()
  793. {
  794. SpinBlock sb(outputQLock);
  795. return error.getLink();
  796. }
  797. inline IEngineRowAllocator * queryOutputAllocator() const { return outputAllocator; }
  798. ISecureSocket *createSecureSocket(ISocket *sock)
  799. {
  800. {
  801. CriticalBlock b(secureContextCrit);
  802. if (!secureContext)
  803. {
  804. if (clientCert != NULL)
  805. secureContext.setown(createSecureSocketContextEx(clientCert->certificate, clientCert->privateKey, clientCert->passphrase, ClientSocket));
  806. else
  807. secureContext.setown(createSecureSocketContext(ClientSocket));
  808. }
  809. }
  810. return secureContext->createSecureSocket(sock);
  811. }
  812. bool isTimeLimitExceeded(unsigned *_remainingMS)
  813. {
  814. if (timeLimitMS != WAIT_FOREVER)
  815. {
  816. CriticalBlock block(timeoutCrit);
  817. if (timeLimitExceeded || timeLimitMon.timedout(_remainingMS))
  818. {
  819. timeLimitExceeded = true;
  820. return true;
  821. }
  822. }
  823. else
  824. *_remainingMS = (unsigned)-1;
  825. return false;
  826. }
  827. void addUserLogMsg(const byte * row)
  828. {
  829. if (logUserMsg)
  830. {
  831. size32_t lenText;
  832. rtlDataAttr text;
  833. helperExtra->getLogText(lenText, text.refstr(), row);
  834. logctx.CTXLOG("%s: %.*s", wscCallTypeText(), lenText, text.getstr());
  835. }
  836. }
  837. inline IXmlToRowTransformer * getRowTransformer() { return rowTransformer; }
  838. inline const char * wscCallTypeText() const { return wscType == STsoap ? "SOAPCALL" : "HTTPCALL"; }
  839. protected:
  840. friend class CWSCHelperThread;
  841. friend class CWSCAsyncFor;
  842. void putRow(const void * row)
  843. {
  844. assertex(row);
  845. outputQ.enqueue(row);
  846. }
  847. void setDone()
  848. {
  849. bool doStop;
  850. {
  851. SpinBlock sb(outputQLock);
  852. done++;
  853. doStop = (done == numRowThreads);
  854. }
  855. if (doStop)
  856. {
  857. // Note - Don't stop the queue - that effectively discards what's already on there,
  858. // which is not what we want.
  859. // Instead, push a NULL to indicate the end of the output.
  860. outputQ.enqueue(NULL);
  861. }
  862. }
  863. void setErrorOwn(IException * e)
  864. {
  865. SpinBlock sb(outputQLock);
  866. if (error)
  867. ::Release(e);
  868. else
  869. error.setown(e);
  870. }
  871. void toXML(const byte * self, IXmlWriter & out) { CriticalBlock block(toXmlCrit); helper->toXML(self, out); }
  872. size32_t transformRow(ARowBuilder & rowBuilder, IColumnProvider * row)
  873. {
  874. CriticalBlock block(transformCrit);
  875. NullDiskCallback callback;
  876. return rowTransformer->transform(rowBuilder, row, &callback);
  877. }
  878. unsigned onFailTransform(ARowBuilder & rowBuilder, const void * left, IException * e) { CriticalBlock block(onfailCrit); return callHelper->onFailTransform(rowBuilder, left, e); }
  879. StringBuffer authToken;
  880. WSCMode wscMode;
  881. IWSCRowProvider * rowProvider;
  882. IHThorWebServiceCallActionArg * helper;
  883. IHThorWebServiceCallArg * callHelper;
  884. IHThorWebServiceCallExtra * helperExtra;
  885. Linked<IEngineRowAllocator> outputAllocator;
  886. Owned<IException> error;
  887. UrlArray urlArray;
  888. UrlArray proxyUrlArray;
  889. unsigned numRecordsPerBatch;
  890. unsigned numUrls;
  891. unsigned numRowThreads;
  892. unsigned numUrlThreads;
  893. unsigned maxRetries;
  894. unsigned timeoutMS;
  895. unsigned timeLimitMS;
  896. bool logXML;
  897. bool logMin;
  898. bool logUserMsg;
  899. bool aborted;
  900. const IContextLogger &logctx;
  901. unsigned flags;
  902. StringAttr soapaction;
  903. StringAttr httpHeaderName;
  904. StringAttr httpHeaderValue;
  905. StringAttr inputpath;
  906. StringBuffer service;
  907. StringBuffer acceptType;//for httpcall, text/plain, text/html, text/xml, etc
  908. StringAttr header;
  909. StringAttr footer;
  910. StringAttr xmlnamespace;
  911. IXmlToRowTransformer * rowTransformer;
  912. };
  913. CriticalSection CWSCHelper::secureContextCrit;
  914. Owned<ISecureSocketContext> CWSCHelper::secureContext; // created on first use
  915. //=================================================================================================
  916. void CWSCHelperThread::outputXmlRows(CommonXmlWriter &xmlWriter, ConstPointerArray &inputRows, const char *itemtag, bool encode_off, char const * itemns)
  917. {
  918. ForEachItemIn(idx, inputRows)
  919. {
  920. if (itemtag) //TAG
  921. {
  922. xmlWriter.outputQuoted("<");
  923. xmlWriter.outputQuoted(itemtag);
  924. if(itemns)
  925. {
  926. xmlWriter.outputQuoted(" xmlns=\"");
  927. xmlWriter.outputQuoted(itemns);
  928. xmlWriter.outputQuoted("\"");
  929. }
  930. xmlWriter.outputQuoted(">");
  931. }
  932. if (master->header.get()) //OPTIONAL HEADER (specified by "HEADING" option)
  933. xmlWriter.outputQuoted(master->header.get());
  934. //XML ROW CONTENT
  935. master->toXML((const byte *)inputRows.item(idx), xmlWriter);
  936. if (master->footer.get()) //OPTION FOOTER
  937. xmlWriter.outputQuoted(master->footer.get());
  938. if (encode_off) //ENCODING
  939. xmlWriter.outputQuoted("<encode_>0</encode_>");
  940. if (itemtag) //CLOSE TAG
  941. {
  942. xmlWriter.outputQuoted("</");
  943. xmlWriter.outputQuoted(itemtag);
  944. xmlWriter.outputQuoted(">");
  945. }
  946. master->addUserLogMsg((const byte *)inputRows.item(idx));
  947. }
  948. }
  949. void CWSCHelperThread::createESPQuery(CommonXmlWriter &xmlWriter, ConstPointerArray &inputRows)
  950. {
  951. StringBuffer method_tag;
  952. method_tag.append(master->service).append("Request");
  953. StringAttr method_ns;
  954. if (inputRows.ordinality() > 1)
  955. {
  956. xmlWriter.outputQuoted("<");
  957. xmlWriter.outputQuoted(method_tag.str());
  958. xmlWriter.outputQuoted("Array");
  959. if (master->xmlnamespace.get())
  960. {
  961. xmlWriter.outputQuoted(" xmlns=\"");
  962. xmlWriter.outputQuoted(master->xmlnamespace.get());
  963. xmlWriter.outputQuoted("\"");
  964. }
  965. xmlWriter.outputQuoted(">");
  966. }
  967. else
  968. {
  969. if(master->xmlnamespace.get())
  970. method_ns.set(master->xmlnamespace.get());
  971. }
  972. outputXmlRows(xmlWriter, inputRows, method_tag.str(), (inputRows.ordinality() == 1), method_ns.get());
  973. if (inputRows.ordinality() > 1)
  974. {
  975. xmlWriter.outputQuoted("<encode_>0</encode_>");
  976. xmlWriter.outputQuoted("</");
  977. xmlWriter.outputQuoted(method_tag.str());
  978. xmlWriter.outputQuoted("Array>");
  979. }
  980. }
  981. //Create servce xml request body, with binding usage of either Literal or Encoded
  982. //Note that Encoded usage requires type encoding for data fields
  983. void CWSCHelperThread::createSOAPliteralOrEncodedQuery(CommonXmlWriter &xmlWriter, ConstPointerArray &inputRows)
  984. {
  985. xmlWriter.outputQuoted("<");
  986. xmlWriter.outputQuoted(master->service);
  987. if (master->flags & SOAPFencoding)
  988. xmlWriter.outputQuoted(" soapenv:encodingStyle=\"http://schemas.xmlsoap.org/soap/encoding/\"");
  989. if (master->xmlnamespace.get())
  990. {
  991. xmlWriter.outputQuoted(" xmlns=\"");
  992. xmlWriter.outputQuoted(master->xmlnamespace.get());
  993. xmlWriter.outputQuoted("\"");
  994. }
  995. xmlWriter.outputQuoted(">");
  996. outputXmlRows(xmlWriter, inputRows);
  997. xmlWriter.outputQuoted("</");
  998. xmlWriter.outputQuoted(master->service);
  999. xmlWriter.outputQuoted(">");
  1000. }
  1001. //Create SOAP body of http request
  1002. void CWSCHelperThread::createXmlSoapQuery(CommonXmlWriter &xmlWriter, ConstPointerArray &inputRows)
  1003. {
  1004. xmlWriter.outputQuoted("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
  1005. xmlWriter.outputQuoted("<soap:Envelope");
  1006. xmlWriter.outputQuoted(" xmlns:soap=\"http://schemas.xmlsoap.org/soap/envelope/\"");
  1007. if (master->flags & SOAPFencoding)
  1008. { //SOAP RPC/encoded. 'Encoded' usage includes type encoding
  1009. xmlWriter.outputQuoted(" xmlns:xsd=\"http://www.w3.org/2001/XMLSchema\"");
  1010. xmlWriter.outputQuoted(" xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\"");
  1011. }
  1012. xmlWriter.outputQuoted(">");
  1013. xmlWriter.outputQuoted("<soap:Body>");
  1014. if (master->flags & SOAPFliteral || master->flags & SOAPFencoding)
  1015. createSOAPliteralOrEncodedQuery(xmlWriter, inputRows);
  1016. else
  1017. createESPQuery(xmlWriter, inputRows);
  1018. xmlWriter.outputQuoted("</soap:Body></soap:Envelope>");
  1019. }
  1020. void CWSCHelperThread::processQuery(ConstPointerArray &inputRows)
  1021. {
  1022. unsigned xmlWriteFlags = 0;
  1023. unsigned xmlReadFlags = ptr_ignoreNameSpaces;
  1024. if (master->flags & SOAPFtrim)
  1025. xmlWriteFlags |= XWFtrim;
  1026. if ((master->flags & SOAPFpreserveSpace) == 0)
  1027. xmlReadFlags |= ptr_ignoreWhiteSpace;
  1028. XMLWriterType xmlType = !(master->flags & SOAPFencoding) ? WTStandard : WTEncodingData64;
  1029. CommonXmlWriter *xmlWriter = CreateCommonXmlWriter(xmlWriteFlags, 0, NULL, xmlType);
  1030. if (master->wscType == STsoap)
  1031. createXmlSoapQuery(*xmlWriter, inputRows);
  1032. Owned<IWSCAsyncFor> casyncfor = createWSCAsyncFor(master, *xmlWriter, inputRows, (PTreeReaderOptions) xmlReadFlags);
  1033. casyncfor->For(master->numUrls, master->numUrlThreads,false,true); // shuffle URLS for poormans load balance
  1034. delete xmlWriter;
  1035. }
  1036. int CWSCHelperThread::run()
  1037. {
  1038. ConstPointerArray inputRows;
  1039. if (master->wscMode == SCrow)
  1040. {
  1041. inputRows.append(NULL);
  1042. try
  1043. {
  1044. processQuery(inputRows);
  1045. }
  1046. catch (IException *e)
  1047. {
  1048. master->setErrorOwn(e);
  1049. }
  1050. inputRows.pop();
  1051. }
  1052. else
  1053. {
  1054. // following a bit odd but preserving previous semantics (except fixing abort leak)
  1055. loop
  1056. {
  1057. try
  1058. {
  1059. while (!master->complete && !master->error.get())
  1060. {
  1061. if (master->aborted) {
  1062. while (inputRows.ordinality() > 0)
  1063. master->rowProvider->releaseRow(inputRows.pop());
  1064. return 0;
  1065. }
  1066. const void *r = master->rowProvider->getNextRow();
  1067. if (!r)
  1068. break;
  1069. inputRows.append(r);
  1070. if (inputRows.ordinality() >= master->numRecordsPerBatch)
  1071. break;
  1072. }
  1073. if (inputRows.ordinality() == 0)
  1074. break;
  1075. processQuery(inputRows);
  1076. }
  1077. catch (IException *e)
  1078. {
  1079. master->setErrorOwn(e); // going to exit next time round
  1080. }
  1081. while (inputRows.ordinality() > 0)
  1082. master->rowProvider->releaseRow(inputRows.pop());
  1083. }
  1084. }
  1085. master->setDone();
  1086. return 0;
  1087. }
  1088. //=================================================================================================
  1089. IWSCHelper * createSoapCallHelper(IWSCRowProvider *r, IEngineRowAllocator * outputAllocator, const char *authToken, WSCMode wscMode, ClientCertificate *clientCert, const IContextLogger &logctx, IRoxieAbortMonitor * roxieAbortMonitor)
  1090. {
  1091. return new CWSCHelper(r, outputAllocator, authToken, wscMode, clientCert, logctx, roxieAbortMonitor, STsoap);
  1092. }
  1093. IWSCHelper * createHttpCallHelper(IWSCRowProvider *r, IEngineRowAllocator * outputAllocator, const char *authToken, WSCMode wscMode, ClientCertificate *clientCert, const IContextLogger &logctx, IRoxieAbortMonitor * roxieAbortMonitor)
  1094. {
  1095. return new CWSCHelper(r, outputAllocator, authToken, wscMode, clientCert, logctx, roxieAbortMonitor, SThttp);
  1096. }
  1097. //=================================================================================================
  1098. class CWSCAsyncFor : implements IWSCAsyncFor, public CInterface, public CAsyncFor
  1099. {
  1100. class CSocketDataProvider : public CInterface
  1101. {
  1102. const char * buffer;
  1103. size32_t currLen;
  1104. size32_t curPosn;
  1105. ISocket * socket;
  1106. unsigned timeoutMS;
  1107. public:
  1108. CSocketDataProvider(const char * _buffer, size32_t _curPosn, size32_t _currLen, ISocket * _sock, unsigned _timeout )
  1109. : buffer(_buffer), currLen(_currLen), curPosn(_curPosn), socket(_sock), timeoutMS(_timeout)
  1110. {
  1111. }
  1112. size32_t getBytes(char * buf, size32_t len)
  1113. {
  1114. size32_t count;
  1115. if ( len <= (currLen-curPosn))
  1116. { //its already in the buffer
  1117. memcpy(buf, (buffer + curPosn), len);
  1118. curPosn += len;
  1119. count = len;
  1120. }
  1121. else if (curPosn >= currLen)
  1122. { //nothing in buffer, read from socket
  1123. size32_t bytesRead=0;
  1124. count = 0;
  1125. do
  1126. {
  1127. socket->readtms(buf + count, 0, len - count, bytesRead, timeoutMS);
  1128. count += bytesRead;
  1129. } while (count != len);
  1130. currLen = curPosn = 0;
  1131. }
  1132. else
  1133. { //only some is in buffer, read rest from socket
  1134. size32_t bytesRead=0;
  1135. size32_t avail = currLen - curPosn;
  1136. memcpy(buf, (buffer + curPosn), avail);
  1137. count = avail;
  1138. do
  1139. {
  1140. size32_t read;
  1141. socket->readtms(buf+avail+bytesRead, 0, len-avail-bytesRead, read, timeoutMS);
  1142. bytesRead += read;
  1143. } while (len != (bytesRead + avail));
  1144. count += bytesRead;
  1145. currLen = curPosn = 0;
  1146. }
  1147. return count;
  1148. }
  1149. };
  1150. private:
  1151. CWSCHelper * master;
  1152. ConstPointerArray &inputRows;
  1153. CommonXmlWriter &xmlWriter;
  1154. IEngineRowAllocator * outputAllocator;
  1155. CriticalSection processExceptionCrit;
  1156. StringBuffer responsePath;
  1157. Owned<CSocketDataProvider> dataProvider;
  1158. PTreeReaderOptions options;
  1159. unsigned remainingMS;
  1160. inline void checkRoxieAbortMonitor(IRoxieAbortMonitor * roxieAbortMonitor)
  1161. {
  1162. if (roxieAbortMonitor)
  1163. {
  1164. try
  1165. {
  1166. roxieAbortMonitor->checkForAbort();//throws
  1167. }
  1168. catch (IException *e)
  1169. {
  1170. StringBuffer s;
  1171. throw MakeStringException(ROXIE_ABORT_EVENT, "%s", e->errorMessage(s).str());
  1172. }
  1173. }
  1174. }
  1175. void createHttpRequest(Url &url, StringBuffer &request)
  1176. {
  1177. // Create the HTTP POST request
  1178. if (master->wscType == STsoap)
  1179. request.clear().append("POST ").append(url.path).append(" HTTP/1.1\r\n");
  1180. else
  1181. request.clear().append(master->service).append(" ").append(url.path).append(" HTTP/1.1\r\n");
  1182. if (url.userPasswordPair.length() > 0)
  1183. {
  1184. StringBuffer authToken;
  1185. JBASE64_Encode(url.userPasswordPair.str(), url.userPasswordPair.length(), authToken);
  1186. request.append("Authorization: Basic ").append(authToken).append("\r\n");
  1187. }
  1188. else if (master->authToken.length() > 0)
  1189. {
  1190. request.append("Authorization: Basic ").append(master->authToken).append("\r\n");
  1191. }
  1192. if (master->wscType == STsoap)
  1193. {
  1194. if (master->soapaction.get())
  1195. request.append("SOAPAction: ").append(master->soapaction.get()).append("\r\n");
  1196. if (master->httpHeaderName.get() && master->httpHeaderValue.get())
  1197. {
  1198. StringBuffer hdr = master->httpHeaderName.get();
  1199. hdr.append(": ").append(master->httpHeaderValue);
  1200. if (soapTraceLevel > 6 || master->logXML)
  1201. master->logctx.CTXLOG("SOAPCALL: Adding HTTP Header(%s)", hdr.str());
  1202. request.append(hdr.append("\r\n"));
  1203. }
  1204. request.append("Content-Type: text/xml\r\n");
  1205. }
  1206. else if(master->wscType == SThttp)
  1207. request.append("Accept: ").append(master->acceptType).append("\r\n");
  1208. else
  1209. assertex(false);
  1210. request.append("Host: ").append(url.host).append(":").append(url.port).append("\r\n");//http 1.1
  1211. if (master->wscType == STsoap)
  1212. {
  1213. request.append(CONTENT_LENGTH).append(xmlWriter.length()).append("\r\n\r\n");
  1214. request.append(xmlWriter.str());//add SOAP xml content
  1215. }
  1216. else
  1217. request.append("\r\n");//httpcall
  1218. if (soapTraceLevel > 6 || master->logXML)
  1219. master->logctx.CTXLOG("%s: request(%s)", master->wscCallTypeText(), request.str());
  1220. if (master->logMin)
  1221. master->logctx.CTXLOG("%s: request(%s:%u)", master->wscCallTypeText(), url.host.str(), url.port);
  1222. }
  1223. int readHttpResponse(StringBuffer &response, ISocket *socket)
  1224. {
  1225. // Read the POST reply
  1226. // not doesn't *assume* is valid HTTP post format but if it is takes advantage of
  1227. response.clear();
  1228. unsigned bytesRead;
  1229. MemoryAttr buf;
  1230. char *buffer=(char *)buf.allocate(WSCBUFFERSIZE+1);
  1231. int rval = 200;
  1232. // first read header
  1233. size32_t payloadofs = 0;
  1234. size32_t payloadsize = 0;
  1235. StringBuffer dbgheader;
  1236. bool chunked;
  1237. size32_t read = 0;
  1238. do {
  1239. checkTimeLimitExceeded(&remainingMS);
  1240. checkRoxieAbortMonitor(master->roxieAbortMonitor);
  1241. socket->readtms(buffer+read, 0, WSCBUFFERSIZE-read, bytesRead, MIN(master->timeoutMS,remainingMS));
  1242. checkTimeLimitExceeded(&remainingMS);
  1243. checkRoxieAbortMonitor(master->roxieAbortMonitor);
  1244. read += bytesRead;
  1245. buffer[read] = 0;
  1246. if (strncmp(buffer, "HTTP", 4) == 0) {
  1247. const char *s = strstr(buffer,"\r\n\r\n");
  1248. if (s) {
  1249. payloadofs = (size32_t)(s-buffer+4);
  1250. if (soapTraceLevel > 6 || master->logXML)
  1251. dbgheader.append(payloadofs,buffer); // needed for tracing below
  1252. s = strstr(buffer, " ");
  1253. if (s)
  1254. rval = atoi(s+1);
  1255. if (!strstr(buffer,"Transfer-Encoding: chunked"))
  1256. {
  1257. chunked = false;
  1258. s = strstr(buffer,CONTENT_LENGTH);
  1259. if (s) {
  1260. s += strlen(CONTENT_LENGTH);
  1261. if ((size32_t)(s-buffer) < payloadofs)
  1262. payloadsize = atoi(s);
  1263. }
  1264. }
  1265. else
  1266. {
  1267. chunked = true;
  1268. size32_t chunkSize = 0;
  1269. size32_t dataLen = 0;
  1270. char ch;
  1271. /*
  1272. //from http://www.w3.org/Protocols/rfc2616/rfc2616-sec19.html#sec19.4.4
  1273. "19.4.6 Introduction of Transfer-Encoding"
  1274. length := 0
  1275. read chunk-size, chunk-extension (if any) and CRLF
  1276. while (chunk-size > 0)
  1277. {
  1278. read chunk-data and CRLF
  1279. append chunk-data to entity-body
  1280. length := length + chunk-size
  1281. read chunk-size and CRLF
  1282. }
  1283. */
  1284. checkTimeLimitExceeded(&remainingMS);
  1285. dataProvider.setown(new CSocketDataProvider(buffer, payloadofs, read, socket, MIN(master->timeoutMS,remainingMS)));
  1286. dataProvider->getBytes(&ch, 1);
  1287. while (isalpha(ch) || isdigit(ch))
  1288. { //get chunk-size
  1289. if (isdigit(ch))
  1290. chunkSize = (chunkSize*16) + (ch - '0');
  1291. else
  1292. chunkSize = (chunkSize*16) + 10 + (toupper(ch) - 'A');
  1293. dataProvider->getBytes(&ch, 1);
  1294. }
  1295. while (chunkSize && ch != '\n')//consume chunk-extension and CRLF
  1296. dataProvider->getBytes(&ch, 1);
  1297. while (chunkSize)
  1298. {
  1299. if (chunkSize > WSCBUFFERSIZE)
  1300. DBGLOG("SOAPCALL chunk size %d", chunkSize);
  1301. //read chunk data directly into response
  1302. size32_t count = dataProvider->getBytes(response.reserve(dataLen + chunkSize), chunkSize);
  1303. assertex(count == chunkSize);
  1304. dataLen += count;
  1305. response.setLength(dataLen);
  1306. dataProvider->getBytes(&ch, 1);//consume CRLF at end of chunk
  1307. while (ch != '\n')
  1308. dataProvider->getBytes(&ch, 1);
  1309. chunkSize = 0;
  1310. dataProvider->getBytes(&ch, 1);
  1311. while (isalpha(ch) || isdigit(ch))
  1312. { //get next chunk size
  1313. if (isdigit(ch))
  1314. chunkSize = (chunkSize*16) + (ch - '0');
  1315. else
  1316. chunkSize = (chunkSize*16) + 10 + (toupper(ch) - 'A');
  1317. dataProvider->getBytes(&ch, 1);
  1318. }
  1319. while(chunkSize && ch != '\n')//consume chunk-extension and CRLF
  1320. dataProvider->getBytes(&ch, 1);
  1321. }
  1322. }
  1323. break;
  1324. }
  1325. }
  1326. if (bytesRead == 0) // socket closed
  1327. break;
  1328. } while (bytesRead&&(read<WSCBUFFERSIZE));
  1329. if (!chunked)
  1330. {
  1331. if (payloadsize)
  1332. response.ensureCapacity(payloadsize);
  1333. char *payload = buffer;
  1334. if (payloadofs) {
  1335. read -= payloadofs;
  1336. payload += payloadofs;
  1337. if (payloadsize&&(read>payloadsize))
  1338. read = payloadsize;
  1339. }
  1340. if (read)
  1341. response.append(read,payload);
  1342. if (payloadsize) { // read directly into response
  1343. while (read<payloadsize) {
  1344. checkTimeLimitExceeded(&remainingMS);
  1345. checkRoxieAbortMonitor(master->roxieAbortMonitor);
  1346. socket->readtms(response.reserve(payloadsize-read), 0, payloadsize-read, bytesRead, MIN(master->timeoutMS,remainingMS));
  1347. checkTimeLimitExceeded(&remainingMS);
  1348. checkRoxieAbortMonitor(master->roxieAbortMonitor);
  1349. read += bytesRead;
  1350. response.setLength(read);
  1351. if (bytesRead==0) {
  1352. master->logctx.CTXLOG("%sCALL: Warning %sHTTP response terminated prematurely",master->wscType == STsoap ? "SOAP" : "HTTP",chunked?"CHUNKED ":NULL);
  1353. break; // oops looks likesocket closed early
  1354. }
  1355. }
  1356. }
  1357. else {
  1358. loop {
  1359. checkTimeLimitExceeded(&remainingMS);
  1360. checkRoxieAbortMonitor(master->roxieAbortMonitor);
  1361. socket->readtms(buffer, 0, WSCBUFFERSIZE, bytesRead, MIN(master->timeoutMS,remainingMS));
  1362. checkTimeLimitExceeded(&remainingMS);
  1363. checkRoxieAbortMonitor(master->roxieAbortMonitor);
  1364. if (bytesRead==0)
  1365. break; // rely on socket closing to terminate message
  1366. response.append(bytesRead,buffer);
  1367. }
  1368. }
  1369. }
  1370. if (soapTraceLevel > 6 || master->logXML)
  1371. master->logctx.CTXLOG("%sCALL: LEN=%d %sresponse(%s%s)", master->wscType == STsoap ? "SOAP" : "HTTP",response.length(),chunked?"CHUNKED ":"", dbgheader.str(), response.str());
  1372. else if (soapTraceLevel > 8)
  1373. master->logctx.CTXLOG("%sCALL: LEN=%d %sresponse(%s)", master->wscType == STsoap ? "SOAP" : "HTTP",response.length(),chunked?"CHUNKED ":"", response.str()); // not sure this is that useful but...
  1374. return rval;
  1375. }
  1376. void processEspResponse(Url &url, StringBuffer &response, ColumnProvider * meta)
  1377. {
  1378. StringBuffer path(responsePath);
  1379. path.append("/Results/Result/");
  1380. const char *tail;
  1381. if (master->rowTransformer && master->inputpath.get())
  1382. {
  1383. StringBuffer ipath;
  1384. ipath.append("/Envelope/Body/").append(master->inputpath.get());
  1385. if((ipath.length() >= path.length()) && (0 == memcmp(ipath.str(), path.str(), path.length())))
  1386. {
  1387. tail = ipath.str() + path.length();
  1388. }
  1389. else
  1390. {
  1391. path.clear().append(ipath);
  1392. tail = NULL;
  1393. }
  1394. }
  1395. else
  1396. tail = "Dataset/Row";
  1397. CMatchCB matchCB(*this, url, tail, meta);
  1398. Owned<IXMLParse> xmlParser = createXMLParse((const void *)response.str(), (unsigned)response.length(), path.str(), matchCB, options, false);
  1399. while (xmlParser->next());
  1400. }
  1401. void processLiteralResponse(Url &url, StringBuffer &response, ColumnProvider * meta)
  1402. {
  1403. StringBuffer path("/Envelope/Body/");
  1404. if(master->rowTransformer && master->inputpath.get())
  1405. path.append(master->inputpath.get());
  1406. CMatchCB matchCB(*this, url, NULL, meta);
  1407. Owned<IXMLParse> xmlParser = createXMLParse((const void *)response.str(), (unsigned)response.length(), path.str(), matchCB, options, false);
  1408. while (xmlParser->next());
  1409. }
  1410. void processHttpResponse(Url &url, StringBuffer &response, ColumnProvider * meta)
  1411. {
  1412. StringBuffer path;
  1413. if(master->rowTransformer && master->inputpath.get())
  1414. path.append(master->inputpath.get());
  1415. CMatchCB matchCB(*this, url, NULL, meta);
  1416. Owned<IXMLParse> xmlParser = createXMLParse((const void *)response.str(), (unsigned)response.length(), path.str(), matchCB, options, false);
  1417. while (xmlParser->next());
  1418. }
  1419. void processResponse(Url &url, StringBuffer &response, ColumnProvider * meta)
  1420. {
  1421. if (master->wscType == SThttp)
  1422. processHttpResponse(url, response, meta);
  1423. else if (master->flags & SOAPFliteral)
  1424. processLiteralResponse(url, response, meta);
  1425. else if (master->flags & SOAPFencoding)
  1426. processLiteralResponse(url, response, meta);
  1427. else
  1428. processEspResponse(url, response, meta);
  1429. }
  1430. void processException(const Url &url, const void *row, IException *e)
  1431. {
  1432. CriticalBlock block(processExceptionCrit);
  1433. Owned<IException> ne = url.getUrlException(e);
  1434. e->Release();
  1435. if ((master->flags & SOAPFonfail) && master->callHelper)
  1436. {
  1437. try
  1438. {
  1439. RtlDynamicRowBuilder rowBuilder(outputAllocator);
  1440. size32_t sizeGot = master->onFailTransform(rowBuilder, row, ne);
  1441. if (sizeGot > 0)
  1442. master->putRow(rowBuilder.finalizeRowClear(sizeGot));
  1443. }
  1444. catch (IException *te)
  1445. {
  1446. master->setErrorOwn(te);
  1447. }
  1448. }
  1449. else
  1450. master->setErrorOwn(ne.getClear());
  1451. }
  1452. void processException(const Url &url, ConstPointerArray &inputRows, IException *e)
  1453. {
  1454. Owned<IException> ne = url.getUrlException(e);
  1455. e->Release();
  1456. if ((master->flags & SOAPFonfail) && master->callHelper)
  1457. {
  1458. ForEachItemIn(idx, inputRows)
  1459. {
  1460. try
  1461. {
  1462. RtlDynamicRowBuilder rowBuilder(outputAllocator);
  1463. size32_t sizeGot = master->onFailTransform(rowBuilder, inputRows.item(idx), ne);
  1464. if (sizeGot > 0)
  1465. master->putRow(rowBuilder.finalizeRowClear(sizeGot));
  1466. }
  1467. catch (IException *te)
  1468. {
  1469. master->setErrorOwn(te);
  1470. break;
  1471. }
  1472. }
  1473. }
  1474. else
  1475. master->setErrorOwn(ne.getClear());
  1476. }
  1477. inline void checkTimeLimitExceeded(unsigned * remainingMS)
  1478. {
  1479. if (master->isTimeLimitExceeded(remainingMS))
  1480. throw MakeStringException(TIMELIMIT_EXCEEDED, "%sCALL TIMELIMIT(%ums) exceeded", master->wscType == STsoap ? "SOAP" : "HTTP", master->timeLimitMS);
  1481. }
  1482. public:
  1483. CWSCAsyncFor(CWSCHelper * _master, CommonXmlWriter &_xmlWriter, ConstPointerArray &_inputRows, PTreeReaderOptions _options): xmlWriter(_xmlWriter), inputRows(_inputRows), options(_options)
  1484. {
  1485. master = _master;
  1486. outputAllocator = master->queryOutputAllocator();
  1487. responsePath.append("/Envelope/Body/");
  1488. if (inputRows.ordinality() > 1)
  1489. {
  1490. // can we receive a roxie exceptions for the whole RequestArray?
  1491. // if so, we need to handle them here
  1492. responsePath.append(master->service).append("ResponseArray/");
  1493. }
  1494. responsePath.append(master->service).append("Response");
  1495. }
  1496. ~CWSCAsyncFor()
  1497. {
  1498. }
  1499. IMPLEMENT_IINTERFACE;
  1500. void For(unsigned num,unsigned maxatonce,bool abortFollowingException, bool shuffled)
  1501. {
  1502. CAsyncFor::For(num, maxatonce, abortFollowingException, shuffled);
  1503. }
  1504. void Do(unsigned idx)
  1505. {
  1506. StringBuffer request;
  1507. StringBuffer response;
  1508. unsigned attempts = 0;
  1509. unsigned retryInterval = 0;
  1510. Url &url = master->urlArray.item(idx);
  1511. createHttpRequest(url, request);
  1512. unsigned startidx = idx;
  1513. while (!master->aborted)
  1514. {
  1515. Owned<ISocket> socket;
  1516. unsigned startTime, endTime;
  1517. startTime = msTick();
  1518. loop
  1519. {
  1520. try
  1521. {
  1522. checkTimeLimitExceeded(&remainingMS);
  1523. Url &connUrl = master->proxyUrlArray.empty() ? url : master->proxyUrlArray.item(0);
  1524. socket.setown(blacklist->connect(connUrl.port, connUrl.host, master->logctx, (unsigned)master->maxRetries, master->timeoutMS, master->roxieAbortMonitor));
  1525. if (stricmp(url.method, "https") == 0)
  1526. {
  1527. Owned<ISecureSocket> ssock = master->createSecureSocket(socket.getClear());
  1528. if (ssock)
  1529. {
  1530. checkTimeLimitExceeded(&remainingMS);
  1531. int status = ssock->secure_connect();
  1532. if (status < 0)
  1533. {
  1534. StringBuffer err;
  1535. err.append("Failure to establish secure connection to ");
  1536. connUrl.getUrlString(err);
  1537. err.append(": returned ").append(status);
  1538. throw MakeStringExceptionDirect(0, err.str());
  1539. }
  1540. socket.setown(ssock.getLink());
  1541. }
  1542. }
  1543. break;
  1544. }
  1545. catch (IException *e)
  1546. {
  1547. if (master->timeLimitExceeded)
  1548. {
  1549. master->logctx.CTXLOG("%sCALL exiting: time limit (%ums) exceeded",master->wscType == STsoap ? "SOAP" : "HTTP", master->timeLimitMS);
  1550. processException(url, inputRows, e);
  1551. return;
  1552. }
  1553. if (e->errorCode() == ROXIE_ABORT_EVENT)
  1554. {
  1555. StringBuffer s;
  1556. master->logctx.CTXLOG("%sCALL exiting: Roxie Abort : %s",master->wscType == STsoap ? "SOAP" : "HTTP",e->errorMessage(s).str());
  1557. throw;
  1558. }
  1559. do
  1560. {
  1561. idx++; // try next socket not blacklisted
  1562. if (idx==master->urlArray.ordinality())
  1563. idx = 0;
  1564. if (idx==startidx)
  1565. {
  1566. StringBuffer s;
  1567. master->logctx.CTXLOG("Exception %s", e->errorMessage(s).str());
  1568. processException(url, inputRows, e);
  1569. return;
  1570. }
  1571. } while (blacklist->blacklisted(url.port, url.host));
  1572. }
  1573. }
  1574. try
  1575. {
  1576. checkTimeLimitExceeded(&remainingMS);
  1577. checkRoxieAbortMonitor(master->roxieAbortMonitor);
  1578. socket->write(request.str(), request.length());
  1579. if (soapTraceLevel > 4)
  1580. master->logctx.CTXLOG("%sCALL: sent request (%s) to %s:%d", master->wscType == STsoap ? "SOAP" : "HTTP",master->service.str(), url.host.str(), url.port);
  1581. checkTimeLimitExceeded(&remainingMS);
  1582. checkRoxieAbortMonitor(master->roxieAbortMonitor);
  1583. int rval = readHttpResponse(response, socket);
  1584. if (soapTraceLevel > 4)
  1585. master->logctx.CTXLOG("%sCALL: received response (%s) from %s:%d", master->wscType == STsoap ? "SOAP" : "HTTP",master->service.str(), url.host.str(), url.port);
  1586. if (rval != 200)
  1587. {
  1588. if (rval == 503)
  1589. throw new ReceivedRoxieException(1001, "Server Too Busy");
  1590. StringBuffer text;
  1591. text.appendf("HTTP error (%d) in processQuery",rval);
  1592. rtlAddExceptionTag(text, "soapresponse", response.str());
  1593. throw MakeStringExceptionDirect(-1, text.str());
  1594. }
  1595. if (response.length() == 0)
  1596. {
  1597. throw MakeStringException(-1, "Zero length response in processQuery");
  1598. }
  1599. endTime = msTick();
  1600. checkTimeLimitExceeded(&remainingMS);
  1601. ColumnProvider * meta = (ColumnProvider*)CreateColumnProvider(endTime-startTime, master->flags&SOAPFencoding?true:false);
  1602. processResponse(url, response, meta);
  1603. delete meta;
  1604. break;
  1605. }
  1606. catch (IReceivedRoxieException *e)
  1607. {
  1608. // server busy ... Sleep and retry
  1609. if (e->errorCode() == 1001)
  1610. {
  1611. if (retryInterval)
  1612. {
  1613. int sleepTime = retryInterval + getRandom() % retryInterval;
  1614. master->logctx.CTXLOG("Server busy (1001), sleeping for %d milliseconds before retry", sleepTime);
  1615. Sleep(sleepTime);
  1616. retryInterval = (retryInterval*2 >= 10000)? 10000: retryInterval*2;
  1617. }
  1618. else
  1619. {
  1620. master->logctx.CTXLOG("Server busy (1001), retrying");
  1621. retryInterval = 10;
  1622. }
  1623. e->Release();
  1624. }
  1625. else
  1626. {
  1627. // other roxie exception ...
  1628. master->logctx.CTXLOG("Exiting: received Roxie exception");
  1629. if (e->errorRow())
  1630. processException(url, e->errorRow(), e);
  1631. else
  1632. processException(url, inputRows, e);
  1633. break;
  1634. }
  1635. }
  1636. catch (IException *e)
  1637. {
  1638. if (master->timeLimitExceeded)
  1639. {
  1640. processException(url, inputRows, e);
  1641. master->logctx.CTXLOG("%sCALL exiting: time limit (%ums) exceeded", master->wscType == STsoap ? "SOAP" : "HTTP", master->timeLimitMS);
  1642. break;
  1643. }
  1644. if (e->errorCode() == ROXIE_ABORT_EVENT)
  1645. {
  1646. StringBuffer s;
  1647. master->logctx.CTXLOG("%sCALL exiting: Roxie Abort : %s",master->wscType == STsoap ? "SOAP" : "HTTP",e->errorMessage(s).str());
  1648. throw;
  1649. }
  1650. // other IException ... retry up to maxRetries
  1651. StringBuffer s;
  1652. master->logctx.CTXLOG("Exception %s - retrying? (%d<%d)", e->errorMessage(s).str(), attempts, master->maxRetries);
  1653. if (attempts > master->maxRetries)
  1654. {
  1655. // error affects all inputRows
  1656. master->logctx.CTXLOG("Exiting: maxRetries exceeded");
  1657. processException(url, inputRows, e);
  1658. break;
  1659. }
  1660. master->logctx.CTXLOG("Retrying: maxRetries not exceeded");
  1661. attempts++;
  1662. e->Release();
  1663. }
  1664. catch (std::exception & es)
  1665. {
  1666. if(dynamic_cast<std::bad_alloc *>(&es))
  1667. throw MakeStringException(-1, "std::exception: out of memory (std::bad_alloc) in CWSCAsyncFor processQuery");
  1668. throw MakeStringException(-1, "std::exception: standard library exception (%s) in CWSCAsyncFor processQuery",es.what());
  1669. }
  1670. catch (...)
  1671. {
  1672. throw MakeStringException(-1, "Unknown exception in processQuery");
  1673. }
  1674. }
  1675. }
  1676. inline virtual StringBuffer getResponsePath() { return responsePath; }
  1677. inline virtual ConstPointerArray & getInputRows() { return inputRows; }
  1678. inline virtual CWSCHelper * getMaster() { return master; }
  1679. inline virtual IEngineRowAllocator * getOutputAllocator() { return outputAllocator; }
  1680. };
  1681. IWSCAsyncFor * createWSCAsyncFor(CWSCHelper * _master, CommonXmlWriter &_xmlWriter, ConstPointerArray &_inputRows, PTreeReaderOptions _options)
  1682. {
  1683. return new CWSCAsyncFor(_master, _xmlWriter, _inputRows, _options);
  1684. }