roxiehelper.cpp 81 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473247424752476247724782479248024812482248324842485248624872488248924902491249224932494249524962497249824992500250125022503250425052506250725082509251025112512251325142515251625172518251925202521252225232524252525262527252825292530253125322533253425352536253725382539254025412542254325442545254625472548254925502551255225532554255525562557255825592560256125622563256425652566256725682569257025712572257325742575257625772578257925802581258225832584258525862587258825892590259125922593259425952596259725982599260026012602260326042605260626072608260926102611261226132614261526162617261826192620262126222623262426252626262726282629263026312632263326342635263626372638263926402641264226432644264526462647264826492650265126522653265426552656265726582659266026612662266326642665266626672668266926702671267226732674267526762677267826792680268126822683268426852686268726882689269026912692269326942695269626972698269927002701270227032704270527062707270827092710271127122713271427152716271727182719272027212722272327242725272627272728272927302731273227332734273527362737273827392740274127422743274427452746274727482749275027512752275327542755275627572758275927602761276227632764276527662767276827692770277127722773277427752776277727782779278027812782278327842785278627872788278927902791279227932794279527962797279827992800280128022803280428052806280728082809
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "jexcept.hpp"
  14. #include "thorherror.h"
  15. #include "roxiehelper.hpp"
  16. #include "roxielmj.hpp"
  17. #include "roxierow.hpp"
  18. #include "roxierowbuff.hpp"
  19. #include "jmisc.hpp"
  20. #include "jfile.hpp"
  21. #include "mpbase.hpp"
  22. #include "dafdesc.hpp"
  23. #include "dadfs.hpp"
  24. unsigned traceLevel = 0;
  25. //OwnedRowArray
  26. void OwnedRowArray::clear()
  27. {
  28. roxiemem::ReleaseRoxieRowArray(buff.ordinality(), buff.getArray());
  29. buff.kill();
  30. }
  31. void OwnedRowArray::clearPart(aindex_t from, aindex_t to)
  32. {
  33. roxiemem::ReleaseRoxieRowRange(buff.getArray(), from, to);
  34. buff.removen(from, to-from);
  35. }
  36. void OwnedRowArray::replace(const void * row, aindex_t pos)
  37. {
  38. ReleaseRoxieRow(buff.item(pos));
  39. buff.replace(row, pos);
  40. }
  41. //=========================================================================================
  42. //CRHRollingCacheElem copied/modified from THOR
  43. CRHRollingCacheElem::CRHRollingCacheElem()
  44. {
  45. row = NULL;
  46. cmp = INT_MIN;
  47. }
  48. CRHRollingCacheElem::~CRHRollingCacheElem()
  49. {
  50. if (row)
  51. ReleaseRoxieRow(row);
  52. }
  53. void CRHRollingCacheElem::set(const void *_row)
  54. {
  55. if (row)
  56. ReleaseRoxieRow(row);
  57. row = _row;
  58. }
  59. //CRHRollingCache copied/modified from THOR CRollingCache
  60. CRHRollingCache::~CRHRollingCache()
  61. {
  62. loop
  63. {
  64. CRHRollingCacheElem *e = cache.dequeue();
  65. if (!e)
  66. break;
  67. delete e;
  68. }
  69. }
  70. void CRHRollingCache::init(IInputBase *_in, unsigned _max)
  71. {
  72. max = _max;
  73. in =_in;
  74. cache.clear();
  75. cache.reserve(max);
  76. eos = false;
  77. while (cache.ordinality()<max/2)
  78. cache.enqueue(NULL);
  79. while (!eos && (cache.ordinality()<max))
  80. advance();
  81. }
  82. #ifdef TRACEROLLING
  83. void CRHRollingCache::PrintCache()
  84. {
  85. for (unsigned i = 0;i<max;i++) {
  86. CRHRollingCacheElem *e = cache.item(i);
  87. if (i==0)
  88. DBGLOG("RC==============================");
  89. int ii = 0;
  90. if (e && e->row)
  91. ii = isalpha(*((char*)e->row)) ? 0 : 4;
  92. chas sz[100];
  93. sprintf(sz,"%c%d: %s",(i==max/2)?'>':' ',i,e?(const char *)e->row+ii:"-----");
  94. for (int xx=0; sz[xx] != NULL; xx++)
  95. {
  96. if (!isprint(sz[xx]))
  97. {
  98. sz[xx] = NULL;
  99. break;
  100. }
  101. }
  102. DBGLOG(sz);
  103. if (i == max-1)
  104. DBGLOG("RC==============================");
  105. }
  106. }
  107. #endif
  108. CRHRollingCacheElem * CRHRollingCache::mid(int rel)
  109. {
  110. return cache.item((max/2)+rel); // relies on unsigned wrap
  111. }
  112. void CRHRollingCache::advance()
  113. {
  114. CRHRollingCacheElem *e = (cache.ordinality()==max)?cache.dequeue():NULL; //cache full, remove head element
  115. if (!eos) {
  116. if (!e)
  117. e = new CRHRollingCacheElem();
  118. const void * nextrec = in->nextInGroup();//get row from CRHCRHDualCache::cOut, which gets from CRHCRHDualCache, which gets from input
  119. if (!nextrec)
  120. nextrec = in->nextInGroup();
  121. if (nextrec) {
  122. e->set(nextrec);
  123. cache.enqueue(e);
  124. #ifdef TRACEROLLING
  125. PrintCache();
  126. #endif
  127. return;
  128. }
  129. else
  130. eos = true;
  131. }
  132. delete e;
  133. cache.enqueue(NULL);
  134. #ifdef TRACEROLLING
  135. PrintCache();
  136. #endif
  137. }
  138. //=========================================================================================
  139. //CRHDualCache copied from THOR, and modified to get input from IInputBase instead
  140. //of IReadSeqVar and to manage rows as OwnedRoxieRow types
  141. CRHDualCache::CRHDualCache()
  142. {
  143. strm1 = NULL;
  144. strm2 = NULL;
  145. }
  146. CRHDualCache::~CRHDualCache()
  147. {
  148. ::Release(strm1);
  149. ::Release(strm2);
  150. loop
  151. {
  152. CRHRollingCacheElem *e = cache.dequeue();
  153. if (!e)
  154. break;
  155. delete e;
  156. }
  157. }
  158. void CRHDualCache::init(IInputBase * _in)
  159. {
  160. in = _in;
  161. cache.clear();
  162. eos = false;
  163. base = 0;
  164. posL = 0;
  165. posR = 0;
  166. strm1 = new cOut(this,posL);
  167. strm2 = new cOut(this,posR) ;
  168. }
  169. #ifdef TRACEROLLING
  170. void CRHDualCache::PrintCache()
  171. {
  172. for (unsigned i = 0;i<cache.ordinality();i++) {
  173. CRHRollingCacheElem *e = cache.item(i);
  174. if (i==0)
  175. {
  176. DBGLOG("DC=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-BASE:%d,posL=%d,posR=%d %s", base, posL,posR, eos?"EOS":"");
  177. }
  178. DBGLOG("%c%d: %s",(i==cache.ordinality()/2)?'>':' ',i,e?(const char *)e->row:"-----");
  179. if (i == cache.ordinality()-1)
  180. DBGLOG("DC=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-");
  181. }
  182. }
  183. #endif
  184. bool CRHDualCache::get(unsigned n, CRHRollingCacheElem *&out)
  185. {
  186. // take off any no longer needed
  187. CRHRollingCacheElem *e=NULL;
  188. while ((base<posL) && (base<posR)) {
  189. delete e;
  190. e = cache.dequeue();
  191. base++;
  192. }
  193. assertex(n>=base);
  194. while (!eos && (n-base>=cache.ordinality())) //element already in cache?
  195. {
  196. if (!e)
  197. e = new CRHRollingCacheElem;
  198. const void * nextrec = in->nextInGroup(); //get from activity
  199. if (!nextrec)
  200. nextrec = in->nextInGroup();
  201. if (!nextrec) {
  202. eos = true;
  203. break;
  204. }
  205. e->set(nextrec);
  206. cache.enqueue(e);
  207. e = NULL;
  208. #ifdef TRACEROLLING
  209. PrintCache();
  210. #endif
  211. }
  212. delete e;
  213. if (n-base>=cache.ordinality())
  214. return false;
  215. out = cache.item(n-base);
  216. return true;
  217. }
  218. size32_t CRHDualCache::getRecordSize(const void *ptr)
  219. {
  220. return in->queryOutputMeta()->getRecordSize(ptr);
  221. }
  222. size32_t CRHDualCache::getFixedSize() const
  223. {
  224. return in->queryOutputMeta()->getFixedSize();
  225. }
  226. size32_t CRHDualCache::getMinRecordSize() const
  227. {
  228. return in->queryOutputMeta()->getMinRecordSize();
  229. }
  230. CRHDualCache::cOut::cOut(CRHDualCache *_parent, unsigned &_pos)
  231. : pos(_pos)
  232. {
  233. parent = _parent;
  234. stopped = false;
  235. }
  236. const void * CRHDualCache::cOut::nextInGroup()
  237. {
  238. CRHRollingCacheElem *e;
  239. if (stopped || !parent->get(pos,e))
  240. return NULL; //no more data
  241. LinkRoxieRow(e->row);
  242. pos++;
  243. return e->row;
  244. }
  245. IOutputMetaData * CRHDualCache::cOut::queryOutputMeta() const
  246. {
  247. return parent->input()->queryOutputMeta();
  248. }
  249. void CRHDualCache::cOut::stop()
  250. {
  251. pos = (unsigned)-1;
  252. stopped = true;
  253. }
  254. //=========================================================================================
  255. IRHLimitedCompareHelper *createRHLimitedCompareHelper()
  256. {
  257. return new CRHLimitedCompareHelper();
  258. }
  259. //CRHLimitedCompareHelper
  260. void CRHLimitedCompareHelper::init( unsigned _atmost,
  261. IInputBase *_in,
  262. ICompare * _cmp,
  263. ICompare * _limitedcmp )
  264. {
  265. atmost = _atmost;
  266. cache.setown(new CRHRollingCache());
  267. cache->init(_in,(atmost+1)*2);
  268. cmp = _cmp;
  269. limitedcmp = _limitedcmp;
  270. }
  271. bool CRHLimitedCompareHelper::getGroup(OwnedRowArray &group, const void *left)
  272. {
  273. // this could be improved!
  274. // first move 'mid' forwards until mid>=left
  275. int low = 0;
  276. loop
  277. {
  278. CRHRollingCacheElem * r = cache->mid(0);
  279. if (!r)
  280. break; // hit eos
  281. int c = cmp->docompare(left,r->row);
  282. if (c == 0)
  283. {
  284. r->cmp = limitedcmp->docompare(left,r->row);
  285. if (r->cmp <= 0)
  286. break;
  287. }
  288. else if (c < 0)
  289. {
  290. r->cmp = -1;
  291. break;
  292. }
  293. else
  294. r->cmp = 1;
  295. cache->advance();
  296. if (cache->mid(low-1)) // only if haven't hit start
  297. low--;
  298. }
  299. // now scan back (note low should be filled even at eos)
  300. loop
  301. {
  302. CRHRollingCacheElem * pr = cache->mid(low-1);
  303. if (!pr)
  304. break; // hit start
  305. int c = cmp->docompare(left,pr->row);
  306. if (c == 0)
  307. {
  308. pr->cmp = limitedcmp->docompare(left,pr->row);
  309. if (pr->cmp==1)
  310. break;
  311. }
  312. else
  313. {
  314. pr->cmp = 1;
  315. break;
  316. }
  317. low--;
  318. }
  319. int high = 0;
  320. if (cache->mid(0)) // check haven't already hit end
  321. {
  322. // now scan fwd
  323. loop
  324. {
  325. high++;
  326. CRHRollingCacheElem * nr = cache->mid(high);
  327. if (!nr)
  328. break;
  329. int c = cmp->docompare(left,nr->row);
  330. if (c==0)
  331. {
  332. nr->cmp = limitedcmp->docompare(left,nr->row);
  333. if (nr->cmp==-1)
  334. break;
  335. }
  336. else
  337. {
  338. nr->cmp = -1;
  339. break;
  340. }
  341. }
  342. }
  343. while (high-low>(int)atmost)
  344. {
  345. int vl = iabs(cache->mid(low)->cmp);
  346. int vh = iabs(cache->mid(high-1)->cmp);
  347. int v;
  348. if (vl==0)
  349. {
  350. if (vh==0) // both ends equal
  351. return false;
  352. v = vh;
  353. }
  354. else if (vh==0)
  355. v = vl;
  356. else
  357. v = imin(vl,vh);
  358. // remove worst match from either end
  359. while ((low<high)&&(iabs(cache->mid(low)->cmp)==v))
  360. low++;
  361. while ((low<high)&&(iabs(cache->mid(high-1)->cmp)==v))
  362. high--;
  363. if (low>=high)
  364. return true; // couldn't make group;
  365. }
  366. for (int i=low;i<high;i++)
  367. {
  368. CRHRollingCacheElem *r = cache->mid(i);
  369. LinkRoxieRow(r->row);
  370. group.append(r->row);
  371. }
  372. return group.ordinality()>0;
  373. }
  374. //=========================================================================================
  375. // default implementations - can be overridden for efficiency...
  376. bool ISimpleInputBase::nextGroup(ConstPointerArray & group)
  377. {
  378. // MORE - this should be replaced with a version that reads to a builder
  379. const void * next;
  380. while ((next = nextInGroup()) != NULL)
  381. group.append(next);
  382. if (group.ordinality())
  383. return true;
  384. return false;
  385. }
  386. void ISimpleInputBase::readAll(RtlLinkedDatasetBuilder &builder)
  387. {
  388. loop
  389. {
  390. const void *nextrec = nextInGroup();
  391. if (!nextrec)
  392. {
  393. nextrec = nextInGroup();
  394. if (!nextrec)
  395. break;
  396. builder.appendEOG();
  397. }
  398. builder.appendOwn(nextrec);
  399. }
  400. }
  401. //=========================================================================================
  402. // Ability to read an input stream and group and/or sort it on-the-fly
  403. using roxiemem::OwnedConstRoxieRow;
  404. class InputReaderBase : public CInterfaceOf<IGroupedInput>
  405. {
  406. protected:
  407. IInputBase *input;
  408. public:
  409. InputReaderBase(IInputBase *_input)
  410. : input(_input)
  411. {
  412. }
  413. virtual IOutputMetaData * queryOutputMeta() const
  414. {
  415. return input->queryOutputMeta();
  416. }
  417. };
  418. class GroupedInputReader : public InputReaderBase
  419. {
  420. protected:
  421. bool firstRead;
  422. bool eof;
  423. bool endGroupPending;
  424. OwnedConstRoxieRow next;
  425. const ICompare *compare;
  426. public:
  427. GroupedInputReader(IInputBase *_input, const ICompare *_compare)
  428. : InputReaderBase(_input), compare(_compare)
  429. {
  430. firstRead = false;
  431. eof = false;
  432. endGroupPending = false;
  433. }
  434. virtual const void *nextInGroup()
  435. {
  436. if (!firstRead)
  437. {
  438. firstRead = true;
  439. next.setown(input->nextInGroup());
  440. }
  441. if (eof || endGroupPending)
  442. {
  443. endGroupPending = false;
  444. return NULL;
  445. }
  446. OwnedConstRoxieRow prev(next.getClear());
  447. next.setown(input->nextUngrouped()); // skip incoming grouping if present
  448. if (next)
  449. {
  450. dbgassertex(prev); // If this fails, you have an initial empty group. That is not legal.
  451. if (compare && compare->docompare(prev, next) != 0)
  452. endGroupPending = true;
  453. }
  454. else
  455. eof = true;
  456. return prev.getClear();
  457. }
  458. };
  459. class DegroupedInputReader : public InputReaderBase
  460. {
  461. public:
  462. DegroupedInputReader(IInputBase *_input) : InputReaderBase(_input)
  463. {
  464. }
  465. virtual const void *nextInGroup()
  466. {
  467. return input->nextUngrouped();
  468. }
  469. };
  470. class SortedInputReader : public InputReaderBase
  471. {
  472. protected:
  473. DegroupedInputReader degroupedInput;
  474. Owned<ISortAlgorithm> sorter;
  475. bool firstRead;
  476. public:
  477. SortedInputReader(IInputBase *_input, ISortAlgorithm *_sorter)
  478. : InputReaderBase(_input), degroupedInput(_input), sorter(_sorter), firstRead(false)
  479. {
  480. sorter->reset();
  481. }
  482. virtual const void *nextInGroup()
  483. {
  484. if (!firstRead)
  485. {
  486. firstRead = true;
  487. sorter->prepare(&degroupedInput);
  488. }
  489. return sorter->next();
  490. }
  491. };
  492. class SortedGroupedInputReader : public SortedInputReader
  493. {
  494. protected:
  495. bool eof;
  496. bool endGroupPending;
  497. OwnedConstRoxieRow next;
  498. const ICompare *compare;
  499. public:
  500. SortedGroupedInputReader(IInputBase *_input, const ICompare *_compare, ISortAlgorithm *_sorter)
  501. : SortedInputReader(_input, _sorter), compare(_compare), eof(false), endGroupPending(false)
  502. {
  503. }
  504. virtual const void *nextInGroup()
  505. {
  506. if (!firstRead)
  507. {
  508. firstRead = true;
  509. sorter->prepare(&degroupedInput);
  510. next.setown(sorter->next());
  511. }
  512. if (eof || endGroupPending)
  513. {
  514. endGroupPending = false;
  515. return NULL;
  516. }
  517. OwnedConstRoxieRow prev(next.getClear());
  518. next.setown(sorter->next());
  519. if (next)
  520. {
  521. dbgassertex(prev); // If this fails, you have an initial empty group. That is not legal.
  522. if (compare->docompare(prev, next) != 0) // MORE - could assert >=0, as input is supposed to be sorted
  523. endGroupPending = true;
  524. }
  525. else
  526. eof = true;
  527. return prev.getClear();
  528. }
  529. };
  530. extern IGroupedInput *createGroupedInputReader(IInputBase *_input, const ICompare *_groupCompare)
  531. {
  532. dbgassertex(_input && _groupCompare);
  533. return new GroupedInputReader(_input, _groupCompare);
  534. }
  535. extern IGroupedInput *createDegroupedInputReader(IInputBase *_input)
  536. {
  537. dbgassertex(_input);
  538. return new DegroupedInputReader(_input);
  539. }
  540. extern IGroupedInput *createSortedInputReader(IInputBase *_input, ISortAlgorithm *_sorter)
  541. {
  542. dbgassertex(_input && _sorter);
  543. return new SortedInputReader(_input, _sorter);
  544. }
  545. extern IGroupedInput *createSortedGroupedInputReader(IInputBase *_input, const ICompare *_groupCompare, ISortAlgorithm *_sorter)
  546. {
  547. dbgassertex(_input && _groupCompare && _sorter);
  548. return new SortedGroupedInputReader(_input, _groupCompare, _sorter);
  549. }
  550. //=========================================================================================
  551. class CSortAlgorithm : implements CInterfaceOf<ISortAlgorithm>
  552. {
  553. public:
  554. CSortAlgorithm() { elapsedCycles = 0; }
  555. virtual void getSortedGroup(ConstPointerArray & result)
  556. {
  557. loop
  558. {
  559. const void * row = next();
  560. if (!row)
  561. return;
  562. result.append(row);
  563. }
  564. }
  565. virtual cycle_t getElapsedCycles(bool reset)
  566. {
  567. cycle_t ret = elapsedCycles;
  568. if (reset)
  569. elapsedCycles = 0;
  570. return ret;
  571. }
  572. protected:
  573. cycle_t elapsedCycles;
  574. };
  575. class CInplaceSortAlgorithm : public CSortAlgorithm
  576. {
  577. protected:
  578. unsigned curIndex;
  579. ConstPointerArray sorted;
  580. ICompare *compare;
  581. public:
  582. CInplaceSortAlgorithm(ICompare *_compare) : compare(_compare)
  583. {
  584. curIndex = 0;
  585. }
  586. virtual const void *next()
  587. {
  588. if (sorted.isItem(curIndex))
  589. return sorted.item(curIndex++);
  590. return NULL;
  591. }
  592. virtual void reset()
  593. {
  594. roxiemem::ReleaseRoxieRowRange(sorted.getArray(), curIndex, sorted.ordinality());
  595. curIndex = 0;
  596. sorted.kill();
  597. }
  598. virtual void getSortedGroup(ConstPointerArray & result)
  599. {
  600. sorted.swapWith(result);
  601. curIndex = 0;
  602. }
  603. };
  604. class CQuickSortAlgorithm : public CInplaceSortAlgorithm
  605. {
  606. public:
  607. CQuickSortAlgorithm(ICompare *_compare) : CInplaceSortAlgorithm(_compare) {}
  608. virtual void prepare(IInputBase *input)
  609. {
  610. curIndex = 0;
  611. if (input->nextGroup(sorted))
  612. {
  613. cycle_t startCycles = get_cycles_now();
  614. qsortvec(const_cast<void * *>(sorted.getArray()), sorted.ordinality(), *compare);
  615. elapsedCycles += (get_cycles_now() - startCycles);
  616. }
  617. }
  618. };
  619. class CTbbQuickSortAlgorithm : public CInplaceSortAlgorithm
  620. {
  621. public:
  622. CTbbQuickSortAlgorithm(ICompare *_compare) : CInplaceSortAlgorithm(_compare) {}
  623. virtual void prepare(IInputBase *input)
  624. {
  625. curIndex = 0;
  626. if (input->nextGroup(sorted))
  627. {
  628. cycle_t startCycles = get_cycles_now();
  629. tbbqsortvec(const_cast<void * *>(sorted.getArray()), sorted.ordinality(), *compare);
  630. elapsedCycles += (get_cycles_now() - startCycles);
  631. }
  632. }
  633. };
  634. class CStableInplaceSortAlgorithm : public CInplaceSortAlgorithm
  635. {
  636. public:
  637. CStableInplaceSortAlgorithm(ICompare *_compare) : CInplaceSortAlgorithm(_compare) {}
  638. virtual void sortRows(void * * rows, size_t numRows, void * * temp) = 0;
  639. virtual void prepare(IInputBase *input)
  640. {
  641. curIndex = 0;
  642. if (input->nextGroup(sorted))
  643. {
  644. unsigned numRows = sorted.ordinality();
  645. void **rows = const_cast<void * *>(sorted.getArray());
  646. MemoryAttr tempAttr(numRows*sizeof(void **)); // Temp storage for stable sort. This should probably be allocated from roxiemem
  647. void **temp = (void **) tempAttr.bufferBase();
  648. cycle_t startCycles = get_cycles_now();
  649. sortRows(rows, numRows, temp);
  650. elapsedCycles += (get_cycles_now() - startCycles);
  651. }
  652. }
  653. };
  654. class CStableQuickSortAlgorithm : public CStableInplaceSortAlgorithm
  655. {
  656. public:
  657. CStableQuickSortAlgorithm(ICompare *_compare) : CStableInplaceSortAlgorithm(_compare) {}
  658. virtual void sortRows(void * * rows, size_t numRows, void * * temp)
  659. {
  660. qsortvecstableinplace(rows, numRows, *compare, temp);
  661. }
  662. };
  663. class CMergeSortAlgorithm : public CStableInplaceSortAlgorithm
  664. {
  665. public:
  666. CMergeSortAlgorithm(ICompare *_compare) : CStableInplaceSortAlgorithm(_compare) {}
  667. virtual void sortRows(void * * rows, size_t numRows, void * * temp)
  668. {
  669. msortvecstableinplace(rows, numRows, *compare, temp);
  670. }
  671. };
  672. class CParallelMergeSortAlgorithm : public CStableInplaceSortAlgorithm
  673. {
  674. public:
  675. CParallelMergeSortAlgorithm(ICompare *_compare) : CStableInplaceSortAlgorithm(_compare) {}
  676. virtual void sortRows(void * * rows, size_t numRows, void * * temp)
  677. {
  678. parmsortvecstableinplace(rows, numRows, *compare, temp);
  679. }
  680. };
  681. class CTbbStableQuickSortAlgorithm : public CStableInplaceSortAlgorithm
  682. {
  683. public:
  684. CTbbStableQuickSortAlgorithm(ICompare *_compare) : CStableInplaceSortAlgorithm(_compare) {}
  685. virtual void sortRows(void * * rows, size_t numRows, void * * temp)
  686. {
  687. tbbqsortstable(rows, numRows, *compare, temp);
  688. }
  689. };
  690. #define INSERTION_SORT_BLOCKSIZE 1024
  691. class SortedBlock : public CInterface, implements IInterface
  692. {
  693. unsigned sequence;
  694. const void **rows;
  695. unsigned length;
  696. unsigned pos;
  697. SortedBlock(const SortedBlock &);
  698. public:
  699. IMPLEMENT_IINTERFACE;
  700. SortedBlock(unsigned _sequence, roxiemem::IRowManager *rowManager, unsigned activityId) : sequence(_sequence)
  701. {
  702. rows = (const void **) rowManager->allocate(INSERTION_SORT_BLOCKSIZE * sizeof(void *), activityId);
  703. length = 0;
  704. pos = 0;
  705. }
  706. ~SortedBlock()
  707. {
  708. roxiemem::ReleaseRoxieRowRange(rows, pos, length);
  709. ReleaseRoxieRow(rows);
  710. }
  711. int compareTo(SortedBlock *r, ICompare *compare)
  712. {
  713. int rc = compare->docompare(rows[pos], r->rows[r->pos]);
  714. if (!rc)
  715. rc = sequence - r->sequence;
  716. return rc;
  717. }
  718. const void *next()
  719. {
  720. if (pos < length)
  721. return rows[pos++];
  722. else
  723. return NULL;
  724. }
  725. inline bool eof()
  726. {
  727. return pos==length;
  728. }
  729. bool insert(const void *next, ICompare *_compare )
  730. {
  731. unsigned b = length;
  732. if (b == INSERTION_SORT_BLOCKSIZE)
  733. return false;
  734. else if (b < 7)
  735. {
  736. while (b)
  737. {
  738. if (_compare->docompare(next, rows[b-1]) >= 0)
  739. break;
  740. b--;
  741. }
  742. if (b != length)
  743. memmove(&rows[b+1], &rows[b], (length - b) * sizeof(void *));
  744. rows[b] = next;
  745. length++;
  746. return true;
  747. }
  748. else
  749. {
  750. unsigned int a = 0;
  751. while ((int)a<b)
  752. {
  753. int i = (a+b)/2;
  754. int rc = _compare->docompare(next, rows[i]);
  755. if (rc>=0)
  756. a = i+1;
  757. else
  758. b = i;
  759. }
  760. if (a != length)
  761. memmove(&rows[a+1], &rows[a], (length - a) * sizeof(void *));
  762. rows[a] = next;
  763. length++;
  764. return true;
  765. }
  766. }
  767. };
  768. class CInsertionSortAlgorithm : public CSortAlgorithm
  769. {
  770. SortedBlock *curBlock;
  771. unsigned blockNo;
  772. IArrayOf<SortedBlock> blocks;
  773. unsigned activityId;
  774. roxiemem::IRowManager *rowManager;
  775. ICompare *compare;
  776. void newBlock()
  777. {
  778. blocks.append(*curBlock);
  779. curBlock = new SortedBlock(blockNo++, rowManager, activityId);
  780. }
  781. inline static int doCompare(SortedBlock &l, SortedBlock &r, ICompare *compare)
  782. {
  783. return l.compareTo(&r, compare);
  784. }
  785. void makeHeap()
  786. {
  787. /* Permute blocks to establish the heap property
  788. For each element p, the children are p*2+1 and p*2+2 (provided these are in range)
  789. The children of p must both be greater than or equal to p
  790. The parent of a child c is given by p = (c-1)/2
  791. */
  792. unsigned i;
  793. unsigned n = blocks.length();
  794. SortedBlock **s = blocks.getArray();
  795. for (i=1; i<n; i++)
  796. {
  797. SortedBlock * r = s[i];
  798. int c = i; /* child */
  799. while (c > 0)
  800. {
  801. int p = (c-1)/2; /* parent */
  802. if ( doCompare( blocks.item(c), blocks.item(p), compare ) >= 0 )
  803. break;
  804. s[c] = s[p];
  805. s[p] = r;
  806. c = p;
  807. }
  808. }
  809. }
  810. void remakeHeap()
  811. {
  812. /* The row associated with block[0] will have changed
  813. This code restores the heap property
  814. */
  815. unsigned p = 0; /* parent */
  816. unsigned n = blocks.length();
  817. SortedBlock **s = blocks.getArray();
  818. while (1)
  819. {
  820. unsigned c = p*2 + 1; /* child */
  821. if ( c >= n )
  822. break;
  823. /* Select smaller child */
  824. if ( c+1 < n && doCompare( blocks.item(c+1), blocks.item(c), compare ) < 0 ) c += 1;
  825. /* If child is greater or equal than parent then we are done */
  826. if ( doCompare( blocks.item(c), blocks.item(p), compare ) >= 0 )
  827. break;
  828. /* Swap parent and child */
  829. SortedBlock *r = s[c];
  830. s[c] = s[p];
  831. s[p] = r;
  832. /* child becomes parent */
  833. p = c;
  834. }
  835. }
  836. public:
  837. CInsertionSortAlgorithm(ICompare *_compare, roxiemem::IRowManager *_rowManager, unsigned _activityId)
  838. : compare(_compare)
  839. {
  840. rowManager = _rowManager;
  841. activityId = _activityId;
  842. curBlock = NULL;
  843. blockNo = 0;
  844. }
  845. virtual void reset()
  846. {
  847. blocks.kill();
  848. delete curBlock;
  849. curBlock = NULL;
  850. blockNo = 0;
  851. }
  852. virtual void prepare(IInputBase *input)
  853. {
  854. blockNo = 0;
  855. curBlock = new SortedBlock(blockNo++, rowManager, activityId);
  856. loop
  857. {
  858. const void *next = input->nextInGroup();
  859. if (!next)
  860. break;
  861. if (!curBlock->insert(next, compare))
  862. {
  863. newBlock();
  864. curBlock->insert(next, compare);
  865. }
  866. }
  867. if (blockNo > 1)
  868. {
  869. blocks.append(*curBlock);
  870. curBlock = NULL;
  871. makeHeap();
  872. }
  873. }
  874. virtual const void * next()
  875. {
  876. const void *ret;
  877. if (blockNo==1) // single block case..
  878. {
  879. ret = curBlock->next();
  880. }
  881. else if (blocks.length())
  882. {
  883. SortedBlock &top = blocks.item(0);
  884. ret = top.next();
  885. if (top.eof())
  886. blocks.replace(blocks.popGet(), 0);
  887. remakeHeap();
  888. }
  889. else
  890. ret = NULL;
  891. return ret;
  892. }
  893. };
  894. class CHeapSortAlgorithm : public CSortAlgorithm
  895. {
  896. unsigned curIndex;
  897. ConstPointerArray sorted;
  898. bool inputAlreadySorted;
  899. IntArray sequences;
  900. bool eof;
  901. ICompare *compare;
  902. #ifdef _CHECK_HEAPSORT
  903. void checkHeap() const
  904. {
  905. unsigned n = sorted.ordinality();
  906. if (n)
  907. {
  908. ICompare *_compare = compare;
  909. void **s = sorted.getArray();
  910. int *sq = sequences.getArray();
  911. unsigned p;
  912. #if 0
  913. CTXLOG("------------------------%d entries-----------------", n);
  914. for (p = 0; p < n; p++)
  915. {
  916. CTXLOG("HEAP %d: %d %.10s", p, sq[p], s[p] ? s[p] : "..");
  917. }
  918. #endif
  919. for (p = 0; p < n; p++)
  920. {
  921. unsigned c = p*2+1;
  922. if (c<n)
  923. assertex(!s[c] || (docompare(p, c, _compare, s, sq) <= 0));
  924. c++;
  925. if (c<n)
  926. assertex(!s[c] || (docompare(p, c, _compare, s, sq) <= 0));
  927. }
  928. }
  929. }
  930. #else
  931. inline void checkHeap() const {}
  932. #endif
  933. const void *removeHeap()
  934. {
  935. unsigned n = sorted.ordinality();
  936. if (n)
  937. {
  938. const void *ret = sorted.item(0);
  939. if (n > 1 && ret)
  940. {
  941. ICompare *_compare = compare;
  942. const void **s = sorted.getArray();
  943. int *sq = sequences.getArray();
  944. unsigned v = 0; // vacancy
  945. loop
  946. {
  947. unsigned c = 2*v + 1;
  948. if (c < n)
  949. {
  950. unsigned f = c; // favourite to fill it
  951. c++;
  952. if (c < n && s[c] && (!s[f] || (docompare(f, c, _compare, s, sq) > 0))) // is the smaller of the children
  953. f = c;
  954. sq[v] = sq[f];
  955. if ((s[v] = s[f]) != NULL)
  956. v = f;
  957. else
  958. break;
  959. }
  960. else
  961. {
  962. s[v] = NULL;
  963. break;
  964. }
  965. }
  966. }
  967. checkHeap();
  968. return ret;
  969. }
  970. else
  971. return NULL;
  972. }
  973. static inline int docompare(unsigned l, unsigned r, ICompare *_compare, const void **s, int *sq)
  974. {
  975. int rc = _compare->docompare(s[l], s[r]);
  976. if (!rc)
  977. rc = sq[l] - sq[r];
  978. return rc;
  979. }
  980. void insertHeap(const void *next)
  981. {
  982. // Upside-down heap sort
  983. // Maintain a heap where every parent is lower than each of its children
  984. // Root (at node 0) is lowest record seen, nodes 2n+1, 2n+2 are the children
  985. // To insert a row, add it at end then keep swapping with parent as long as parent is greater
  986. // To remove a row, take row 0, then recreate heap by replacing it with smaller of two children and so on down the tree
  987. // Nice features:
  988. // 1. Deterministic
  989. // 2. Sort time can be overlapped with upstream/downstream processes - there is no delay between receiving last record from input and deliveriing first to output
  990. // 3. Already sorted case can be spotted at zero cost while reading.
  991. // 4. If you don't read all the results, you don't have to complete the sort
  992. // BUT it is NOT stable, so we have to use a parallel array of sequence numbers
  993. unsigned n = sorted.ordinality();
  994. sorted.append(next);
  995. sequences.append(n);
  996. if (!n)
  997. return;
  998. ICompare *_compare = compare;
  999. const void **s = sorted.getArray();
  1000. if (inputAlreadySorted)
  1001. {
  1002. if (_compare->docompare(next, s[n-1]) >= 0)
  1003. return;
  1004. else
  1005. {
  1006. // MORE - could delay creating sequences until now...
  1007. inputAlreadySorted = false;
  1008. }
  1009. }
  1010. int *sq = sequences.getArray();
  1011. unsigned q = n;
  1012. while (n)
  1013. {
  1014. unsigned parent = (n-1) / 2;
  1015. const void *p = s[parent];
  1016. if (_compare->docompare(p, next) <= 0)
  1017. break;
  1018. s[n] = p;
  1019. sq[n] = sq[parent];
  1020. s[parent] = next;
  1021. sq[parent] = q;
  1022. n = parent;
  1023. }
  1024. }
  1025. public:
  1026. CHeapSortAlgorithm(ICompare *_compare) : compare(_compare)
  1027. {
  1028. inputAlreadySorted = true;
  1029. curIndex = 0;
  1030. eof = false;
  1031. }
  1032. virtual void reset()
  1033. {
  1034. eof = false;
  1035. if (inputAlreadySorted)
  1036. {
  1037. roxiemem::ReleaseRoxieRowRange(sorted.getArray(), curIndex, sorted.ordinality());
  1038. sorted.kill();
  1039. }
  1040. else
  1041. {
  1042. roxiemem::ReleaseRoxieRows(sorted);
  1043. }
  1044. curIndex = 0;
  1045. inputAlreadySorted = true;
  1046. sequences.kill();
  1047. }
  1048. virtual void prepare(IInputBase *input)
  1049. {
  1050. inputAlreadySorted = true;
  1051. curIndex = 0;
  1052. eof = false;
  1053. assertex(sorted.ordinality()==0);
  1054. const void *next = input->nextInGroup();
  1055. if (!next)
  1056. {
  1057. eof = true;
  1058. return;
  1059. }
  1060. loop
  1061. {
  1062. insertHeap(next);
  1063. next = input->nextInGroup();
  1064. if (!next)
  1065. break;
  1066. }
  1067. checkHeap();
  1068. }
  1069. virtual const void * next()
  1070. {
  1071. if (inputAlreadySorted)
  1072. {
  1073. if (sorted.isItem(curIndex))
  1074. {
  1075. return sorted.item(curIndex++);
  1076. }
  1077. else
  1078. return NULL;
  1079. }
  1080. else
  1081. return removeHeap();
  1082. }
  1083. };
  1084. class CSpillingSortAlgorithm : public CSortAlgorithm, implements roxiemem::IBufferedRowCallback
  1085. {
  1086. enum {
  1087. InitialSortElements = 0,
  1088. //The number of rows that can be added without entering a critical section, and therefore also the number
  1089. //of rows that might not get freed when memory gets tight.
  1090. CommitStep=32
  1091. };
  1092. roxiemem::DynamicRoxieOutputRowArray rowsToSort;
  1093. roxiemem::RoxieSimpleInputRowArray sorted;
  1094. ICompare *compare;
  1095. roxiemem::IRowManager &rowManager;
  1096. Owned<IDiskMerger> diskMerger;
  1097. Owned<IRowStream> diskReader;
  1098. IOutputMetaData *rowMeta;
  1099. StringAttr tempDirectory;
  1100. ICodeContext *ctx;
  1101. unsigned activityId;
  1102. bool stable;
  1103. public:
  1104. CSpillingSortAlgorithm(ICompare *_compare, roxiemem::IRowManager &_rowManager, IOutputMetaData * _rowMeta, ICodeContext *_ctx, const char *_tempDirectory, unsigned _activityId, bool _stable)
  1105. : rowsToSort(&_rowManager, InitialSortElements, CommitStep, _activityId),
  1106. rowManager(_rowManager), compare(_compare), rowMeta(_rowMeta), ctx(_ctx), tempDirectory(_tempDirectory), activityId(_activityId), stable(_stable)
  1107. {
  1108. rowManager.addRowBuffer(this);
  1109. }
  1110. ~CSpillingSortAlgorithm()
  1111. {
  1112. rowManager.removeRowBuffer(this);
  1113. diskReader.clear();
  1114. }
  1115. virtual void sortRows(void * * rows, size_t numRows, ICompare & compare, void * * stableTemp) = 0;
  1116. virtual void prepare(IInputBase *input)
  1117. {
  1118. loop
  1119. {
  1120. const void * next = input->nextInGroup();
  1121. if (!next)
  1122. break;
  1123. if (!rowsToSort.append(next))
  1124. {
  1125. {
  1126. roxiemem::RoxieOutputRowArrayLock block(rowsToSort);
  1127. //We should have been called back to free any committed rows, but occasionally it may not (e.g., if
  1128. //the problem is global memory is exhausted) - in which case force a spill here (but add any pending
  1129. //rows first).
  1130. if (rowsToSort.numCommitted() != 0)
  1131. {
  1132. rowsToSort.flush();
  1133. spillRows();
  1134. }
  1135. //Ensure new rows are written to the head of the array. It needs to be a separate call because
  1136. //spillRows() cannot shift active row pointer since it can be called from any thread
  1137. rowsToSort.flush();
  1138. }
  1139. if (!rowsToSort.append(next))
  1140. {
  1141. ReleaseRoxieRow(next);
  1142. throw MakeStringException(ROXIEMM_MEMORY_LIMIT_EXCEEDED, "Insufficient memory to append sort row");
  1143. }
  1144. }
  1145. }
  1146. rowsToSort.flush();
  1147. roxiemem::RoxieOutputRowArrayLock block(rowsToSort);
  1148. if (diskMerger)
  1149. {
  1150. spillRows();
  1151. rowsToSort.kill();
  1152. diskReader.setown(diskMerger->merge(compare));
  1153. }
  1154. else
  1155. {
  1156. sortCommitted();
  1157. sorted.transferFrom(rowsToSort);
  1158. }
  1159. }
  1160. virtual const void *next()
  1161. {
  1162. if(diskReader)
  1163. return diskReader->nextRow();
  1164. return sorted.dequeue();
  1165. }
  1166. virtual void reset()
  1167. {
  1168. //MORE: This could transfer any row pointer from sorted back to rowsToSort. It would trade
  1169. //fewer heap allocations with not freeing up the memory from large group sorts.
  1170. rowsToSort.clearRows();
  1171. sorted.kill();
  1172. //Disk reader must be cleared before the merger - or the files may still be locked.
  1173. diskReader.clear();
  1174. diskMerger.clear();
  1175. }
  1176. //interface roxiemem::IBufferedRowCallback
  1177. virtual unsigned getSpillCost() const
  1178. {
  1179. //Spill global sorts before grouped sorts
  1180. if (rowMeta->isGrouped())
  1181. return 20;
  1182. return 10;
  1183. }
  1184. virtual unsigned getActivityId() const
  1185. {
  1186. return activityId;
  1187. }
  1188. virtual bool freeBufferedRows(bool critical)
  1189. {
  1190. roxiemem::RoxieOutputRowArrayLock block(rowsToSort);
  1191. return spillRows();
  1192. }
  1193. protected:
  1194. void sortCommitted()
  1195. {
  1196. unsigned numRows = rowsToSort.numCommitted();
  1197. if (numRows)
  1198. {
  1199. cycle_t startCycles = get_cycles_now();
  1200. void ** rows = const_cast<void * *>(rowsToSort.getBlock(numRows));
  1201. //MORE: Should this be parallel? Should that be dependent on whether it is grouped? Should be a hint.
  1202. if (stable)
  1203. {
  1204. MemoryAttr tempAttr(numRows*sizeof(void **)); // Temp storage for stable sort. This should probably be allocated from roxiemem
  1205. void **temp = (void **) tempAttr.bufferBase();
  1206. sortRows(rows, numRows, *compare, temp);
  1207. }
  1208. else
  1209. sortRows(rows, numRows, *compare, NULL);
  1210. elapsedCycles += (get_cycles_now() - startCycles);
  1211. }
  1212. }
  1213. bool spillRows()
  1214. {
  1215. unsigned numRows = rowsToSort.numCommitted();
  1216. if (numRows == 0)
  1217. return false;
  1218. sortCommitted();
  1219. const void * * rows = rowsToSort.getBlock(numRows);
  1220. Owned<IRowWriter> out = queryMerger()->createWriteBlock();
  1221. for (unsigned i= 0; i < numRows; i++)
  1222. {
  1223. out->putRow(rows[i]);
  1224. }
  1225. rowsToSort.noteSpilled(numRows);
  1226. return true;
  1227. }
  1228. IDiskMerger * queryMerger()
  1229. {
  1230. if (!diskMerger)
  1231. {
  1232. unsigned __int64 seq = (memsize_t)this ^ get_cycles_now();
  1233. StringBuffer spillBasename;
  1234. spillBasename.append(tempDirectory).append(PATHSEPCHAR).appendf("spill_sort_%" I64F "u", seq);
  1235. Owned<IRowLinkCounter> linker = new RoxieRowLinkCounter();
  1236. Owned<IRowInterfaces> rowInterfaces = createRowInterfaces(rowMeta, activityId, ctx);
  1237. diskMerger.setown(createDiskMerger(rowInterfaces, linker, spillBasename));
  1238. }
  1239. return diskMerger;
  1240. }
  1241. };
  1242. class CSpillingQuickSortAlgorithm : public CSpillingSortAlgorithm
  1243. {
  1244. public:
  1245. CSpillingQuickSortAlgorithm(ICompare *_compare, roxiemem::IRowManager &_rowManager, IOutputMetaData * _rowMeta, ICodeContext *_ctx, const char *_tempDirectory, unsigned _activityId, bool _stable)
  1246. : CSpillingSortAlgorithm(_compare, _rowManager, _rowMeta, _ctx, _tempDirectory, _activityId, _stable)
  1247. {
  1248. }
  1249. virtual void sortRows(void * * rows, size_t numRows, ICompare & compare, void * * stableTemp)
  1250. {
  1251. if (stableTemp)
  1252. qsortvecstableinplace(rows, numRows, compare, stableTemp);
  1253. else
  1254. qsortvec(rows, numRows, compare);
  1255. }
  1256. };
  1257. class CSpillingMergeSortAlgorithm : public CSpillingSortAlgorithm
  1258. {
  1259. public:
  1260. CSpillingMergeSortAlgorithm(ICompare *_compare, roxiemem::IRowManager &_rowManager, IOutputMetaData * _rowMeta, ICodeContext *_ctx, const char *_tempDirectory, unsigned _activityId, bool _parallel)
  1261. : CSpillingSortAlgorithm(_compare, _rowManager, _rowMeta, _ctx, _tempDirectory, _activityId, true)
  1262. {
  1263. parallel = _parallel;
  1264. }
  1265. virtual void sortRows(void * * rows, size_t numRows, ICompare & compare, void * * stableTemp)
  1266. {
  1267. if (parallel)
  1268. parmsortvecstableinplace(rows, numRows, compare, stableTemp);
  1269. else
  1270. msortvecstableinplace(rows, numRows, compare, stableTemp);
  1271. }
  1272. protected:
  1273. bool parallel;
  1274. };
  1275. extern ISortAlgorithm *createQuickSortAlgorithm(ICompare *_compare)
  1276. {
  1277. return new CQuickSortAlgorithm(_compare);
  1278. }
  1279. extern ISortAlgorithm *createStableQuickSortAlgorithm(ICompare *_compare)
  1280. {
  1281. return new CStableQuickSortAlgorithm(_compare);
  1282. }
  1283. extern ISortAlgorithm *createTbbQuickSortAlgorithm(ICompare *_compare)
  1284. {
  1285. return new CTbbQuickSortAlgorithm(_compare);
  1286. }
  1287. extern ISortAlgorithm *createTbbStableQuickSortAlgorithm(ICompare *_compare)
  1288. {
  1289. return new CTbbStableQuickSortAlgorithm(_compare);
  1290. }
  1291. extern ISortAlgorithm *createInsertionSortAlgorithm(ICompare *_compare, roxiemem::IRowManager *_rowManager, unsigned _activityId)
  1292. {
  1293. return new CInsertionSortAlgorithm(_compare, _rowManager, _activityId);
  1294. }
  1295. extern ISortAlgorithm *createHeapSortAlgorithm(ICompare *_compare)
  1296. {
  1297. return new CHeapSortAlgorithm(_compare);
  1298. }
  1299. extern ISortAlgorithm *createMergeSortAlgorithm(ICompare *_compare)
  1300. {
  1301. return new CMergeSortAlgorithm(_compare);
  1302. }
  1303. extern ISortAlgorithm *createParallelMergeSortAlgorithm(ICompare *_compare)
  1304. {
  1305. return new CParallelMergeSortAlgorithm(_compare);
  1306. }
  1307. extern ISortAlgorithm *createSpillingQuickSortAlgorithm(ICompare *_compare, roxiemem::IRowManager &_rowManager, IOutputMetaData * _rowMeta, ICodeContext *_ctx, const char *_tempDirectory, unsigned _activityId, bool _stable)
  1308. {
  1309. return new CSpillingQuickSortAlgorithm(_compare, _rowManager, _rowMeta, _ctx, _tempDirectory, _activityId, _stable);
  1310. }
  1311. extern ISortAlgorithm *createSortAlgorithm(RoxieSortAlgorithm _algorithm, ICompare *_compare, roxiemem::IRowManager &_rowManager, IOutputMetaData * _rowMeta, ICodeContext *_ctx, const char *_tempDirectory, unsigned _activityId)
  1312. {
  1313. switch (_algorithm)
  1314. {
  1315. case heapSortAlgorithm:
  1316. return createHeapSortAlgorithm(_compare);
  1317. case insertionSortAlgorithm:
  1318. return createInsertionSortAlgorithm(_compare, &_rowManager, _activityId);
  1319. case quickSortAlgorithm:
  1320. return createQuickSortAlgorithm(_compare);
  1321. case stableQuickSortAlgorithm:
  1322. return createStableQuickSortAlgorithm(_compare);
  1323. case spillingQuickSortAlgorithm:
  1324. case stableSpillingQuickSortAlgorithm:
  1325. return createSpillingQuickSortAlgorithm(_compare, _rowManager, _rowMeta, _ctx, _tempDirectory, _activityId, _algorithm==stableSpillingQuickSortAlgorithm);
  1326. case mergeSortAlgorithm:
  1327. return new CMergeSortAlgorithm(_compare);
  1328. case parallelMergeSortAlgorithm:
  1329. return new CParallelMergeSortAlgorithm(_compare);
  1330. case spillingMergeSortAlgorithm:
  1331. return new CSpillingMergeSortAlgorithm(_compare, _rowManager, _rowMeta, _ctx, _tempDirectory, _activityId, false);
  1332. case spillingParallelMergeSortAlgorithm:
  1333. return new CSpillingMergeSortAlgorithm(_compare, _rowManager, _rowMeta, _ctx, _tempDirectory, _activityId, true);
  1334. case tbbQuickSortAlgorithm:
  1335. return createTbbQuickSortAlgorithm(_compare);
  1336. case tbbStableQuickSortAlgorithm:
  1337. return createTbbStableQuickSortAlgorithm(_compare);
  1338. default:
  1339. break;
  1340. }
  1341. throwUnexpected();
  1342. }
  1343. //===================================================
  1344. CSafeSocket::CSafeSocket(ISocket *_sock)
  1345. {
  1346. httpMode = false;
  1347. mlFmt = MarkupFmt_Unknown;
  1348. sent = 0;
  1349. heartbeat = false;
  1350. sock.setown(_sock);
  1351. }
  1352. CSafeSocket::~CSafeSocket()
  1353. {
  1354. sock.clear();
  1355. ForEachItemIn(idx, queued)
  1356. {
  1357. free(queued.item(idx));
  1358. }
  1359. queued.kill();
  1360. lengths.kill();
  1361. }
  1362. unsigned CSafeSocket::bytesOut() const
  1363. {
  1364. return sent;
  1365. }
  1366. bool CSafeSocket::checkConnection() const
  1367. {
  1368. if (sock)
  1369. return sock->check_connection();
  1370. else
  1371. return false;
  1372. }
  1373. size32_t CSafeSocket::write(const void *buf, size32_t size, bool takeOwnership)
  1374. {
  1375. CriticalBlock c(crit); // NOTE: anyone needing to write multiple times without interleave should have already locked this. We lock again for the simple cases.
  1376. OwnedMalloc<void> ownedBuffer;
  1377. if (takeOwnership)
  1378. ownedBuffer.setown((void *) buf);
  1379. if (!size)
  1380. return 0;
  1381. try
  1382. {
  1383. if (httpMode)
  1384. {
  1385. if (!takeOwnership)
  1386. {
  1387. ownedBuffer.setown(malloc(size));
  1388. if (!ownedBuffer)
  1389. throw MakeStringException(THORHELPER_INTERNAL_ERROR, "Out of memory in CSafeSocket::write (requesting %d bytes)", size);
  1390. memcpy(ownedBuffer, buf, size);
  1391. }
  1392. queued.append(ownedBuffer.getClear());
  1393. lengths.append(size);
  1394. return size;
  1395. }
  1396. else
  1397. {
  1398. sent += size;
  1399. size32_t written = sock->write(buf, size);
  1400. return written;
  1401. }
  1402. }
  1403. catch(...)
  1404. {
  1405. heartbeat = false;
  1406. throw;
  1407. }
  1408. }
  1409. bool CSafeSocket::readBlock(MemoryBuffer &ret, unsigned timeout, unsigned maxBlockSize)
  1410. {
  1411. // MORE - this is still not good enough as we could get someone else's block if there are multiple input datasets
  1412. CriticalBlock c(crit);
  1413. try
  1414. {
  1415. unsigned bytesRead;
  1416. unsigned len;
  1417. try
  1418. {
  1419. sock->read(&len, sizeof (len), sizeof (len), bytesRead, timeout);
  1420. }
  1421. catch (IJSOCK_Exception *E)
  1422. {
  1423. if (E->errorCode()==JSOCKERR_graceful_close)
  1424. {
  1425. E->Release();
  1426. return false;
  1427. }
  1428. throw;
  1429. }
  1430. assertex(bytesRead == sizeof(len));
  1431. _WINREV(len);
  1432. if (len & 0x80000000)
  1433. len ^= 0x80000000;
  1434. if (len > maxBlockSize)
  1435. throw MakeStringException(THORHELPER_DATA_ERROR, "Maximum block size (%d bytes) exceeded (missing length prefix?)", maxBlockSize);
  1436. if (len)
  1437. {
  1438. unsigned bytesRead;
  1439. sock->read(ret.reserveTruncate(len), len, len, bytesRead, timeout);
  1440. }
  1441. return len != 0;
  1442. }
  1443. catch(...)
  1444. {
  1445. heartbeat = false;
  1446. throw;
  1447. }
  1448. }
  1449. int readHttpHeaderLine(IBufferedSocket *linereader, char *headerline, unsigned maxlen)
  1450. {
  1451. Owned<IMultiException> me = makeMultiException("roxie");
  1452. int bytesread = linereader->readline(headerline, maxlen, true, me);
  1453. if (me->ordinality())
  1454. throw me.getClear();
  1455. if(bytesread <= 0 || bytesread > maxlen)
  1456. throw MakeStringException(THORHELPER_DATA_ERROR, "HTTP-GET Bad Request");
  1457. return bytesread;
  1458. }
  1459. bool CSafeSocket::readBlock(StringBuffer &ret, unsigned timeout, HttpHelper *pHttpHelper, bool &continuationNeeded, bool &isStatus, unsigned maxBlockSize)
  1460. {
  1461. continuationNeeded = false;
  1462. isStatus = false;
  1463. CriticalBlock c(crit);
  1464. try
  1465. {
  1466. unsigned bytesRead;
  1467. unsigned len = 0;
  1468. try
  1469. {
  1470. sock->read(&len, sizeof (len), sizeof (len), bytesRead, timeout);
  1471. }
  1472. catch (IJSOCK_Exception *E)
  1473. {
  1474. if (E->errorCode()==JSOCKERR_graceful_close)
  1475. {
  1476. E->Release();
  1477. return false;
  1478. }
  1479. throw;
  1480. }
  1481. assertex(bytesRead == sizeof(len));
  1482. unsigned left = 0;
  1483. char *buf;
  1484. if (pHttpHelper != NULL && strncmp((char *)&len, "POST", 4) == 0)
  1485. {
  1486. #define MAX_HTTP_HEADERSIZE 8000
  1487. pHttpHelper->setIsHttp(true);
  1488. char header[MAX_HTTP_HEADERSIZE + 1]; // allow room for \0
  1489. sock->read(header, 1, MAX_HTTP_HEADERSIZE, bytesRead, timeout);
  1490. header[bytesRead] = 0;
  1491. char *payload = strstr(header, "\r\n\r\n");
  1492. if (payload)
  1493. {
  1494. *payload = 0;
  1495. payload += 4;
  1496. char *str;
  1497. pHttpHelper->parseHTTPRequestLine(header);
  1498. // capture authentication token
  1499. if ((str = strstr(header, "Authorization: Basic ")) != NULL)
  1500. pHttpHelper->setAuthToken(str+21);
  1501. // capture content type
  1502. if ((str = strstr(header, "Content-Type: ")) != NULL)
  1503. pHttpHelper->setContentType(str+14);
  1504. // determine payload length
  1505. str = strstr(header, "Content-Length: ");
  1506. if (str)
  1507. {
  1508. len = atoi(str + strlen("Content-Length: "));
  1509. buf = ret.reserveTruncate(len);
  1510. left = len - (bytesRead - (payload - header));
  1511. if (len > left)
  1512. memcpy(buf, payload, len - left);
  1513. }
  1514. else
  1515. left = len = 0;
  1516. }
  1517. else
  1518. left = len = 0;
  1519. if (!len)
  1520. throw MakeStringException(THORHELPER_DATA_ERROR, "Badly formed HTTP header");
  1521. }
  1522. else if (pHttpHelper != NULL && strncmp((char *)&len, "GET", 3) == 0)
  1523. {
  1524. #define MAX_HTTP_GET_LINE 16000 //arbitrary per line limit, most web servers are lower, but urls for queries can be complex..
  1525. pHttpHelper->setIsHttp(true);
  1526. char headerline[MAX_HTTP_GET_LINE + 1];
  1527. Owned<IBufferedSocket> linereader = createBufferedSocket(sock);
  1528. int bytesread = readHttpHeaderLine(linereader, headerline, MAX_HTTP_GET_LINE);
  1529. pHttpHelper->parseHTTPRequestLine(headerline);
  1530. bytesread = readHttpHeaderLine(linereader, headerline, MAX_HTTP_GET_LINE);
  1531. while(bytesread >= 0 && *headerline && *headerline!='\r')
  1532. {
  1533. // capture authentication token
  1534. if (!strnicmp(headerline, "Authorization: Basic ", 21))
  1535. pHttpHelper->setAuthToken(headerline+21);
  1536. bytesread = readHttpHeaderLine(linereader, headerline, MAX_HTTP_GET_LINE);
  1537. }
  1538. StringBuffer queryName;
  1539. const char *target = pHttpHelper->queryTarget();
  1540. if (!target || !*target)
  1541. throw MakeStringException(THORHELPER_DATA_ERROR, "HTTP-GET Target not specified");
  1542. else if (!pHttpHelper->validateTarget(target))
  1543. throw MakeStringException(THORHELPER_DATA_ERROR, "HTTP-GET Target not found");
  1544. const char *query = pHttpHelper->queryQueryName();
  1545. if (!query || !*query)
  1546. throw MakeStringException(THORHELPER_DATA_ERROR, "HTTP-GET Query not specified");
  1547. queryName.append(query);
  1548. Owned<IPropertyTree> req = createPTreeFromHttpParameters(queryName, pHttpHelper->queryUrlParameters(), true, pHttpHelper->queryContentFormat()==MarkupFmt_JSON);
  1549. if (pHttpHelper->queryContentFormat()==MarkupFmt_JSON)
  1550. toJSON(req, ret);
  1551. else
  1552. toXML(req, ret);
  1553. return true;
  1554. }
  1555. else if (strnicmp((char *)&len, "STAT", 4) == 0)
  1556. isStatus = true;
  1557. else
  1558. {
  1559. _WINREV(len);
  1560. if (len & 0x80000000)
  1561. {
  1562. len ^= 0x80000000;
  1563. continuationNeeded = true;
  1564. }
  1565. if (len > maxBlockSize)
  1566. throw MakeStringException(THORHELPER_DATA_ERROR, "Maximum block size (%d bytes) exceeded (missing length prefix?)", maxBlockSize);
  1567. left = len;
  1568. if (len)
  1569. buf = ret.reserveTruncate(len);
  1570. }
  1571. if (left)
  1572. {
  1573. sock->read(buf + (len - left), left, left, bytesRead, timeout);
  1574. }
  1575. return len != 0;
  1576. }
  1577. catch (IException *E)
  1578. {
  1579. if (pHttpHelper)
  1580. checkSendHttpException(*pHttpHelper, E, NULL);
  1581. heartbeat = false;
  1582. throw;
  1583. }
  1584. catch (...)
  1585. {
  1586. heartbeat = false;
  1587. throw;
  1588. }
  1589. }
  1590. void CSafeSocket::setHttpMode(const char *queryName, bool arrayMode, HttpHelper &httphelper)
  1591. {
  1592. CriticalBlock c(crit); // Should not be needed
  1593. httpMode = true;
  1594. mlFmt = httphelper.queryContentFormat();
  1595. heartbeat = false;
  1596. assertex(contentHead.length()==0 && contentTail.length()==0);
  1597. if (mlFmt==MarkupFmt_JSON)
  1598. {
  1599. contentHead.set("{");
  1600. contentTail.set("}");
  1601. }
  1602. else
  1603. {
  1604. StringAttrBuilder headText(contentHead), tailText(contentTail);
  1605. if (httphelper.getUseEnvelope())
  1606. headText.append(
  1607. "<?xml version=\"1.0\" encoding=\"UTF-8\"?>"
  1608. "<soap:Envelope xmlns:soap=\"http://schemas.xmlsoap.org/soap/envelope/\">"
  1609. "<soap:Body>");
  1610. if (arrayMode)
  1611. {
  1612. headText.append("<").append(queryName).append("ResponseArray>");
  1613. tailText.append("</").append(queryName).append("ResponseArray>");
  1614. }
  1615. if (httphelper.getUseEnvelope())
  1616. tailText.append("</soap:Body></soap:Envelope>");
  1617. }
  1618. }
  1619. void CSafeSocket::checkSendHttpException(HttpHelper &httphelper, IException *E, const char *queryName)
  1620. {
  1621. if (!httphelper.isHttp())
  1622. return;
  1623. if (httphelper.queryContentFormat()==MarkupFmt_JSON)
  1624. sendJsonException(E, queryName);
  1625. else
  1626. sendSoapException(E, queryName);
  1627. }
  1628. void CSafeSocket::sendSoapException(IException *E, const char *queryName)
  1629. {
  1630. try
  1631. {
  1632. if (!queryName)
  1633. queryName = "Unknown"; // Exceptions when parsing query XML can leave queryName unset/unknowable....
  1634. StringBuffer response;
  1635. response.append("<").append(queryName).append("Response");
  1636. response.append(" xmlns=\"urn:hpccsystems:ecl:").appendLower(strlen(queryName), queryName).append("\">");
  1637. response.appendf("<Results><Result><Exception><Source>Roxie</Source><Code>%d</Code>", E->errorCode());
  1638. response.append("<Message>");
  1639. StringBuffer s;
  1640. E->errorMessage(s);
  1641. encodeXML(s.str(), response);
  1642. response.append("</Message></Exception></Result></Results>");
  1643. response.append("</").append(queryName).append("Response>");
  1644. write(response.str(), response.length());
  1645. }
  1646. catch(IException *EE)
  1647. {
  1648. StringBuffer error("While reporting exception: ");
  1649. EE->errorMessage(error);
  1650. DBGLOG("%s", error.str());
  1651. EE->Release();
  1652. }
  1653. #ifndef _DEBUG
  1654. catch(...) {}
  1655. #endif
  1656. }
  1657. void CSafeSocket::sendJsonException(IException *E, const char *queryName)
  1658. {
  1659. try
  1660. {
  1661. if (!queryName)
  1662. queryName = "Unknown"; // Exceptions when parsing query XML can leave queryName unset/unknowable....
  1663. StringBuffer response;
  1664. appendfJSONName(response, "%sResponse", queryName).append(" {");
  1665. appendJSONName(response, "Results").append(" {");
  1666. appendJSONName(response, "Exception").append(" [{");
  1667. appendJSONValue(response, "Source", "Roxie");
  1668. appendJSONValue(response, "Code", E->errorCode());
  1669. StringBuffer s;
  1670. appendJSONValue(response, "Message", E->errorMessage(s).str());
  1671. response.append("}]}}");
  1672. write(response.str(), response.length());
  1673. }
  1674. catch(IException *EE)
  1675. {
  1676. StringBuffer error("While reporting exception: ");
  1677. DBGLOG("%s", EE->errorMessage(error).str());
  1678. EE->Release();
  1679. }
  1680. #ifndef _DEBUG
  1681. catch(...) {}
  1682. #endif
  1683. }
  1684. void CSafeSocket::setHeartBeat()
  1685. {
  1686. CriticalBlock c(crit);
  1687. heartbeat = true;
  1688. }
  1689. bool CSafeSocket::sendHeartBeat(const IContextLogger &logctx)
  1690. {
  1691. if (heartbeat)
  1692. {
  1693. StringBuffer s;
  1694. bool rval = false;
  1695. unsigned replyLen = 5;
  1696. unsigned rev = replyLen | 0x80000000; // make it a blocked msg
  1697. _WINREV(rev);
  1698. s.append(sizeof(rev), (char *) &rev);
  1699. s.append('H');
  1700. rev = (unsigned) time(NULL);
  1701. _WINREV(rev);
  1702. s.append(sizeof(rev), (char *) &rev);
  1703. try
  1704. {
  1705. CriticalBlock c(crit);
  1706. sock->write(s.str(), replyLen + sizeof(rev));
  1707. rval = true;
  1708. }
  1709. catch (IException * E)
  1710. {
  1711. StringBuffer error("HeartBeat write failed with exception: ");
  1712. E->errorMessage(error);
  1713. logctx.CTXLOG("%s", error.str());
  1714. E->Release();
  1715. }
  1716. catch(...)
  1717. {
  1718. logctx.CTXLOG("HeartBeat write failed (Unknown exception)");
  1719. }
  1720. return rval;
  1721. }
  1722. else
  1723. return true;
  1724. };
  1725. void CSafeSocket::flush()
  1726. {
  1727. if (httpMode)
  1728. {
  1729. unsigned length = contentHead.length() + contentTail.length();
  1730. ForEachItemIn(idx, lengths)
  1731. length += lengths.item(idx);
  1732. StringBuffer header;
  1733. header.append("HTTP/1.0 200 OK\r\n");
  1734. header.append("Content-Type: ").append(mlFmt == MarkupFmt_JSON ? "application/json" : "text/xml").append("\r\n");
  1735. header.append("Content-Length: ").append(length).append("\r\n\r\n");
  1736. CriticalBlock c(crit); // should not be anyone writing but better to be safe
  1737. if (traceLevel > 5)
  1738. DBGLOG("Writing HTTP header length %d to HTTP socket", header.length());
  1739. sock->write(header.str(), header.length());
  1740. sent += header.length();
  1741. if (traceLevel > 5)
  1742. DBGLOG("Writing content head length %d to HTTP socket", contentHead.length());
  1743. sock->write(contentHead.str(), contentHead.length());
  1744. sent += contentHead.length();
  1745. ForEachItemIn(idx2, queued)
  1746. {
  1747. unsigned length = lengths.item(idx2);
  1748. if (traceLevel > 5)
  1749. DBGLOG("Writing block length %d to HTTP socket", length);
  1750. sock->write(queued.item(idx2), length);
  1751. sent += length;
  1752. }
  1753. if (traceLevel > 5)
  1754. DBGLOG("Writing content tail length %d to HTTP socket", contentTail.length());
  1755. sock->write(contentTail.str(), contentTail.length());
  1756. sent += contentTail.length();
  1757. if (traceLevel > 5)
  1758. DBGLOG("Total written %d", sent);
  1759. }
  1760. }
  1761. void CSafeSocket::sendException(const char *source, unsigned code, const char *message, bool isBlocked, const IContextLogger &logctx)
  1762. {
  1763. try
  1764. {
  1765. FlushingStringBuffer response(this, isBlocked, MarkupFmt_XML, false, httpMode, logctx);
  1766. response.startDataset("Exception", NULL, (unsigned) -1);
  1767. response.appendf("<Source>%s</Source><Code>%d</Code>", source, code);
  1768. response.append("<Message>");
  1769. response.encodeString(message, strlen(message));
  1770. response.append("</Message>");
  1771. }
  1772. catch(IException *EE)
  1773. {
  1774. StringBuffer error("While reporting exception: ");
  1775. EE->errorMessage(error);
  1776. logctx.CTXLOG("%s", error.str());
  1777. EE->Release();
  1778. }
  1779. #ifndef _DEBUG
  1780. catch(...) {}
  1781. #endif
  1782. }
  1783. //==============================================================================================================
  1784. #define RESULT_FLUSH_THRESHOLD 10000u
  1785. #ifdef _DEBUG
  1786. #define HTTP_SPLIT_THRESHOLD 100u
  1787. #define HTTP_SPLIT_RESERVE 200u
  1788. #else
  1789. #define HTTP_SPLIT_THRESHOLD 64000u
  1790. #define HTTP_SPLIT_RESERVE 65535u
  1791. #endif
  1792. interface IXmlStreamFlusher;
  1793. //==============================================================================================================
  1794. bool FlushingStringBuffer::needsFlush(bool closing)
  1795. {
  1796. if (isBlocked || closing) // can't flush unblocked. MORE - may need to break it up though....
  1797. {
  1798. size32_t len = s.length() - emptyLength;
  1799. return len > (closing ? 0 : RESULT_FLUSH_THRESHOLD);
  1800. }
  1801. else
  1802. return false; // MORE - if there is a single result, it can be flushed (actually, can flush anytime all prior results have been closed)
  1803. }
  1804. void FlushingStringBuffer::startBlock()
  1805. {
  1806. size32_t len = 0;
  1807. s.clear();
  1808. if (!isHttp)
  1809. append(sizeof(size32_t), (char *) &len);
  1810. rowCount = 0;
  1811. if (isBlocked)
  1812. {
  1813. s.append('R');
  1814. unsigned rev = sequenceNumber++;
  1815. _WINREV(rev);
  1816. s.append(sizeof(rev), (char *) &rev);
  1817. rev = rowCount;
  1818. _WINREV(rev);
  1819. s.append(sizeof(rev), (char *) &rev); // NOTE - need to patch up later. At this point it is 0.
  1820. s.append(strlen(name)+1, name);
  1821. }
  1822. emptyLength = s.length();
  1823. // MORE - should probably pre-reserve string at RESULT_FLUSH_THRESHOLD plus a bit
  1824. }
  1825. FlushingStringBuffer::FlushingStringBuffer(SafeSocket *_sock, bool _isBlocked, TextMarkupFormat _mlFmt, bool _isRaw, bool _isHttp, const IContextLogger &_logctx)
  1826. : sock(_sock), isBlocked(_isBlocked), mlFmt(_mlFmt), isRaw(_isRaw), isHttp(_isHttp), logctx(_logctx)
  1827. {
  1828. sequenceNumber = 0;
  1829. rowCount = 0;
  1830. isSoap = false;
  1831. isEmpty = true;
  1832. extend = false;
  1833. trim = false;
  1834. emptyLength = 0;
  1835. tagClosed = true;
  1836. }
  1837. FlushingStringBuffer::~FlushingStringBuffer()
  1838. {
  1839. try
  1840. {
  1841. flush(true);
  1842. }
  1843. catch (IException *E)
  1844. {
  1845. // Ignore any socket errors that we get at termination - nothing we can do about them anyway...
  1846. E->Release();
  1847. }
  1848. catch(...)
  1849. {
  1850. }
  1851. ForEachItemIn(idx, queued)
  1852. {
  1853. free(queued.item(idx));
  1854. }
  1855. }
  1856. //void FlushingStringBuffer::append(char data)
  1857. //{
  1858. //append(1, &data);
  1859. //}
  1860. void FlushingStringBuffer::append(const char *data)
  1861. {
  1862. append(strlen(data), data);
  1863. }
  1864. void FlushingStringBuffer::append(double data)
  1865. {
  1866. if (isRaw)
  1867. append(sizeof(data), (char *)&data);
  1868. else
  1869. {
  1870. StringBuffer v;
  1871. v.append(data);
  1872. append(v.length(), v.str());
  1873. }
  1874. }
  1875. void FlushingStringBuffer::append(unsigned len, const char *data)
  1876. {
  1877. try
  1878. {
  1879. CriticalBlock b(crit);
  1880. s.append(len, data);
  1881. }
  1882. catch (IException *E)
  1883. {
  1884. logctx.logOperatorException(E, __FILE__, __LINE__, "FlushingStringBuffer::append");
  1885. throw;
  1886. }
  1887. }
  1888. void FlushingStringBuffer::appendf(const char *format, ...)
  1889. {
  1890. StringBuffer t;
  1891. va_list args;
  1892. va_start(args, format);
  1893. t.valist_appendf(format, args);
  1894. va_end(args);
  1895. append(t.length(), t.str());
  1896. }
  1897. void FlushingStringBuffer::encodeString(const char *x, unsigned len, bool utf8)
  1898. {
  1899. if (mlFmt==MarkupFmt_XML)
  1900. {
  1901. StringBuffer t;
  1902. ::encodeXML(x, t, 0, len, utf8);
  1903. append(t.length(), t.str());
  1904. }
  1905. else
  1906. append(len, x);
  1907. }
  1908. void FlushingStringBuffer::encodeData(const void *data, unsigned len)
  1909. {
  1910. static char hexchar[] = "0123456789ABCDEF";
  1911. if (isRaw)
  1912. append(len, (const char *) data);
  1913. else
  1914. {
  1915. const byte *field = (const byte *) data;
  1916. for (unsigned i = 0; i < len; i++)
  1917. {
  1918. append(hexchar[field[i] >> 4]);
  1919. append(hexchar[field[i] & 0x0f]);
  1920. }
  1921. }
  1922. }
  1923. void FlushingStringBuffer::addPayload(StringBuffer &s, unsigned int reserve)
  1924. {
  1925. if (!s.length())
  1926. return;
  1927. lengths.append(s.length());
  1928. queued.append(s.detach());
  1929. if (reserve)
  1930. s.ensureCapacity(reserve);
  1931. }
  1932. void FlushingStringBuffer::flushXML(StringBuffer &current, bool isClosing)
  1933. {
  1934. CriticalBlock b(crit);
  1935. if (isHttp) // we don't do any chunking for non-HTTP yet
  1936. {
  1937. if (isClosing || current.length() > HTTP_SPLIT_THRESHOLD)
  1938. {
  1939. addPayload(s, HTTP_SPLIT_RESERVE);
  1940. addPayload(current, isClosing ? 0 : HTTP_SPLIT_RESERVE);
  1941. }
  1942. }
  1943. else if (isClosing)
  1944. append(current.length(), current.str());
  1945. }
  1946. void FlushingStringBuffer::flush(bool closing)
  1947. {
  1948. CriticalBlock b(crit);
  1949. if (closing && tail.length())
  1950. {
  1951. s.append(tail);
  1952. tail.clear();
  1953. }
  1954. if (isHttp)
  1955. {
  1956. if (!closing && s.length() > HTTP_SPLIT_THRESHOLD)
  1957. addPayload(s, HTTP_SPLIT_RESERVE);
  1958. }
  1959. else if (needsFlush(closing))
  1960. {
  1961. // MORE - if not blocked we can get very large blocks.
  1962. assertex(s.length() > sizeof(size32_t));
  1963. unsigned replyLen = s.length() - sizeof(size32_t);
  1964. unsigned revLen = replyLen | ((isBlocked)?0x80000000:0);
  1965. _WINREV(revLen);
  1966. if (logctx.queryTraceLevel() > 1)
  1967. {
  1968. if (isBlocked)
  1969. logctx.CTXLOG("Sending reply: Sending blocked %s data", getFormatName(mlFmt));
  1970. else
  1971. #ifdef _DEBUG
  1972. logctx.CTXLOG("Sending reply length %d: %.1024s", (unsigned) (s.length() - sizeof(size32_t)), s.str()+sizeof(size32_t));
  1973. #else
  1974. logctx.CTXLOG("Sending reply length %d: %.40s", (unsigned) (s.length() - sizeof(size32_t)), s.str()+sizeof(size32_t));
  1975. #endif
  1976. }
  1977. *(size32_t *) s.str() = revLen;
  1978. if (isBlocked)
  1979. {
  1980. unsigned revRowCount = rowCount;
  1981. _WINREV(revRowCount);
  1982. *(size32_t *) (s.str()+9) = revRowCount;
  1983. }
  1984. if (logctx.queryTraceLevel() > 9)
  1985. logctx.CTXLOG("writing block size %d to socket", replyLen);
  1986. try
  1987. {
  1988. if (sock)
  1989. {
  1990. if (isHttp)
  1991. sock->write(s.str()+sizeof(revLen), replyLen);
  1992. else
  1993. sock->write(s.str(), replyLen + sizeof(revLen));
  1994. }
  1995. else
  1996. fwrite(s.str()+sizeof(revLen), replyLen, 1, stdout);
  1997. }
  1998. catch (...)
  1999. {
  2000. if (logctx.queryTraceLevel() > 9)
  2001. logctx.CTXLOG("Exception caught FlushingStringBuffer::flush");
  2002. s.clear();
  2003. emptyLength = 0;
  2004. throw;
  2005. }
  2006. if (logctx.queryTraceLevel() > 9)
  2007. logctx.CTXLOG("wrote block size %d to socket", replyLen);
  2008. if (closing)
  2009. {
  2010. s.clear();
  2011. emptyLength = 0;
  2012. }
  2013. else
  2014. startBlock();
  2015. }
  2016. }
  2017. void *FlushingStringBuffer::getPayload(size32_t &length)
  2018. {
  2019. assertex(isHttp);
  2020. CriticalBlock b(crit);
  2021. if (queued.ordinality())
  2022. {
  2023. length = lengths.item(0);
  2024. void *ret = queued.item(0);
  2025. queued.remove(0);
  2026. lengths.remove(0);
  2027. return ret;
  2028. }
  2029. length = s.length();
  2030. return length ? s.detach() : NULL;
  2031. }
  2032. void FlushingStringBuffer::startDataset(const char *elementName, const char *resultName, unsigned sequence, bool _extend, const IProperties *xmlns)
  2033. {
  2034. CriticalBlock b(crit);
  2035. extend = _extend;
  2036. if (isEmpty || !extend)
  2037. {
  2038. name.clear().append(resultName ? resultName : elementName);
  2039. sequenceNumber = 0;
  2040. startBlock();
  2041. if (!isBlocked)
  2042. {
  2043. if (mlFmt==MarkupFmt_XML)
  2044. {
  2045. s.append('<').append(elementName);
  2046. if (isSoap && (resultName || (sequence != (unsigned) -1)))
  2047. {
  2048. s.append(" xmlns=\'urn:hpccsystems:ecl:").appendLower(queryName.length(), queryName.str()).append(":result:");
  2049. if (resultName && *resultName)
  2050. s.appendLower(strlen(resultName), resultName).append('\'');
  2051. else
  2052. s.append("result_").append(sequence+1).append('\'');
  2053. if (xmlns)
  2054. {
  2055. Owned<IPropertyIterator> it = const_cast<IProperties*>(xmlns)->getIterator(); //should fix IProperties to be const friendly
  2056. ForEach(*it)
  2057. {
  2058. const char *name = it->getPropKey();
  2059. s.append(' ');
  2060. if (!streq(name, "xmlns"))
  2061. s.append("xmlns:");
  2062. s.append(name).append("='");
  2063. encodeUtf8XML(const_cast<IProperties*>(xmlns)->queryProp(name), s);
  2064. s.append("'");
  2065. }
  2066. }
  2067. }
  2068. if (resultName && *resultName)
  2069. s.appendf(" name='%s'",resultName);
  2070. else if (sequence != (unsigned) -1)
  2071. s.appendf(" name='Result %d'",sequence+1);
  2072. s.append(">\n");
  2073. tail.clear().appendf("</%s>\n", elementName);
  2074. }
  2075. }
  2076. isEmpty = false;
  2077. }
  2078. }
  2079. void FlushingStringBuffer::startScalar(const char *resultName, unsigned sequence)
  2080. {
  2081. if (s.length())
  2082. throw MakeStringException(0, "Attempt to output scalar ('%s',%d) multiple times", resultName ? resultName : "", (int)sequence);
  2083. CriticalBlock b(crit);
  2084. name.clear().append(resultName ? resultName : "Dataset");
  2085. sequenceNumber = 0;
  2086. startBlock();
  2087. if (!isBlocked)
  2088. {
  2089. if (mlFmt==MarkupFmt_XML)
  2090. {
  2091. tail.clear();
  2092. s.append("<Dataset");
  2093. if (isSoap && (resultName || (sequence != (unsigned) -1)))
  2094. {
  2095. s.append(" xmlns=\'urn:hpccsystems:ecl:").appendLower(queryName.length(), queryName.str()).append(":result:");
  2096. if (resultName && *resultName)
  2097. s.appendLower(strlen(resultName), resultName).append('\'');
  2098. else
  2099. s.append("result_").append(sequence+1).append('\'');
  2100. }
  2101. if (resultName && *resultName)
  2102. s.appendf(" name='%s'>\n",resultName);
  2103. else
  2104. s.appendf(" name='Result %d'>\n",sequence+1);
  2105. s.append(" <Row>");
  2106. if (resultName && *resultName)
  2107. {
  2108. s.appendf("<%s>", resultName);
  2109. tail.appendf("</%s>", resultName);
  2110. }
  2111. else
  2112. {
  2113. s.appendf("<Result_%d>", sequence+1);
  2114. tail.appendf("</Result_%d>", sequence+1);
  2115. }
  2116. tail.appendf("</Row>\n</Dataset>\n");
  2117. }
  2118. else if (!isRaw)
  2119. {
  2120. tail.clear().append('\n');
  2121. }
  2122. }
  2123. }
  2124. void FlushingStringBuffer::setScalarInt(const char *resultName, unsigned sequence, __int64 value, unsigned size)
  2125. {
  2126. startScalar(resultName, sequence);
  2127. s.append(value);
  2128. }
  2129. void FlushingStringBuffer::setScalarUInt(const char *resultName, unsigned sequence, unsigned __int64 value, unsigned size)
  2130. {
  2131. startScalar(resultName, sequence);
  2132. s.append(value);
  2133. }
  2134. void FlushingStringBuffer::incrementRowCount()
  2135. {
  2136. CriticalBlock b(crit);
  2137. rowCount++;
  2138. }
  2139. void FlushingJsonBuffer::append(double data)
  2140. {
  2141. CriticalBlock b(crit);
  2142. appendJSONRealValue(s, NULL, data);
  2143. }
  2144. void FlushingJsonBuffer::encodeString(const char *x, unsigned len, bool utf8)
  2145. {
  2146. CriticalBlock b(crit);
  2147. appendJSONStringValue(s, NULL, len, x, true);
  2148. }
  2149. void FlushingJsonBuffer::encodeData(const void *data, unsigned len)
  2150. {
  2151. CriticalBlock b(crit);
  2152. appendJSONDataValue(s, NULL, len, data);
  2153. }
  2154. void FlushingJsonBuffer::startDataset(const char *elementName, const char *resultName, unsigned sequence, bool _extend, const IProperties *xmlns)
  2155. {
  2156. CriticalBlock b(crit);
  2157. extend = _extend;
  2158. if (isEmpty || !extend)
  2159. {
  2160. name.clear().append(resultName ? resultName : elementName);
  2161. sequenceNumber = 0;
  2162. startBlock();
  2163. if (!isBlocked)
  2164. {
  2165. StringBuffer seqName;
  2166. if (!resultName || !*resultName)
  2167. resultName = seqName.appendf("result_%d", sequence+1).str();
  2168. appendJSONName(s, resultName).append('{');
  2169. tail.set("}");
  2170. }
  2171. isEmpty = false;
  2172. }
  2173. }
  2174. void FlushingJsonBuffer::startScalar(const char *resultName, unsigned sequence)
  2175. {
  2176. if (s.length())
  2177. throw MakeStringException(0, "Attempt to output scalar ('%s',%d) multiple times", resultName ? resultName : "", (int)sequence);
  2178. CriticalBlock b(crit);
  2179. name.set(resultName ? resultName : "Dataset");
  2180. sequenceNumber = 0;
  2181. startBlock();
  2182. if (!isBlocked)
  2183. {
  2184. StringBuffer seqName;
  2185. if (!resultName || !*resultName)
  2186. resultName = seqName.appendf("Result_%d", sequence+1).str();
  2187. appendJSONName(s, resultName).append('{');
  2188. appendJSONName(s, "Row").append("[{");
  2189. appendJSONName(s, resultName);
  2190. tail.set("}]}");
  2191. }
  2192. }
  2193. void FlushingJsonBuffer::setScalarInt(const char *resultName, unsigned sequence, __int64 value, unsigned size)
  2194. {
  2195. startScalar(resultName, sequence);
  2196. if (size < 7) //JavaScript only supports 53 significant bits
  2197. s.append(value);
  2198. else
  2199. s.append('"').append(value).append('"');
  2200. }
  2201. void FlushingJsonBuffer::setScalarUInt(const char *resultName, unsigned sequence, unsigned __int64 value, unsigned size)
  2202. {
  2203. startScalar(resultName, sequence);
  2204. if (size < 7) //JavaScript doesn't support unsigned, and only supports 53 significant bits
  2205. s.append(value);
  2206. else
  2207. s.append('"').append(value).append('"');
  2208. }
  2209. //=====================================================================================================
  2210. ClusterWriteHandler::ClusterWriteHandler(char const * _logicalName, char const * _activityType)
  2211. : logicalName(_logicalName), activityType(_activityType)
  2212. {
  2213. makePhysicalPartName(logicalName.get(), 1, 1, physicalName, false);
  2214. splitFilename(physicalName, &physicalDir, &physicalDir, &physicalBase, &physicalBase);
  2215. }
  2216. void ClusterWriteHandler::addCluster(char const * cluster)
  2217. {
  2218. Owned<IGroup> group = queryNamedGroupStore().lookup(cluster);
  2219. if (!group)
  2220. throw MakeStringException(0, "Unknown cluster %s while writing file %s", cluster, logicalName.get());
  2221. if (group->isMember())
  2222. {
  2223. if (localCluster)
  2224. throw MakeStringException(0, "Cluster %s occupies node already specified while writing file %s", cluster,
  2225. logicalName.get());
  2226. localClusterName.set(cluster);
  2227. localCluster.set(group);
  2228. }
  2229. else
  2230. {
  2231. ForEachItemIn(idx, remoteNodes)
  2232. {
  2233. Owned<INode> other = remoteNodes.item(idx).getNode(0);
  2234. if (group->isMember(other))
  2235. throw MakeStringException(0, "Cluster %s occupies node already specified while writing file %s",
  2236. cluster, logicalName.get());
  2237. }
  2238. remoteNodes.append(*group.getClear());
  2239. remoteClusters.append(cluster);
  2240. }
  2241. }
  2242. void ClusterWriteHandler::getLocalPhysicalFilename(StringAttr & out) const
  2243. {
  2244. if(localCluster.get())
  2245. out.set(physicalName.str());
  2246. else
  2247. getTempFilename(out);
  2248. PROGLOG("%s(CLUSTER) for logical filename %s writing to local file %s", activityType.get(), logicalName.get(), out.get());
  2249. }
  2250. void ClusterWriteHandler::splitPhysicalFilename(StringBuffer & dir, StringBuffer & base) const
  2251. {
  2252. dir.append(physicalDir);
  2253. base.append(physicalBase);
  2254. }
  2255. void ClusterWriteHandler::getTempFilename(StringAttr & out) const
  2256. {
  2257. // Should be implemented by more derived (platform-specific) class, if needed
  2258. throwUnexpected();
  2259. }
  2260. void ClusterWriteHandler::copyPhysical(IFile * source, bool noCopy) const
  2261. {
  2262. RemoteFilename rdn, rfn;
  2263. rdn.setLocalPath(physicalDir.str());
  2264. rfn.setLocalPath(physicalName.str());
  2265. ForEachItemIn(idx, remoteNodes)
  2266. {
  2267. rdn.setEp(remoteNodes.item(idx).queryNode(0).endpoint());
  2268. rfn.setEp(remoteNodes.item(idx).queryNode(0).endpoint());
  2269. Owned<IFile> targetdir = createIFile(rdn);
  2270. Owned<IFile> target = createIFile(rfn);
  2271. PROGLOG("%s(CLUSTER) for logical filename %s copying %s to %s", activityType.get(), logicalName.get(), source->queryFilename(), target->queryFilename());
  2272. if(noCopy)
  2273. {
  2274. WARNLOG("Skipping remote copy due to debug option");
  2275. }
  2276. else
  2277. {
  2278. targetdir->createDirectory();
  2279. copyFile(target, source);
  2280. }
  2281. }
  2282. }
  2283. void ClusterWriteHandler::setDescriptorParts(IFileDescriptor * desc, char const * basename, IPropertyTree * attrs) const
  2284. {
  2285. if(!localCluster.get()&&(remoteNodes.ordinality()==0))
  2286. throw MakeStringException(0, "Attempting to write file to no clusters");
  2287. ClusterPartDiskMapSpec partmap; // will get this from group at some point
  2288. desc->setNumParts(1);
  2289. desc->setPartMask(basename);
  2290. if (localCluster)
  2291. desc->addCluster(localClusterName,localCluster, partmap);
  2292. ForEachItemIn(idx,remoteNodes)
  2293. desc->addCluster(remoteClusters.item(idx),&remoteNodes.item(idx), partmap);
  2294. if (attrs) {
  2295. // need to set part attr
  2296. IPartDescriptor *partdesc = desc->queryPart(0);
  2297. IPropertyTree &pprop = partdesc->queryProperties();
  2298. // bit of a kludge (should really set properties *after* creating part rather than passing prop tree in)
  2299. Owned<IAttributeIterator> ai = attrs->getAttributes();
  2300. ForEach(*ai)
  2301. pprop.setProp(ai->queryName(),ai->queryValue());
  2302. }
  2303. }
  2304. void ClusterWriteHandler::finish(IFile * file) const
  2305. {
  2306. if(!localCluster.get())
  2307. {
  2308. PROGLOG("%s(CLUSTER) for logical filename %s removing temporary file %s", activityType.get(), logicalName.get(), file->queryFilename());
  2309. file->remove();
  2310. }
  2311. }
  2312. void ClusterWriteHandler::getClusters(StringArray &clusters) const
  2313. {
  2314. if(localCluster)
  2315. clusters.append(localClusterName);
  2316. ForEachItemIn(c, remoteClusters)
  2317. clusters.append(remoteClusters.item(c));
  2318. }
  2319. //=====================================================================================================
  2320. class COrderedOutputSerializer : implements IOrderedOutputSerializer, public CInterface
  2321. {
  2322. class COrderedResult : public CInterface
  2323. {
  2324. bool closed;
  2325. StringBuffer sb;
  2326. public:
  2327. IMPLEMENT_IINTERFACE;
  2328. COrderedResult() : closed(false) {}
  2329. bool flush(FILE * outFile, bool onlyClosed)
  2330. {
  2331. if (closed || !onlyClosed)
  2332. {
  2333. if (sb.length())
  2334. {
  2335. ::fwrite(sb.str(), sb.length(), 1, outFile);
  2336. sb.clear();
  2337. }
  2338. }
  2339. return closed;
  2340. }
  2341. size32_t printf(const char *format, va_list args) __attribute__((format(printf,2,0)))
  2342. {
  2343. if (closed)
  2344. throw MakeStringException(0, "Attempting to append to previously closed result in COrderedResult::printf");
  2345. int prevLen = sb.length();
  2346. sb.valist_appendf(format, args);
  2347. return sb.length() - prevLen;
  2348. }
  2349. size32_t fwrite(const void * data, size32_t size, size32_t count)
  2350. {
  2351. if (closed)
  2352. throw MakeStringException(0, "Attempting to append to previously closed result in COrderedResult::fwrite");
  2353. size32_t len = size * count;
  2354. sb.append(len, (const char *)data);
  2355. return len;
  2356. }
  2357. void close(bool nl)
  2358. {
  2359. if (closed)
  2360. throw MakeStringException(0, "Attempting to reclose result in COrderedResult::close");
  2361. if (nl)
  2362. sb.append('\n');
  2363. closed = true;
  2364. }
  2365. };
  2366. CIArrayOf<COrderedResult> COrderedResultArr;
  2367. int lastSeqFlushed;
  2368. FILE * outFile;
  2369. CriticalSection crit;
  2370. COrderedResult * getResult(size32_t seq)
  2371. {
  2372. while ((int)COrderedResultArr.ordinality() < (seq+1))
  2373. COrderedResultArr.append(*(new COrderedResult()));
  2374. return &COrderedResultArr.item(seq);
  2375. }
  2376. void flushCurrent()//stream current sequence
  2377. {
  2378. COrderedResult &res = COrderedResultArr.item(lastSeqFlushed + 1);
  2379. res.flush(outFile,false);
  2380. fflush(outFile);
  2381. }
  2382. void flushCompleted(bool onlyClosed)//flush completed sequence(s)
  2383. {
  2384. int lastSeq = (int)COrderedResultArr.ordinality()-1;
  2385. for (; lastSeqFlushed < lastSeq; lastSeqFlushed++)
  2386. {
  2387. COrderedResult &res = COrderedResultArr.item(lastSeqFlushed + 1);
  2388. if (!res.flush(outFile,onlyClosed) && onlyClosed)
  2389. break;
  2390. }
  2391. fflush(outFile);
  2392. }
  2393. public:
  2394. IMPLEMENT_IINTERFACE;
  2395. COrderedOutputSerializer(FILE* _outFile) : lastSeqFlushed(-1), outFile(_outFile) {}
  2396. ~COrderedOutputSerializer()
  2397. {
  2398. if (lastSeqFlushed != (COrderedResultArr.ordinality()-1))
  2399. flushCompleted(false);
  2400. COrderedResultArr.kill();
  2401. }
  2402. //IOrderedOutputSerializer
  2403. size32_t fwrite(int seq, const void * data, size32_t size, size32_t count)
  2404. {
  2405. CriticalBlock c(crit);
  2406. size32_t ret = getResult(seq)->fwrite(data,size, count);
  2407. if (seq == (lastSeqFlushed + 1))
  2408. flushCurrent();
  2409. return ret;
  2410. }
  2411. size32_t printf(int seq, const char *format, ...) __attribute__((format(printf, 3, 4)))
  2412. {
  2413. CriticalBlock c(crit);
  2414. va_list args;
  2415. va_start(args, format);
  2416. int ret = getResult(seq)->printf(format, args);
  2417. va_end(args);
  2418. if (seq == (lastSeqFlushed + 1))
  2419. flushCurrent();
  2420. return ret;
  2421. }
  2422. void close(int seq, bool nl)
  2423. {
  2424. CriticalBlock c(crit);
  2425. getResult(seq)->close(nl);
  2426. if ( seq == (lastSeqFlushed+1) )
  2427. flushCompleted(true);
  2428. }
  2429. };
  2430. IOrderedOutputSerializer * createOrderedOutputSerializer(FILE * _outFile)
  2431. {
  2432. return new COrderedOutputSerializer(_outFile);
  2433. }
  2434. //=====================================================================================================
  2435. StringBuffer & mangleHelperFileName(StringBuffer & out, const char * in, const char * wuid, unsigned int flags)
  2436. {
  2437. out = in;
  2438. if (flags & (TDXtemporary | TDXjobtemp))
  2439. out.append("__").append(wuid);
  2440. return out;
  2441. }
  2442. StringBuffer & mangleLocalTempFilename(StringBuffer & out, char const * in)
  2443. {
  2444. char const * start = in;
  2445. while(true)
  2446. {
  2447. char const * end = strstr(start, "::");
  2448. if(end)
  2449. {
  2450. out.append(end-start, start).append("__scope__");
  2451. start = end + 2;
  2452. }
  2453. else
  2454. {
  2455. out.append(start);
  2456. break;
  2457. }
  2458. }
  2459. return out;
  2460. }
  2461. static const char *skipLfnForeign(const char *lfn)
  2462. {
  2463. // NOTE: The leading ~ and any leading spaces have already been stripped at this point
  2464. const char *finger = lfn;
  2465. if (strnicmp(finger, "foreign", 7)==0)
  2466. {
  2467. finger += 7;
  2468. while (*finger == ' ')
  2469. finger++;
  2470. if (finger[0] == ':' && finger[1] == ':')
  2471. {
  2472. // foreign scope - need to strip off the ip and port (i.e. from here to the next ::)
  2473. finger += 2; // skip ::
  2474. finger = strstr(finger, "::");
  2475. if (finger)
  2476. {
  2477. finger += 2;
  2478. while (*finger == ' ')
  2479. finger++;
  2480. return finger;
  2481. }
  2482. }
  2483. }
  2484. return lfn;
  2485. }
  2486. StringBuffer & expandLogicalFilename(StringBuffer & logicalName, const char * fname, IConstWorkUnit * wu, bool resolveLocally, bool ignoreForeignPrefix)
  2487. {
  2488. if (fname[0]=='~')
  2489. {
  2490. while (*fname=='~' || *fname==' ')
  2491. fname++;
  2492. if (ignoreForeignPrefix)
  2493. fname = skipLfnForeign(fname);
  2494. logicalName.append(fname);
  2495. }
  2496. else if (resolveLocally)
  2497. {
  2498. StringBuffer sb(fname);
  2499. sb.replaceString("::",PATHSEPSTR);
  2500. makeAbsolutePath(sb.str(), logicalName.clear());
  2501. }
  2502. else
  2503. {
  2504. SCMStringBuffer lfn;
  2505. if (wu)
  2506. {
  2507. wu->getScope(lfn);
  2508. if(lfn.length())
  2509. logicalName.append(lfn.s).append("::");
  2510. }
  2511. logicalName.append(fname);
  2512. }
  2513. return logicalName;
  2514. }
  2515. //----------------------------------------------------------------------------------
  2516. void IRoxieContextLogger::CTXLOGae(IException *E, const char *file, unsigned line, const char *prefix, const char *format, ...) const
  2517. {
  2518. va_list args;
  2519. va_start(args, format);
  2520. CTXLOGaeva(E, file, line, prefix, format, args);
  2521. va_end(args);
  2522. }
  2523. void HttpHelper::parseURL()
  2524. {
  2525. const char *start = url.str();
  2526. while (isspace(*start))
  2527. start++;
  2528. if (*start=='/')
  2529. start++;
  2530. StringAttr path;
  2531. const char *finger = strpbrk(start, "?");
  2532. if (finger)
  2533. path.set(start, finger-start);
  2534. else
  2535. path.set(start);
  2536. if (path.length())
  2537. pathNodes.appendList(path, "/");
  2538. if (!finger)
  2539. return;
  2540. finger++;
  2541. while (*finger)
  2542. {
  2543. StringBuffer s, prop, val;
  2544. while (*finger && *finger != '&' && *finger != '=')
  2545. s.append(*finger++);
  2546. appendDecodedURL(prop, s.trim());
  2547. if (!*finger || *finger == '&')
  2548. val.set("1");
  2549. else
  2550. {
  2551. s.clear();
  2552. finger++;
  2553. while (*finger && *finger != '&')
  2554. s.append(*finger++);
  2555. appendDecodedURL(val, s.trim());
  2556. }
  2557. if (prop.length())
  2558. parameters->setProp(prop, val);
  2559. if (*finger)
  2560. finger++;
  2561. }
  2562. }