roxiehelper.cpp 39 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377
  1. /*##############################################################################
  2. Copyright (C) 2011 HPCC Systems.
  3. All rights reserved. This program is free software: you can redistribute it and/or modify
  4. it under the terms of the GNU Affero General Public License as
  5. published by the Free Software Foundation, either version 3 of the
  6. License, or (at your option) any later version.
  7. This program is distributed in the hope that it will be useful,
  8. but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. GNU Affero General Public License for more details.
  11. You should have received a copy of the GNU Affero General Public License
  12. along with this program. If not, see <http://www.gnu.org/licenses/>.
  13. ############################################################################## */
  14. #include "jexcept.hpp"
  15. #include "roxiehelper.hpp"
  16. #include "roxielmj.hpp"
  17. #include "jmisc.hpp"
  18. #include "jfile.hpp"
  19. #include "mpbase.hpp"
  20. #include "dafdesc.hpp"
  21. #include "dadfs.hpp"
  22. unsigned traceLevel = 0;
  23. //OwnedRowArray
  24. void OwnedRowArray::clear()
  25. {
  26. ForEachItemIn(idx, buff)
  27. ReleaseRoxieRow(buff.item(idx));
  28. buff.kill();
  29. }
  30. void OwnedRowArray::clearPart(aindex_t from, aindex_t to)
  31. {
  32. aindex_t idx;
  33. for(idx = from; idx < to; idx++)
  34. ReleaseRoxieRow(buff.item(idx));
  35. buff.removen(from, to-from);
  36. }
  37. void OwnedRowArray::replace(const void * row, aindex_t pos)
  38. {
  39. ReleaseRoxieRow(buff.item(pos));
  40. buff.replace(row, pos);
  41. }
  42. //=========================================================================================
  43. //CRHRollingCacheElem copied/modified from THOR
  44. CRHRollingCacheElem::CRHRollingCacheElem()
  45. {
  46. row = NULL;
  47. cmp = INT_MIN;
  48. }
  49. CRHRollingCacheElem::~CRHRollingCacheElem()
  50. {
  51. if (row)
  52. ReleaseRoxieRow(row);
  53. }
  54. void CRHRollingCacheElem::set(const void *_row)
  55. {
  56. if (row)
  57. ReleaseRoxieRow(row);
  58. row = _row;
  59. }
  60. //CRHRollingCache copied/modified from THOR CRollingCache
  61. CRHRollingCache::~CRHRollingCache()
  62. {
  63. loop
  64. {
  65. CRHRollingCacheElem *e = cache.dequeue();
  66. if (!e)
  67. break;
  68. delete e;
  69. }
  70. }
  71. void CRHRollingCache::init(IInputBase *_in, unsigned _max)
  72. {
  73. max = _max;
  74. in =_in;
  75. cache.clear();
  76. cache.reserve(max);
  77. eos = false;
  78. while (cache.ordinality()<max/2)
  79. cache.enqueue(NULL);
  80. while (!eos && (cache.ordinality()<max))
  81. advance();
  82. }
  83. #ifdef TRACEROLLING
  84. void CRHRollingCache::PrintCache()
  85. {
  86. for (unsigned i = 0;i<max;i++) {
  87. CRHRollingCacheElem *e = cache.item(i);
  88. if (i==0)
  89. DBGLOG("RC==============================");
  90. int ii = 0;
  91. if (e && e->row)
  92. ii = isalpha(*((char*)e->row)) ? 0 : 4;
  93. chas sz[100];
  94. sprintf(sz,"%c%d: %s",(i==max/2)?'>':' ',i,e?(const char *)e->row+ii:"-----");
  95. for (int xx=0; sz[xx] != NULL; xx++)
  96. {
  97. if (!isprint(sz[xx]))
  98. {
  99. sz[xx] = NULL;
  100. break;
  101. }
  102. }
  103. DBGLOG(sz);
  104. if (i == max-1)
  105. DBGLOG("RC==============================");
  106. }
  107. }
  108. #endif
  109. CRHRollingCacheElem * CRHRollingCache::mid(int rel)
  110. {
  111. return cache.item((max/2)+rel); // relies on unsigned wrap
  112. }
  113. void CRHRollingCache::advance()
  114. {
  115. CRHRollingCacheElem *e = (cache.ordinality()==max)?cache.dequeue():NULL; //cache full, remove head element
  116. if (!eos) {
  117. if (!e)
  118. e = new CRHRollingCacheElem();
  119. const void * nextrec = in->nextInGroup();//get row from CRHCRHDualCache::cOut, which gets from CRHCRHDualCache, which gets from input
  120. if (!nextrec)
  121. nextrec = in->nextInGroup();
  122. if (nextrec) {
  123. e->set(nextrec);
  124. cache.enqueue(e);
  125. #ifdef TRACEROLLING
  126. PrintCache();
  127. #endif
  128. return;
  129. }
  130. else
  131. eos = true;
  132. }
  133. delete e;
  134. cache.enqueue(NULL);
  135. #ifdef TRACEROLLING
  136. PrintCache();
  137. #endif
  138. }
  139. //=========================================================================================
  140. //CRHDualCache copied from THOR, and modified to get input from IInputBase instead
  141. //of IReadSeqVar and to manage rows as OwnedRoxieRow types
  142. CRHDualCache::CRHDualCache()
  143. {
  144. strm1 = NULL;
  145. strm2 = NULL;
  146. }
  147. CRHDualCache::~CRHDualCache()
  148. {
  149. ::Release(strm1);
  150. ::Release(strm2);
  151. loop
  152. {
  153. CRHRollingCacheElem *e = cache.dequeue();
  154. if (!e)
  155. break;
  156. delete e;
  157. }
  158. }
  159. void CRHDualCache::init(IInputBase * _in)
  160. {
  161. in = _in;
  162. cache.clear();
  163. eos = false;
  164. base = 0;
  165. posL = 0;
  166. posR = 0;
  167. strm1 = new cOut(this,posL);
  168. strm2 = new cOut(this,posR) ;
  169. }
  170. #ifdef TRACEROLLING
  171. void CRHDualCache::PrintCache()
  172. {
  173. for (unsigned i = 0;i<cache.ordinality();i++) {
  174. CRHRollingCacheElem *e = cache.item(i);
  175. if (i==0)
  176. {
  177. DBGLOG("DC=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-BASE:%d,posL=%d,posR=%d %s", base, posL,posR, eos?"EOS":"");
  178. }
  179. DBGLOG("%c%d: %s",(i==cache.ordinality()/2)?'>':' ',i,e?(const char *)e->row:"-----");
  180. if (i == cache.ordinality()-1)
  181. DBGLOG("DC=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-");
  182. }
  183. }
  184. #endif
  185. bool CRHDualCache::get(unsigned n, CRHRollingCacheElem *&out)
  186. {
  187. // take off any no longer needed
  188. CRHRollingCacheElem *e=NULL;
  189. while ((base<posL) && (base<posR)) {
  190. delete e;
  191. e = cache.dequeue();
  192. base++;
  193. }
  194. assertex(n>=base);
  195. while (!eos && (n-base>=cache.ordinality())) //element already in cache?
  196. {
  197. if (!e)
  198. e = new CRHRollingCacheElem;
  199. const void * nextrec = in->nextInGroup(); //get from activity
  200. if (!nextrec)
  201. nextrec = in->nextInGroup();
  202. if (!nextrec) {
  203. eos = true;
  204. break;
  205. }
  206. e->set(nextrec);
  207. cache.enqueue(e);
  208. e = NULL;
  209. #ifdef TRACEROLLING
  210. PrintCache();
  211. #endif
  212. }
  213. delete e;
  214. if (n-base>=cache.ordinality())
  215. return false;
  216. out = cache.item(n-base);
  217. return true;
  218. }
  219. size32_t CRHDualCache::getRecordSize(const void *ptr)
  220. {
  221. return in->queryOutputMeta()->getRecordSize(ptr);
  222. }
  223. size32_t CRHDualCache::getFixedSize() const
  224. {
  225. return in->queryOutputMeta()->getFixedSize();
  226. }
  227. CRHDualCache::cOut::cOut(CRHDualCache *_parent, unsigned &_pos)
  228. : pos(_pos)
  229. {
  230. parent = _parent;
  231. stopped = false;
  232. }
  233. const void * CRHDualCache::cOut::nextInGroup()
  234. {
  235. CRHRollingCacheElem *e;
  236. if (stopped || !parent->get(pos,e))
  237. return NULL; //no more data
  238. LinkRoxieRow(e->row);
  239. pos++;
  240. return e->row;
  241. }
  242. IOutputMetaData * CRHDualCache::cOut::queryOutputMeta() const
  243. {
  244. return parent->input()->queryOutputMeta();
  245. }
  246. void CRHDualCache::cOut::stop()
  247. {
  248. pos = (unsigned)-1;
  249. stopped = true;
  250. }
  251. //=========================================================================================
  252. IRHLimitedCompareHelper *createRHLimitedCompareHelper()
  253. {
  254. return new CRHLimitedCompareHelper();
  255. }
  256. //CRHLimitedCompareHelper
  257. void CRHLimitedCompareHelper::init( unsigned _atmost,
  258. IInputBase *_in,
  259. ICompare * _cmp,
  260. ICompare * _limitedcmp )
  261. {
  262. atmost = _atmost;
  263. cache.setown(new CRHRollingCache());
  264. cache->init(_in,(atmost+1)*2);
  265. cmp = _cmp;
  266. limitedcmp = _limitedcmp;
  267. }
  268. bool CRHLimitedCompareHelper::getGroup(OwnedRowArray &group, const void *left)
  269. {
  270. // this could be improved!
  271. // first move 'mid' forwards until mid>=left
  272. int low = 0;
  273. loop
  274. {
  275. CRHRollingCacheElem * r = cache->mid(0);
  276. if (!r)
  277. break; // hit eos
  278. int c = cmp->docompare(left,r->row);
  279. if (c == 0)
  280. {
  281. r->cmp = limitedcmp->docompare(left,r->row);
  282. if (r->cmp <= 0)
  283. break;
  284. }
  285. else if (c < 0)
  286. {
  287. r->cmp = -1;
  288. break;
  289. }
  290. else
  291. r->cmp = 1;
  292. cache->advance();
  293. if (cache->mid(low-1)) // only if haven't hit start
  294. low--;
  295. }
  296. // now scan back (note low should be filled even at eos)
  297. loop
  298. {
  299. CRHRollingCacheElem * pr = cache->mid(low-1);
  300. if (!pr)
  301. break; // hit start
  302. int c = cmp->docompare(left,pr->row);
  303. if (c == 0)
  304. {
  305. pr->cmp = limitedcmp->docompare(left,pr->row);
  306. if (pr->cmp==1)
  307. break;
  308. }
  309. else
  310. {
  311. pr->cmp = 1;
  312. break;
  313. }
  314. low--;
  315. }
  316. int high = 0;
  317. if (cache->mid(0)) // check haven't already hit end
  318. {
  319. // now scan fwd
  320. loop
  321. {
  322. high++;
  323. CRHRollingCacheElem * nr = cache->mid(high);
  324. if (!nr)
  325. break;
  326. int c = cmp->docompare(left,nr->row);
  327. if (c==0)
  328. {
  329. nr->cmp = limitedcmp->docompare(left,nr->row);
  330. if (nr->cmp==-1)
  331. break;
  332. }
  333. else
  334. {
  335. nr->cmp = -1;
  336. break;
  337. }
  338. }
  339. }
  340. while (high-low>(int)atmost)
  341. {
  342. int vl = iabs(cache->mid(low)->cmp);
  343. int vh = iabs(cache->mid(high-1)->cmp);
  344. int v;
  345. if (vl==0)
  346. {
  347. if (vh==0) // both ends equal
  348. return false;
  349. v = vh;
  350. }
  351. else if (vh==0)
  352. v = vl;
  353. else
  354. v = imin(vl,vh);
  355. // remove worst match from either end
  356. while ((low<high)&&(iabs(cache->mid(low)->cmp)==v))
  357. low++;
  358. while ((low<high)&&(iabs(cache->mid(high-1)->cmp)==v))
  359. high--;
  360. if (low>=high)
  361. return true; // couldn't make group;
  362. }
  363. for (int i=low;i<high;i++)
  364. {
  365. CRHRollingCacheElem *r = cache->mid(i);
  366. LinkRoxieRow(r->row);
  367. group.append(r->row);
  368. }
  369. return group.ordinality()>0;
  370. }
  371. //=========================================================================================
  372. CSafeSocket::CSafeSocket(ISocket *_sock)
  373. {
  374. httpMode = false;
  375. sent = 0;
  376. heartbeat = false;
  377. sock.setown(_sock);
  378. }
  379. CSafeSocket::~CSafeSocket()
  380. {
  381. sock.clear();
  382. ForEachItemIn(idx, queued)
  383. {
  384. free(queued.item(idx));
  385. }
  386. queued.kill();
  387. lengths.kill();
  388. }
  389. unsigned CSafeSocket::bytesOut() const
  390. {
  391. return sent;
  392. }
  393. bool CSafeSocket::checkConnection() const
  394. {
  395. if (sock)
  396. return sock->check_connection();
  397. else
  398. return false;
  399. }
  400. size32_t CSafeSocket::write(const void *buf, size32_t size, bool takeOwnership)
  401. {
  402. CriticalBlock c(crit); // NOTE: anyone needing to write multiple times without interleave should have already locked this. We lock again for the simple cases.
  403. try
  404. {
  405. if (httpMode)
  406. {
  407. if (!takeOwnership)
  408. {
  409. void *newbuf = malloc(size);
  410. if (!newbuf)
  411. throw MakeStringException(ROXIE_INTERNAL_ERROR, "Out of memory in CSafeSocket::write (requesting %d bytes)", size);
  412. memcpy(newbuf, buf, size);
  413. buf = newbuf;
  414. }
  415. queued.append((void *) buf);
  416. lengths.append(size);
  417. return size;
  418. }
  419. else
  420. {
  421. sent += size;
  422. size32_t written = sock->write(buf, size);
  423. if (takeOwnership)
  424. free((void *) buf);
  425. return written;
  426. }
  427. }
  428. catch(...)
  429. {
  430. heartbeat = false;
  431. if (takeOwnership)
  432. free((void *) buf);
  433. throw;
  434. }
  435. }
  436. bool CSafeSocket::readBlock(MemoryBuffer &ret, unsigned timeout, unsigned maxBlockSize)
  437. {
  438. // MORE - this is still not good enough as we could get someone else's block if there are multiple input datasets
  439. CriticalBlock c(crit);
  440. try
  441. {
  442. unsigned bytesRead;
  443. unsigned len;
  444. try
  445. {
  446. sock->read(&len, sizeof (len), sizeof (len), bytesRead, timeout);
  447. }
  448. catch (IJSOCK_Exception *E)
  449. {
  450. if (E->errorCode()==JSOCKERR_graceful_close)
  451. {
  452. E->Release();
  453. return false;
  454. }
  455. throw;
  456. }
  457. assertex(bytesRead == sizeof(len));
  458. _WINREV(len);
  459. if (len & 0x80000000)
  460. len ^= 0x80000000;
  461. if (len > maxBlockSize)
  462. throw MakeStringException(ROXIE_DATA_ERROR, "Maximum block size (%d bytes) exceeded (missing length prefix?)", maxBlockSize);
  463. if (len)
  464. {
  465. unsigned bytesRead;
  466. sock->read(ret.reserveTruncate(len), len, len, bytesRead, timeout);
  467. }
  468. return len != 0;
  469. }
  470. catch(...)
  471. {
  472. heartbeat = false;
  473. throw;
  474. }
  475. }
  476. bool CSafeSocket::readBlock(StringBuffer &ret, unsigned timeout, HttpHelper *pHttpHelper, bool &continuationNeeded, bool &isStatus, unsigned maxBlockSize)
  477. {
  478. continuationNeeded = false;
  479. isStatus = false;
  480. CriticalBlock c(crit);
  481. try
  482. {
  483. unsigned bytesRead;
  484. unsigned len = 0;
  485. try
  486. {
  487. sock->read(&len, sizeof (len), sizeof (len), bytesRead, timeout);
  488. }
  489. catch (IJSOCK_Exception *E)
  490. {
  491. if (E->errorCode()==JSOCKERR_graceful_close)
  492. {
  493. E->Release();
  494. return false;
  495. }
  496. throw;
  497. }
  498. assertex(bytesRead == sizeof(len));
  499. unsigned left = 0;
  500. char *buf;
  501. if (pHttpHelper != NULL && strncmp((char *)&len, "POST", 4) == 0)
  502. {
  503. #define MAX_HTTP_HEADERSIZE 8000
  504. char header[MAX_HTTP_HEADERSIZE + 1]; // allow room for \0
  505. sock->read(header, 1, MAX_HTTP_HEADERSIZE, bytesRead, timeout);
  506. header[bytesRead] = 0;
  507. char *payload = strstr(header, "\r\n\r\n");
  508. if (payload)
  509. {
  510. *payload = 0;
  511. payload += 4;
  512. char *str;
  513. // capture authentication token
  514. if ((str = strstr(header, "Authorization: Basic ")) != NULL)
  515. {
  516. char *authToken = str + strlen("Authorization: Basic ");
  517. str = strchr(authToken, '\r');
  518. if (str)
  519. {
  520. *str = 0;
  521. pHttpHelper->setAuthToken(authToken);
  522. *str = '\r'; // need to remove the 0 so other str comparisons will work
  523. }
  524. }
  525. // determine payload length
  526. str = strstr(header, "Content-Length: ");
  527. if (str)
  528. {
  529. len = atoi(str + strlen("Content-Length: "));
  530. buf = ret.reserveTruncate(len);
  531. left = len - (bytesRead - (payload - header));
  532. if (len > left)
  533. memcpy(buf, payload, len - left);
  534. }
  535. else
  536. left = len = 0;
  537. }
  538. else
  539. left = len = 0;
  540. pHttpHelper->setIsHttp(true);
  541. if (!len)
  542. throw MakeStringException(ROXIE_DATA_ERROR, "Badly formed HTTP header");
  543. }
  544. else if (strnicmp((char *)&len, "STAT", 4) == 0)
  545. isStatus = true;
  546. else
  547. {
  548. _WINREV(len);
  549. if (len & 0x80000000)
  550. {
  551. len ^= 0x80000000;
  552. continuationNeeded = true;
  553. }
  554. if (len > maxBlockSize)
  555. throw MakeStringException(ROXIE_DATA_ERROR, "Maximum block size (%d bytes) exceeded (missing length prefix?)", maxBlockSize);
  556. left = len;
  557. if (len)
  558. buf = ret.reserveTruncate(len);
  559. }
  560. if (left)
  561. {
  562. sock->read(buf + (len - left), left, left, bytesRead, timeout);
  563. }
  564. return len != 0;
  565. }
  566. catch (...)
  567. {
  568. heartbeat = false;
  569. throw;
  570. }
  571. }
  572. void CSafeSocket::setHttpMode(const char *queryName, bool arrayMode)
  573. {
  574. CriticalBlock c(crit); // Should not be needed
  575. httpMode = true;
  576. heartbeat = false;
  577. assertex(xmlhead.length()==0 && xmltail.length()==0);
  578. xmlhead.append(
  579. "<?xml version=\"1.0\" encoding=\"UTF-8\"?>"
  580. "<soap:Envelope xmlns:soap=\"http://schemas.xmlsoap.org/soap/envelope/\">"
  581. "<soap:Body>");
  582. if (arrayMode)
  583. {
  584. xmlhead.append("<").append(queryName).append("ResponseArray>");
  585. xmltail.append("</").append(queryName).append("ResponseArray>");
  586. }
  587. xmltail.append("</soap:Body></soap:Envelope>");
  588. }
  589. void CSafeSocket::setHeartBeat()
  590. {
  591. CriticalBlock c(crit);
  592. heartbeat = true;
  593. }
  594. bool CSafeSocket::sendHeartBeat(const IContextLogger &logctx)
  595. {
  596. if (heartbeat)
  597. {
  598. StringBuffer s;
  599. bool rval = false;
  600. unsigned replyLen = 5;
  601. unsigned rev = replyLen | 0x80000000; // make it a blocked msg
  602. _WINREV(rev);
  603. s.append(sizeof(rev), (char *) &rev);
  604. s.append('H');
  605. rev = (unsigned) time(NULL);
  606. _WINREV(rev);
  607. s.append(sizeof(rev), (char *) &rev);
  608. try
  609. {
  610. CriticalBlock c(crit);
  611. sock->write(s.str(), replyLen + sizeof(rev));
  612. rval = true;
  613. }
  614. catch (IException * E)
  615. {
  616. StringBuffer error("HeartBeat write failed with exception: ");
  617. E->errorMessage(error);
  618. logctx.CTXLOG("%s", error.str());
  619. E->Release();
  620. }
  621. catch(...)
  622. {
  623. logctx.CTXLOG("HeartBeat write failed (Unknown exception)");
  624. }
  625. return rval;
  626. }
  627. else
  628. return true;
  629. };
  630. void CSafeSocket::flush()
  631. {
  632. if (httpMode)
  633. {
  634. unsigned length = xmlhead.length() + xmltail.length();
  635. ForEachItemIn(idx, lengths)
  636. length += lengths.item(idx);
  637. StringBuffer header;
  638. header.append("HTTP/1.0 200 OK\r\n");
  639. header.append("Content-Type: text/xml\r\n");
  640. header.append("Content-Length: ").append(length).append("\r\n\r\n");
  641. CriticalBlock c(crit); // should not be anyone writing but better to be safe
  642. if (traceLevel > 5)
  643. DBGLOG("Writing HTTP header length %d to HTTP socket", header.length());
  644. sock->write(header.str(), header.length());
  645. sent += header.length();
  646. if (traceLevel > 5)
  647. DBGLOG("Writing xml head length %d to HTTP socket", xmlhead.length());
  648. sock->write(xmlhead.str(), xmlhead.length());
  649. sent += xmlhead.length();
  650. ForEachItemIn(idx2, queued)
  651. {
  652. unsigned length = lengths.item(idx2);
  653. if (traceLevel > 5)
  654. DBGLOG("Writing block length %d to HTTP socket", length);
  655. sock->write(queued.item(idx2), length);
  656. sent += length;
  657. }
  658. if (traceLevel > 5)
  659. DBGLOG("Writing xml tail length %d to HTTP socket", xmltail.length());
  660. sock->write(xmltail.str(), xmltail.length());
  661. sent += xmltail.length();
  662. if (traceLevel > 5)
  663. DBGLOG("Total written %d", sent);
  664. }
  665. }
  666. void CSafeSocket::sendException(const char *source, unsigned code, const char *message, bool isBlocked, const IContextLogger &logctx)
  667. {
  668. try
  669. {
  670. FlushingStringBuffer response(this, isBlocked, true, false, httpMode, logctx);
  671. response.startDataset("Exception", NULL, (unsigned) -1);
  672. response.appendf("<Source>%s</Source><Code>%d</Code>", source, code);
  673. response.append("<Message>");
  674. response.encodeXML(message);
  675. response.append("</Message>");
  676. }
  677. catch(IException *EE)
  678. {
  679. StringBuffer error("While reporting exception: ");
  680. EE->errorMessage(error);
  681. logctx.CTXLOG("%s", error.str());
  682. EE->Release();
  683. }
  684. #ifndef _DEBUG
  685. catch(...) {}
  686. #endif
  687. }
  688. //==============================================================================================================
  689. #define RESULT_FLUSH_THRESHOLD 10000u
  690. #ifdef _DEBUG
  691. #define SOAP_SPLIT_THRESHOLD 100u
  692. #define SOAP_SPLIT_RESERVE 200u
  693. #else
  694. #define SOAP_SPLIT_THRESHOLD 64000u
  695. #define SOAP_SPLIT_RESERVE 65535u
  696. #endif
  697. interface IXmlStreamFlusher;
  698. //==============================================================================================================
  699. bool FlushingStringBuffer::needsFlush(bool closing)
  700. {
  701. if (isBlocked || closing) // can't flush unblocked. MORE - may need to break it up though....
  702. {
  703. size32_t len = s.length() - emptyLength;
  704. return len > (closing ? 0 : RESULT_FLUSH_THRESHOLD);
  705. }
  706. else
  707. return false; // MORE - if there is a single result, it can be flushed (actually, can flush anytime all prior results have been closed)
  708. }
  709. void FlushingStringBuffer::startBlock()
  710. {
  711. size32_t len = 0;
  712. s.clear();
  713. if (!isSoap)
  714. append(sizeof(size32_t), (char *) &len);
  715. rowCount = 0;
  716. if (isBlocked)
  717. {
  718. s.append('R');
  719. unsigned rev = sequenceNumber++;
  720. _WINREV(rev);
  721. s.append(sizeof(rev), (char *) &rev);
  722. rev = rowCount;
  723. _WINREV(rev);
  724. s.append(sizeof(rev), (char *) &rev); // NOTE - need to patch up later. At this point it is 0.
  725. s.append(strlen(name)+1, name);
  726. }
  727. emptyLength = s.length();
  728. // MORE - should probably pre-reserve string at RESULT_FLUSH_THRESHOLD plus a bit
  729. }
  730. FlushingStringBuffer::FlushingStringBuffer(SafeSocket *_sock, bool _isBlocked, bool _isXml, bool _isRaw, bool _isHttp, const IContextLogger &_logctx)
  731. : sock(_sock), isBlocked(_isBlocked), isXml(_isXml), isRaw(_isRaw), isHttp(_isHttp), logctx(_logctx)
  732. {
  733. sequenceNumber = 0;
  734. rowCount = 0;
  735. isSoap = false;
  736. isEmpty = true;
  737. extend = false;
  738. trim = false;
  739. emptyLength = 0;
  740. tagClosed = true;
  741. }
  742. FlushingStringBuffer::~FlushingStringBuffer()
  743. {
  744. try
  745. {
  746. flush(true);
  747. }
  748. catch (IException *E)
  749. {
  750. // Ignore any socket errors that we get at termination - nothing we can do about them anyway...
  751. E->Release();
  752. }
  753. catch(...)
  754. {
  755. }
  756. ForEachItemIn(idx, queued)
  757. {
  758. free(queued.item(idx));
  759. }
  760. }
  761. //void FlushingStringBuffer::append(char data)
  762. //{
  763. //append(1, &data);
  764. //}
  765. void FlushingStringBuffer::append(const char *data)
  766. {
  767. append(strlen(data), data);
  768. }
  769. void FlushingStringBuffer::append(unsigned len, const char *data)
  770. {
  771. try
  772. {
  773. CriticalBlock b(crit);
  774. s.append(len, data);
  775. }
  776. catch (IException *E)
  777. {
  778. logctx.logOperatorException(E, __FILE__, __LINE__, NULL);
  779. throw;
  780. }
  781. }
  782. void FlushingStringBuffer::appendf(const char *format, ...)
  783. {
  784. StringBuffer t;
  785. va_list args;
  786. va_start(args, format);
  787. t.valist_appendf(format, args);
  788. va_end(args);
  789. append(t.length(), t.str());
  790. }
  791. void FlushingStringBuffer::encodeXML(const char *x, unsigned flags, unsigned len, bool utf8)
  792. {
  793. StringBuffer t;
  794. ::encodeXML(x, t, flags, len, utf8);
  795. append(t.length(), t.str());
  796. }
  797. void FlushingStringBuffer::flushXML(StringBuffer &current, bool isClosing)
  798. {
  799. CriticalBlock b(crit);
  800. if (isSoap) // we don't do any chunking for non-SOAP yet
  801. {
  802. if (isClosing || current.length() > SOAP_SPLIT_THRESHOLD)
  803. {
  804. if (s.length())
  805. {
  806. lengths.append(s.length());
  807. queued.append(s.detach());
  808. s.ensureCapacity(SOAP_SPLIT_RESERVE);
  809. }
  810. lengths.append(current.length());
  811. queued.append(current.detach());
  812. if (!isClosing)
  813. current.ensureCapacity(SOAP_SPLIT_RESERVE);
  814. }
  815. }
  816. else if (isClosing)
  817. append(current.length(), current.str());
  818. }
  819. void FlushingStringBuffer::flush(bool closing)
  820. {
  821. CriticalBlock b(crit);
  822. if (closing && tail.length())
  823. {
  824. s.append(tail);
  825. tail.clear();
  826. }
  827. if (isSoap)
  828. {
  829. if (!closing)
  830. {
  831. unsigned length = s.length();
  832. if (length > SOAP_SPLIT_THRESHOLD)
  833. {
  834. queued.append(s.detach());
  835. lengths.append(length);
  836. s.ensureCapacity(SOAP_SPLIT_RESERVE);
  837. }
  838. }
  839. }
  840. else if (needsFlush(closing))
  841. {
  842. // MORE - if not blocked we can get very large blocks.
  843. assertex(s.length() > sizeof(size32_t));
  844. unsigned replyLen = s.length() - sizeof(size32_t);
  845. unsigned revLen = replyLen | ((isBlocked)?0x80000000:0);
  846. _WINREV(revLen);
  847. if (logctx.queryTraceLevel() > 1)
  848. {
  849. if (isBlocked)
  850. logctx.CTXLOG("Sending reply: Sending blocked %s data", (isXml)?"xml":"raw");
  851. else
  852. #ifdef _DEBUG
  853. logctx.CTXLOG("Sending reply length %d: %.1024s", (unsigned) (s.length() - sizeof(size32_t)), s.str()+sizeof(size32_t));
  854. #else
  855. logctx.CTXLOG("Sending reply length %d: %.40s", (unsigned) (s.length() - sizeof(size32_t)), s.str()+sizeof(size32_t));
  856. #endif
  857. }
  858. *(size32_t *) s.str() = revLen;
  859. if (isBlocked)
  860. {
  861. unsigned revRowCount = rowCount;
  862. _WINREV(revRowCount);
  863. *(size32_t *) (s.str()+9) = revRowCount;
  864. }
  865. if (logctx.queryTraceLevel() > 9)
  866. logctx.CTXLOG("writing block size %d to socket", replyLen);
  867. try
  868. {
  869. if (sock)
  870. {
  871. if (isHttp)
  872. sock->write(s.str()+sizeof(revLen), replyLen);
  873. else
  874. sock->write(s.str(), replyLen + sizeof(revLen));
  875. }
  876. else
  877. fwrite(s.str()+sizeof(revLen), replyLen, 1, stdout);
  878. }
  879. catch (...)
  880. {
  881. if (logctx.queryTraceLevel() > 9)
  882. logctx.CTXLOG("Exception caught FlushingStringBuffer::flush");
  883. s.clear();
  884. emptyLength = 0;
  885. throw;
  886. }
  887. if (logctx.queryTraceLevel() > 9)
  888. logctx.CTXLOG("wrote block size %d to socket", replyLen);
  889. if (closing)
  890. {
  891. s.clear();
  892. emptyLength = 0;
  893. }
  894. else
  895. startBlock();
  896. }
  897. }
  898. void *FlushingStringBuffer::getPayload(size32_t &length)
  899. {
  900. assertex(isSoap);
  901. CriticalBlock b(crit);
  902. if (queued.ordinality())
  903. {
  904. length = lengths.item(0);
  905. void *ret = queued.item(0);
  906. queued.remove(0);
  907. lengths.remove(0);
  908. return ret;
  909. }
  910. length = s.length();
  911. return length ? s.detach() : NULL;
  912. }
  913. void FlushingStringBuffer::startDataset(const char *elementName, const char *resultName, unsigned sequence, bool _extend)
  914. {
  915. CriticalBlock b(crit);
  916. extend = _extend;
  917. if (isEmpty || !extend)
  918. {
  919. name.clear().append(resultName ? resultName : elementName);
  920. sequenceNumber = 0;
  921. startBlock();
  922. if (!isBlocked)
  923. {
  924. if (isXml)
  925. {
  926. s.append('<').append(elementName);
  927. if (isSoap && (resultName || (sequence != (unsigned) -1)))
  928. {
  929. s.append(" xmlns=\'urn:hpccsystems:ecl:").appendLower(queryName.length(), queryName.sget()).append(":result:");
  930. if (resultName && *resultName)
  931. s.appendLower(strlen(resultName), resultName).append('\'');
  932. else
  933. s.append("result_").append(sequence+1).append('\'');
  934. }
  935. if (resultName && *resultName)
  936. s.appendf(" name='%s'",resultName);
  937. else if (sequence != (unsigned) -1)
  938. s.appendf(" name='Result %d'",sequence+1);
  939. s.append(">\n");
  940. tail.clear().appendf("</%s>\n", elementName);
  941. }
  942. }
  943. isEmpty = false;
  944. }
  945. }
  946. void FlushingStringBuffer::startScalar(const char *resultName, unsigned sequence)
  947. {
  948. CriticalBlock b(crit);
  949. assertex(!s.length());
  950. name.clear().append(resultName ? resultName : "Dataset");
  951. sequenceNumber = 0;
  952. startBlock();
  953. if (!isBlocked)
  954. {
  955. if (isXml)
  956. {
  957. tail.clear();
  958. s.append("<Dataset");
  959. if (isSoap && (resultName || (sequence != (unsigned) -1)))
  960. {
  961. s.append(" xmlns=\'urn:hpccsystems:ecl:").appendLower(queryName.length(), queryName.sget()).append(":result:");
  962. if (resultName && *resultName)
  963. s.appendLower(strlen(resultName), resultName).append('\'');
  964. else
  965. s.append("result_").append(sequence+1).append('\'');
  966. }
  967. if (resultName && *resultName)
  968. s.appendf(" name='%s'>\n",resultName);
  969. else
  970. s.appendf(" name='Result %d'>\n",sequence+1);
  971. s.append(" <Row>");
  972. if (resultName && *resultName)
  973. {
  974. s.appendf("<%s>", resultName);
  975. tail.appendf("</%s>", resultName);
  976. }
  977. else
  978. {
  979. s.appendf("<Result_%d>", sequence+1);
  980. tail.appendf("</Result_%d>", sequence+1);
  981. }
  982. tail.appendf("</Row>\n</Dataset>\n");
  983. }
  984. else if (!isRaw)
  985. {
  986. tail.clear().append('\n');
  987. }
  988. }
  989. }
  990. void FlushingStringBuffer::incrementRowCount()
  991. {
  992. CriticalBlock b(crit);
  993. rowCount++;
  994. }
  995. //=====================================================================================================
  996. ClusterWriteHandler::ClusterWriteHandler(char const * _logicalName, char const * _activityType)
  997. : logicalName(_logicalName), activityType(_activityType)
  998. {
  999. makePhysicalPartName(logicalName.get(), 1, 1, physicalName, false);
  1000. splitFilename(physicalName, &physicalDir, &physicalDir, &physicalBase, &physicalBase);
  1001. }
  1002. void ClusterWriteHandler::addCluster(char const * cluster)
  1003. {
  1004. Owned<IGroup> group = queryNamedGroupStore().lookup(cluster);
  1005. if (!group)
  1006. throw MakeStringException(0, "Unknown cluster %s while writing file %s", cluster, logicalName.get());
  1007. if (group->isMember())
  1008. {
  1009. if (localCluster)
  1010. throw MakeStringException(0, "Cluster %s occupies node already specified while writing file %s", cluster,
  1011. logicalName.get());
  1012. localClusterName.set(cluster);
  1013. localCluster.set(group);
  1014. }
  1015. else
  1016. {
  1017. ForEachItemIn(idx, remoteNodes)
  1018. {
  1019. Owned<INode> other = remoteNodes.item(idx).getNode(0);
  1020. if (group->isMember(other))
  1021. throw MakeStringException(0, "Cluster %s occupies node already specified while writing file %s",
  1022. cluster, logicalName.get());
  1023. }
  1024. remoteNodes.append(*group.getClear());
  1025. remoteClusters.append(cluster);
  1026. }
  1027. }
  1028. void ClusterWriteHandler::getLocalPhysicalFilename(StringAttr & out) const
  1029. {
  1030. if(localCluster.get())
  1031. out.set(physicalName.str());
  1032. else
  1033. getTempFilename(out);
  1034. PROGLOG("%s(CLUSTER) for logical filename %s writing to local file %s", activityType.get(), logicalName.get(), out.get());
  1035. }
  1036. void ClusterWriteHandler::splitPhysicalFilename(StringBuffer & dir, StringBuffer & base) const
  1037. {
  1038. dir.append(physicalDir);
  1039. base.append(physicalBase);
  1040. }
  1041. void ClusterWriteHandler::getTempFilename(StringAttr & out) const
  1042. {
  1043. // Should be implemented by more derived (platform-specific) class, if needed
  1044. throwUnexpected();
  1045. }
  1046. void ClusterWriteHandler::copyPhysical(IFile * source, bool noCopy) const
  1047. {
  1048. RemoteFilename rdn, rfn;
  1049. rdn.setLocalPath(physicalDir.str());
  1050. rfn.setLocalPath(physicalName.str());
  1051. ForEachItemIn(idx, remoteNodes)
  1052. {
  1053. rdn.setEp(remoteNodes.item(idx).queryNode(0).endpoint());
  1054. rfn.setEp(remoteNodes.item(idx).queryNode(0).endpoint());
  1055. Owned<IFile> targetdir = createIFile(rdn);
  1056. Owned<IFile> target = createIFile(rfn);
  1057. PROGLOG("%s(CLUSTER) for logical filename %s copying %s to %s", activityType.get(), logicalName.get(), source->queryFilename(), target->queryFilename());
  1058. if(noCopy)
  1059. {
  1060. WARNLOG("Skipping remote copy due to debug option");
  1061. }
  1062. else
  1063. {
  1064. targetdir->createDirectory();
  1065. copyFile(target, source);
  1066. }
  1067. }
  1068. }
  1069. void ClusterWriteHandler::setDescriptorParts(IFileDescriptor * desc, char const * basename, IPropertyTree * attrs) const
  1070. {
  1071. if(!localCluster.get()&&(remoteNodes.ordinality()==0))
  1072. throw MakeStringException(0, "Attempting to write file to no clusters");
  1073. ClusterPartDiskMapSpec partmap; // will get this from group at some point
  1074. desc->setNumParts(1);
  1075. desc->setPartMask(basename);
  1076. if (localCluster)
  1077. desc->addCluster(localClusterName,localCluster, partmap);
  1078. ForEachItemIn(idx,remoteNodes)
  1079. desc->addCluster(remoteClusters.item(idx),&remoteNodes.item(idx), partmap);
  1080. if (attrs) {
  1081. // need to set part attr
  1082. IPartDescriptor *partdesc = desc->queryPart(0);
  1083. IPropertyTree &pprop = partdesc->queryProperties();
  1084. // bit of a kludge (should really set properties *after* creating part rather than passing prop tree in)
  1085. Owned<IAttributeIterator> ai = attrs->getAttributes();
  1086. ForEach(*ai)
  1087. pprop.setProp(ai->queryName(),ai->queryValue());
  1088. }
  1089. }
  1090. void ClusterWriteHandler::finish(IFile * file) const
  1091. {
  1092. if(!localCluster.get())
  1093. {
  1094. PROGLOG("%s(CLUSTER) for logical filename %s removing temporary file %s", activityType.get(), logicalName.get(), file->queryFilename());
  1095. file->remove();
  1096. }
  1097. }
  1098. void ClusterWriteHandler::getClusters(StringArray &clusters) const
  1099. {
  1100. if(localCluster)
  1101. clusters.append(localClusterName);
  1102. ForEachItemIn(c, remoteClusters)
  1103. clusters.append(remoteClusters.item(c));
  1104. }
  1105. //=====================================================================================================
  1106. class COrderedOutputSerializer : implements IOrderedOutputSerializer, public CInterface
  1107. {
  1108. class COrderedResult : public CInterface
  1109. {
  1110. bool closed;
  1111. StringBuffer sb;
  1112. public:
  1113. IMPLEMENT_IINTERFACE;
  1114. COrderedResult() : closed(false) {}
  1115. bool flush(FILE * outFile, bool onlyClosed)
  1116. {
  1117. if (closed || !onlyClosed)
  1118. {
  1119. if (sb.length())
  1120. {
  1121. ::fwrite(sb.str(), sb.length(), 1, outFile);
  1122. sb.clear();
  1123. }
  1124. }
  1125. return closed;
  1126. }
  1127. size32_t printf(const char *format, va_list args)
  1128. {
  1129. if (closed)
  1130. throw MakeStringException(0, "Attempting to append to previously closed result in COrderedResult::printf");
  1131. int prevLen = sb.length();
  1132. sb.valist_appendf(format, args);
  1133. return sb.length() - prevLen;
  1134. }
  1135. size32_t fwrite(const void * data, size32_t size, size32_t count)
  1136. {
  1137. if (closed)
  1138. throw MakeStringException(0, "Attempting to append to previously closed result in COrderedResult::fwrite");
  1139. size32_t len = size * count;
  1140. sb.append(len, (const char *)data);
  1141. return len;
  1142. }
  1143. void close(bool nl)
  1144. {
  1145. if (closed)
  1146. throw MakeStringException(0, "Attempting to reclose result in COrderedResult::close");
  1147. if (nl)
  1148. sb.append('\n');
  1149. closed = true;
  1150. }
  1151. };
  1152. CIArrayOf<COrderedResult> COrderedResultArr;
  1153. int lastSeqFlushed;
  1154. FILE * outFile;
  1155. CriticalSection crit;
  1156. COrderedResult * getResult(size32_t seq)
  1157. {
  1158. while ((int)COrderedResultArr.ordinality() < (seq+1))
  1159. COrderedResultArr.append(*(new COrderedResult()));
  1160. return &COrderedResultArr.item(seq);
  1161. }
  1162. void flushCurrent()//stream current sequence
  1163. {
  1164. COrderedResult &res = COrderedResultArr.item(lastSeqFlushed + 1);
  1165. res.flush(outFile,false);
  1166. fflush(outFile);
  1167. }
  1168. void flushCompleted(bool onlyClosed)//flush completed sequence(s)
  1169. {
  1170. int lastSeq = (int)COrderedResultArr.ordinality()-1;
  1171. for (; lastSeqFlushed < lastSeq; lastSeqFlushed++)
  1172. {
  1173. COrderedResult &res = COrderedResultArr.item(lastSeqFlushed + 1);
  1174. if (!res.flush(outFile,onlyClosed) && onlyClosed)
  1175. break;
  1176. }
  1177. fflush(outFile);
  1178. }
  1179. public:
  1180. IMPLEMENT_IINTERFACE;
  1181. COrderedOutputSerializer(FILE* _outFile) : lastSeqFlushed(-1), outFile(_outFile) {}
  1182. ~COrderedOutputSerializer()
  1183. {
  1184. if (lastSeqFlushed != (COrderedResultArr.ordinality()-1))
  1185. flushCompleted(false);
  1186. COrderedResultArr.kill();
  1187. }
  1188. //IOrderedOutputSerializer
  1189. size32_t fwrite(int seq, const void * data, size32_t size, size32_t count)
  1190. {
  1191. CriticalBlock c(crit);
  1192. size32_t ret = getResult(seq)->fwrite(data,size, count);
  1193. if (seq == (lastSeqFlushed + 1))
  1194. flushCurrent();
  1195. return ret;
  1196. }
  1197. size32_t printf(int seq, const char *format, ...) __attribute__((format(printf, 3, 4)))
  1198. {
  1199. CriticalBlock c(crit);
  1200. va_list args;
  1201. va_start(args, format);
  1202. int ret = getResult(seq)->printf(format, args);
  1203. va_end(args);
  1204. if (seq == (lastSeqFlushed + 1))
  1205. flushCurrent();
  1206. return ret;
  1207. }
  1208. void close(int seq, bool nl)
  1209. {
  1210. CriticalBlock c(crit);
  1211. getResult(seq)->close(nl);
  1212. if ( seq == (lastSeqFlushed+1) )
  1213. flushCompleted(true);
  1214. }
  1215. };
  1216. ROXIEHELPER_API IOrderedOutputSerializer * createOrderedOutputSerializer(FILE * _outFile)
  1217. {
  1218. return new COrderedOutputSerializer(_outFile);
  1219. }
  1220. //=====================================================================================================
  1221. ROXIEHELPER_API StringBuffer & mangleHelperFileName(StringBuffer & out, const char * in, const char * wuid, unsigned int flags)
  1222. {
  1223. out = in;
  1224. if (flags & (TDXtemporary | TDXjobtemp))
  1225. out.append("__").append(wuid);
  1226. return out;
  1227. }
  1228. ROXIEHELPER_API StringBuffer & mangleLocalTempFilename(StringBuffer & out, char const * in)
  1229. {
  1230. char const * start = in;
  1231. while(true)
  1232. {
  1233. char const * end = strstr(start, "::");
  1234. if(end)
  1235. {
  1236. out.append(end-start, start).append("__scope__");
  1237. start = end + 2;
  1238. }
  1239. else
  1240. {
  1241. out.append(start);
  1242. break;
  1243. }
  1244. }
  1245. return out;
  1246. }
  1247. ROXIEHELPER_API StringBuffer & expandLogicalFilename(StringBuffer & logicalName, const char * fname, IConstWorkUnit * wu, bool resolveLocally)
  1248. {
  1249. if (fname[0]=='~')
  1250. logicalName.append(fname+1);
  1251. else if (resolveLocally)
  1252. {
  1253. StringBuffer sb(fname);
  1254. sb.replaceString("::",PATHSEPSTR);
  1255. makeAbsolutePath(sb.str(), logicalName.clear());
  1256. }
  1257. else
  1258. {
  1259. SCMStringBuffer lfn;
  1260. if (wu)
  1261. {
  1262. wu->getScope(lfn);
  1263. if(lfn.length())
  1264. logicalName.append(lfn.s).append("::");
  1265. }
  1266. logicalName.append(fname);
  1267. }
  1268. return logicalName;
  1269. }