roxiehelper.cpp 45 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "jexcept.hpp"
  14. #include "thorherror.h"
  15. #include "roxiehelper.hpp"
  16. #include "roxielmj.hpp"
  17. #include "jmisc.hpp"
  18. #include "jfile.hpp"
  19. #include "mpbase.hpp"
  20. #include "dafdesc.hpp"
  21. #include "dadfs.hpp"
  22. unsigned traceLevel = 0;
  23. //OwnedRowArray
  24. void OwnedRowArray::clear()
  25. {
  26. ForEachItemIn(idx, buff)
  27. ReleaseRoxieRow(buff.item(idx));
  28. buff.kill();
  29. }
  30. void OwnedRowArray::clearPart(aindex_t from, aindex_t to)
  31. {
  32. aindex_t idx;
  33. for(idx = from; idx < to; idx++)
  34. ReleaseRoxieRow(buff.item(idx));
  35. buff.removen(from, to-from);
  36. }
  37. void OwnedRowArray::replace(const void * row, aindex_t pos)
  38. {
  39. ReleaseRoxieRow(buff.item(pos));
  40. buff.replace(row, pos);
  41. }
  42. //=========================================================================================
  43. //CRHRollingCacheElem copied/modified from THOR
  44. CRHRollingCacheElem::CRHRollingCacheElem()
  45. {
  46. row = NULL;
  47. cmp = INT_MIN;
  48. }
  49. CRHRollingCacheElem::~CRHRollingCacheElem()
  50. {
  51. if (row)
  52. ReleaseRoxieRow(row);
  53. }
  54. void CRHRollingCacheElem::set(const void *_row)
  55. {
  56. if (row)
  57. ReleaseRoxieRow(row);
  58. row = _row;
  59. }
  60. //CRHRollingCache copied/modified from THOR CRollingCache
  61. CRHRollingCache::~CRHRollingCache()
  62. {
  63. loop
  64. {
  65. CRHRollingCacheElem *e = cache.dequeue();
  66. if (!e)
  67. break;
  68. delete e;
  69. }
  70. }
  71. void CRHRollingCache::init(IInputBase *_in, unsigned _max)
  72. {
  73. max = _max;
  74. in =_in;
  75. cache.clear();
  76. cache.reserve(max);
  77. eos = false;
  78. while (cache.ordinality()<max/2)
  79. cache.enqueue(NULL);
  80. while (!eos && (cache.ordinality()<max))
  81. advance();
  82. }
  83. #ifdef TRACEROLLING
  84. void CRHRollingCache::PrintCache()
  85. {
  86. for (unsigned i = 0;i<max;i++) {
  87. CRHRollingCacheElem *e = cache.item(i);
  88. if (i==0)
  89. DBGLOG("RC==============================");
  90. int ii = 0;
  91. if (e && e->row)
  92. ii = isalpha(*((char*)e->row)) ? 0 : 4;
  93. chas sz[100];
  94. sprintf(sz,"%c%d: %s",(i==max/2)?'>':' ',i,e?(const char *)e->row+ii:"-----");
  95. for (int xx=0; sz[xx] != NULL; xx++)
  96. {
  97. if (!isprint(sz[xx]))
  98. {
  99. sz[xx] = NULL;
  100. break;
  101. }
  102. }
  103. DBGLOG(sz);
  104. if (i == max-1)
  105. DBGLOG("RC==============================");
  106. }
  107. }
  108. #endif
  109. CRHRollingCacheElem * CRHRollingCache::mid(int rel)
  110. {
  111. return cache.item((max/2)+rel); // relies on unsigned wrap
  112. }
  113. void CRHRollingCache::advance()
  114. {
  115. CRHRollingCacheElem *e = (cache.ordinality()==max)?cache.dequeue():NULL; //cache full, remove head element
  116. if (!eos) {
  117. if (!e)
  118. e = new CRHRollingCacheElem();
  119. const void * nextrec = in->nextInGroup();//get row from CRHCRHDualCache::cOut, which gets from CRHCRHDualCache, which gets from input
  120. if (!nextrec)
  121. nextrec = in->nextInGroup();
  122. if (nextrec) {
  123. e->set(nextrec);
  124. cache.enqueue(e);
  125. #ifdef TRACEROLLING
  126. PrintCache();
  127. #endif
  128. return;
  129. }
  130. else
  131. eos = true;
  132. }
  133. delete e;
  134. cache.enqueue(NULL);
  135. #ifdef TRACEROLLING
  136. PrintCache();
  137. #endif
  138. }
  139. //=========================================================================================
  140. //CRHDualCache copied from THOR, and modified to get input from IInputBase instead
  141. //of IReadSeqVar and to manage rows as OwnedRoxieRow types
  142. CRHDualCache::CRHDualCache()
  143. {
  144. strm1 = NULL;
  145. strm2 = NULL;
  146. }
  147. CRHDualCache::~CRHDualCache()
  148. {
  149. ::Release(strm1);
  150. ::Release(strm2);
  151. loop
  152. {
  153. CRHRollingCacheElem *e = cache.dequeue();
  154. if (!e)
  155. break;
  156. delete e;
  157. }
  158. }
  159. void CRHDualCache::init(IInputBase * _in)
  160. {
  161. in = _in;
  162. cache.clear();
  163. eos = false;
  164. base = 0;
  165. posL = 0;
  166. posR = 0;
  167. strm1 = new cOut(this,posL);
  168. strm2 = new cOut(this,posR) ;
  169. }
  170. #ifdef TRACEROLLING
  171. void CRHDualCache::PrintCache()
  172. {
  173. for (unsigned i = 0;i<cache.ordinality();i++) {
  174. CRHRollingCacheElem *e = cache.item(i);
  175. if (i==0)
  176. {
  177. DBGLOG("DC=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-BASE:%d,posL=%d,posR=%d %s", base, posL,posR, eos?"EOS":"");
  178. }
  179. DBGLOG("%c%d: %s",(i==cache.ordinality()/2)?'>':' ',i,e?(const char *)e->row:"-----");
  180. if (i == cache.ordinality()-1)
  181. DBGLOG("DC=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-");
  182. }
  183. }
  184. #endif
  185. bool CRHDualCache::get(unsigned n, CRHRollingCacheElem *&out)
  186. {
  187. // take off any no longer needed
  188. CRHRollingCacheElem *e=NULL;
  189. while ((base<posL) && (base<posR)) {
  190. delete e;
  191. e = cache.dequeue();
  192. base++;
  193. }
  194. assertex(n>=base);
  195. while (!eos && (n-base>=cache.ordinality())) //element already in cache?
  196. {
  197. if (!e)
  198. e = new CRHRollingCacheElem;
  199. const void * nextrec = in->nextInGroup(); //get from activity
  200. if (!nextrec)
  201. nextrec = in->nextInGroup();
  202. if (!nextrec) {
  203. eos = true;
  204. break;
  205. }
  206. e->set(nextrec);
  207. cache.enqueue(e);
  208. e = NULL;
  209. #ifdef TRACEROLLING
  210. PrintCache();
  211. #endif
  212. }
  213. delete e;
  214. if (n-base>=cache.ordinality())
  215. return false;
  216. out = cache.item(n-base);
  217. return true;
  218. }
  219. size32_t CRHDualCache::getRecordSize(const void *ptr)
  220. {
  221. return in->queryOutputMeta()->getRecordSize(ptr);
  222. }
  223. size32_t CRHDualCache::getFixedSize() const
  224. {
  225. return in->queryOutputMeta()->getFixedSize();
  226. }
  227. size32_t CRHDualCache::getMinRecordSize() const
  228. {
  229. return in->queryOutputMeta()->getMinRecordSize();
  230. }
  231. CRHDualCache::cOut::cOut(CRHDualCache *_parent, unsigned &_pos)
  232. : pos(_pos)
  233. {
  234. parent = _parent;
  235. stopped = false;
  236. }
  237. const void * CRHDualCache::cOut::nextInGroup()
  238. {
  239. CRHRollingCacheElem *e;
  240. if (stopped || !parent->get(pos,e))
  241. return NULL; //no more data
  242. LinkRoxieRow(e->row);
  243. pos++;
  244. return e->row;
  245. }
  246. IOutputMetaData * CRHDualCache::cOut::queryOutputMeta() const
  247. {
  248. return parent->input()->queryOutputMeta();
  249. }
  250. void CRHDualCache::cOut::stop()
  251. {
  252. pos = (unsigned)-1;
  253. stopped = true;
  254. }
  255. //=========================================================================================
  256. IRHLimitedCompareHelper *createRHLimitedCompareHelper()
  257. {
  258. return new CRHLimitedCompareHelper();
  259. }
  260. //CRHLimitedCompareHelper
  261. void CRHLimitedCompareHelper::init( unsigned _atmost,
  262. IInputBase *_in,
  263. ICompare * _cmp,
  264. ICompare * _limitedcmp )
  265. {
  266. atmost = _atmost;
  267. cache.setown(new CRHRollingCache());
  268. cache->init(_in,(atmost+1)*2);
  269. cmp = _cmp;
  270. limitedcmp = _limitedcmp;
  271. }
  272. bool CRHLimitedCompareHelper::getGroup(OwnedRowArray &group, const void *left)
  273. {
  274. // this could be improved!
  275. // first move 'mid' forwards until mid>=left
  276. int low = 0;
  277. loop
  278. {
  279. CRHRollingCacheElem * r = cache->mid(0);
  280. if (!r)
  281. break; // hit eos
  282. int c = cmp->docompare(left,r->row);
  283. if (c == 0)
  284. {
  285. r->cmp = limitedcmp->docompare(left,r->row);
  286. if (r->cmp <= 0)
  287. break;
  288. }
  289. else if (c < 0)
  290. {
  291. r->cmp = -1;
  292. break;
  293. }
  294. else
  295. r->cmp = 1;
  296. cache->advance();
  297. if (cache->mid(low-1)) // only if haven't hit start
  298. low--;
  299. }
  300. // now scan back (note low should be filled even at eos)
  301. loop
  302. {
  303. CRHRollingCacheElem * pr = cache->mid(low-1);
  304. if (!pr)
  305. break; // hit start
  306. int c = cmp->docompare(left,pr->row);
  307. if (c == 0)
  308. {
  309. pr->cmp = limitedcmp->docompare(left,pr->row);
  310. if (pr->cmp==1)
  311. break;
  312. }
  313. else
  314. {
  315. pr->cmp = 1;
  316. break;
  317. }
  318. low--;
  319. }
  320. int high = 0;
  321. if (cache->mid(0)) // check haven't already hit end
  322. {
  323. // now scan fwd
  324. loop
  325. {
  326. high++;
  327. CRHRollingCacheElem * nr = cache->mid(high);
  328. if (!nr)
  329. break;
  330. int c = cmp->docompare(left,nr->row);
  331. if (c==0)
  332. {
  333. nr->cmp = limitedcmp->docompare(left,nr->row);
  334. if (nr->cmp==-1)
  335. break;
  336. }
  337. else
  338. {
  339. nr->cmp = -1;
  340. break;
  341. }
  342. }
  343. }
  344. while (high-low>(int)atmost)
  345. {
  346. int vl = iabs(cache->mid(low)->cmp);
  347. int vh = iabs(cache->mid(high-1)->cmp);
  348. int v;
  349. if (vl==0)
  350. {
  351. if (vh==0) // both ends equal
  352. return false;
  353. v = vh;
  354. }
  355. else if (vh==0)
  356. v = vl;
  357. else
  358. v = imin(vl,vh);
  359. // remove worst match from either end
  360. while ((low<high)&&(iabs(cache->mid(low)->cmp)==v))
  361. low++;
  362. while ((low<high)&&(iabs(cache->mid(high-1)->cmp)==v))
  363. high--;
  364. if (low>=high)
  365. return true; // couldn't make group;
  366. }
  367. for (int i=low;i<high;i++)
  368. {
  369. CRHRollingCacheElem *r = cache->mid(i);
  370. LinkRoxieRow(r->row);
  371. group.append(r->row);
  372. }
  373. return group.ordinality()>0;
  374. }
  375. //=========================================================================================
  376. CSafeSocket::CSafeSocket(ISocket *_sock)
  377. {
  378. httpMode = false;
  379. mlFmt = MarkupFmt_Unknown;
  380. sent = 0;
  381. heartbeat = false;
  382. sock.setown(_sock);
  383. }
  384. CSafeSocket::~CSafeSocket()
  385. {
  386. sock.clear();
  387. ForEachItemIn(idx, queued)
  388. {
  389. free(queued.item(idx));
  390. }
  391. queued.kill();
  392. lengths.kill();
  393. }
  394. unsigned CSafeSocket::bytesOut() const
  395. {
  396. return sent;
  397. }
  398. bool CSafeSocket::checkConnection() const
  399. {
  400. if (sock)
  401. return sock->check_connection();
  402. else
  403. return false;
  404. }
  405. size32_t CSafeSocket::write(const void *buf, size32_t size, bool takeOwnership)
  406. {
  407. CriticalBlock c(crit); // NOTE: anyone needing to write multiple times without interleave should have already locked this. We lock again for the simple cases.
  408. OwnedMalloc<void> ownedBuffer;
  409. if (takeOwnership)
  410. ownedBuffer.setown((void *) buf);
  411. if (!size)
  412. return 0;
  413. try
  414. {
  415. if (httpMode)
  416. {
  417. if (!takeOwnership)
  418. {
  419. ownedBuffer.setown(malloc(size));
  420. if (!ownedBuffer)
  421. throw MakeStringException(THORHELPER_INTERNAL_ERROR, "Out of memory in CSafeSocket::write (requesting %d bytes)", size);
  422. memcpy(ownedBuffer, buf, size);
  423. }
  424. queued.append(ownedBuffer.getClear());
  425. lengths.append(size);
  426. return size;
  427. }
  428. else
  429. {
  430. sent += size;
  431. size32_t written = sock->write(buf, size);
  432. return written;
  433. }
  434. }
  435. catch(...)
  436. {
  437. heartbeat = false;
  438. throw;
  439. }
  440. }
  441. bool CSafeSocket::readBlock(MemoryBuffer &ret, unsigned timeout, unsigned maxBlockSize)
  442. {
  443. // MORE - this is still not good enough as we could get someone else's block if there are multiple input datasets
  444. CriticalBlock c(crit);
  445. try
  446. {
  447. unsigned bytesRead;
  448. unsigned len;
  449. try
  450. {
  451. sock->read(&len, sizeof (len), sizeof (len), bytesRead, timeout);
  452. }
  453. catch (IJSOCK_Exception *E)
  454. {
  455. if (E->errorCode()==JSOCKERR_graceful_close)
  456. {
  457. E->Release();
  458. return false;
  459. }
  460. throw;
  461. }
  462. assertex(bytesRead == sizeof(len));
  463. _WINREV(len);
  464. if (len & 0x80000000)
  465. len ^= 0x80000000;
  466. if (len > maxBlockSize)
  467. throw MakeStringException(THORHELPER_DATA_ERROR, "Maximum block size (%d bytes) exceeded (missing length prefix?)", maxBlockSize);
  468. if (len)
  469. {
  470. unsigned bytesRead;
  471. sock->read(ret.reserveTruncate(len), len, len, bytesRead, timeout);
  472. }
  473. return len != 0;
  474. }
  475. catch(...)
  476. {
  477. heartbeat = false;
  478. throw;
  479. }
  480. }
  481. bool CSafeSocket::readBlock(StringBuffer &ret, unsigned timeout, HttpHelper *pHttpHelper, bool &continuationNeeded, bool &isStatus, unsigned maxBlockSize)
  482. {
  483. continuationNeeded = false;
  484. isStatus = false;
  485. CriticalBlock c(crit);
  486. try
  487. {
  488. unsigned bytesRead;
  489. unsigned len = 0;
  490. try
  491. {
  492. sock->read(&len, sizeof (len), sizeof (len), bytesRead, timeout);
  493. }
  494. catch (IJSOCK_Exception *E)
  495. {
  496. if (E->errorCode()==JSOCKERR_graceful_close)
  497. {
  498. E->Release();
  499. return false;
  500. }
  501. throw;
  502. }
  503. assertex(bytesRead == sizeof(len));
  504. unsigned left = 0;
  505. char *buf;
  506. if (pHttpHelper != NULL && strncmp((char *)&len, "POST", 4) == 0)
  507. {
  508. #define MAX_HTTP_HEADERSIZE 8000
  509. char header[MAX_HTTP_HEADERSIZE + 1]; // allow room for \0
  510. sock->read(header, 1, MAX_HTTP_HEADERSIZE, bytesRead, timeout);
  511. header[bytesRead] = 0;
  512. char *payload = strstr(header, "\r\n\r\n");
  513. if (payload)
  514. {
  515. *payload = 0;
  516. payload += 4;
  517. char *str;
  518. pHttpHelper->parseHTTPRequestLine(header);
  519. // capture authentication token
  520. if ((str = strstr(header, "Authorization: Basic ")) != NULL)
  521. pHttpHelper->setAuthToken(str+21);
  522. // capture content type
  523. if ((str = strstr(header, "Content-Type: ")) != NULL)
  524. pHttpHelper->setContentType(str+14);
  525. // determine payload length
  526. str = strstr(header, "Content-Length: ");
  527. if (str)
  528. {
  529. len = atoi(str + strlen("Content-Length: "));
  530. buf = ret.reserveTruncate(len);
  531. left = len - (bytesRead - (payload - header));
  532. if (len > left)
  533. memcpy(buf, payload, len - left);
  534. }
  535. else
  536. left = len = 0;
  537. }
  538. else
  539. left = len = 0;
  540. pHttpHelper->setIsHttp(true);
  541. if (!len)
  542. throw MakeStringException(THORHELPER_DATA_ERROR, "Badly formed HTTP header");
  543. }
  544. else if (strnicmp((char *)&len, "STAT", 4) == 0)
  545. isStatus = true;
  546. else
  547. {
  548. _WINREV(len);
  549. if (len & 0x80000000)
  550. {
  551. len ^= 0x80000000;
  552. continuationNeeded = true;
  553. }
  554. if (len > maxBlockSize)
  555. throw MakeStringException(THORHELPER_DATA_ERROR, "Maximum block size (%d bytes) exceeded (missing length prefix?)", maxBlockSize);
  556. left = len;
  557. if (len)
  558. buf = ret.reserveTruncate(len);
  559. }
  560. if (left)
  561. {
  562. sock->read(buf + (len - left), left, left, bytesRead, timeout);
  563. }
  564. return len != 0;
  565. }
  566. catch (...)
  567. {
  568. heartbeat = false;
  569. throw;
  570. }
  571. }
  572. void CSafeSocket::setHttpMode(const char *queryName, bool arrayMode, TextMarkupFormat _mlfmt)
  573. {
  574. CriticalBlock c(crit); // Should not be needed
  575. httpMode = true;
  576. mlFmt = _mlfmt;
  577. heartbeat = false;
  578. assertex(contentHead.length()==0 && contentTail.length()==0);
  579. if (mlFmt==MarkupFmt_JSON)
  580. {
  581. contentHead.set("{");
  582. contentTail.set("}");
  583. }
  584. else
  585. {
  586. StringAttrBuilder headText(contentHead), tailText(contentTail);
  587. headText.append(
  588. "<?xml version=\"1.0\" encoding=\"UTF-8\"?>"
  589. "<soap:Envelope xmlns:soap=\"http://schemas.xmlsoap.org/soap/envelope/\">"
  590. "<soap:Body>");
  591. if (arrayMode)
  592. {
  593. headText.append("<").append(queryName).append("ResponseArray>");
  594. tailText.append("</").append(queryName).append("ResponseArray>");
  595. }
  596. tailText.append("</soap:Body></soap:Envelope>");
  597. }
  598. }
  599. void CSafeSocket::setHeartBeat()
  600. {
  601. CriticalBlock c(crit);
  602. heartbeat = true;
  603. }
  604. bool CSafeSocket::sendHeartBeat(const IContextLogger &logctx)
  605. {
  606. if (heartbeat)
  607. {
  608. StringBuffer s;
  609. bool rval = false;
  610. unsigned replyLen = 5;
  611. unsigned rev = replyLen | 0x80000000; // make it a blocked msg
  612. _WINREV(rev);
  613. s.append(sizeof(rev), (char *) &rev);
  614. s.append('H');
  615. rev = (unsigned) time(NULL);
  616. _WINREV(rev);
  617. s.append(sizeof(rev), (char *) &rev);
  618. try
  619. {
  620. CriticalBlock c(crit);
  621. sock->write(s.str(), replyLen + sizeof(rev));
  622. rval = true;
  623. }
  624. catch (IException * E)
  625. {
  626. StringBuffer error("HeartBeat write failed with exception: ");
  627. E->errorMessage(error);
  628. logctx.CTXLOG("%s", error.str());
  629. E->Release();
  630. }
  631. catch(...)
  632. {
  633. logctx.CTXLOG("HeartBeat write failed (Unknown exception)");
  634. }
  635. return rval;
  636. }
  637. else
  638. return true;
  639. };
  640. void CSafeSocket::flush()
  641. {
  642. if (httpMode)
  643. {
  644. unsigned length = contentHead.length() + contentTail.length();
  645. ForEachItemIn(idx, lengths)
  646. length += lengths.item(idx);
  647. StringBuffer header;
  648. header.append("HTTP/1.0 200 OK\r\n");
  649. header.append("Content-Type: ").append(mlFmt == MarkupFmt_JSON ? "application/json" : "text/xml").append("\r\n");
  650. header.append("Content-Length: ").append(length).append("\r\n\r\n");
  651. CriticalBlock c(crit); // should not be anyone writing but better to be safe
  652. if (traceLevel > 5)
  653. DBGLOG("Writing HTTP header length %d to HTTP socket", header.length());
  654. sock->write(header.str(), header.length());
  655. sent += header.length();
  656. if (traceLevel > 5)
  657. DBGLOG("Writing content head length %d to HTTP socket", contentHead.length());
  658. sock->write(contentHead.str(), contentHead.length());
  659. sent += contentHead.length();
  660. ForEachItemIn(idx2, queued)
  661. {
  662. unsigned length = lengths.item(idx2);
  663. if (traceLevel > 5)
  664. DBGLOG("Writing block length %d to HTTP socket", length);
  665. sock->write(queued.item(idx2), length);
  666. sent += length;
  667. }
  668. if (traceLevel > 5)
  669. DBGLOG("Writing content tail length %d to HTTP socket", contentTail.length());
  670. sock->write(contentTail.str(), contentTail.length());
  671. sent += contentTail.length();
  672. if (traceLevel > 5)
  673. DBGLOG("Total written %d", sent);
  674. }
  675. }
  676. void CSafeSocket::sendException(const char *source, unsigned code, const char *message, bool isBlocked, const IContextLogger &logctx)
  677. {
  678. try
  679. {
  680. FlushingStringBuffer response(this, isBlocked, MarkupFmt_XML, false, httpMode, logctx);
  681. response.startDataset("Exception", NULL, (unsigned) -1);
  682. response.appendf("<Source>%s</Source><Code>%d</Code>", source, code);
  683. response.append("<Message>");
  684. response.encodeString(message, strlen(message));
  685. response.append("</Message>");
  686. }
  687. catch(IException *EE)
  688. {
  689. StringBuffer error("While reporting exception: ");
  690. EE->errorMessage(error);
  691. logctx.CTXLOG("%s", error.str());
  692. EE->Release();
  693. }
  694. #ifndef _DEBUG
  695. catch(...) {}
  696. #endif
  697. }
  698. //==============================================================================================================
  699. #define RESULT_FLUSH_THRESHOLD 10000u
  700. #ifdef _DEBUG
  701. #define HTTP_SPLIT_THRESHOLD 100u
  702. #define HTTP_SPLIT_RESERVE 200u
  703. #else
  704. #define HTTP_SPLIT_THRESHOLD 64000u
  705. #define HTTP_SPLIT_RESERVE 65535u
  706. #endif
  707. interface IXmlStreamFlusher;
  708. //==============================================================================================================
  709. bool FlushingStringBuffer::needsFlush(bool closing)
  710. {
  711. if (isBlocked || closing) // can't flush unblocked. MORE - may need to break it up though....
  712. {
  713. size32_t len = s.length() - emptyLength;
  714. return len > (closing ? 0 : RESULT_FLUSH_THRESHOLD);
  715. }
  716. else
  717. return false; // MORE - if there is a single result, it can be flushed (actually, can flush anytime all prior results have been closed)
  718. }
  719. void FlushingStringBuffer::startBlock()
  720. {
  721. size32_t len = 0;
  722. s.clear();
  723. if (!isHttp)
  724. append(sizeof(size32_t), (char *) &len);
  725. rowCount = 0;
  726. if (isBlocked)
  727. {
  728. s.append('R');
  729. unsigned rev = sequenceNumber++;
  730. _WINREV(rev);
  731. s.append(sizeof(rev), (char *) &rev);
  732. rev = rowCount;
  733. _WINREV(rev);
  734. s.append(sizeof(rev), (char *) &rev); // NOTE - need to patch up later. At this point it is 0.
  735. s.append(strlen(name)+1, name);
  736. }
  737. emptyLength = s.length();
  738. // MORE - should probably pre-reserve string at RESULT_FLUSH_THRESHOLD plus a bit
  739. }
  740. FlushingStringBuffer::FlushingStringBuffer(SafeSocket *_sock, bool _isBlocked, TextMarkupFormat _mlFmt, bool _isRaw, bool _isHttp, const IContextLogger &_logctx)
  741. : sock(_sock), isBlocked(_isBlocked), mlFmt(_mlFmt), isRaw(_isRaw), isHttp(_isHttp), logctx(_logctx)
  742. {
  743. sequenceNumber = 0;
  744. rowCount = 0;
  745. isSoap = false;
  746. isEmpty = true;
  747. extend = false;
  748. trim = false;
  749. emptyLength = 0;
  750. tagClosed = true;
  751. }
  752. FlushingStringBuffer::~FlushingStringBuffer()
  753. {
  754. try
  755. {
  756. flush(true);
  757. }
  758. catch (IException *E)
  759. {
  760. // Ignore any socket errors that we get at termination - nothing we can do about them anyway...
  761. E->Release();
  762. }
  763. catch(...)
  764. {
  765. }
  766. ForEachItemIn(idx, queued)
  767. {
  768. free(queued.item(idx));
  769. }
  770. }
  771. //void FlushingStringBuffer::append(char data)
  772. //{
  773. //append(1, &data);
  774. //}
  775. void FlushingStringBuffer::append(const char *data)
  776. {
  777. append(strlen(data), data);
  778. }
  779. void FlushingStringBuffer::append(double data)
  780. {
  781. if (isRaw)
  782. append(sizeof(data), (char *)&data);
  783. else
  784. {
  785. StringBuffer v;
  786. v.append(data);
  787. append(v.length(), v.str());
  788. }
  789. }
  790. void FlushingStringBuffer::append(unsigned len, const char *data)
  791. {
  792. try
  793. {
  794. CriticalBlock b(crit);
  795. s.append(len, data);
  796. }
  797. catch (IException *E)
  798. {
  799. logctx.logOperatorException(E, __FILE__, __LINE__, "FlushingStringBuffer::append");
  800. throw;
  801. }
  802. }
  803. void FlushingStringBuffer::appendf(const char *format, ...)
  804. {
  805. StringBuffer t;
  806. va_list args;
  807. va_start(args, format);
  808. t.valist_appendf(format, args);
  809. va_end(args);
  810. append(t.length(), t.str());
  811. }
  812. void FlushingStringBuffer::encodeString(const char *x, unsigned len, bool utf8)
  813. {
  814. if (mlFmt==MarkupFmt_XML)
  815. {
  816. StringBuffer t;
  817. ::encodeXML(x, t, 0, len, utf8);
  818. append(t.length(), t.str());
  819. }
  820. else
  821. append(len, x);
  822. }
  823. void FlushingStringBuffer::encodeData(const void *data, unsigned len)
  824. {
  825. static char hexchar[] = "0123456789ABCDEF";
  826. if (isRaw)
  827. append(len, (const char *) data);
  828. else
  829. {
  830. const byte *field = (const byte *) data;
  831. for (unsigned i = 0; i < len; i++)
  832. {
  833. append(hexchar[field[i] >> 4]);
  834. append(hexchar[field[i] & 0x0f]);
  835. }
  836. }
  837. }
  838. void FlushingStringBuffer::addPayload(StringBuffer &s, unsigned int reserve)
  839. {
  840. if (!s.length())
  841. return;
  842. lengths.append(s.length());
  843. queued.append(s.detach());
  844. if (reserve)
  845. s.ensureCapacity(reserve);
  846. }
  847. void FlushingStringBuffer::flushXML(StringBuffer &current, bool isClosing)
  848. {
  849. CriticalBlock b(crit);
  850. if (isHttp) // we don't do any chunking for non-HTTP yet
  851. {
  852. if (isClosing || current.length() > HTTP_SPLIT_THRESHOLD)
  853. {
  854. addPayload(s, HTTP_SPLIT_RESERVE);
  855. addPayload(current, isClosing ? 0 : HTTP_SPLIT_RESERVE);
  856. }
  857. }
  858. else if (isClosing)
  859. append(current.length(), current.str());
  860. }
  861. void FlushingStringBuffer::flush(bool closing)
  862. {
  863. CriticalBlock b(crit);
  864. if (closing && tail.length())
  865. {
  866. s.append(tail);
  867. tail.clear();
  868. }
  869. if (isHttp)
  870. {
  871. if (!closing && s.length() > HTTP_SPLIT_THRESHOLD)
  872. addPayload(s, HTTP_SPLIT_RESERVE);
  873. }
  874. else if (needsFlush(closing))
  875. {
  876. // MORE - if not blocked we can get very large blocks.
  877. assertex(s.length() > sizeof(size32_t));
  878. unsigned replyLen = s.length() - sizeof(size32_t);
  879. unsigned revLen = replyLen | ((isBlocked)?0x80000000:0);
  880. _WINREV(revLen);
  881. if (logctx.queryTraceLevel() > 1)
  882. {
  883. if (isBlocked)
  884. logctx.CTXLOG("Sending reply: Sending blocked %s data", getFormatName(mlFmt));
  885. else
  886. #ifdef _DEBUG
  887. logctx.CTXLOG("Sending reply length %d: %.1024s", (unsigned) (s.length() - sizeof(size32_t)), s.str()+sizeof(size32_t));
  888. #else
  889. logctx.CTXLOG("Sending reply length %d: %.40s", (unsigned) (s.length() - sizeof(size32_t)), s.str()+sizeof(size32_t));
  890. #endif
  891. }
  892. *(size32_t *) s.str() = revLen;
  893. if (isBlocked)
  894. {
  895. unsigned revRowCount = rowCount;
  896. _WINREV(revRowCount);
  897. *(size32_t *) (s.str()+9) = revRowCount;
  898. }
  899. if (logctx.queryTraceLevel() > 9)
  900. logctx.CTXLOG("writing block size %d to socket", replyLen);
  901. try
  902. {
  903. if (sock)
  904. {
  905. if (isHttp)
  906. sock->write(s.str()+sizeof(revLen), replyLen);
  907. else
  908. sock->write(s.str(), replyLen + sizeof(revLen));
  909. }
  910. else
  911. fwrite(s.str()+sizeof(revLen), replyLen, 1, stdout);
  912. }
  913. catch (...)
  914. {
  915. if (logctx.queryTraceLevel() > 9)
  916. logctx.CTXLOG("Exception caught FlushingStringBuffer::flush");
  917. s.clear();
  918. emptyLength = 0;
  919. throw;
  920. }
  921. if (logctx.queryTraceLevel() > 9)
  922. logctx.CTXLOG("wrote block size %d to socket", replyLen);
  923. if (closing)
  924. {
  925. s.clear();
  926. emptyLength = 0;
  927. }
  928. else
  929. startBlock();
  930. }
  931. }
  932. void *FlushingStringBuffer::getPayload(size32_t &length)
  933. {
  934. assertex(isHttp);
  935. CriticalBlock b(crit);
  936. if (queued.ordinality())
  937. {
  938. length = lengths.item(0);
  939. void *ret = queued.item(0);
  940. queued.remove(0);
  941. lengths.remove(0);
  942. return ret;
  943. }
  944. length = s.length();
  945. return length ? s.detach() : NULL;
  946. }
  947. void FlushingStringBuffer::startDataset(const char *elementName, const char *resultName, unsigned sequence, bool _extend, const IProperties *xmlns)
  948. {
  949. CriticalBlock b(crit);
  950. extend = _extend;
  951. if (isEmpty || !extend)
  952. {
  953. name.clear().append(resultName ? resultName : elementName);
  954. sequenceNumber = 0;
  955. startBlock();
  956. if (!isBlocked)
  957. {
  958. if (mlFmt==MarkupFmt_XML)
  959. {
  960. s.append('<').append(elementName);
  961. if (isSoap && (resultName || (sequence != (unsigned) -1)))
  962. {
  963. s.append(" xmlns=\'urn:hpccsystems:ecl:").appendLower(queryName.length(), queryName.sget()).append(":result:");
  964. if (resultName && *resultName)
  965. s.appendLower(strlen(resultName), resultName).append('\'');
  966. else
  967. s.append("result_").append(sequence+1).append('\'');
  968. if (xmlns)
  969. {
  970. Owned<IPropertyIterator> it = const_cast<IProperties*>(xmlns)->getIterator(); //should fix IProperties to be const friendly
  971. ForEach(*it)
  972. {
  973. const char *name = it->getPropKey();
  974. s.append(' ');
  975. if (!streq(name, "xmlns"))
  976. s.append("xmlns:");
  977. s.append(name).append("='");
  978. encodeUtf8XML(const_cast<IProperties*>(xmlns)->queryProp(name), s);
  979. s.append("'");
  980. }
  981. }
  982. }
  983. if (resultName && *resultName)
  984. s.appendf(" name='%s'",resultName);
  985. else if (sequence != (unsigned) -1)
  986. s.appendf(" name='Result %d'",sequence+1);
  987. s.append(">\n");
  988. tail.clear().appendf("</%s>\n", elementName);
  989. }
  990. }
  991. isEmpty = false;
  992. }
  993. }
  994. void FlushingStringBuffer::startScalar(const char *resultName, unsigned sequence)
  995. {
  996. CriticalBlock b(crit);
  997. assertex(!s.length());
  998. name.clear().append(resultName ? resultName : "Dataset");
  999. sequenceNumber = 0;
  1000. startBlock();
  1001. if (!isBlocked)
  1002. {
  1003. if (mlFmt==MarkupFmt_XML)
  1004. {
  1005. tail.clear();
  1006. s.append("<Dataset");
  1007. if (isSoap && (resultName || (sequence != (unsigned) -1)))
  1008. {
  1009. s.append(" xmlns=\'urn:hpccsystems:ecl:").appendLower(queryName.length(), queryName.sget()).append(":result:");
  1010. if (resultName && *resultName)
  1011. s.appendLower(strlen(resultName), resultName).append('\'');
  1012. else
  1013. s.append("result_").append(sequence+1).append('\'');
  1014. }
  1015. if (resultName && *resultName)
  1016. s.appendf(" name='%s'>\n",resultName);
  1017. else
  1018. s.appendf(" name='Result %d'>\n",sequence+1);
  1019. s.append(" <Row>");
  1020. if (resultName && *resultName)
  1021. {
  1022. s.appendf("<%s>", resultName);
  1023. tail.appendf("</%s>", resultName);
  1024. }
  1025. else
  1026. {
  1027. s.appendf("<Result_%d>", sequence+1);
  1028. tail.appendf("</Result_%d>", sequence+1);
  1029. }
  1030. tail.appendf("</Row>\n</Dataset>\n");
  1031. }
  1032. else if (!isRaw)
  1033. {
  1034. tail.clear().append('\n');
  1035. }
  1036. }
  1037. }
  1038. void FlushingStringBuffer::setScalarInt(const char *resultName, unsigned sequence, __int64 value, unsigned size)
  1039. {
  1040. startScalar(resultName, sequence);
  1041. s.append(value);
  1042. }
  1043. void FlushingStringBuffer::setScalarUInt(const char *resultName, unsigned sequence, unsigned __int64 value, unsigned size)
  1044. {
  1045. startScalar(resultName, sequence);
  1046. s.append(value);
  1047. }
  1048. void FlushingStringBuffer::incrementRowCount()
  1049. {
  1050. CriticalBlock b(crit);
  1051. rowCount++;
  1052. }
  1053. void FlushingJsonBuffer::append(double data)
  1054. {
  1055. CriticalBlock b(crit);
  1056. appendJSONRealValue(s, NULL, data);
  1057. }
  1058. void FlushingJsonBuffer::encodeString(const char *x, unsigned len, bool utf8)
  1059. {
  1060. CriticalBlock b(crit);
  1061. appendJSONStringValue(s, NULL, len, x, true);
  1062. }
  1063. void FlushingJsonBuffer::encodeData(const void *data, unsigned len)
  1064. {
  1065. CriticalBlock b(crit);
  1066. appendJSONDataValue(s, NULL, len, data);
  1067. }
  1068. void FlushingJsonBuffer::startDataset(const char *elementName, const char *resultName, unsigned sequence, bool _extend, const IProperties *xmlns)
  1069. {
  1070. CriticalBlock b(crit);
  1071. extend = _extend;
  1072. if (isEmpty || !extend)
  1073. {
  1074. name.clear().append(resultName ? resultName : elementName);
  1075. sequenceNumber = 0;
  1076. startBlock();
  1077. if (!isBlocked)
  1078. {
  1079. StringBuffer seqName;
  1080. if (!resultName || !*resultName)
  1081. resultName = seqName.appendf("result_%d", sequence+1).str();
  1082. appendJSONName(s, resultName).append('{');
  1083. tail.set("}");
  1084. }
  1085. isEmpty = false;
  1086. }
  1087. }
  1088. void FlushingJsonBuffer::startScalar(const char *resultName, unsigned sequence)
  1089. {
  1090. CriticalBlock b(crit);
  1091. assertex(!s.length());
  1092. name.set(resultName ? resultName : "Dataset");
  1093. sequenceNumber = 0;
  1094. startBlock();
  1095. if (!isBlocked)
  1096. {
  1097. StringBuffer seqName;
  1098. if (!resultName || !*resultName)
  1099. resultName = seqName.appendf("Result_%d", sequence+1).str();
  1100. appendJSONName(s, resultName).append('{');
  1101. appendJSONName(s, "Row").append("[{");
  1102. appendJSONName(s, resultName);
  1103. tail.set("}]}");
  1104. }
  1105. }
  1106. void FlushingJsonBuffer::setScalarInt(const char *resultName, unsigned sequence, __int64 value, unsigned size)
  1107. {
  1108. startScalar(resultName, sequence);
  1109. if (size < 7) //JavaScript only supports 53 significant bits
  1110. s.append(value);
  1111. else
  1112. s.append('"').append(value).append('"');
  1113. }
  1114. void FlushingJsonBuffer::setScalarUInt(const char *resultName, unsigned sequence, unsigned __int64 value, unsigned size)
  1115. {
  1116. startScalar(resultName, sequence);
  1117. if (size < 7) //JavaScript doesn't support unsigned, and only supports 53 significant bits
  1118. s.append(value);
  1119. else
  1120. s.append('"').append(value).append('"');
  1121. }
  1122. //=====================================================================================================
  1123. ClusterWriteHandler::ClusterWriteHandler(char const * _logicalName, char const * _activityType)
  1124. : logicalName(_logicalName), activityType(_activityType)
  1125. {
  1126. makePhysicalPartName(logicalName.get(), 1, 1, physicalName, false);
  1127. splitFilename(physicalName, &physicalDir, &physicalDir, &physicalBase, &physicalBase);
  1128. }
  1129. void ClusterWriteHandler::addCluster(char const * cluster)
  1130. {
  1131. Owned<IGroup> group = queryNamedGroupStore().lookup(cluster);
  1132. if (!group)
  1133. throw MakeStringException(0, "Unknown cluster %s while writing file %s", cluster, logicalName.get());
  1134. if (group->isMember())
  1135. {
  1136. if (localCluster)
  1137. throw MakeStringException(0, "Cluster %s occupies node already specified while writing file %s", cluster,
  1138. logicalName.get());
  1139. localClusterName.set(cluster);
  1140. localCluster.set(group);
  1141. }
  1142. else
  1143. {
  1144. ForEachItemIn(idx, remoteNodes)
  1145. {
  1146. Owned<INode> other = remoteNodes.item(idx).getNode(0);
  1147. if (group->isMember(other))
  1148. throw MakeStringException(0, "Cluster %s occupies node already specified while writing file %s",
  1149. cluster, logicalName.get());
  1150. }
  1151. remoteNodes.append(*group.getClear());
  1152. remoteClusters.append(cluster);
  1153. }
  1154. }
  1155. void ClusterWriteHandler::getLocalPhysicalFilename(StringAttr & out) const
  1156. {
  1157. if(localCluster.get())
  1158. out.set(physicalName.str());
  1159. else
  1160. getTempFilename(out);
  1161. PROGLOG("%s(CLUSTER) for logical filename %s writing to local file %s", activityType.get(), logicalName.get(), out.get());
  1162. }
  1163. void ClusterWriteHandler::splitPhysicalFilename(StringBuffer & dir, StringBuffer & base) const
  1164. {
  1165. dir.append(physicalDir);
  1166. base.append(physicalBase);
  1167. }
  1168. void ClusterWriteHandler::getTempFilename(StringAttr & out) const
  1169. {
  1170. // Should be implemented by more derived (platform-specific) class, if needed
  1171. throwUnexpected();
  1172. }
  1173. void ClusterWriteHandler::copyPhysical(IFile * source, bool noCopy) const
  1174. {
  1175. RemoteFilename rdn, rfn;
  1176. rdn.setLocalPath(physicalDir.str());
  1177. rfn.setLocalPath(physicalName.str());
  1178. ForEachItemIn(idx, remoteNodes)
  1179. {
  1180. rdn.setEp(remoteNodes.item(idx).queryNode(0).endpoint());
  1181. rfn.setEp(remoteNodes.item(idx).queryNode(0).endpoint());
  1182. Owned<IFile> targetdir = createIFile(rdn);
  1183. Owned<IFile> target = createIFile(rfn);
  1184. PROGLOG("%s(CLUSTER) for logical filename %s copying %s to %s", activityType.get(), logicalName.get(), source->queryFilename(), target->queryFilename());
  1185. if(noCopy)
  1186. {
  1187. WARNLOG("Skipping remote copy due to debug option");
  1188. }
  1189. else
  1190. {
  1191. targetdir->createDirectory();
  1192. copyFile(target, source);
  1193. }
  1194. }
  1195. }
  1196. void ClusterWriteHandler::setDescriptorParts(IFileDescriptor * desc, char const * basename, IPropertyTree * attrs) const
  1197. {
  1198. if(!localCluster.get()&&(remoteNodes.ordinality()==0))
  1199. throw MakeStringException(0, "Attempting to write file to no clusters");
  1200. ClusterPartDiskMapSpec partmap; // will get this from group at some point
  1201. desc->setNumParts(1);
  1202. desc->setPartMask(basename);
  1203. if (localCluster)
  1204. desc->addCluster(localClusterName,localCluster, partmap);
  1205. ForEachItemIn(idx,remoteNodes)
  1206. desc->addCluster(remoteClusters.item(idx),&remoteNodes.item(idx), partmap);
  1207. if (attrs) {
  1208. // need to set part attr
  1209. IPartDescriptor *partdesc = desc->queryPart(0);
  1210. IPropertyTree &pprop = partdesc->queryProperties();
  1211. // bit of a kludge (should really set properties *after* creating part rather than passing prop tree in)
  1212. Owned<IAttributeIterator> ai = attrs->getAttributes();
  1213. ForEach(*ai)
  1214. pprop.setProp(ai->queryName(),ai->queryValue());
  1215. }
  1216. }
  1217. void ClusterWriteHandler::finish(IFile * file) const
  1218. {
  1219. if(!localCluster.get())
  1220. {
  1221. PROGLOG("%s(CLUSTER) for logical filename %s removing temporary file %s", activityType.get(), logicalName.get(), file->queryFilename());
  1222. file->remove();
  1223. }
  1224. }
  1225. void ClusterWriteHandler::getClusters(StringArray &clusters) const
  1226. {
  1227. if(localCluster)
  1228. clusters.append(localClusterName);
  1229. ForEachItemIn(c, remoteClusters)
  1230. clusters.append(remoteClusters.item(c));
  1231. }
  1232. //=====================================================================================================
  1233. class COrderedOutputSerializer : implements IOrderedOutputSerializer, public CInterface
  1234. {
  1235. class COrderedResult : public CInterface
  1236. {
  1237. bool closed;
  1238. StringBuffer sb;
  1239. public:
  1240. IMPLEMENT_IINTERFACE;
  1241. COrderedResult() : closed(false) {}
  1242. bool flush(FILE * outFile, bool onlyClosed)
  1243. {
  1244. if (closed || !onlyClosed)
  1245. {
  1246. if (sb.length())
  1247. {
  1248. ::fwrite(sb.str(), sb.length(), 1, outFile);
  1249. sb.clear();
  1250. }
  1251. }
  1252. return closed;
  1253. }
  1254. size32_t printf(const char *format, va_list args)
  1255. {
  1256. if (closed)
  1257. throw MakeStringException(0, "Attempting to append to previously closed result in COrderedResult::printf");
  1258. int prevLen = sb.length();
  1259. sb.valist_appendf(format, args);
  1260. return sb.length() - prevLen;
  1261. }
  1262. size32_t fwrite(const void * data, size32_t size, size32_t count)
  1263. {
  1264. if (closed)
  1265. throw MakeStringException(0, "Attempting to append to previously closed result in COrderedResult::fwrite");
  1266. size32_t len = size * count;
  1267. sb.append(len, (const char *)data);
  1268. return len;
  1269. }
  1270. void close(bool nl)
  1271. {
  1272. if (closed)
  1273. throw MakeStringException(0, "Attempting to reclose result in COrderedResult::close");
  1274. if (nl)
  1275. sb.append('\n');
  1276. closed = true;
  1277. }
  1278. };
  1279. CIArrayOf<COrderedResult> COrderedResultArr;
  1280. int lastSeqFlushed;
  1281. FILE * outFile;
  1282. CriticalSection crit;
  1283. COrderedResult * getResult(size32_t seq)
  1284. {
  1285. while ((int)COrderedResultArr.ordinality() < (seq+1))
  1286. COrderedResultArr.append(*(new COrderedResult()));
  1287. return &COrderedResultArr.item(seq);
  1288. }
  1289. void flushCurrent()//stream current sequence
  1290. {
  1291. COrderedResult &res = COrderedResultArr.item(lastSeqFlushed + 1);
  1292. res.flush(outFile,false);
  1293. fflush(outFile);
  1294. }
  1295. void flushCompleted(bool onlyClosed)//flush completed sequence(s)
  1296. {
  1297. int lastSeq = (int)COrderedResultArr.ordinality()-1;
  1298. for (; lastSeqFlushed < lastSeq; lastSeqFlushed++)
  1299. {
  1300. COrderedResult &res = COrderedResultArr.item(lastSeqFlushed + 1);
  1301. if (!res.flush(outFile,onlyClosed) && onlyClosed)
  1302. break;
  1303. }
  1304. fflush(outFile);
  1305. }
  1306. public:
  1307. IMPLEMENT_IINTERFACE;
  1308. COrderedOutputSerializer(FILE* _outFile) : lastSeqFlushed(-1), outFile(_outFile) {}
  1309. ~COrderedOutputSerializer()
  1310. {
  1311. if (lastSeqFlushed != (COrderedResultArr.ordinality()-1))
  1312. flushCompleted(false);
  1313. COrderedResultArr.kill();
  1314. }
  1315. //IOrderedOutputSerializer
  1316. size32_t fwrite(int seq, const void * data, size32_t size, size32_t count)
  1317. {
  1318. CriticalBlock c(crit);
  1319. size32_t ret = getResult(seq)->fwrite(data,size, count);
  1320. if (seq == (lastSeqFlushed + 1))
  1321. flushCurrent();
  1322. return ret;
  1323. }
  1324. size32_t printf(int seq, const char *format, ...) __attribute__((format(printf, 3, 4)))
  1325. {
  1326. CriticalBlock c(crit);
  1327. va_list args;
  1328. va_start(args, format);
  1329. int ret = getResult(seq)->printf(format, args);
  1330. va_end(args);
  1331. if (seq == (lastSeqFlushed + 1))
  1332. flushCurrent();
  1333. return ret;
  1334. }
  1335. void close(int seq, bool nl)
  1336. {
  1337. CriticalBlock c(crit);
  1338. getResult(seq)->close(nl);
  1339. if ( seq == (lastSeqFlushed+1) )
  1340. flushCompleted(true);
  1341. }
  1342. };
  1343. IOrderedOutputSerializer * createOrderedOutputSerializer(FILE * _outFile)
  1344. {
  1345. return new COrderedOutputSerializer(_outFile);
  1346. }
  1347. //=====================================================================================================
  1348. StringBuffer & mangleHelperFileName(StringBuffer & out, const char * in, const char * wuid, unsigned int flags)
  1349. {
  1350. out = in;
  1351. if (flags & (TDXtemporary | TDXjobtemp))
  1352. out.append("__").append(wuid);
  1353. return out;
  1354. }
  1355. StringBuffer & mangleLocalTempFilename(StringBuffer & out, char const * in)
  1356. {
  1357. char const * start = in;
  1358. while(true)
  1359. {
  1360. char const * end = strstr(start, "::");
  1361. if(end)
  1362. {
  1363. out.append(end-start, start).append("__scope__");
  1364. start = end + 2;
  1365. }
  1366. else
  1367. {
  1368. out.append(start);
  1369. break;
  1370. }
  1371. }
  1372. return out;
  1373. }
  1374. static const char *skipLfnForeign(const char *lfn)
  1375. {
  1376. while (*lfn=='~')
  1377. lfn++;
  1378. const char *finger = lfn;
  1379. const char *scope = strstr(finger, "::");
  1380. if (scope)
  1381. {
  1382. StringBuffer cmp;
  1383. if (strieq("foreign", cmp.append(scope-finger, finger).trim()))
  1384. {
  1385. // foreign scope - need to strip off the ip and port
  1386. scope += 2; // skip ::
  1387. finger = strstr(scope,"::");
  1388. if (finger)
  1389. {
  1390. finger += 2;
  1391. while (*finger == ' ')
  1392. finger++;
  1393. return finger;
  1394. }
  1395. }
  1396. }
  1397. return lfn;
  1398. }
  1399. StringBuffer & expandLogicalFilename(StringBuffer & logicalName, const char * fname, IConstWorkUnit * wu, bool resolveLocally, bool ignoreForeignPrefix)
  1400. {
  1401. const char *native = (ignoreForeignPrefix) ? skipLfnForeign(fname) : fname; //for published roxie queries foreign location already reflected in local dfs meta data
  1402. if (fname[0]=='~')
  1403. logicalName.append(native);
  1404. else if (resolveLocally)
  1405. {
  1406. StringBuffer sb(native);
  1407. sb.replaceString("::",PATHSEPSTR);
  1408. makeAbsolutePath(sb.str(), logicalName.clear());
  1409. }
  1410. else
  1411. {
  1412. SCMStringBuffer lfn;
  1413. if (wu)
  1414. {
  1415. wu->getScope(lfn);
  1416. if(lfn.length())
  1417. logicalName.append(lfn.s).append("::");
  1418. }
  1419. logicalName.append(native);
  1420. }
  1421. return logicalName;
  1422. }
  1423. //----------------------------------------------------------------------------------
  1424. void IRoxieContextLogger::CTXLOGae(IException *E, const char *file, unsigned line, const char *prefix, const char *format, ...) const
  1425. {
  1426. va_list args;
  1427. va_start(args, format);
  1428. CTXLOGaeva(E, file, line, prefix, format, args);
  1429. va_end(args);
  1430. }
  1431. void HttpHelper::parseURL()
  1432. {
  1433. const char *start = url.str();
  1434. while (isspace(*start))
  1435. start++;
  1436. if (*start=='/')
  1437. start++;
  1438. StringAttr path;
  1439. const char *finger = strpbrk(start, "?");
  1440. if (finger)
  1441. path.set(start, finger-start);
  1442. else
  1443. path.set(start);
  1444. if (path.length())
  1445. pathNodes.appendList(path, "/");
  1446. if (!finger)
  1447. return;
  1448. finger++;
  1449. while (*finger)
  1450. {
  1451. StringBuffer s, prop, val;
  1452. while (*finger && *finger != '&' && *finger != '=')
  1453. s.append(*finger++);
  1454. appendDecodedURL(prop, s.trim());
  1455. if (!*finger || *finger == '&')
  1456. val.set("1");
  1457. else
  1458. {
  1459. s.clear();
  1460. finger++;
  1461. while (*finger && *finger != '&')
  1462. s.append(*finger++);
  1463. appendDecodedURL(val, s.trim());
  1464. }
  1465. if (prop.length())
  1466. parameters->setProp(prop, val);
  1467. if (*finger)
  1468. finger++;
  1469. }
  1470. }