thorsoapcall.cpp 87 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "jliball.hpp"
  14. #include "jqueue.tpp"
  15. #include "jisem.hpp"
  16. #include "jsecrets.hpp"
  17. #include "rtlformat.hpp"
  18. #include "thorxmlread.hpp"
  19. #include "thorxmlwrite.hpp"
  20. #include "thorcommon.ipp"
  21. #include "thorsoapcall.hpp"
  22. #include "securesocket.hpp"
  23. #include "eclrtl.hpp"
  24. #include "roxiemem.hpp"
  25. #include "zcrypt.hpp"
  26. #include "persistent.hpp"
  27. using roxiemem::OwnedRoxieString;
  28. #ifndef _WIN32
  29. #include <stdexcept>
  30. #endif
  31. #include <new>
  32. #define CONTENT_LENGTH "Content-Length: "
  33. #define CONTENT_ENCODING "Content-Encoding"
  34. #define ACCEPT_ENCODING "Accept-Encoding"
  35. #define CONNECTION "Connection"
  36. unsigned soapTraceLevel = 1;
  37. #define WSCBUFFERSIZE 0x10000
  38. #define MAXWSCTHREADS 50 //Max Web Service Call Threads
  39. interface DECL_EXCEPTION IReceivedRoxieException : extends IException
  40. {
  41. public:
  42. virtual const void *errorRow() = 0;
  43. };
  44. #define EXCEPTION_PREFIX "ReceivedRoxieException:"
  45. class DECL_EXCEPTION ReceivedRoxieException: public IReceivedRoxieException, public CInterface
  46. {
  47. public:
  48. IMPLEMENT_IINTERFACE;
  49. ReceivedRoxieException(int code, const char *_msg, const void *_row=NULL) : errcode(code), msg(_msg), row(_row) { };
  50. int errorCode() const { return (errcode); };
  51. StringBuffer & errorMessage(StringBuffer &str) const
  52. {
  53. if (strncmp(str.str(), EXCEPTION_PREFIX, strlen(EXCEPTION_PREFIX)) == 0)
  54. str.append(msg);
  55. else
  56. str.append(EXCEPTION_PREFIX).append(" (").append(msg).append(")");
  57. return str;
  58. };
  59. MessageAudience errorAudience() const { return (MSGAUD_user); };
  60. const void *errorRow() { return (row); };
  61. private:
  62. int errcode;
  63. StringAttr msg;
  64. const void *row;
  65. };
  66. //=================================================================================================
  67. class Url : public CInterface, implements IInterface
  68. {
  69. public:
  70. IMPLEMENT_IINTERFACE;
  71. StringAttr method;
  72. StringAttr host;
  73. unsigned port;
  74. StringBuffer path;
  75. StringBuffer userPasswordPair;
  76. StringBuffer &getUrlString(StringBuffer &url) const
  77. {
  78. return url.append(method).append("://").append(host).append(":").append(port).append(path);
  79. }
  80. IException *getUrlException(IException *e) const
  81. {
  82. StringBuffer url;
  83. StringBuffer text;
  84. e->errorMessage(text);
  85. rtlAddExceptionTag(text, "url", getUrlString(url).str());
  86. if (text.length() <= 1024)
  87. return MakeStringException(e->errorCode(), "%s", text.str());
  88. else
  89. return MakeStringExceptionDirect(e->errorCode(), text.str());
  90. }
  91. Url() : port(0)
  92. {
  93. }
  94. private:
  95. char translateHex(char hex) {
  96. if(hex >= 'A')
  97. return (hex & 0xdf) - 'A' + 10;
  98. else
  99. return hex - '0';
  100. }
  101. void userPassword_decode(const char* userpass, StringBuffer& result)
  102. {
  103. if(!userpass || !*userpass)
  104. return;
  105. const char *finger = userpass;
  106. while (*finger)
  107. {
  108. char c = *finger++;
  109. if (c == '%')
  110. {
  111. if(*finger != '\0')
  112. {
  113. c = translateHex(*finger);
  114. finger++;
  115. }
  116. if(*finger != '\0')
  117. {
  118. c = (char)(c*16 + translateHex(*finger));
  119. finger++;
  120. }
  121. }
  122. result.append(c);
  123. }
  124. return;
  125. }
  126. void cleanPath(const char *src, StringBuffer &output)
  127. {
  128. const char *srcPtr = src;
  129. char *dst = output.reserveTruncate(strlen(src));
  130. char *dstPtr = dst;
  131. while (*srcPtr)
  132. {
  133. *dstPtr++ = *srcPtr;
  134. if ('/' == *srcPtr++)
  135. {
  136. while ('/' == *srcPtr)
  137. ++srcPtr;
  138. }
  139. }
  140. *dstPtr = '\0';
  141. output.setLength(dstPtr-dst);
  142. }
  143. public:
  144. Url(char *urltext)
  145. {
  146. char *p;
  147. if ((p = strstr(urltext, "://")) != NULL)
  148. {
  149. *p = 0;
  150. p += 3; // skip past the colon-slash-slash
  151. method.set(urltext);
  152. urltext = p;
  153. }
  154. else
  155. throw MakeStringException(-1, "Malformed URL");
  156. if ((p = strchr(urltext, '@')) != NULL)
  157. {
  158. // extract username & password
  159. *p = 0;
  160. p++;
  161. userPassword_decode(urltext, userPasswordPair);
  162. urltext = p;
  163. }
  164. if ((p = strchr(urltext, ':')) != NULL)
  165. {
  166. // extract the port
  167. *p = 0;
  168. p++;
  169. port = atoi(p);
  170. host.set(urltext);
  171. if ((p = strchr(p, '/')) != NULL)
  172. cleanPath(p, path);
  173. else
  174. path.append("/");
  175. }
  176. else
  177. {
  178. // no port - look at method for port
  179. if (stricmp(method.str(), "https") == 0)
  180. port = 443;
  181. else if (stricmp(method.str(), "http") == 0)
  182. port = 80;
  183. else
  184. throw MakeStringException(-1, "Unsupported access method");
  185. if ((p = strchr(urltext, '/')) != NULL)
  186. {
  187. *p = 0;
  188. host.set(urltext);
  189. *p = '/';
  190. cleanPath(p, path);
  191. }
  192. else
  193. {
  194. host.set(urltext);
  195. path.append("/");
  196. }
  197. }
  198. // While the code below would make some sense, there is code that relies on being able to use invalid IP addresses and catching the
  199. // errors that result via ONFAIL.
  200. #if 0
  201. IpAddress ipaddr(host);
  202. if ( ipaddr.isNull())
  203. throw MakeStringException(-1, "Invalid IP address %s", host.str());
  204. #endif
  205. }
  206. };
  207. typedef IArrayOf<Url> UrlArray;
  208. //=================================================================================================
  209. //MORE: This whole class should really be replaced with a single function, avoiding at least one string clone.
  210. class UrlListParser
  211. {
  212. public:
  213. UrlListParser(const char * text)
  214. {
  215. fullText = text ? strdup(text) : NULL;
  216. }
  217. ~UrlListParser()
  218. {
  219. free(fullText);
  220. }
  221. unsigned getUrls(UrlArray &array, const char *basic_credentials=nullptr)
  222. {
  223. if (fullText)
  224. {
  225. char *copyFullText = strdup(fullText);
  226. char *saveptr;
  227. char *url = strtok_r(copyFullText, "|", &saveptr);
  228. while (url != NULL)
  229. {
  230. Owned<Url> item = new Url(url);
  231. if (basic_credentials)
  232. item->userPasswordPair.set(basic_credentials);
  233. array.append(*item.getClear());
  234. url = strtok_r(NULL, "|", &saveptr);
  235. }
  236. free(copyFullText);
  237. }
  238. return array.ordinality();
  239. }
  240. private:
  241. char *fullText;
  242. };
  243. //=================================================================================================
  244. #define BLACKLIST_RETRIES 10
  245. #define ROXIE_ABORT_EVENT 1407
  246. #define TIMELIMIT_EXCEEDED 1408
  247. class BlackLister : public CInterface, implements IThreadFactory
  248. {
  249. SocketEndpointArray list;
  250. Owned<IThreadPool> pool;
  251. CriticalSection crit;
  252. private:
  253. inline void checkRoxieAbortMonitor(IRoxieAbortMonitor * roxieAbortMonitor)
  254. {
  255. if (roxieAbortMonitor)
  256. {
  257. try
  258. {
  259. roxieAbortMonitor->checkForAbort();//throws
  260. }
  261. catch (IException *e)
  262. {
  263. StringBuffer s;
  264. e->errorMessage(s);
  265. e->Release();
  266. throw MakeStringException(ROXIE_ABORT_EVENT, "%s", s.str());
  267. }
  268. }
  269. }
  270. public:
  271. bool lookup(SocketEndpoint &ep, const IContextLogger &logctx)
  272. {
  273. CriticalBlock b(crit);
  274. if (list.find(ep)!=NotFound)
  275. {
  276. if (soapTraceLevel > 3)
  277. {
  278. StringBuffer s;
  279. logctx.CTXLOG("socket %s is blacklisted", ep.getUrlStr(s).str());
  280. }
  281. return true;
  282. }
  283. return false;
  284. }
  285. void blacklist(SocketEndpoint &ep, const IContextLogger &logctx)
  286. {
  287. CriticalBlock b(crit);
  288. if (list.find(ep)==NotFound)
  289. {
  290. if (soapTraceLevel > 0)
  291. {
  292. StringBuffer s;
  293. logctx.CTXLOG("Blacklisting socket %s", ep.getUrlStr(s).str());
  294. }
  295. list.append(ep);
  296. pool->start(&ep);
  297. }
  298. }
  299. void deblacklist(SocketEndpoint &ep)
  300. {
  301. CriticalBlock b(crit);
  302. unsigned idx = list.find(ep);
  303. if (idx!=NotFound)
  304. {
  305. if (soapTraceLevel > 0)
  306. {
  307. StringBuffer s;
  308. DBGLOG("De-blacklisting socket %s", ep.getUrlStr(s).str());
  309. }
  310. list.remove(idx);
  311. }
  312. }
  313. public:
  314. IMPLEMENT_IINTERFACE;
  315. BlackLister()
  316. {
  317. pool.setown(createThreadPool("SocketBlacklistPool", this, NULL, 0, 0));
  318. }
  319. ISocket* connect(SocketEndpoint &ep,
  320. const IContextLogger &logctx,
  321. unsigned retries,
  322. unsigned timeoutMS,
  323. IRoxieAbortMonitor * roxieAbortMonitor)
  324. {
  325. if (lookup(ep, logctx))
  326. {
  327. StringBuffer s;
  328. ep.getUrlStr(s);
  329. throw MakeStringException(-1, "blacklisted socket %s", s.str());
  330. }
  331. Owned<IException> exc;
  332. try
  333. {
  334. checkRoxieAbortMonitor(roxieAbortMonitor);
  335. Owned<ISocket> sock;
  336. Owned<ISocketConnectWait> scw = nonBlockingConnect(ep, timeoutMS == WAIT_FOREVER ? 60000 : timeoutMS*(retries+1));
  337. for (;;)
  338. {
  339. sock.setown(scw->wait(1000));//throws if connect fails or timeoutMS
  340. checkRoxieAbortMonitor(roxieAbortMonitor);
  341. if (sock)
  342. return sock.getLink();
  343. }
  344. }
  345. catch (IJSOCK_Exception *e)
  346. {
  347. EXCLOG(e,"BlackLister::connect");
  348. if (exc)
  349. e->Release();
  350. else
  351. exc.setown(e);
  352. }
  353. blacklist(ep, logctx);
  354. if (exc->errorCode()==JSOCKERR_connection_failed) {
  355. StringBuffer s;
  356. ep.getUrlStr(s);
  357. throw MakeStringException(JSOCKERR_connection_failed, "connection failed %s", s.str());
  358. }
  359. throw exc.getClear();
  360. return NULL;
  361. }
  362. bool blacklisted (unsigned short port, char const* host)
  363. {
  364. SocketEndpoint ep(host, port);
  365. return (list.find(ep)!=NotFound);
  366. }
  367. ISocket* connect(unsigned short port,
  368. char const* host,
  369. const IContextLogger &logctx,
  370. unsigned retries,
  371. unsigned timeoutMS,
  372. IRoxieAbortMonitor * roxieAbortMonitor )
  373. {
  374. SocketEndpoint ep(host, port);
  375. return connect(ep, logctx, retries, timeoutMS, roxieAbortMonitor);
  376. }
  377. virtual IPooledThread *createNew()
  378. {
  379. class SocketDeblacklister : public CInterface, implements IPooledThread
  380. {
  381. SocketEndpoint ep;
  382. BlackLister &parent;
  383. Semaphore stopped;
  384. public:
  385. IMPLEMENT_IINTERFACE;
  386. SocketDeblacklister(BlackLister &_parent): parent(_parent)
  387. {
  388. }
  389. ~SocketDeblacklister()
  390. {
  391. }
  392. virtual void init(void *param) override
  393. {
  394. ep.set(*(SocketEndpoint *) param);
  395. }
  396. virtual void threadmain() override
  397. {
  398. unsigned delay = 5000;
  399. for (unsigned i = 0; i < BLACKLIST_RETRIES; i++)
  400. {
  401. try
  402. {
  403. Owned<ISocket> s = ISocket::connect_timeout(ep, 10000);
  404. s->close();
  405. break;
  406. }
  407. catch (IJSOCK_Exception *E)
  408. {
  409. // EXCLOG(E, "While updating socket blacklist"); // MORE - may need to downgrade if this fires traps
  410. E->Release();
  411. if (stopped.wait(delay))
  412. return;
  413. delay += delay;
  414. }
  415. }
  416. parent.deblacklist(ep);
  417. }
  418. virtual bool stop() override
  419. {
  420. stopped.signal();
  421. return true;
  422. }
  423. virtual bool canReuse() const override { return true; }
  424. };
  425. return new SocketDeblacklister(*this);
  426. }
  427. void stop()
  428. {
  429. pool->stopAll();
  430. }
  431. } *blacklist;
  432. static IPersistentHandler* persistentHandler = nullptr;
  433. static CriticalSection persistentCrit;
  434. static std::atomic<bool> persistentInitDone{false};
  435. void initPersistentHandler()
  436. {
  437. CriticalBlock block(persistentCrit);
  438. if (!persistentInitDone)
  439. {
  440. #ifndef _CONTAINERIZED
  441. const IProperties &conf = queryEnvironmentConf();
  442. int maxPersistentRequests = conf.getPropInt("maxPersistentRequests", DEFAULT_MAX_PERSISTENT_REQUESTS);
  443. #else
  444. const IPropertyTree& conf = queryComponentConfig();
  445. int maxPersistentRequests = conf.getPropInt("@maxPersistentRequests", DEFAULT_MAX_PERSISTENT_REQUESTS);
  446. #endif
  447. if (maxPersistentRequests != 0)
  448. persistentHandler = createPersistentHandler(nullptr, DEFAULT_MAX_PERSISTENT_IDLE_TIME, maxPersistentRequests, PersistentLogLevel::PLogMin, true);
  449. persistentInitDone = true;
  450. }
  451. }
  452. MODULE_INIT(INIT_PRIORITY_STANDARD)
  453. {
  454. blacklist = new BlackLister;
  455. return true;
  456. }
  457. MODULE_EXIT()
  458. {
  459. blacklist->stop();
  460. delete blacklist;
  461. if (persistentHandler)
  462. {
  463. persistentHandler->stop(true);
  464. ::Release(persistentHandler);
  465. }
  466. }
  467. //=================================================================================================
  468. class ColumnProvider : public IColumnProvider, public CInterface
  469. {
  470. public:
  471. ColumnProvider(unsigned _callLatencyMs) : callLatencyMs(_callLatencyMs), base(NULL) {}
  472. IMPLEMENT_IINTERFACE;
  473. virtual bool getBool(const char * path) { return base->getBool(path); }
  474. virtual void getData(size32_t len, void * text, const char * path) { base->getData(len, text, path); }
  475. virtual void getDataX(size32_t & len, void * & text, const char * path) { base->getDataX(len, text, path); }
  476. virtual __int64 getInt(const char * path)
  477. {
  478. __int64 ret = base->getInt(path);
  479. if((ret==0) && path && *path=='_')
  480. {
  481. if (stricmp(path, "_call_latency_ms")==0)
  482. ret = callLatencyMs;
  483. else if (stricmp(path, "_call_latency")==0)
  484. ret = (callLatencyMs + 500) / 1000;
  485. }
  486. return ret;
  487. }
  488. virtual __uint64 getUInt(const char * path)
  489. {
  490. __uint64 ret = base->getUInt(path);
  491. if((ret==0) && path && *path=='_')
  492. {
  493. if (stricmp(path, "_call_latency_ms")==0)
  494. ret = callLatencyMs;
  495. else if (stricmp(path, "_call_latency")==0)
  496. ret = (callLatencyMs + 500) / 1000;
  497. }
  498. return ret;
  499. }
  500. virtual void getQString(size32_t len, char * text, const char * path) { base->getQString(len, text, path); }
  501. virtual void getString(size32_t len, char * text, const char * path) { base->getString(len, text, path); }
  502. virtual void getStringX(size32_t & len, char * & text, const char * path) { base->getStringX(len, text, path); }
  503. virtual void getUnicodeX(size32_t & len, UChar * & text, const char * path) { base->getUnicodeX(len, text, path); }
  504. virtual bool getIsSetAll(const char * path) { return base->getIsSetAll(path); }
  505. virtual IColumnProviderIterator * getChildIterator(const char * path) { return base->getChildIterator(path); }
  506. virtual void getUtf8X(size32_t & len, char * & text, const char * path) { base->getUtf8X(len, text, path); }
  507. virtual bool readBool(const char * path, bool _default) { return base->readBool(path, _default); }
  508. virtual void readData(size32_t len, void * text, const char * path, size32_t _lenDefault, const void * _default) { base->readData(len, text, path, _lenDefault, _default); }
  509. virtual void readDataX(size32_t & len, void * & text, const char * path, size32_t _lenDefault, const void * _default) { base->readDataX(len, text, path, _lenDefault, _default); }
  510. virtual __int64 readInt(const char * path, __int64 _default)
  511. {
  512. if(path && *path=='_')
  513. {
  514. if (stricmp(path, "_call_latency_ms")==0)
  515. return callLatencyMs;
  516. else if (stricmp(path, "_call_latency")==0)
  517. return (callLatencyMs + 500) / 1000;
  518. }
  519. return base->readInt(path, _default);
  520. }
  521. virtual __uint64 readUInt(const char * path, __uint64 _default)
  522. {
  523. if(path && *path=='_')
  524. {
  525. if (stricmp(path, "_call_latency_ms")==0)
  526. return callLatencyMs;
  527. else if (stricmp(path, "_call_latency")==0)
  528. return (callLatencyMs + 500) / 1000;
  529. }
  530. return base->readUInt(path, _default);
  531. }
  532. virtual void readQString(size32_t len, char * text, const char * path, size32_t _lenDefault, const char * _default) { base->readQString(len, text, path, _lenDefault, _default); }
  533. virtual void readString(size32_t len, char * text, const char * path, size32_t _lenDefault, const char * _default) { base->readString(len, text, path, _lenDefault, _default); }
  534. virtual void readStringX(size32_t & len, char * & text, const char * path, size32_t _lenDefault, const char * _default) { base->readStringX(len, text, path, _lenDefault, _default); }
  535. virtual void readUnicodeX(size32_t & len, UChar * & text, const char * path, size32_t _lenDefault, const UChar * _default) { base->readUnicodeX(len, text, path, _lenDefault, _default); }
  536. virtual bool readIsSetAll(const char * path, bool _default) { return base->readIsSetAll(path, _default); }
  537. virtual void readUtf8X(size32_t & len, char * & text, const char * path, size32_t _lenDefault, const char * _default) { base->readUtf8X(len, text, path, _lenDefault, _default); }
  538. void setBase(IColumnProvider * _base) { base = _base; }
  539. virtual void getDataRaw(size32_t len, void * text, const char * path) { base->getDataRaw(len, text, path); }
  540. virtual void getDataRawX(size32_t & len, void * & text, const char * path) { base->getDataRawX(len, text, path); }
  541. virtual void readDataRaw(size32_t len, void * text, const char * path, size32_t _lenDefault, const void * _default) { base->readDataRaw(len, text, path, _lenDefault, _default); }
  542. virtual void readDataRawX(size32_t & len, void * & text, const char * path, size32_t _lenDefault, const void * _default) { base->readDataRawX(len, text, path, _lenDefault, _default); }
  543. protected:
  544. IColumnProvider * base;
  545. private:
  546. unsigned callLatencyMs;
  547. };
  548. //=================================================================================================
  549. //Same as ColumnProvider, except returns data as hex64Binary instead of 16 bit hex
  550. class ColumnProviderData64 : public ColumnProvider
  551. {
  552. public:
  553. ColumnProviderData64(unsigned _callLatencyMs) : ColumnProvider(_callLatencyMs) {}
  554. virtual void getData(size32_t len, void * text, const char * path) { ColumnProvider::getDataRaw(len, text, path); }
  555. virtual void getDataX(size32_t & len, void * & text, const char * path) { ColumnProvider::getDataRawX(len, text, path); }
  556. virtual void readData(size32_t len, void * text, const char * path, size32_t _lenDefault, const void * _default) { ColumnProvider::readDataRaw(len, text, path, _lenDefault, _default); }
  557. virtual void readDataX(size32_t len, void * text, const char * path, size32_t _lenDefault, const void * _default) { ColumnProvider::readDataRawX(len, text, path, _lenDefault, _default); }
  558. };
  559. //=================================================================================================
  560. IColumnProvider * CreateColumnProvider(unsigned _callLatencyMs, bool _encoding)
  561. {
  562. if (!_encoding)
  563. return new ColumnProvider(_callLatencyMs);
  564. else
  565. return new ColumnProviderData64(_callLatencyMs);
  566. }
  567. //=================================================================================================
  568. enum WSCType{STsoap, SThttp} ; //web service call type
  569. //Web Services Call Asynchronous For
  570. interface IWSCAsyncFor: public IInterface
  571. {
  572. virtual void processException(const Url &url, const void *row, IException *e) = 0;
  573. virtual void processException(const Url &url, ConstPointerArray &inputRows, IException *e) = 0;
  574. virtual void checkTimeLimitExceeded(unsigned * _remainingMS) = 0;
  575. virtual void createHttpRequest(Url &url, StringBuffer &request) = 0;
  576. virtual int readHttpResponse(StringBuffer &response, ISocket *socket, bool &keepAlive) = 0;
  577. virtual void processResponse(Url &url, StringBuffer &response, ColumnProvider * meta) = 0;
  578. virtual const char *getResponsePath() = 0;
  579. virtual ConstPointerArray & getInputRows() = 0;
  580. virtual IWSCHelper * getMaster() = 0;
  581. virtual IEngineRowAllocator * getOutputAllocator() = 0;
  582. virtual void For(unsigned num,unsigned maxatonce,bool abortFollowingException=false,bool shuffled=false) = 0;
  583. virtual void Do(unsigned idx=0)=0;
  584. };
  585. class CWSCHelper;
  586. IWSCAsyncFor * createWSCAsyncFor(CWSCHelper * _master, IXmlWriterExt &_xmlWriter, ConstPointerArray &_inputRows, PTreeReaderOptions _options);
  587. //=================================================================================================
  588. #define CBExceptionPathExc 0x0001
  589. #define CBExceptionPathExcsExc 0x0002
  590. #define CBExceptionPathExcs 0x0004
  591. class CMatchCB : implements IXMLSelect, public CInterface
  592. {
  593. IWSCAsyncFor &parent;
  594. const Url &url;
  595. StringAttr tail;
  596. ColumnProvider * meta;
  597. StringAttr excPath;
  598. unsigned excFlags = 0;
  599. public:
  600. IMPLEMENT_IINTERFACE;
  601. CMatchCB(IWSCAsyncFor &_parent, const Url &_url, const char *_tail, ColumnProvider * _meta, const char *_excPath, unsigned _excFlags) : parent(_parent), url(_url), tail(_tail), meta(_meta), excFlags(_excFlags), excPath(_excPath)
  602. {
  603. }
  604. bool checkGetExceptionEntry(bool check, Owned<IColumnProviderIterator> &excIter, IColumnProvider &parent, IColumnProvider *&excEntry, const char *path)
  605. {
  606. if (!check)
  607. return false;
  608. excIter.setown(parent.getChildIterator(path));
  609. excEntry = excIter->first();
  610. return excEntry != nullptr;
  611. }
  612. IColumnProvider *getExceptionEntry(Owned<IColumnProviderIterator> &excIter, IColumnProvider &parent)
  613. {
  614. IColumnProvider *excEntry = nullptr;
  615. if (excPath.length())
  616. {
  617. checkGetExceptionEntry(true, excIter, parent, excEntry, excPath.str()); //set by user so don't try others
  618. return excEntry;
  619. }
  620. if (checkGetExceptionEntry((excFlags & CBExceptionPathExc)!=0, excIter, parent, excEntry, "Exception"))
  621. return excEntry;
  622. if (checkGetExceptionEntry((excFlags & CBExceptionPathExcsExc)!=0, excIter, parent, excEntry, "Exceptions/Exception")) //ESP xml array
  623. return excEntry;
  624. checkGetExceptionEntry((excFlags & CBExceptionPathExcs)!=0, excIter, parent, excEntry, "Exceptions"); //json array
  625. return excEntry;
  626. }
  627. virtual void match(IColumnProvider &entry, offset_t startOffset, offset_t endOffset)
  628. {
  629. Owned<IException> e;
  630. if (tail.length()||excPath.length())
  631. {
  632. StringBuffer path(parent.getResponsePath());
  633. unsigned idx = (unsigned)entry.getInt(path.append("/@sequence").str());
  634. Owned<IColumnProviderIterator> excIter;
  635. IColumnProvider *excptEntry = getExceptionEntry(excIter, entry);
  636. if (excptEntry)
  637. {
  638. int code = (int)excptEntry->getInt("Code");
  639. size32_t len;
  640. char *message;
  641. excptEntry->getStringX(len, message, "Message");
  642. StringBuffer mstr(len, message);
  643. rtlFree(message);
  644. DBGLOG("Roxie exception: %s", mstr.str());
  645. e.setown(new ReceivedRoxieException(code, mstr.str()));
  646. parent.processException(url, parent.getInputRows().item(idx), e.getLink());
  647. }
  648. }
  649. if (parent.getMaster()->getRowTransformer() && !e)
  650. {
  651. IEngineRowAllocator * outputAllocator = parent.getOutputAllocator();
  652. Owned<IColumnProviderIterator> iter = entry.getChildIterator(tail);
  653. IColumnProvider *rowEntry = iter->first();
  654. while (rowEntry)
  655. {
  656. RtlDynamicRowBuilder rowBuilder(outputAllocator);
  657. size32_t sizeGot;
  658. if(meta)
  659. {
  660. meta->setBase(rowEntry);
  661. sizeGot = parent.getMaster()->transformRow(rowBuilder, meta);
  662. }
  663. else
  664. {
  665. sizeGot = parent.getMaster()->transformRow(rowBuilder, rowEntry);
  666. }
  667. if (sizeGot > 0)
  668. parent.getMaster()->putRow(rowBuilder.finalizeRowClear(sizeGot));
  669. rowEntry = iter->next();
  670. }
  671. }
  672. }
  673. };
  674. //=================================================================================================
  675. bool isValidHttpValue(char const * in)
  676. {
  677. for (const byte * ptr = (byte*)in; *ptr; ++ptr)
  678. if (((*ptr <= '\x1F') && (*ptr != '\x09')) || (*ptr == '\x7F'))
  679. return false;
  680. return true;
  681. }
  682. //=================================================================================================
  683. //Web Service Call helper thread
  684. class CWSCHelperThread : public Thread
  685. {
  686. private:
  687. CWSCHelper * master;
  688. virtual void outputRows(IXmlWriterExt &xmlWriter, ConstPointerArray &inputRows, const char *itemtag=NULL, bool encode_off=false, char const * itemns = NULL);
  689. virtual void createESPQuery(IXmlWriterExt &xmlWriter, ConstPointerArray &inputRows);
  690. virtual void createSOAPliteralOrEncodedQuery(IXmlWriterExt &xmlWriter, ConstPointerArray &inputRows);
  691. virtual void createXmlSoapQuery(IXmlWriterExt &xmlWriter, ConstPointerArray &inputRows);
  692. virtual void createHttpPostQuery(IXmlWriterExt &xmlWriter, ConstPointerArray &inputRows, bool appendRequestToName, bool appendEncodeFlag);
  693. virtual void processQuery(ConstPointerArray &inputRows);
  694. //Thread
  695. virtual int run();
  696. public:
  697. CWSCHelperThread(CWSCHelper * _master) : Thread("CWSCHelperThread")
  698. {
  699. master = _master;
  700. }
  701. ~CWSCHelperThread()
  702. {
  703. }
  704. };
  705. //=================================================================================================
  706. class CWSCHelper : implements IWSCHelper, public CInterface
  707. {
  708. private:
  709. SimpleInterThreadQueueOf<const void, true> outputQ;
  710. SpinLock outputQLock;
  711. CriticalSection toXmlCrit, transformCrit, onfailCrit, timeoutCrit;
  712. unsigned done;
  713. Owned<IPropertyTree> xpathHints;
  714. Linked<ClientCertificate> clientCert;
  715. static CriticalSection secureContextCrit;
  716. static Owned<ISecureSocketContext> secureContext;
  717. Owned<ISecureSocketContext> customSecureContext;
  718. CTimeMon timeLimitMon;
  719. bool complete, timeLimitExceeded;
  720. bool customClientCert = false;
  721. IRoxieAbortMonitor * roxieAbortMonitor;
  722. protected:
  723. CIArrayOf<CWSCHelperThread> threads;
  724. WSCType wscType;
  725. public:
  726. IMPLEMENT_IINTERFACE;
  727. CWSCHelper(IWSCRowProvider *_rowProvider, IEngineRowAllocator * _outputAllocator, const char *_authToken, WSCMode _wscMode, ClientCertificate *_clientCert, const IContextLogger &_logctx, IRoxieAbortMonitor *_roxieAbortMonitor, WSCType _wscType)
  728. : logctx(_logctx), outputAllocator(_outputAllocator), clientCert(_clientCert), roxieAbortMonitor(_roxieAbortMonitor)
  729. {
  730. wscMode = _wscMode;
  731. wscType = _wscType;
  732. done = 0;
  733. complete = aborted = timeLimitExceeded = false;
  734. rowProvider = _rowProvider;
  735. helper = rowProvider->queryActionHelper();
  736. callHelper = rowProvider->queryCallHelper();
  737. flags = helper->getFlags();
  738. OwnedRoxieString s;
  739. authToken.append(_authToken);
  740. if (helper->numRetries() < 0)
  741. maxRetries = 3; // default to 3 retries
  742. else
  743. maxRetries = helper->numRetries();
  744. //Allow all of these options to be specified separately. Possibly useful, and the code is cleaner.
  745. logMin = (flags & SOAPFlogmin) != 0;
  746. logXML = (flags & SOAPFlog) != 0;
  747. logUserMsg = (flags & SOAPFlogusermsg) != 0;
  748. double dval = helper->getTimeout(); // In seconds, but may include fractions of a second...
  749. if (dval < 0.0) //not provided, or out of range
  750. timeoutMS = 300*1000; // 300 second default
  751. else if (dval == 0)
  752. timeoutMS = WAIT_FOREVER;
  753. else
  754. timeoutMS = (unsigned)(dval * 1000);
  755. dval = helper->getTimeLimit();
  756. if (dval <= 0.0)
  757. timeLimitMS = WAIT_FOREVER;
  758. else
  759. timeLimitMS = (unsigned)(dval * 1000);
  760. if (flags & SOAPFhttpheaders)
  761. httpHeaders.set(s.setown(helper->getHttpHeaders()));
  762. if (flags & SOAPFxpathhints)
  763. {
  764. s.setown(helper->getXpathHintsXml());
  765. xpathHints.setown(createPTreeFromXMLString(s.get()));
  766. }
  767. if (wscType == STsoap)
  768. {
  769. soapaction.set(s.setown(helper->getSoapAction()));
  770. if(soapaction.get() && !isValidHttpValue(soapaction.get()))
  771. throw MakeStringException(-1, "SOAPAction value contained illegal characters: %s", soapaction.get());
  772. httpHeaderName.set(s.setown(helper->getHttpHeaderName()));
  773. if(httpHeaderName.get() && !isValidHttpValue(httpHeaderName.get()))
  774. throw MakeStringException(-1, "HTTPHEADER name contained illegal characters: %s", httpHeaderName.get());
  775. httpHeaderValue.set(s.setown(helper->getHttpHeaderValue()));
  776. if(httpHeaderValue.get() && !isValidHttpValue(httpHeaderValue.get()))
  777. throw MakeStringException(-1, "HTTPHEADER value contained illegal characters: %s", httpHeaderValue.get());
  778. if ((flags & SOAPFliteral) && (flags & SOAPFencoding))
  779. throw MakeStringException(0, "SOAPCALL 'LITERAL' and 'ENCODING' options are mutually exclusive");
  780. rowHeader.set(s.setown(helper->getHeader()));
  781. rowFooter.set(s.setown(helper->getFooter()));
  782. if (flags & SOAPFmarkupinfo)
  783. {
  784. rootHeader.set(s.setown(helper->getRequestHeader()));
  785. rootFooter.set(s.setown(helper->getRequestFooter()));
  786. }
  787. if(flags & SOAPFnamespace)
  788. {
  789. OwnedRoxieString ns = helper->getNamespaceName();
  790. if(ns && *ns)
  791. xmlnamespace.set(ns);
  792. }
  793. }
  794. if (wscType == SThttp)
  795. {
  796. //Check for unsupported flags
  797. if ((flags & SOAPFliteral) || (flags & SOAPFencoding))
  798. throw MakeStringException(0, "HTTPCALL 'LITERAL' and 'ENCODINGD' options not supported");
  799. }
  800. if (callHelper)
  801. {
  802. OwnedRoxieString iteratorPath(callHelper->getInputIteratorPath());
  803. char const * ipath = iteratorPath;
  804. if(ipath && (*ipath == '/'))
  805. ++ipath;
  806. inputpath.set(ipath);
  807. }
  808. service.set(s.setown(helper->getService()));
  809. service.trim();
  810. if (wscType == SThttp)
  811. {
  812. service.toUpperCase(); //GET/PUT/POST
  813. if (strcmp(service.str(), "GET") != 0)
  814. throw MakeStringException(0, "HTTPCALL Only 'GET' http method currently supported");
  815. OwnedRoxieString acceptTypeSupplied(helper->getAcceptType()); // text/html, text/xml, etc
  816. acceptType.set(acceptTypeSupplied);
  817. acceptType.trim();
  818. acceptType.toLowerCase();
  819. }
  820. if (callHelper)
  821. rowTransformer = callHelper->queryInputTransformer();
  822. else
  823. rowTransformer = NULL;
  824. StringBuffer proxyAddress;
  825. proxyAddress.set(s.setown(helper->getProxyAddress()));
  826. OwnedRoxieString hosts(helper->getHosts());
  827. if (isEmptyString(hosts))
  828. throw MakeStringException(0, "%sCALL specified no URLs",wscType == STsoap ? "SOAP" : "HTTP");
  829. if (0==strncmp(hosts, "secret:", 7))
  830. {
  831. const char *finger = hosts.get()+7;
  832. if (isEmptyString(finger))
  833. throw MakeStringException(0, "%sCALL HTTP-CONNECT SECRET specified with no name", wscType == STsoap ? "SOAP" : "HTTP");
  834. if (!proxyAddress.isEmpty())
  835. throw MakeStringException(0, "%sCALL PROXYADDRESS can't be used with HTTP-CONNECT secrets", wscType == STsoap ? "SOAP" : "HTTP");
  836. StringAttr vaultId;
  837. const char *thumb = strchr(finger, ':');
  838. if (thumb)
  839. {
  840. vaultId.set(finger, thumb-finger);
  841. finger = thumb + 1;
  842. }
  843. StringBuffer secretName("http-connect-");
  844. secretName.append(finger);
  845. Owned<IPropertyTree> secret = (vaultId.isEmpty()) ? getSecret("ecl", secretName) : getVaultSecret("ecl", vaultId, secretName, nullptr);
  846. if (!secret)
  847. throw MakeStringException(0, "%sCALL %s SECRET not found", wscType == STsoap ? "SOAP" : "HTTP", secretName.str());
  848. StringBuffer url;
  849. getSecretKeyValue(url, secret, "url");
  850. if (url.isEmpty())
  851. throw MakeStringException(0, "%sCALL %s HTTP SECRET must contain url", wscType == STsoap ? "SOAP" : "HTTP", secretName.str());
  852. UrlListParser urlListParser(url);
  853. StringBuffer auth;
  854. getSecretKeyValue(auth, secret, "username");
  855. if (auth.length())
  856. {
  857. if (strchr(auth, ':'))
  858. throw MakeStringException(0, "%sCALL HTTP-CONNECT SECRET username contains illegal colon", wscType == STsoap ? "SOAP" : "HTTP");
  859. auth.append(':');
  860. getSecretKeyValue(auth, secret, "password");
  861. }
  862. urlListParser.getUrls(urlArray, auth);
  863. proxyAddress.set(secret->queryProp("proxy"));
  864. getSecretKeyValue(proxyAddress.clear(), secret, "proxy");
  865. }
  866. else
  867. {
  868. UrlListParser urlListParser(hosts);
  869. urlListParser.getUrls(urlArray);
  870. }
  871. numUrls = urlArray.ordinality();
  872. if (numUrls == 0)
  873. throw MakeStringException(0, "%sCALL specified no URLs",wscType == STsoap ? "SOAP" : "HTTP");
  874. if (!proxyAddress.isEmpty())
  875. {
  876. UrlListParser proxyUrlListParser(proxyAddress);
  877. if (0 == proxyUrlListParser.getUrls(proxyUrlArray))
  878. throw MakeStringException(0, "%sCALL proxy address specified no URLs",wscType == STsoap ? "SOAP" : "HTTP");
  879. }
  880. if (wscMode == SCrow)
  881. {
  882. numRowThreads = 1;
  883. numUrlThreads = helper->numParallelThreads();
  884. if (numUrlThreads == 0)
  885. numUrlThreads = 1;
  886. else if (numUrlThreads > MAXWSCTHREADS)
  887. numUrlThreads = MAXWSCTHREADS;
  888. numRecordsPerBatch = 1;
  889. }
  890. else
  891. {
  892. unsigned totThreads = helper->numParallelThreads();
  893. if (totThreads < 1)
  894. totThreads = 2; // default to 2 threads
  895. else if (totThreads > MAXWSCTHREADS)
  896. totThreads = MAXWSCTHREADS;
  897. numUrlThreads = (numUrls < totThreads)? numUrls: totThreads;
  898. numRowThreads = totThreads / numUrlThreads;
  899. if (numRowThreads < 1)
  900. numRowThreads = 1;
  901. else if (numRowThreads > MAXWSCTHREADS)
  902. numRowThreads = MAXWSCTHREADS;
  903. numRecordsPerBatch = helper->numRecordsPerBatch();
  904. if (numRecordsPerBatch < 1)
  905. numRecordsPerBatch = 1;
  906. }
  907. for (unsigned i=0; i<numRowThreads; i++)
  908. threads.append(*new CWSCHelperThread(this));
  909. }
  910. ~CWSCHelper()
  911. {
  912. complete = true;
  913. waitUntilDone();
  914. threads.kill();
  915. }
  916. void waitUntilDone()
  917. {
  918. ForEachItemIn(i,threads)
  919. threads.item(i).join();
  920. for (;;)
  921. {
  922. const void *row = outputQ.dequeueNow();
  923. if (!row)
  924. break;
  925. outputAllocator->releaseRow(row);
  926. }
  927. outputQ.reset();
  928. }
  929. void start()
  930. {
  931. if (timeLimitMS != WAIT_FOREVER)
  932. timeLimitMon.reset(timeLimitMS);
  933. done = 0;
  934. complete = aborted = timeLimitExceeded = false;
  935. ForEachItemIn(i,threads)
  936. threads.item(i).start();
  937. }
  938. void abort()
  939. {
  940. aborted = true;
  941. complete = true;
  942. outputQ.stop();
  943. }
  944. const void * getRow()
  945. {
  946. if (complete)
  947. return NULL;
  948. for (;;)
  949. {
  950. const void *row = outputQ.dequeue();
  951. if (aborted)
  952. break;
  953. if (row)
  954. return row;
  955. // should only be here if setDone() triggered
  956. complete = true;
  957. Owned<IException> e = getError();
  958. if (e)
  959. throw e.getClear();
  960. break;
  961. }
  962. return NULL;
  963. }
  964. IException * getError()
  965. {
  966. SpinBlock sb(outputQLock);
  967. return error.getLink();
  968. }
  969. inline IEngineRowAllocator * queryOutputAllocator() const { return outputAllocator; }
  970. #ifdef _USE_OPENSSL
  971. ISecureSocketContext *ensureSecureContext(Owned<ISecureSocketContext> &ownedSC)
  972. {
  973. if (!ownedSC)
  974. {
  975. if (clientCert != NULL)
  976. ownedSC.setown(createSecureSocketContextEx(clientCert->certificate, clientCert->privateKey, clientCert->passphrase, ClientSocket));
  977. else
  978. ownedSC.setown(createSecureSocketContext(ClientSocket));
  979. }
  980. return ownedSC.get();
  981. }
  982. ISecureSocketContext *ensureStaticSecureContext()
  983. {
  984. CriticalBlock b(secureContextCrit);
  985. return ensureSecureContext(secureContext);
  986. }
  987. ISecureSocket *createSecureSocket(ISocket *sock)
  988. {
  989. ISecureSocketContext *sc = (customClientCert) ? ensureSecureContext(customSecureContext) : ensureStaticSecureContext();
  990. return sc->createSecureSocket(sock);
  991. }
  992. #endif
  993. bool isTimeLimitExceeded(unsigned *_remainingMS)
  994. {
  995. if (timeLimitMS != WAIT_FOREVER)
  996. {
  997. CriticalBlock block(timeoutCrit);
  998. if (timeLimitExceeded || timeLimitMon.timedout(_remainingMS))
  999. {
  1000. timeLimitExceeded = true;
  1001. return true;
  1002. }
  1003. }
  1004. else
  1005. *_remainingMS = (unsigned)-1;
  1006. return false;
  1007. }
  1008. void addUserLogMsg(const byte * row)
  1009. {
  1010. if (logUserMsg)
  1011. {
  1012. size32_t lenText;
  1013. rtlDataAttr text;
  1014. helper->getLogText(lenText, text.refstr(), row);
  1015. logctx.CTXLOG("%s: %.*s", wscCallTypeText(), lenText, text.getstr());
  1016. }
  1017. }
  1018. inline IXmlToRowTransformer * getRowTransformer() { return rowTransformer; }
  1019. inline const char * wscCallTypeText() const { return wscType == STsoap ? "SOAPCALL" : "HTTPCALL"; }
  1020. protected:
  1021. friend class CWSCHelperThread;
  1022. friend class CWSCAsyncFor;
  1023. void putRow(const void * row)
  1024. {
  1025. assertex(row);
  1026. outputQ.enqueue(row);
  1027. }
  1028. void setDone()
  1029. {
  1030. bool doStop;
  1031. {
  1032. SpinBlock sb(outputQLock);
  1033. done++;
  1034. doStop = (done == numRowThreads);
  1035. }
  1036. if (doStop)
  1037. {
  1038. // Note - Don't stop the queue - that effectively discards what's already on there,
  1039. // which is not what we want.
  1040. // Instead, push a NULL to indicate the end of the output.
  1041. outputQ.enqueue(NULL);
  1042. }
  1043. }
  1044. void setErrorOwn(IException * e)
  1045. {
  1046. SpinBlock sb(outputQLock);
  1047. if (error)
  1048. ::Release(e);
  1049. else
  1050. error.setown(e);
  1051. }
  1052. void toXML(const byte * self, IXmlWriterExt & out) { CriticalBlock block(toXmlCrit); helper->toXML(self, out); }
  1053. size32_t transformRow(ARowBuilder & rowBuilder, IColumnProvider * row)
  1054. {
  1055. CriticalBlock block(transformCrit);
  1056. NullDiskCallback callback;
  1057. return rowTransformer->transform(rowBuilder, row, &callback);
  1058. }
  1059. unsigned onFailTransform(ARowBuilder & rowBuilder, const void * left, IException * e) { CriticalBlock block(onfailCrit); return callHelper->onFailTransform(rowBuilder, left, e); }
  1060. StringBuffer authToken;
  1061. WSCMode wscMode;
  1062. IWSCRowProvider * rowProvider;
  1063. IHThorWebServiceCallActionArg * helper;
  1064. IHThorWebServiceCallArg * callHelper;
  1065. Linked<IEngineRowAllocator> outputAllocator;
  1066. Owned<IException> error;
  1067. UrlArray urlArray;
  1068. UrlArray proxyUrlArray;
  1069. unsigned numRecordsPerBatch;
  1070. unsigned numUrls;
  1071. unsigned numRowThreads;
  1072. unsigned numUrlThreads;
  1073. unsigned maxRetries;
  1074. unsigned timeoutMS;
  1075. unsigned timeLimitMS;
  1076. bool logXML;
  1077. bool logMin;
  1078. bool logUserMsg;
  1079. bool aborted;
  1080. const IContextLogger &logctx;
  1081. unsigned flags;
  1082. StringAttr soapaction;
  1083. StringAttr httpHeaderName;
  1084. StringAttr httpHeaderValue;
  1085. StringAttr httpHeaders;
  1086. StringAttr inputpath;
  1087. StringBuffer service;
  1088. StringBuffer acceptType;//for httpcall, text/plain, text/html, text/xml, etc
  1089. StringAttr rowHeader;
  1090. StringAttr rowFooter;
  1091. StringAttr rootHeader;
  1092. StringAttr rootFooter;
  1093. StringAttr xmlnamespace;
  1094. IXmlToRowTransformer * rowTransformer;
  1095. };
  1096. CriticalSection CWSCHelper::secureContextCrit;
  1097. Owned<ISecureSocketContext> CWSCHelper::secureContext; // created on first use
  1098. //=================================================================================================
  1099. void CWSCHelperThread::outputRows(IXmlWriterExt &xmlWriter, ConstPointerArray &inputRows, const char *itemtag, bool encode_off, char const * itemns)
  1100. {
  1101. ForEachItemIn(idx, inputRows)
  1102. {
  1103. if (idx!=0)
  1104. xmlWriter.checkDelimiter();
  1105. if (itemtag && *itemtag) //TAG
  1106. {
  1107. xmlWriter.outputBeginNested(itemtag, true);
  1108. if(itemns)
  1109. xmlWriter.outputXmlns("xmlns", itemns);
  1110. }
  1111. if (master->rowHeader.get()) //OPTIONAL HEADER (specified by "HEADING" option)
  1112. xmlWriter.outputInline(master->rowHeader.get());
  1113. //XML ROW CONTENT
  1114. master->toXML((const byte *)inputRows.item(idx), xmlWriter);
  1115. if (master->rowFooter.get()) //OPTION FOOTER
  1116. xmlWriter.outputInline(master->rowFooter.get());
  1117. if (encode_off) //ENCODING
  1118. xmlWriter.outputInt(0, 1, "encode_");
  1119. if (itemtag && *itemtag) //TAG
  1120. xmlWriter.outputEndNested(itemtag);
  1121. master->addUserLogMsg((const byte *)inputRows.item(idx));
  1122. }
  1123. }
  1124. void CWSCHelperThread::createHttpPostQuery(IXmlWriterExt &xmlWriter, ConstPointerArray &inputRows, bool appendRequestToName, bool appendEncodeFlag)
  1125. {
  1126. StringBuffer method_tag;
  1127. method_tag.append(master->service);
  1128. if (method_tag.length() && appendRequestToName)
  1129. method_tag.append("Request");
  1130. StringBuffer array_tag;
  1131. StringAttr method_ns;
  1132. if (master->rootHeader.get()) //OPTIONAL ROOT REQUEST HEADER
  1133. xmlWriter.outputInline(master->rootHeader.get());
  1134. if (inputRows.ordinality() > 1)
  1135. {
  1136. if (!(master->flags & SOAPFnoroot))
  1137. {
  1138. if (method_tag.length())
  1139. {
  1140. array_tag.append(method_tag).append("Array");
  1141. xmlWriter.outputBeginNested(array_tag, true);
  1142. if (master->xmlnamespace.get())
  1143. xmlWriter.outputXmlns("xmlns", master->xmlnamespace);
  1144. }
  1145. }
  1146. xmlWriter.outputBeginArray(method_tag);
  1147. }
  1148. else
  1149. {
  1150. if(master->xmlnamespace.get())
  1151. method_ns.set(master->xmlnamespace.get());
  1152. }
  1153. outputRows(xmlWriter, inputRows, method_tag.str(), appendEncodeFlag ? (inputRows.ordinality() == 1) : false, method_ns.get());
  1154. if (inputRows.ordinality() > 1)
  1155. {
  1156. xmlWriter.outputEndArray(method_tag);
  1157. if (appendEncodeFlag)
  1158. xmlWriter.outputInt(0, 1, "encode_");
  1159. if (!(master->flags & SOAPFnoroot))
  1160. {
  1161. if (method_tag.length())
  1162. xmlWriter.outputEndNested(array_tag);
  1163. }
  1164. }
  1165. if (master->rootFooter.get()) //OPTIONAL ROOT REQUEST FOOTER
  1166. xmlWriter.outputInline(master->rootFooter.get());
  1167. }
  1168. void CWSCHelperThread::createESPQuery(IXmlWriterExt &xmlWriter, ConstPointerArray &inputRows)
  1169. {
  1170. createHttpPostQuery(xmlWriter, inputRows, true, true);
  1171. }
  1172. //Create servce xml request body, with binding usage of either Literal or Encoded
  1173. //Note that Encoded usage requires type encoding for data fields
  1174. void CWSCHelperThread::createSOAPliteralOrEncodedQuery(IXmlWriterExt &xmlWriter, ConstPointerArray &inputRows)
  1175. {
  1176. xmlWriter.outputBeginNested(master->service, true);
  1177. if (master->flags & SOAPFencoding)
  1178. xmlWriter.outputCString("http://schemas.xmlsoap.org/soap/encoding/", "@soapenv:encodingStyle");
  1179. if (master->xmlnamespace.get())
  1180. xmlWriter.outputXmlns("xmlns", master->xmlnamespace.get());
  1181. outputRows(xmlWriter, inputRows);
  1182. xmlWriter.outputEndNested(master->service);
  1183. }
  1184. //Create SOAP body of http request
  1185. void CWSCHelperThread::createXmlSoapQuery(IXmlWriterExt &xmlWriter, ConstPointerArray &inputRows)
  1186. {
  1187. xmlWriter.outputQuoted("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
  1188. xmlWriter.outputBeginNested("soap:Envelope", true);
  1189. xmlWriter.outputXmlns("soap", "http://schemas.xmlsoap.org/soap/envelope/");
  1190. if (master->flags & SOAPFencoding)
  1191. { //SOAP RPC/encoded. 'Encoded' usage includes type encoding
  1192. xmlWriter.outputXmlns("xsd", "http://www.w3.org/2001/XMLSchema");
  1193. xmlWriter.outputXmlns("xsi", "http://www.w3.org/2001/XMLSchema-instance");
  1194. }
  1195. xmlWriter.outputBeginNested("soap:Body", true);
  1196. if (master->flags & SOAPFliteral || master->flags & SOAPFencoding)
  1197. createSOAPliteralOrEncodedQuery(xmlWriter, inputRows);
  1198. else
  1199. createESPQuery(xmlWriter, inputRows);
  1200. xmlWriter.outputEndNested("soap:Body");
  1201. xmlWriter.outputEndNested("soap:Envelope");
  1202. }
  1203. void CWSCHelperThread::processQuery(ConstPointerArray &inputRows)
  1204. {
  1205. unsigned xmlWriteFlags = 0;
  1206. unsigned xmlReadFlags = ptr_ignoreNameSpaces;
  1207. if (master->flags & SOAPFtrim)
  1208. xmlWriteFlags |= XWFtrim;
  1209. if ((master->flags & SOAPFpreserveSpace) == 0)
  1210. xmlReadFlags |= ptr_ignoreWhiteSpace;
  1211. bool useMarkup = (master->flags & SOAPFmarkupinfo);
  1212. XMLWriterType xmlType = WTStandard;
  1213. if (useMarkup && (master->flags & SOAPFjson))
  1214. xmlType = (master->flags & SOAPFnoroot) ? WTJSONRootless : WTJSONObject;
  1215. else if (master->flags & SOAPFencoding)
  1216. xmlType = WTEncodingData64;
  1217. Owned<IXmlWriterExt> xmlWriter = createIXmlWriterExt(xmlWriteFlags, 0, nullptr, xmlType);
  1218. if (useMarkup)
  1219. createHttpPostQuery(*xmlWriter, inputRows, false, false);
  1220. else if (master->wscType == STsoap )
  1221. createXmlSoapQuery(*xmlWriter, inputRows);
  1222. xmlWriter->finalize();
  1223. Owned<IWSCAsyncFor> casyncfor = createWSCAsyncFor(master, *xmlWriter, inputRows, (PTreeReaderOptions) xmlReadFlags);
  1224. casyncfor->For(master->numUrls, master->numUrlThreads,false,true); // shuffle URLS for poormans load balance
  1225. }
  1226. int CWSCHelperThread::run()
  1227. {
  1228. ConstPointerArray inputRows;
  1229. if (master->wscMode == SCrow)
  1230. {
  1231. inputRows.append(NULL);
  1232. try
  1233. {
  1234. processQuery(inputRows);
  1235. }
  1236. catch (IException *e)
  1237. {
  1238. master->setErrorOwn(e);
  1239. }
  1240. inputRows.pop();
  1241. }
  1242. else
  1243. {
  1244. // following a bit odd but preserving previous semantics (except fixing abort leak)
  1245. for (;;)
  1246. {
  1247. try
  1248. {
  1249. while (!master->complete && !master->error.get())
  1250. {
  1251. if (master->aborted) {
  1252. while (inputRows.ordinality() > 0)
  1253. master->rowProvider->releaseRow(inputRows.popGet());
  1254. return 0;
  1255. }
  1256. const void *r = master->rowProvider->getNextRow();
  1257. if (!r)
  1258. break;
  1259. inputRows.append(r);
  1260. if (inputRows.ordinality() >= master->numRecordsPerBatch)
  1261. break;
  1262. }
  1263. if (inputRows.ordinality() == 0)
  1264. break;
  1265. processQuery(inputRows);
  1266. }
  1267. catch (IException *e)
  1268. {
  1269. master->setErrorOwn(e); // going to exit next time round
  1270. }
  1271. while (inputRows.ordinality() > 0)
  1272. master->rowProvider->releaseRow(inputRows.popGet());
  1273. }
  1274. }
  1275. master->setDone();
  1276. return 0;
  1277. }
  1278. //=================================================================================================
  1279. IWSCHelper * createSoapCallHelper(IWSCRowProvider *r, IEngineRowAllocator * outputAllocator, const char *authToken, WSCMode wscMode, ClientCertificate *clientCert, const IContextLogger &logctx, IRoxieAbortMonitor * roxieAbortMonitor)
  1280. {
  1281. return new CWSCHelper(r, outputAllocator, authToken, wscMode, clientCert, logctx, roxieAbortMonitor, STsoap);
  1282. }
  1283. IWSCHelper * createHttpCallHelper(IWSCRowProvider *r, IEngineRowAllocator * outputAllocator, const char *authToken, WSCMode wscMode, ClientCertificate *clientCert, const IContextLogger &logctx, IRoxieAbortMonitor * roxieAbortMonitor)
  1284. {
  1285. return new CWSCHelper(r, outputAllocator, authToken, wscMode, clientCert, logctx, roxieAbortMonitor, SThttp);
  1286. }
  1287. //=================================================================================================
  1288. bool httpHeaderBlockContainsHeader(const char *httpheaders, const char *header)
  1289. {
  1290. if (!httpheaders || !*httpheaders)
  1291. return false;
  1292. VStringBuffer match("\n%s:", header);
  1293. const char *matchStart = match.str()+1;
  1294. if (!strncmp(httpheaders, matchStart, strlen(matchStart)))
  1295. return true;
  1296. if (strstr(httpheaders, match))
  1297. return true;
  1298. return false;
  1299. }
  1300. bool getHTTPHeader(const char *httpheaders, const char *header, StringBuffer& value)
  1301. {
  1302. if (!httpheaders || !*httpheaders || !header || !*header)
  1303. return false;
  1304. const char* pHeader = strstr(httpheaders, header);
  1305. if (!pHeader)
  1306. return false;
  1307. pHeader += strlen(header);
  1308. if (*pHeader != ':')
  1309. return getHTTPHeader(pHeader, header, value);
  1310. pHeader++;
  1311. const char* ppHeader = strchr(pHeader, '\n');
  1312. if (!ppHeader)
  1313. value.append(pHeader);
  1314. else
  1315. value.append(pHeader, 0, ppHeader - pHeader);
  1316. value.trim();
  1317. return true;
  1318. }
  1319. class CWSCAsyncFor : implements IWSCAsyncFor, public CInterface, public CAsyncFor
  1320. {
  1321. class CSocketDataProvider : public CInterface
  1322. {
  1323. const char * buffer;
  1324. size32_t currLen;
  1325. size32_t curPosn;
  1326. ISocket * socket;
  1327. unsigned timeoutMS;
  1328. public:
  1329. CSocketDataProvider(const char * _buffer, size32_t _curPosn, size32_t _currLen, ISocket * _sock, unsigned _timeout )
  1330. : buffer(_buffer), currLen(_currLen), curPosn(_curPosn), socket(_sock), timeoutMS(_timeout)
  1331. {
  1332. }
  1333. size32_t getBytes(char * buf, size32_t len)
  1334. {
  1335. size32_t count;
  1336. if ( len <= (currLen-curPosn))
  1337. { //its already in the buffer
  1338. memcpy(buf, (buffer + curPosn), len);
  1339. curPosn += len;
  1340. count = len;
  1341. }
  1342. else if (curPosn >= currLen)
  1343. { //nothing in buffer, read from socket
  1344. size32_t bytesRead=0;
  1345. count = 0;
  1346. do
  1347. {
  1348. socket->readtms(buf + count, 0, len - count, bytesRead, timeoutMS);
  1349. count += bytesRead;
  1350. } while (count != len);
  1351. currLen = curPosn = 0;
  1352. }
  1353. else
  1354. { //only some is in buffer, read rest from socket
  1355. size32_t bytesRead=0;
  1356. size32_t avail = currLen - curPosn;
  1357. memcpy(buf, (buffer + curPosn), avail);
  1358. count = avail;
  1359. do
  1360. {
  1361. size32_t read;
  1362. socket->readtms(buf+avail+bytesRead, 0, len-avail-bytesRead, read, timeoutMS);
  1363. bytesRead += read;
  1364. } while (len != (bytesRead + avail));
  1365. count += bytesRead;
  1366. currLen = curPosn = 0;
  1367. }
  1368. return count;
  1369. }
  1370. };
  1371. private:
  1372. CWSCHelper * master;
  1373. ConstPointerArray &inputRows;
  1374. IXmlWriterExt &xmlWriter;
  1375. IEngineRowAllocator * outputAllocator;
  1376. CriticalSection processExceptionCrit;
  1377. StringBuffer responsePath;
  1378. Owned<CSocketDataProvider> dataProvider;
  1379. PTreeReaderOptions options;
  1380. unsigned remainingMS;
  1381. CCycleTimer mTimer;
  1382. inline void checkRoxieAbortMonitor(IRoxieAbortMonitor * roxieAbortMonitor)
  1383. {
  1384. if (roxieAbortMonitor)
  1385. {
  1386. try
  1387. {
  1388. roxieAbortMonitor->checkForAbort();//throws
  1389. }
  1390. catch (IException *e)
  1391. {
  1392. StringBuffer s;
  1393. throw MakeStringException(ROXIE_ABORT_EVENT, "%s", e->errorMessage(s).str());
  1394. }
  1395. }
  1396. }
  1397. bool checkContentEncodingSupported(const char* encoding)
  1398. {
  1399. if (strieq(encoding, "gzip"))
  1400. return true;
  1401. if (strieq(encoding, "deflate"))
  1402. return true;
  1403. if (strieq(encoding, "x-deflate"))
  1404. return true;
  1405. return false;
  1406. }
  1407. bool checkContentDecoding(const StringBuffer& headers, StringBuffer& content, StringBuffer& contentEncoding)
  1408. {
  1409. if ((headers.length() == 0) || (content.length() == 0))
  1410. return false;
  1411. getHTTPHeader(headers.str(), CONTENT_ENCODING, contentEncoding);
  1412. if (contentEncoding.isEmpty())
  1413. return false;
  1414. if (!checkContentEncodingSupported(contentEncoding.str()))
  1415. throw MakeStringException(-1, "Content-Encoding:%s not supported", contentEncoding.str());
  1416. return true;
  1417. }
  1418. void decodeContent(const char* contentEncodingType, StringBuffer& content)
  1419. {
  1420. #ifdef _USE_ZLIB
  1421. unsigned contentLength = content.length();
  1422. StringBuffer contentDecoded;
  1423. httpInflate((const byte*)content.str(), contentLength, contentDecoded, strieq(contentEncodingType, "gzip"));
  1424. PROGLOG("Content decoded from %d bytes to %d bytes", contentLength, contentDecoded.length());
  1425. content = contentDecoded;
  1426. if (soapTraceLevel > 6 || master->logXML)
  1427. master->logctx.CTXLOG("Content decoded. Original " CONTENT_LENGTH " %d", contentLength);
  1428. #else
  1429. throw MakeStringException(-1, "_USE_ZLIB is required for Content-Encoding:%s", contentEncodingType);
  1430. #endif
  1431. }
  1432. bool checkContentEncoding(const char* httpheaders, StringBuffer& contentEncodingType)
  1433. {
  1434. if (xmlWriter.length() == 0)
  1435. return false;
  1436. getHTTPHeader(httpheaders, CONTENT_ENCODING, contentEncodingType);
  1437. if (contentEncodingType.isEmpty())
  1438. return false;
  1439. if (!checkContentEncodingSupported(contentEncodingType.str()))
  1440. throw MakeStringException(-1, "Content-Encoding:%s not supported", contentEncodingType.str());
  1441. return true;
  1442. }
  1443. ZlibCompressionType getEncodeFormat(const char *name)
  1444. {
  1445. if (strieq(name, "gzip"))
  1446. return ZlibCompressionType::GZIP;
  1447. if (strieq(name, "x-deflate"))
  1448. return ZlibCompressionType::ZLIB_DEFLATE;
  1449. if (strieq(name, "deflate"))
  1450. return ZlibCompressionType::DEFLATE;
  1451. return ZlibCompressionType::GZIP; //already checked above, shouldn't be here
  1452. }
  1453. void encodeContent(const char* contentEncodingType, MemoryBuffer& mb)
  1454. {
  1455. #ifdef _USE_ZLIB
  1456. zlib_deflate(mb, xmlWriter.str(), xmlWriter.length(), GZ_BEST_SPEED, getEncodeFormat(contentEncodingType));
  1457. PROGLOG("Content encoded from %d bytes to %d bytes", xmlWriter.length(), mb.length());
  1458. #else
  1459. throw MakeStringException(-1, "_USE_ZLIB is required for Content-Encoding:%s", contentEncodingType);
  1460. #endif
  1461. }
  1462. void logRequest(bool contentEncoded, StringBuffer& request)
  1463. {
  1464. if (soapTraceLevel > 6 || master->logXML)
  1465. {
  1466. if (!contentEncoded)
  1467. master->logctx.mCTXLOG("%s: request(%s)", master->wscCallTypeText(), request.str());
  1468. else
  1469. master->logctx.mCTXLOG("%s: request(%s), content encoded.", master->wscCallTypeText(), request.str());
  1470. }
  1471. }
  1472. void createHttpRequest(Url &url, StringBuffer &request)
  1473. {
  1474. // Create the HTTP POST request
  1475. if (master->wscType == STsoap)
  1476. request.clear().append("POST ").append(url.path).append(" HTTP/1.1\r\n");
  1477. else
  1478. request.clear().append(master->service).append(" ").append(url.path).append(" HTTP/1.1\r\n");
  1479. const char *httpheaders = master->httpHeaders.get();
  1480. if (httpheaders && *httpheaders)
  1481. {
  1482. if (soapTraceLevel > 6 || master->logXML)
  1483. master->logctx.mCTXLOG("%s: Adding HTTP Headers(%s)", master->wscCallTypeText(), httpheaders);
  1484. request.append(httpheaders);
  1485. }
  1486. if (!httpHeaderBlockContainsHeader(httpheaders, "Authorization"))
  1487. {
  1488. if (url.userPasswordPair.length() > 0)
  1489. {
  1490. StringBuffer authToken;
  1491. JBASE64_Encode(url.userPasswordPair.str(), url.userPasswordPair.length(), authToken, false);
  1492. request.append("Authorization: Basic ").append(authToken).append("\r\n");
  1493. }
  1494. else if (master->authToken.length() > 0)
  1495. {
  1496. request.append("Authorization: Basic ").append(master->authToken).append("\r\n");
  1497. }
  1498. }
  1499. #ifdef _USE_ZLIB
  1500. if (!httpHeaderBlockContainsHeader(httpheaders, ACCEPT_ENCODING))
  1501. request.appendf("%s: gzip, deflate\r\n", ACCEPT_ENCODING);
  1502. #endif
  1503. if (!isEmptyString(master->logctx.queryGlobalId()))
  1504. {
  1505. if (!httpHeaderBlockContainsHeader(httpheaders, master->logctx.queryGlobalIdHttpHeader()))
  1506. request.append(master->logctx.queryGlobalIdHttpHeader()).append(": ").append(master->logctx.queryGlobalId()).append("\r\n");
  1507. if (!isEmptyString(master->logctx.queryLocalId()) && !httpHeaderBlockContainsHeader(httpheaders, master->logctx.queryCallerIdHttpHeader()))
  1508. request.append(master->logctx.queryCallerIdHttpHeader()).append(": ").append(master->logctx.queryLocalId()).append("\r\n"); //our localId is reciever's callerId
  1509. }
  1510. if (master->wscType == STsoap)
  1511. {
  1512. if (master->soapaction.get())
  1513. request.append("SOAPAction: ").append(master->soapaction.get()).append("\r\n");
  1514. if (master->httpHeaders.isEmpty() && master->httpHeaderName.get() && master->httpHeaderValue.get())
  1515. {
  1516. //backward compatibility
  1517. StringBuffer hdr(master->httpHeaderName.get());
  1518. hdr.append(": ").append(master->httpHeaderValue);
  1519. if (soapTraceLevel > 6 || master->logXML)
  1520. master->logctx.mCTXLOG("SOAPCALL: Adding HTTP Header(%s)", hdr.str());
  1521. request.append(hdr.append("\r\n"));
  1522. }
  1523. if (!httpHeaderBlockContainsHeader(httpheaders, "Content-Type"))
  1524. {
  1525. bool isJson = ((master->flags & SOAPFmarkupinfo) && (master->flags & SOAPFjson));
  1526. if (isJson)
  1527. request.append("Content-Type: application/json\r\n");
  1528. else
  1529. request.append("Content-Type: text/xml\r\n");
  1530. }
  1531. }
  1532. else if(master->wscType == SThttp)
  1533. request.append("Accept: ").append(master->acceptType).append("\r\n");
  1534. else
  1535. assertex(false);
  1536. if (master->wscType == STsoap)
  1537. {
  1538. request.append("Host: ").append(url.host).append(":").append(url.port).append("\r\n");//http 1.1
  1539. StringBuffer contentEncodingType;
  1540. if (!checkContentEncoding(httpheaders, contentEncodingType))
  1541. {
  1542. request.append(CONTENT_LENGTH).append(xmlWriter.length()).append("\r\n\r\n");
  1543. request.append(xmlWriter.str());//add SOAP xml content
  1544. logRequest(false, request);
  1545. }
  1546. else
  1547. {
  1548. logRequest(true, request);
  1549. MemoryBuffer encodedContentBuf;
  1550. encodeContent(contentEncodingType.str(), encodedContentBuf);
  1551. request.append(CONTENT_LENGTH).append(encodedContentBuf.length()).append("\r\n\r\n");
  1552. request.append(encodedContentBuf.length(), encodedContentBuf.toByteArray());
  1553. }
  1554. }
  1555. else
  1556. {
  1557. request.append("Host: ").append(url.host);//http 1.1
  1558. if (url.port != 80) //default port?
  1559. request.append(":").append(url.port);
  1560. request.append("\r\n");//http 1.1
  1561. request.append("\r\n");//httpcall
  1562. logRequest(false, request);
  1563. }
  1564. if (master->logMin)
  1565. master->logctx.CTXLOG("%s: request(%s:%u)", master->wscCallTypeText(), url.host.str(), url.port);
  1566. }
  1567. int readHttpResponse(StringBuffer &response, ISocket *socket, bool &keepAlive)
  1568. {
  1569. // Read the POST reply
  1570. // not doesn't *assume* is valid HTTP post format but if it is takes advantage of
  1571. response.clear();
  1572. unsigned bytesRead;
  1573. MemoryAttr buf;
  1574. char *buffer=(char *)buf.allocate(WSCBUFFERSIZE+1);
  1575. int rval = 200;
  1576. keepAlive = false;
  1577. // first read header
  1578. size32_t payloadofs = 0;
  1579. size32_t payloadsize = 0;
  1580. StringBuffer dbgheader, contentEncoding;
  1581. bool chunked = false;
  1582. size32_t read = 0;
  1583. do {
  1584. checkTimeLimitExceeded(&remainingMS);
  1585. checkRoxieAbortMonitor(master->roxieAbortMonitor);
  1586. socket->readtms(buffer+read, 0, WSCBUFFERSIZE-read, bytesRead, MIN(master->timeoutMS,remainingMS));
  1587. checkTimeLimitExceeded(&remainingMS);
  1588. checkRoxieAbortMonitor(master->roxieAbortMonitor);
  1589. read += bytesRead;
  1590. buffer[read] = 0;
  1591. if (strncmp(buffer, "HTTP", 4) == 0) {
  1592. const char *s = strstr(buffer,"\r\n\r\n");
  1593. if (s) {
  1594. payloadofs = (size32_t)(s-buffer+4);
  1595. dbgheader.append(payloadofs,buffer);
  1596. s = strstr(buffer, " ");
  1597. if (s)
  1598. rval = atoi(s+1);
  1599. if (!strstr(buffer,"Transfer-Encoding: chunked"))
  1600. {
  1601. chunked = false;
  1602. s = strstr(buffer,CONTENT_LENGTH);
  1603. if (s) {
  1604. s += strlen(CONTENT_LENGTH);
  1605. if ((size32_t)(s-buffer) < payloadofs)
  1606. payloadsize = atoi(s);
  1607. }
  1608. }
  1609. else
  1610. {
  1611. chunked = true;
  1612. size32_t chunkSize = 0;
  1613. size32_t dataLen = 0;
  1614. char ch;
  1615. /*
  1616. //from http://www.w3.org/Protocols/rfc2616/rfc2616-sec19.html#sec19.4.4
  1617. "19.4.6 Introduction of Transfer-Encoding"
  1618. length := 0
  1619. read chunk-size, chunk-extension (if any) and CRLF
  1620. while (chunk-size > 0)
  1621. {
  1622. read chunk-data and CRLF
  1623. append chunk-data to entity-body
  1624. length := length + chunk-size
  1625. read chunk-size and CRLF
  1626. }
  1627. */
  1628. checkTimeLimitExceeded(&remainingMS);
  1629. dataProvider.setown(new CSocketDataProvider(buffer, payloadofs, read, socket, MIN(master->timeoutMS,remainingMS)));
  1630. dataProvider->getBytes(&ch, 1);
  1631. while (isalpha(ch) || isdigit(ch))
  1632. { //get chunk-size
  1633. if (isdigit(ch))
  1634. chunkSize = (chunkSize*16) + (ch - '0');
  1635. else
  1636. chunkSize = (chunkSize*16) + 10 + (toupper(ch) - 'A');
  1637. dataProvider->getBytes(&ch, 1);
  1638. }
  1639. while (chunkSize && ch != '\n')//consume chunk-extension and CRLF
  1640. dataProvider->getBytes(&ch, 1);
  1641. while (chunkSize)
  1642. {
  1643. if (chunkSize > WSCBUFFERSIZE)
  1644. DBGLOG("SOAPCALL chunk size %d", chunkSize);
  1645. //read chunk data directly into response
  1646. size32_t count = dataProvider->getBytes(response.reserve(dataLen + chunkSize), chunkSize);
  1647. assertex(count == chunkSize);
  1648. dataLen += count;
  1649. response.setLength(dataLen);
  1650. dataProvider->getBytes(&ch, 1);//consume CRLF at end of chunk
  1651. while (ch != '\n')
  1652. dataProvider->getBytes(&ch, 1);
  1653. chunkSize = 0;
  1654. dataProvider->getBytes(&ch, 1);
  1655. while (isalpha(ch) || isdigit(ch))
  1656. { //get next chunk size
  1657. if (isdigit(ch))
  1658. chunkSize = (chunkSize*16) + (ch - '0');
  1659. else
  1660. chunkSize = (chunkSize*16) + 10 + (toupper(ch) - 'A');
  1661. dataProvider->getBytes(&ch, 1);
  1662. }
  1663. while(chunkSize && ch != '\n')//consume chunk-extension and CRLF
  1664. dataProvider->getBytes(&ch, 1);
  1665. }
  1666. }
  1667. break;
  1668. }
  1669. }
  1670. if (bytesRead == 0) // socket closed
  1671. break;
  1672. } while (bytesRead&&(read<WSCBUFFERSIZE));
  1673. if (!chunked)
  1674. {
  1675. if (payloadsize)
  1676. response.ensureCapacity(payloadsize);
  1677. char *payload = buffer;
  1678. if (payloadofs) {
  1679. read -= payloadofs;
  1680. payload += payloadofs;
  1681. if (payloadsize&&(read>payloadsize))
  1682. read = payloadsize;
  1683. }
  1684. if (read)
  1685. response.append(read,payload);
  1686. if (payloadsize) { // read directly into response
  1687. while (read<payloadsize) {
  1688. checkTimeLimitExceeded(&remainingMS);
  1689. checkRoxieAbortMonitor(master->roxieAbortMonitor);
  1690. socket->readtms(response.reserve(payloadsize-read), 0, payloadsize-read, bytesRead, MIN(master->timeoutMS,remainingMS));
  1691. checkTimeLimitExceeded(&remainingMS);
  1692. checkRoxieAbortMonitor(master->roxieAbortMonitor);
  1693. read += bytesRead;
  1694. response.setLength(read);
  1695. if (bytesRead==0) {
  1696. master->logctx.CTXLOG("%sCALL: Warning %sHTTP response terminated prematurely",master->wscType == STsoap ? "SOAP" : "HTTP",chunked?"CHUNKED ":"");
  1697. break; // oops looks likesocket closed early
  1698. }
  1699. }
  1700. }
  1701. else {
  1702. for (;;) {
  1703. checkTimeLimitExceeded(&remainingMS);
  1704. checkRoxieAbortMonitor(master->roxieAbortMonitor);
  1705. socket->readtms(buffer, 0, WSCBUFFERSIZE, bytesRead, MIN(master->timeoutMS,remainingMS));
  1706. checkTimeLimitExceeded(&remainingMS);
  1707. checkRoxieAbortMonitor(master->roxieAbortMonitor);
  1708. if (bytesRead==0)
  1709. break; // rely on socket closing to terminate message
  1710. response.append(bytesRead,buffer);
  1711. }
  1712. }
  1713. }
  1714. if (rval == 200 && response.length() > 0)
  1715. keepAlive = checkKeepAlive(dbgheader);
  1716. if (checkContentDecoding(dbgheader, response, contentEncoding))
  1717. decodeContent(contentEncoding.str(), response);
  1718. if (soapTraceLevel > 6 || master->logXML)
  1719. master->logctx.mCTXLOG("%sCALL: LEN=%d %sresponse(%s%s)", master->wscType == STsoap ? "SOAP" : "HTTP",response.length(),chunked?"CHUNKED ":"", dbgheader.str(), response.str());
  1720. else if (soapTraceLevel > 8)
  1721. master->logctx.mCTXLOG("%sCALL: LEN=%d %sresponse(%s)", master->wscType == STsoap ? "SOAP" : "HTTP",response.length(),chunked?"CHUNKED ":"", response.str()); // not sure this is that useful but...
  1722. return rval;
  1723. }
  1724. inline const char *queryXpathHint(const char *name)
  1725. {
  1726. if (!master->xpathHints)
  1727. return nullptr;
  1728. return master->xpathHints->queryProp(name);
  1729. }
  1730. void processEspResponse(Url &url, StringBuffer &response, ColumnProvider * meta)
  1731. {
  1732. StringBuffer path(responsePath);
  1733. path.append("/Results/Result/");
  1734. const char *tail = nullptr;
  1735. const char *excPath = nullptr;
  1736. if (master->rowTransformer && master->inputpath.get())
  1737. {
  1738. StringBuffer ipath;
  1739. ipath.append("/Envelope/Body/").append(master->inputpath.get());
  1740. tail = queryXpathHint("rowpath");
  1741. if(!tail && (ipath.length() >= path.length()) && (0 == memcmp(ipath.str(), path.str(), path.length())))
  1742. tail = ipath.str() + path.length();
  1743. else
  1744. path.clear().append(ipath);
  1745. excPath = queryXpathHint("excpath");
  1746. }
  1747. else
  1748. tail = "Dataset/Row";
  1749. CMatchCB matchCB(*this, url, tail, meta, excPath, CBExceptionPathExc);
  1750. Owned<IXMLParse> xmlParser = createXMLParse((const void *)response.str(), (unsigned)response.length(), path.str(), matchCB, options, (master->flags&SOAPFusescontents)!=0);
  1751. while (xmlParser->next());
  1752. }
  1753. void processLiteralResponse(Url &url, StringBuffer &response, ColumnProvider * meta)
  1754. {
  1755. StringBuffer path("/Envelope/Body/");
  1756. const char *tail = nullptr;
  1757. const char *excPath = nullptr;
  1758. if(master->rowTransformer && master->inputpath.get())
  1759. {
  1760. path.append(master->inputpath.get());
  1761. tail = queryXpathHint("rowpath");
  1762. excPath = queryXpathHint("excpath");
  1763. }
  1764. CMatchCB matchCB(*this, url, tail, meta, excPath, CBExceptionPathExc);
  1765. Owned<IXMLParse> xmlParser = createXMLParse((const void *)response.str(), (unsigned)response.length(), path.str(), matchCB, options, (master->flags&SOAPFusescontents)!=0);
  1766. while (xmlParser->next());
  1767. }
  1768. void processHttpResponse(Url &url, StringBuffer &response, ColumnProvider * meta)
  1769. {
  1770. const char *path = nullptr;
  1771. const char *tail = nullptr;
  1772. const char *excPath = nullptr;
  1773. if(master->rowTransformer && master->inputpath.get())
  1774. {
  1775. path = master->inputpath.get();
  1776. tail = queryXpathHint("rowpath");
  1777. excPath = queryXpathHint("excpath");
  1778. }
  1779. CMatchCB matchCB(*this, url, tail, meta, excPath, CBExceptionPathExc | CBExceptionPathExcs | CBExceptionPathExcsExc);
  1780. Owned<IXMLParse> xmlParser;
  1781. if (strieq(master->acceptType.str(), "application/json") || (master->flags & SOAPFjson))
  1782. xmlParser.setown(createJSONParse((const void *)response.str(), (unsigned)response.length(), path, matchCB, options, (master->flags&SOAPFusescontents)!=0, true));
  1783. else
  1784. xmlParser.setown(createXMLParse((const void *)response.str(), (unsigned)response.length(), path, matchCB, options, (master->flags&SOAPFusescontents)!=0));
  1785. while (xmlParser->next());
  1786. }
  1787. void processResponse(Url &url, StringBuffer &response, ColumnProvider * meta)
  1788. {
  1789. if (master->wscType == SThttp || master->flags & SOAPFmarkupinfo)
  1790. processHttpResponse(url, response, meta);
  1791. else if (master->flags & SOAPFliteral)
  1792. processLiteralResponse(url, response, meta);
  1793. else if (master->flags & SOAPFencoding)
  1794. processLiteralResponse(url, response, meta);
  1795. else
  1796. processEspResponse(url, response, meta);
  1797. }
  1798. void processException(const Url &url, const void *row, IException *e)
  1799. {
  1800. CriticalBlock block(processExceptionCrit);
  1801. Owned<IException> ne = url.getUrlException(e);
  1802. e->Release();
  1803. if ((master->flags & SOAPFonfail) && master->callHelper)
  1804. {
  1805. try
  1806. {
  1807. RtlDynamicRowBuilder rowBuilder(outputAllocator);
  1808. size32_t sizeGot = master->onFailTransform(rowBuilder, row, ne);
  1809. if (sizeGot > 0)
  1810. master->putRow(rowBuilder.finalizeRowClear(sizeGot));
  1811. }
  1812. catch (IException *te)
  1813. {
  1814. master->setErrorOwn(te);
  1815. }
  1816. }
  1817. else
  1818. master->setErrorOwn(ne.getClear());
  1819. }
  1820. void processException(const Url &url, ConstPointerArray &inputRows, IException *e)
  1821. {
  1822. Owned<IException> ne = url.getUrlException(e);
  1823. e->Release();
  1824. if ((master->flags & SOAPFonfail) && master->callHelper)
  1825. {
  1826. ForEachItemIn(idx, inputRows)
  1827. {
  1828. try
  1829. {
  1830. RtlDynamicRowBuilder rowBuilder(outputAllocator);
  1831. size32_t sizeGot = master->onFailTransform(rowBuilder, inputRows.item(idx), ne);
  1832. if (sizeGot > 0)
  1833. master->putRow(rowBuilder.finalizeRowClear(sizeGot));
  1834. }
  1835. catch (IException *te)
  1836. {
  1837. master->setErrorOwn(te);
  1838. break;
  1839. }
  1840. }
  1841. }
  1842. else
  1843. master->setErrorOwn(ne.getClear());
  1844. }
  1845. inline void checkTimeLimitExceeded(unsigned * remainingMS)
  1846. {
  1847. if (master->isTimeLimitExceeded(remainingMS))
  1848. throw MakeStringException(TIMELIMIT_EXCEEDED, "%sCALL TIMELIMIT(%ums) exceeded", master->wscType == STsoap ? "SOAP" : "HTTP", master->timeLimitMS);
  1849. }
  1850. inline bool checkKeepAlive(StringBuffer& headers)
  1851. {
  1852. size32_t verOffset = httpVerOffset();
  1853. if (headers.length() <= verOffset)
  1854. return false;
  1855. StringBuffer httpVer;
  1856. const char* ptr = headers.str() + verOffset;
  1857. while (*ptr && !isspace(*ptr))
  1858. httpVer.append(*ptr++);
  1859. StringBuffer conHeader;
  1860. getHTTPHeader(ptr, CONNECTION, conHeader);
  1861. return isHttpPersistable(httpVer.str(), conHeader.str());
  1862. }
  1863. public:
  1864. CWSCAsyncFor(CWSCHelper * _master, IXmlWriterExt &_xmlWriter, ConstPointerArray &_inputRows, PTreeReaderOptions _options): xmlWriter(_xmlWriter), inputRows(_inputRows), options(_options)
  1865. {
  1866. master = _master;
  1867. outputAllocator = master->queryOutputAllocator();
  1868. responsePath.append("/Envelope/Body/");
  1869. if (inputRows.ordinality() > 1)
  1870. {
  1871. // can we receive a roxie exceptions for the whole RequestArray?
  1872. // if so, we need to handle them here
  1873. responsePath.append(master->service).append("ResponseArray/");
  1874. }
  1875. responsePath.append(master->service).append("Response");
  1876. remainingMS = 0;
  1877. }
  1878. ~CWSCAsyncFor()
  1879. {
  1880. master->logctx.noteStatistic(StTimeSoapcall, mTimer.elapsedNs());
  1881. }
  1882. IMPLEMENT_IINTERFACE;
  1883. void For(unsigned num,unsigned maxatonce,bool abortFollowingException, bool shuffled)
  1884. {
  1885. CAsyncFor::For(num, maxatonce, abortFollowingException, shuffled);
  1886. }
  1887. void Do(unsigned idx)
  1888. {
  1889. StringBuffer request;
  1890. StringBuffer response;
  1891. unsigned numRetries = 0;
  1892. unsigned retryInterval = 0;
  1893. Url &url = master->urlArray.item(idx);
  1894. createHttpRequest(url, request);
  1895. unsigned startidx = idx;
  1896. while (!master->aborted)
  1897. {
  1898. bool keepAlive = false;
  1899. bool isReused = false;
  1900. PersistentProtocol proto = PersistentProtocol::ProtoTCP;
  1901. SocketEndpoint ep;
  1902. Owned<ISocket> socket;
  1903. CCycleTimer timer;
  1904. for (;;)
  1905. {
  1906. try
  1907. {
  1908. checkTimeLimitExceeded(&remainingMS);
  1909. Url &connUrl = master->proxyUrlArray.empty() ? url : master->proxyUrlArray.item(0);
  1910. ep.set(connUrl.host.get(), connUrl.port);
  1911. if (strieq(url.method, "https"))
  1912. proto = PersistentProtocol::ProtoTLS;
  1913. bool shouldClose = false;
  1914. Owned<ISocket> psock = persistentHandler?persistentHandler->getAvailable(&ep, &shouldClose, proto):nullptr;
  1915. if (psock)
  1916. {
  1917. isReused = true;
  1918. keepAlive = !shouldClose;
  1919. socket.setown(psock.getClear());
  1920. }
  1921. else
  1922. {
  1923. isReused = false;
  1924. keepAlive = true;
  1925. socket.setown(blacklist->connect(connUrl.port, connUrl.host, master->logctx, (unsigned)master->maxRetries, master->timeoutMS, master->roxieAbortMonitor));
  1926. if (proto == PersistentProtocol::ProtoTLS)
  1927. {
  1928. #ifdef _USE_OPENSSL
  1929. Owned<ISecureSocket> ssock = master->createSecureSocket(socket.getClear());
  1930. if (ssock)
  1931. {
  1932. checkTimeLimitExceeded(&remainingMS);
  1933. int status = ssock->secure_connect();
  1934. if (status < 0)
  1935. {
  1936. StringBuffer err;
  1937. err.append("Failure to establish secure connection to ");
  1938. connUrl.getUrlString(err);
  1939. err.append(": returned ").append(status);
  1940. throw makeStringException(0, err.str());
  1941. }
  1942. socket.setown(ssock.getClear());
  1943. }
  1944. #else
  1945. StringBuffer err;
  1946. err.append("Failure to establish secure connection to ");
  1947. connUrl.getUrlString(err);
  1948. err.append(": OpenSSL disabled in build");
  1949. throw makeStringException(0, err.str());
  1950. #endif
  1951. }
  1952. }
  1953. break;
  1954. }
  1955. catch (IException *e)
  1956. {
  1957. if (master->timeLimitExceeded)
  1958. {
  1959. master->logctx.CTXLOG("%sCALL exiting: time limit (%ums) exceeded",master->wscType == STsoap ? "SOAP" : "HTTP", master->timeLimitMS);
  1960. processException(url, inputRows, e);
  1961. return;
  1962. }
  1963. if (e->errorCode() == ROXIE_ABORT_EVENT)
  1964. {
  1965. StringBuffer s;
  1966. master->logctx.CTXLOG("%sCALL exiting: Roxie Abort : %s",master->wscType == STsoap ? "SOAP" : "HTTP",e->errorMessage(s).str());
  1967. throw;
  1968. }
  1969. do
  1970. {
  1971. idx++; // try next socket not blacklisted
  1972. if (idx==master->urlArray.ordinality())
  1973. idx = 0;
  1974. if (idx==startidx)
  1975. {
  1976. StringBuffer s;
  1977. master->logctx.CTXLOG("Exception %s", e->errorMessage(s).str());
  1978. processException(url, inputRows, e);
  1979. return;
  1980. }
  1981. } while (blacklist->blacklisted(url.port, url.host));
  1982. }
  1983. }
  1984. try
  1985. {
  1986. checkTimeLimitExceeded(&remainingMS);
  1987. checkRoxieAbortMonitor(master->roxieAbortMonitor);
  1988. socket->write(request.str(), request.length());
  1989. if (soapTraceLevel > 4)
  1990. master->logctx.CTXLOG("%sCALL: sent request (%s) to %s:%d", master->wscType == STsoap ? "SOAP" : "HTTP",master->service.str(), url.host.str(), url.port);
  1991. checkTimeLimitExceeded(&remainingMS);
  1992. checkRoxieAbortMonitor(master->roxieAbortMonitor);
  1993. bool keepAlive2;
  1994. int rval = readHttpResponse(response, socket, keepAlive2);
  1995. keepAlive = keepAlive && keepAlive2;
  1996. if (soapTraceLevel > 4)
  1997. master->logctx.CTXLOG("%sCALL: received response (%s) from %s:%d", master->wscType == STsoap ? "SOAP" : "HTTP",master->service.str(), url.host.str(), url.port);
  1998. if (rval != 200)
  1999. {
  2000. if (rval == 503)
  2001. throw new ReceivedRoxieException(1001, "Server Too Busy");
  2002. StringBuffer text;
  2003. text.appendf("HTTP error (%d) in processQuery",rval);
  2004. rtlAddExceptionTag(text, "soapresponse", response.str());
  2005. throw MakeStringExceptionDirect(-1, text.str());
  2006. }
  2007. if (response.length() == 0)
  2008. {
  2009. throw MakeStringException(-1, "Zero length response in processQuery");
  2010. }
  2011. checkTimeLimitExceeded(&remainingMS);
  2012. ColumnProvider * meta = (ColumnProvider*)CreateColumnProvider((unsigned)nanoToMilli(timer.elapsedNs()), master->flags&SOAPFencoding?true:false);
  2013. processResponse(url, response, meta);
  2014. delete meta;
  2015. if (persistentHandler)
  2016. {
  2017. if (isReused)
  2018. persistentHandler->doneUsing(socket, keepAlive);
  2019. else if (keepAlive)
  2020. persistentHandler->add(socket, &ep, proto);
  2021. }
  2022. break;
  2023. }
  2024. catch (IReceivedRoxieException *e)
  2025. {
  2026. if (persistentHandler && isReused)
  2027. persistentHandler->doneUsing(socket, false);
  2028. // server busy ... Sleep and retry
  2029. if (e->errorCode() == 1001)
  2030. {
  2031. if (retryInterval)
  2032. {
  2033. int sleepTime = retryInterval + getRandom() % retryInterval;
  2034. master->logctx.CTXLOG("Server busy (1001), sleeping for %d milliseconds before retry", sleepTime);
  2035. Sleep(sleepTime);
  2036. retryInterval = (retryInterval*2 >= 10000)? 10000: retryInterval*2;
  2037. }
  2038. else
  2039. {
  2040. master->logctx.CTXLOG("Server busy (1001), retrying");
  2041. retryInterval = 10;
  2042. }
  2043. e->Release();
  2044. }
  2045. else
  2046. {
  2047. // other roxie exception ...
  2048. master->logctx.CTXLOG("Exiting: received Roxie exception");
  2049. if (e->errorRow())
  2050. processException(url, e->errorRow(), e);
  2051. else
  2052. processException(url, inputRows, e);
  2053. break;
  2054. }
  2055. }
  2056. catch (IException *e)
  2057. {
  2058. if (persistentHandler && isReused)
  2059. persistentHandler->doneUsing(socket, false);
  2060. if (master->timeLimitExceeded)
  2061. {
  2062. processException(url, inputRows, e);
  2063. master->logctx.CTXLOG("%sCALL exiting: time limit (%ums) exceeded", master->wscType == STsoap ? "SOAP" : "HTTP", master->timeLimitMS);
  2064. break;
  2065. }
  2066. if (e->errorCode() == ROXIE_ABORT_EVENT)
  2067. {
  2068. StringBuffer s;
  2069. master->logctx.CTXLOG("%sCALL exiting: Roxie Abort : %s",master->wscType == STsoap ? "SOAP" : "HTTP",e->errorMessage(s).str());
  2070. throw;
  2071. }
  2072. // other IException ... retry up to maxRetries
  2073. StringBuffer s;
  2074. master->logctx.CTXLOG("Exception %s", e->errorMessage(s).str());
  2075. if (numRetries >= master->maxRetries)
  2076. {
  2077. // error affects all inputRows
  2078. master->logctx.CTXLOG("Exiting: maxRetries %d exceeded", master->maxRetries);
  2079. processException(url, inputRows, e);
  2080. break;
  2081. }
  2082. numRetries++;
  2083. master->logctx.CTXLOG("Retrying: attempt %d of %d", numRetries, master->maxRetries);
  2084. e->Release();
  2085. }
  2086. catch (std::exception & es)
  2087. {
  2088. if (persistentHandler && isReused)
  2089. persistentHandler->doneUsing(socket, false);
  2090. if(dynamic_cast<std::bad_alloc *>(&es))
  2091. throw MakeStringException(-1, "std::exception: out of memory (std::bad_alloc) in CWSCAsyncFor processQuery");
  2092. throw MakeStringException(-1, "std::exception: standard library exception (%s) in CWSCAsyncFor processQuery",es.what());
  2093. }
  2094. catch (...)
  2095. {
  2096. if (persistentHandler && isReused)
  2097. persistentHandler->doneUsing(socket, false);
  2098. throw MakeStringException(-1, "Unknown exception in processQuery");
  2099. }
  2100. }
  2101. }
  2102. inline virtual const char *getResponsePath() { return responsePath; }
  2103. inline virtual ConstPointerArray & getInputRows() { return inputRows; }
  2104. inline virtual CWSCHelper * getMaster() { return master; }
  2105. inline virtual IEngineRowAllocator * getOutputAllocator() { return outputAllocator; }
  2106. };
  2107. IWSCAsyncFor * createWSCAsyncFor(CWSCHelper * _master, IXmlWriterExt &_xmlWriter, ConstPointerArray &_inputRows, PTreeReaderOptions _options)
  2108. {
  2109. if (!persistentInitDone)
  2110. initPersistentHandler();
  2111. return new CWSCAsyncFor(_master, _xmlWriter, _inputRows, _options);
  2112. }