thorcommon.cpp 74 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "jexcept.hpp"
  14. #include "jmisc.hpp"
  15. #include "jthread.hpp"
  16. #include "jsocket.hpp"
  17. #include "jprop.hpp"
  18. #include "jdebug.hpp"
  19. #include "jlzw.hpp"
  20. #include "junicode.hpp"
  21. #include "eclhelper.hpp"
  22. #include "thorcommon.ipp"
  23. #include "eclrtl.hpp"
  24. #include "rtlread_imp.hpp"
  25. #include "rtlcommon.hpp"
  26. #include "rtldynfield.hpp"
  27. #include "eclhelper_dyn.hpp"
  28. #include "hqlexpr.hpp"
  29. #include "hqlutil.hpp"
  30. #include <algorithm>
  31. #ifdef _USE_NUMA
  32. #include <numa.h>
  33. #endif
  34. #include "roxiemem.hpp"
  35. #include "thorstep.hpp"
  36. #include "roxiemem.hpp"
  37. #define ROWAGG_PERROWOVERHEAD (sizeof(AggregateRowBuilder))
  38. void AggregateRowBuilder::Link() const { LinkRoxieRow(this); }
  39. bool AggregateRowBuilder::Release() const { ReleaseRoxieRow(this); return false; } // MORE - return value is iffy
  40. RowAggregator::RowAggregator(IHThorHashAggregateExtra &_extra, IHThorRowAggregator & _helper) : helper(_helper)
  41. {
  42. comparer = _extra.queryCompareRowElement();
  43. hasher = _extra.queryHash();
  44. elementHasher = _extra.queryHashElement();
  45. elementComparer = _extra.queryCompareElements();
  46. cursor = NULL;
  47. eof = false;
  48. totalSize = overhead = 0;
  49. }
  50. RowAggregator::~RowAggregator()
  51. {
  52. reset();
  53. }
  54. static CClassMeta<AggregateRowBuilder> AggregateRowBuilderMeta;
  55. void RowAggregator::start(IEngineRowAllocator *_rowAllocator, ICodeContext *ctx, unsigned activityId)
  56. {
  57. rowAllocator.set(_rowAllocator);
  58. rowBuilderAllocator.setown(ctx->getRowAllocatorEx(&AggregateRowBuilderMeta, activityId, roxiemem::RHFunique|roxiemem::RHFscanning|roxiemem::RHFdelayrelease));
  59. }
  60. void RowAggregator::reset()
  61. {
  62. while (!eof)
  63. {
  64. AggregateRowBuilder *n = nextResult();
  65. if (n)
  66. ReleaseRoxieRow(n);
  67. }
  68. _releaseAll();
  69. eof = false;
  70. cursor = NULL;
  71. rowAllocator.clear();
  72. totalSize = overhead = 0;
  73. }
  74. AggregateRowBuilder &RowAggregator::addRow(const void * row)
  75. {
  76. AggregateRowBuilder *result;
  77. unsigned hash = hasher->hash(row);
  78. void * match = find(hash, row);
  79. if (match)
  80. {
  81. result = static_cast<AggregateRowBuilder *>(match);
  82. totalSize -= result->querySize();
  83. size32_t sz = helper.processNext(*result, row);
  84. result->setSize(sz);
  85. totalSize += sz;
  86. }
  87. else
  88. {
  89. Owned<AggregateRowBuilder> rowBuilder = new (rowBuilderAllocator->createRow()) AggregateRowBuilder(rowAllocator, hash);
  90. helper.clearAggregate(*rowBuilder);
  91. size32_t sz = helper.processFirst(*rowBuilder, row);
  92. rowBuilder->setSize(sz);
  93. result = rowBuilder.getClear();
  94. addNew(result, hash);
  95. totalSize += sz;
  96. overhead += ROWAGG_PERROWOVERHEAD;
  97. }
  98. return *result;
  99. }
  100. void RowAggregator::mergeElement(const void * otherElement)
  101. {
  102. unsigned hash = elementHasher->hash(otherElement);
  103. void * match = findElement(hash, otherElement);
  104. if (match)
  105. {
  106. AggregateRowBuilder *rowBuilder = static_cast<AggregateRowBuilder *>(match);
  107. totalSize -= rowBuilder->querySize();
  108. size32_t sz = helper.mergeAggregate(*rowBuilder, otherElement);
  109. rowBuilder->setSize(sz);
  110. totalSize += sz;
  111. }
  112. else
  113. {
  114. Owned<AggregateRowBuilder> rowBuilder = new (rowBuilderAllocator->createRow()) AggregateRowBuilder(rowAllocator, hash);
  115. rowBuilder->setSize(cloneRow(*rowBuilder, otherElement, rowAllocator->queryOutputMeta()));
  116. addNew(rowBuilder.getClear(), hash);
  117. }
  118. }
  119. const void * RowAggregator::getFindParam(const void *et) const
  120. {
  121. // Slightly odd name for this function... it actually gets the comparable element
  122. const AggregateRowBuilder *rb = static_cast<const AggregateRowBuilder*>(et);
  123. return rb->row();
  124. }
  125. bool RowAggregator::matchesFindParam(const void *et, const void *key, unsigned fphash) const
  126. {
  127. if (fphash != hashFromElement(et))
  128. return false;
  129. // et = element in the table (an AggregateRowBuilder) key = new row (in input row layout).
  130. return comparer->docompare(key, getFindParam(et)) == 0;
  131. }
  132. bool RowAggregator::matchesElement(const void *et, const void * searchET) const
  133. {
  134. return elementComparer->docompare(getFindParam(et), searchET) == 0;
  135. }
  136. AggregateRowBuilder *RowAggregator::nextResult()
  137. {
  138. void *ret = next(cursor);
  139. if (!ret)
  140. {
  141. eof = true;
  142. return NULL;
  143. }
  144. cursor = ret;
  145. return static_cast<AggregateRowBuilder *>(ret);
  146. }
  147. //=====================================================================================================
  148. void CStreamMerger::fillheap(const void * seek, unsigned numFields, const SmartStepExtra * stepExtra)
  149. {
  150. assertex(activeInputs == 0);
  151. for(unsigned i = 0; i < numInputs; i++)
  152. if(pullInput(i, seek, numFields, stepExtra))
  153. mergeheap[activeInputs++] = i;
  154. }
  155. void CStreamMerger::permute(const void * seek, unsigned numFields, const SmartStepExtra * stepExtra)
  156. {
  157. // the tree structure: element p has children p*2+1 and p*2+2, or element c has parent (unsigned)(c-1)/2
  158. // the heap property: no element should be smaller than its parent
  159. // the dedup variant: if(dedup), the top of the heap should also not be equal to either child
  160. // the method: establish this by starting with the parent of the bottom element and working up to the top element, sifting each down to its correct place
  161. if (activeInputs >= 2)
  162. for(unsigned p = (activeInputs-2)/2; p > 0; --p)
  163. siftDown(p);
  164. if(dedup)
  165. siftDownDedupTop(seek, numFields, stepExtra);
  166. else
  167. siftDown(0);
  168. }
  169. const void * CStreamMerger::consumeTop()
  170. {
  171. unsigned top = mergeheap[0];
  172. if (!pullConsumes)
  173. consumeInput(top);
  174. const void *next = pending[top];
  175. pending[top] = NULL;
  176. return next;
  177. }
  178. bool CStreamMerger::ensureNext(const void * seek, unsigned numFields, bool & wasCompleteMatch, const SmartStepExtra * stepExtra)
  179. {
  180. //wasCompleteMatch must be initialised from the actual row returned. (See bug #30388)
  181. if (first)
  182. {
  183. fillheap(seek, numFields, stepExtra);
  184. permute(seek, numFields, stepExtra);
  185. first = false;
  186. if (activeInputs == 0)
  187. return false;
  188. unsigned top = mergeheap[0];
  189. wasCompleteMatch = pendingMatches[top];
  190. return true;
  191. }
  192. while (activeInputs)
  193. {
  194. unsigned top = mergeheap[0];
  195. const void *next = pending[top];
  196. if (next)
  197. {
  198. if (seek)
  199. {
  200. int c = rangeCompare->docompare(next, seek, numFields);
  201. if (c >= 0)
  202. {
  203. if (stepExtra->returnMismatches() && (c > 0))
  204. {
  205. wasCompleteMatch = pendingMatches[top];
  206. return true;
  207. }
  208. else
  209. {
  210. if (pendingMatches[top])
  211. return true;
  212. }
  213. }
  214. }
  215. else
  216. {
  217. if (pendingMatches[top])
  218. return true;
  219. }
  220. skipInput(top);
  221. }
  222. if(!pullInput(top, seek, numFields, stepExtra))
  223. if(!promote(0))
  224. return false;
  225. // we have changed the element at the top of the heap, so need to sift it down to maintain the heap property
  226. if(dedup)
  227. siftDownDedupTop(seek, numFields, stepExtra);
  228. else
  229. siftDown(0);
  230. }
  231. return false;
  232. }
  233. bool CStreamMerger::ensureNext()
  234. {
  235. bool isCompleteMatch = true;
  236. return ensureNext(NULL, 0, isCompleteMatch, NULL);
  237. }
  238. void CStreamMerger::permute()
  239. {
  240. permute(NULL, 0, NULL);
  241. }
  242. bool CStreamMerger::promote(unsigned p)
  243. {
  244. activeInputs--;
  245. if(activeInputs == p)
  246. return false;
  247. mergeheap[p] = mergeheap[activeInputs];
  248. return true;
  249. }
  250. void CStreamMerger::siftDownDedupTop(const void * seek, unsigned numFields, const SmartStepExtra * stepExtra)
  251. {
  252. // same as siftDown(0), except that it also ensures that the top of the heap is not equal to either of its children
  253. if(activeInputs < 2)
  254. return;
  255. unsigned c = 1;
  256. int childcmp = 1;
  257. if(activeInputs >= 3)
  258. {
  259. childcmp = compare->docompare(pending[mergeheap[2]], pending[mergeheap[1]]);
  260. if(childcmp < 0)
  261. c = 2;
  262. }
  263. int cmp = compare->docompare(pending[mergeheap[c]], pending[mergeheap[0]]);
  264. if(cmp > 0)
  265. return;
  266. // the following loop ensures the correct property holds on the smaller branch, and that childcmp==0 iff the top matches the other branch
  267. while(cmp <= 0)
  268. {
  269. if(cmp == 0)
  270. {
  271. if(mergeheap[c] < mergeheap[0])
  272. {
  273. unsigned r = mergeheap[c];
  274. mergeheap[c] = mergeheap[0];
  275. mergeheap[0] = r;
  276. }
  277. unsigned top = mergeheap[c];
  278. skipInput(top);
  279. if(!pullInput(top, seek, numFields, stepExtra))
  280. if(!promote(c))
  281. break;
  282. siftDown(c);
  283. }
  284. else
  285. {
  286. unsigned r = mergeheap[c];
  287. mergeheap[c] = mergeheap[0];
  288. mergeheap[0] = r;
  289. if(siftDown(c))
  290. break;
  291. }
  292. cmp = compare->docompare(pending[mergeheap[c]], pending[mergeheap[0]]);
  293. }
  294. // the following loop ensures the uniqueness property holds on the other branch too
  295. c = 3-c;
  296. if(activeInputs <= c)
  297. return;
  298. while(childcmp == 0)
  299. {
  300. if(mergeheap[c] < mergeheap[0])
  301. {
  302. unsigned r = mergeheap[c];
  303. mergeheap[c] = mergeheap[0];
  304. mergeheap[0] = r;
  305. }
  306. unsigned top = mergeheap[c];
  307. skipInput(top);
  308. if(!pullInput(top, seek, numFields, stepExtra))
  309. if(!promote(c))
  310. break;
  311. siftDown(c);
  312. childcmp = compare->docompare(pending[mergeheap[c]], pending[mergeheap[0]]);
  313. }
  314. }
  315. void CStreamMerger::cleanup()
  316. {
  317. clearPending();
  318. delete [] pending;
  319. pending = NULL;
  320. delete [] pendingMatches;
  321. pendingMatches = NULL;
  322. delete [] mergeheap;
  323. mergeheap = NULL;
  324. }
  325. void CStreamMerger::clearPending()
  326. {
  327. if (pending && activeInputs)
  328. {
  329. for(unsigned i = 0; i < numInputs; i++)
  330. {
  331. if (pullConsumes)
  332. releaseRow(pending[i]);
  333. pending[i] = NULL;
  334. }
  335. activeInputs = 0;
  336. }
  337. first = true;
  338. }
  339. CStreamMerger::CStreamMerger(bool _pullConsumes)
  340. {
  341. pending = NULL;
  342. pendingMatches = NULL;
  343. mergeheap = NULL;
  344. compare = NULL;
  345. rangeCompare = NULL;
  346. dedup = false;
  347. activeInputs = 0;
  348. pullConsumes = _pullConsumes;
  349. numInputs = 0;
  350. first = true;
  351. }
  352. CStreamMerger::~CStreamMerger()
  353. {
  354. //can't call cleanup() because virtual releaseRow() won't be defined.
  355. // NOTE: use assert rather than assertex as exceptions from within destructors are not handled well.
  356. assert(!pending && !mergeheap);
  357. }
  358. void CStreamMerger::init(ICompare * _compare, bool _dedup, IRangeCompare * _rangeCompare)
  359. {
  360. compare = _compare;
  361. dedup = _dedup;
  362. rangeCompare = _rangeCompare;
  363. }
  364. void CStreamMerger::initInputs(unsigned _numInputs)
  365. {
  366. assertex(!pending); // cleanup should have been called before reinitializing
  367. numInputs = _numInputs;
  368. mergeheap = new unsigned[numInputs];
  369. pending = new const void *[numInputs];
  370. pendingMatches = new bool [numInputs];
  371. for (unsigned i = 0; i < numInputs; i++)
  372. pending[i] = NULL;
  373. activeInputs = 0;
  374. first = true;
  375. }
  376. void CStreamMerger::consumeInput(unsigned i)
  377. {
  378. //should be over-ridden if pullConsumes is false;
  379. throwUnexpected();
  380. }
  381. void CStreamMerger::skipInput(unsigned i)
  382. {
  383. if (!pullConsumes)
  384. consumeInput(i);
  385. releaseRow(pending[i]);
  386. pending[i] = NULL;
  387. }
  388. void CStreamMerger::primeRows(const void * * rows)
  389. {
  390. assertex(first && (activeInputs == 0));
  391. first = false;
  392. for(unsigned i = 0; i < numInputs; i++)
  393. {
  394. if ((pending[i] = rows[i]) != NULL)
  395. {
  396. mergeheap[activeInputs++] = i;
  397. pendingMatches[i] = true;
  398. }
  399. }
  400. permute();
  401. }
  402. const void * CStreamMerger::nextRow()
  403. {
  404. if (ensureNext())
  405. return consumeTop();
  406. return NULL;
  407. }
  408. const void * CStreamMerger::queryNextRow()
  409. {
  410. if (ensureNext())
  411. return pending[mergeheap[0]];
  412. return NULL;
  413. }
  414. unsigned CStreamMerger::queryNextInput()
  415. {
  416. if (ensureNext())
  417. return mergeheap[0];
  418. return NotFound;
  419. }
  420. const void * CStreamMerger::nextRowGE(const void * seek, unsigned numFields, bool & wasCompleteMatch, const SmartStepExtra & stepExtra)
  421. {
  422. if (ensureNext(seek, numFields, wasCompleteMatch, &stepExtra))
  423. return consumeTop();
  424. return NULL;
  425. }
  426. void CStreamMerger::skipRow()
  427. {
  428. assertex(!first);
  429. skipInput(mergeheap[0]);
  430. }
  431. //=====================================================================================================
  432. CThorDemoRowSerializer::CThorDemoRowSerializer(MemoryBuffer & _buffer) : buffer(_buffer)
  433. {
  434. nesting = 0;
  435. }
  436. void CThorDemoRowSerializer::put(size32_t len, const void * ptr)
  437. {
  438. buffer.append(len, ptr);
  439. //ok to flush if nesting == 0;
  440. }
  441. size32_t CThorDemoRowSerializer::beginNested(size32_t count)
  442. {
  443. nesting++;
  444. unsigned pos = buffer.length();
  445. buffer.append((size32_t)0);
  446. return pos;
  447. }
  448. void CThorDemoRowSerializer::endNested(size32_t sizePos)
  449. {
  450. unsigned pos = buffer.length();
  451. buffer.rewrite(sizePos);
  452. buffer.append((size32_t)(pos - (sizePos + sizeof(size32_t))));
  453. buffer.rewrite(pos);
  454. nesting--;
  455. }
  456. IOutputRowSerializer * CachedOutputMetaData::createDiskSerializer(ICodeContext * ctx, unsigned activityId) const
  457. {
  458. if (metaFlags & (MDFhasserialize|MDFneedserializedisk))
  459. return meta->createDiskSerializer(ctx, activityId);
  460. if (isFixedSize())
  461. return new CSimpleFixedRowSerializer(getFixedSize());
  462. return new CSimpleVariableRowSerializer(this);
  463. }
  464. IOutputRowDeserializer * CachedOutputMetaData::createDiskDeserializer(ICodeContext * ctx, unsigned activityId) const
  465. {
  466. if (metaFlags & (MDFhasserialize|MDFneedserializedisk))
  467. return meta->createDiskDeserializer(ctx, activityId);
  468. if (isFixedSize())
  469. return new CSimpleFixedRowDeserializer(getFixedSize());
  470. throwUnexpectedX("createDiskDeserializer variable meta has no serializer");
  471. }
  472. IOutputRowSerializer * CachedOutputMetaData::createInternalSerializer(ICodeContext * ctx, unsigned activityId) const
  473. {
  474. if (metaFlags & (MDFhasserialize|MDFneedserializeinternal))
  475. return meta->createInternalSerializer(ctx, activityId);
  476. if (isFixedSize())
  477. return new CSimpleFixedRowSerializer(getFixedSize());
  478. return new CSimpleVariableRowSerializer(this);
  479. }
  480. IOutputRowDeserializer * CachedOutputMetaData::createInternalDeserializer(ICodeContext * ctx, unsigned activityId) const
  481. {
  482. if (metaFlags & (MDFhasserialize|MDFneedserializeinternal))
  483. return meta->createInternalDeserializer(ctx, activityId);
  484. if (isFixedSize())
  485. return new CSimpleFixedRowDeserializer(getFixedSize());
  486. throwUnexpectedX("createInternalDeserializer variable meta has no serializer");
  487. }
  488. void CSizingSerializer::put(size32_t len, const void * ptr)
  489. {
  490. totalsize += len;
  491. }
  492. size32_t CSizingSerializer::beginNested(size32_t count)
  493. {
  494. totalsize += sizeof(size32_t);
  495. return totalsize;
  496. }
  497. void CSizingSerializer::endNested(size32_t position)
  498. {
  499. }
  500. void CMemoryRowSerializer::put(size32_t len, const void * ptr)
  501. {
  502. buffer.append(len, ptr);
  503. }
  504. size32_t CMemoryRowSerializer::beginNested(size32_t count)
  505. {
  506. nesting++;
  507. unsigned pos = buffer.length();
  508. buffer.append((size32_t)0);
  509. return pos;
  510. }
  511. void CMemoryRowSerializer::endNested(size32_t sizePos)
  512. {
  513. size32_t sz = buffer.length()-(sizePos + sizeof(size32_t));
  514. buffer.writeDirect(sizePos,sizeof(sz),&sz);
  515. nesting--;
  516. }
  517. static void ensureClassesAreNotAbstract()
  518. {
  519. MemoryBuffer temp;
  520. CThorStreamDeserializerSource x1(NULL);
  521. CThorContiguousRowBuffer x2(NULL);
  522. CSizingSerializer x3;
  523. CMemoryRowSerializer x4(temp);
  524. }
  525. //=====================================================================================================
  526. //the visitor callback is used to ensure link counts for children are updated.
  527. size32_t cloneRow(ARowBuilder & rowBuilder, const void * row, IOutputMetaData * meta)
  528. {
  529. size32_t rowSize = meta->getRecordSize(row); // TBD could be better?
  530. byte * self = rowBuilder.ensureCapacity(rowSize, NULL);
  531. memcpy(self, row, rowSize);
  532. if (meta->getMetaFlags() & MDFneeddestruct)
  533. {
  534. ChildRowLinkerWalker walker;
  535. meta->walkIndirectMembers(self, walker);
  536. }
  537. return rowSize;
  538. }
  539. //---------------------------------------------------------------------------------------------------
  540. extern const char * getActivityText(ThorActivityKind kind)
  541. {
  542. switch (kind)
  543. {
  544. case TAKnone: return "None";
  545. case TAKdiskwrite: return "Disk Write";
  546. case TAKsort: return "Sort";
  547. case TAKdedup: return "Dedup";
  548. case TAKfilter: return "Filter";
  549. case TAKsplit: return "Split";
  550. case TAKproject: return "Project";
  551. case TAKrollup: return "Rollup";
  552. case TAKiterate: return "Iterate";
  553. case TAKaggregate: return "Aggregate";
  554. case TAKhashaggregate: return "Hash Aggregate";
  555. case TAKfirstn: return "Firstn";
  556. case TAKsample: return "Sample";
  557. case TAKdegroup: return "Degroup";
  558. case TAKjoin: return "Join";
  559. case TAKhashjoin: return "Hash Join";
  560. case TAKlookupjoin: return "Lookup Join";
  561. case TAKselfjoin: return "Self Join";
  562. case TAKkeyedjoin: return "Keyed Join";
  563. case TAKgroup: return "Group";
  564. case TAKworkunitwrite: return "Output";
  565. case TAKfunnel: return "Funnel";
  566. case TAKapply: return "Apply";
  567. case TAKinlinetable: return "Inline Dataset";
  568. case TAKhashdistribute: return "Hash Distribute";
  569. case TAKhashdedup: return "Hash Dedup";
  570. case TAKnormalize: return "Normalize";
  571. case TAKremoteresult: return "Remote Result";
  572. case TAKpull: return "Pull";
  573. case TAKdenormalize: return "Denormalize";
  574. case TAKnormalizechild: return "Normalize Child";
  575. case TAKchilddataset: return "Child Dataset";
  576. case TAKselectn: return "Select Nth";
  577. case TAKenth: return "Enth";
  578. case TAKif: return "If";
  579. case TAKnull: return "Null";
  580. case TAKdistribution: return "Distribution";
  581. case TAKcountproject: return "Count Project";
  582. case TAKchoosesets: return "Choose Sets";
  583. case TAKpiperead: return "Pipe Read";
  584. case TAKpipewrite: return "Pipe Write";
  585. case TAKcsvwrite: return "Csv Write";
  586. case TAKpipethrough: return "Pipe Through";
  587. case TAKindexwrite: return "Index Write";
  588. case TAKchoosesetsenth: return "Choose Sets Enth";
  589. case TAKchoosesetslast: return "Choose Sets Last";
  590. case TAKfetch: return "Fetch";
  591. case TAKhashdenormalize: return "Hash Denormalize";
  592. case TAKworkunitread: return "Read";
  593. case TAKthroughaggregate: return "Through Aggregate";
  594. case TAKspill: return "Spill";
  595. case TAKcase: return "Case";
  596. case TAKlimit: return "Limit";
  597. case TAKcsvfetch: return "Csv Fetch";
  598. case TAKxmlwrite: return "Xml Write";
  599. case TAKjsonwrite: return "Json Write";
  600. case TAKparse: return "Parse";
  601. case TAKsideeffect: return "Simple Action";
  602. case TAKtopn: return "Top N";
  603. case TAKmerge: return "Merge";
  604. case TAKxmlfetch: return "Xml Fetch";
  605. case TAKjsonfetch: return "Json Fetch";
  606. case TAKxmlparse: return "Parse Xml";
  607. case TAKkeyeddistribute: return "Keyed Distribute";
  608. case TAKjoinlight: return "Lightweight Join";
  609. case TAKalljoin: return "All Join";
  610. case TAKsoap_rowdataset: return "SOAP dataset";
  611. case TAKsoap_rowaction: return "SOAP action";
  612. case TAKsoap_datasetdataset: return "SOAP dataset";
  613. case TAKsoap_datasetaction: return "SOAP action";
  614. case TAKkeydiff: return "Key Difference";
  615. case TAKkeypatch: return "Key Patch";
  616. case TAKkeyeddenormalize: return "Keyed Denormalize";
  617. case TAKsequential: return "Sequential";
  618. case TAKparallel: return "Parallel";
  619. case TAKchilditerator: return "Child Dataset";
  620. case TAKdatasetresult: return "Dataset Result";
  621. case TAKrowresult: return "Row Result";
  622. case TAKchildif: return "If";
  623. case TAKpartition: return "Partition Distribute";
  624. case TAKsubgraph: return "Sub Graph";
  625. case TAKlocalgraph: return "Local Graph";
  626. case TAKifaction: return "If Action";
  627. case TAKemptyaction: return "Empty Action";
  628. case TAKskiplimit: return "Skip Limit";
  629. case TAKdiskread: return "Disk Read";
  630. case TAKdisknormalize: return "Disk Normalize";
  631. case TAKdiskaggregate: return "Disk Aggregate";
  632. case TAKdiskcount: return "Disk Count";
  633. case TAKdiskgroupaggregate: return "Disk Grouped Aggregate";
  634. case TAKdiskexists: return "Disk Exists";
  635. case TAKindexread: return "Index Read";
  636. case TAKindexnormalize: return "Index Normalize";
  637. case TAKindexaggregate: return "Index Aggregate";
  638. case TAKindexcount: return "Index Count";
  639. case TAKindexgroupaggregate: return "Index Grouped Aggregate";
  640. case TAKindexexists: return "Index Exists";
  641. case TAKchildread: return "Child Read";
  642. case TAKchildnormalize: return "Child Normalize";
  643. case TAKchildaggregate: return "Child Aggregate";
  644. case TAKchildcount: return "Child Count";
  645. case TAKchildgroupaggregate: return "Child Grouped Aggregate";
  646. case TAKchildexists: return "Child Exists";
  647. case TAKchildthroughnormalize: return "Normalize";
  648. case TAKcsvread: return "Csv Read";
  649. case TAKxmlread: return "Xml Read";
  650. case TAKjsonread: return "Json Read";
  651. case TAKlocalresultread: return "Read Local Result";
  652. case TAKlocalresultwrite: return "Local Result";
  653. case TAKcombine: return "Combine";
  654. case TAKregroup: return "Regroup";
  655. case TAKrollupgroup: return "Rollup Group";
  656. case TAKcombinegroup: return "Combine Group";
  657. case TAKlookupdenormalize: return "Lookup Denormalize";
  658. case TAKalldenormalize: return "All Denormalize";
  659. case TAKsmartdenormalizegroup: return "Smart Denormalize Group";
  660. case TAKunknowndenormalizegroup1: return "Unknown Denormalize Group1";
  661. case TAKunknowndenormalizegroup2: return "Unknown Denormalize Group2";
  662. case TAKunknowndenormalizegroup3: return "Unknown Denormalize Group3";
  663. case TAKlastdenormalizegroup: return "Last Denormalize Group";
  664. case TAKdenormalizegroup: return "Denormalize Group";
  665. case TAKhashdenormalizegroup: return "Hash Denormalize Group";
  666. case TAKlookupdenormalizegroup: return "Lookup Denormalize Group";
  667. case TAKkeyeddenormalizegroup: return "Keyed Denormalize Group";
  668. case TAKalldenormalizegroup: return "All Denormalize Group";
  669. case TAKlocalresultspill: return "Spill Local Result";
  670. case TAKsimpleaction: return "Action";
  671. case TAKloopcount: return "Loop";
  672. case TAKlooprow: return "Loop";
  673. case TAKloopdataset: return "Loop";
  674. case TAKchildcase: return "Case";
  675. case TAKremotegraph: return "Remote";
  676. case TAKlibrarycall: return "Library Call";
  677. case TAKlocalstreamread: return "Read Input";
  678. case TAKprocess: return "Process";
  679. case TAKgraphloop: return "Graph";
  680. case TAKparallelgraphloop: return "Graph";
  681. case TAKgraphloopresultread: return "Graph Input";
  682. case TAKgraphloopresultwrite: return "Graph Result";
  683. case TAKgrouped: return "Grouped";
  684. case TAKsorted: return "Sorted";
  685. case TAKdistributed: return "Distributed";
  686. case TAKnwayjoin: return "Join";
  687. case TAKnwaymerge: return "Merge";
  688. case TAKnwaymergejoin: return "Merge Join";
  689. case TAKnwayinput: return "Nway Input";
  690. case TAKnwaygraphloopresultread: return "Nway Graph Input";
  691. case TAKnwayselect: return "Select Nway Input";
  692. case TAKnonempty: return "Non Empty";
  693. case TAKcreaterowlimit: return "OnFail Limit";
  694. case TAKexistsaggregate: return "Exists";
  695. case TAKcountaggregate: return "Count";
  696. case TAKprefetchproject: return "Prefetch Project";
  697. case TAKprefetchcountproject: return "Prefetch Count Project";
  698. case TAKfiltergroup: return "Filter Group";
  699. case TAKmemoryspillread: return "Read Spill";
  700. case TAKmemoryspillwrite: return "Write Spill";
  701. case TAKmemoryspillsplit: return "Spill";
  702. case TAKsection: return "Section";
  703. case TAKlinkedrawiterator: return "Child Dataset";
  704. case TAKnormalizelinkedchild: return "Normalize";
  705. case TAKfilterproject: return "Filtered Project";
  706. case TAKcatch: return "Catch";
  707. case TAKskipcatch: return "Skip Catch";
  708. case TAKcreaterowcatch: return "OnFail Catch";
  709. case TAKsectioninput: return "Section Input";
  710. case TAKcaseaction: return "Case Action";
  711. case TAKindexgroupcount: return "Index Grouped Count";
  712. case TAKindexgroupexists: return "Index Grouped Exists";
  713. case TAKhashdistributemerge: return "Distribute Merge";
  714. case TAKselfjoinlight: return "Lightweight Self Join";
  715. case TAKlastjoin: return "Last Join";
  716. case TAKwhen_dataset: return "When";
  717. case TAKhttp_rowdataset: return "HTTP dataset";
  718. case TAKstreamediterator: return "Streamed Dataset";
  719. case TAKexternalsource: return "User Source";
  720. case TAKexternalsink: return "User Output";
  721. case TAKexternalprocess: return "User Proceess";
  722. case TAKwhen_action: return "When";
  723. case TAKsubsort: return "Sub Sort";
  724. case TAKdictionaryworkunitwrite:return "Dictionary Write";
  725. case TAKdictionaryresultwrite: return "Dictionary Result";
  726. case TAKsmartjoin: return "Smart Join";
  727. case TAKunknownjoin1: return "Unknown Join1";
  728. case TAKunknownjoin2: return "Unknown Join2";
  729. case TAKunknownjoin3: return "Unknown Join3";
  730. case TAKsmartdenormalize: return "Smart Denormalize";
  731. case TAKunknowndenormalize1: return "Unknown Denormalize1";
  732. case TAKunknowndenormalize2: return "Unknown Denormalize2";
  733. case TAKunknowndenormalize3: return "Unknown Denormalize3";
  734. case TAKlastdenormalize: return "Last Denormalize";
  735. case TAKselfdenormalize: return "Self Denormalize";
  736. case TAKselfdenormalizegroup: return "Self Denormalize Group";
  737. case TAKtrace: return "Trace";
  738. case TAKquantile: return "Quantile";
  739. case TAKspillread: return "Spill Read";
  740. case TAKspillwrite: return "Spill Write";
  741. case TAKnwaydistribute: return "Nway Distribute";
  742. case TAKnewdiskread: return "Disk Read";
  743. }
  744. throwUnexpected();
  745. }
  746. extern bool isActivitySource(ThorActivityKind kind)
  747. {
  748. switch (kind)
  749. {
  750. case TAKpiperead:
  751. case TAKinlinetable:
  752. case TAKworkunitread:
  753. case TAKnull:
  754. case TAKsideeffect:
  755. case TAKsoap_rowdataset:
  756. case TAKsoap_rowaction:
  757. case TAKkeydiff:
  758. case TAKkeypatch:
  759. case TAKchilditerator:
  760. case TAKlocalgraph:
  761. case TAKemptyaction:
  762. case TAKdiskread:
  763. case TAKdisknormalize:
  764. case TAKdiskaggregate:
  765. case TAKdiskcount:
  766. case TAKdiskgroupaggregate:
  767. case TAKindexread:
  768. case TAKindexnormalize:
  769. case TAKindexaggregate:
  770. case TAKindexcount:
  771. case TAKindexgroupaggregate:
  772. case TAKchildnormalize:
  773. case TAKchildaggregate:
  774. case TAKchildgroupaggregate:
  775. case TAKcsvread:
  776. case TAKxmlread:
  777. case TAKjsonread:
  778. case TAKlocalresultread:
  779. case TAKsimpleaction:
  780. case TAKlocalstreamread:
  781. case TAKgraphloopresultread:
  782. case TAKnwaygraphloopresultread:
  783. case TAKlinkedrawiterator:
  784. case TAKindexgroupexists:
  785. case TAKindexgroupcount:
  786. case TAKstreamediterator:
  787. case TAKexternalsource:
  788. case TAKspillread:
  789. return true;
  790. }
  791. return false;
  792. }
  793. extern bool isActivitySink(ThorActivityKind kind)
  794. {
  795. switch (kind)
  796. {
  797. case TAKdiskwrite:
  798. case TAKworkunitwrite:
  799. case TAKapply:
  800. case TAKremoteresult:
  801. case TAKdistribution:
  802. case TAKpipewrite:
  803. case TAKcsvwrite:
  804. case TAKindexwrite:
  805. case TAKxmlwrite:
  806. case TAKjsonwrite:
  807. case TAKsoap_rowaction:
  808. case TAKsoap_datasetaction:
  809. case TAKkeydiff:
  810. case TAKkeypatch:
  811. case TAKdatasetresult:
  812. case TAKrowresult:
  813. case TAKemptyaction:
  814. case TAKlocalresultwrite:
  815. case TAKgraphloopresultwrite:
  816. case TAKsimpleaction:
  817. case TAKexternalsink:
  818. case TAKifaction:
  819. case TAKparallel:
  820. case TAKsequential:
  821. case TAKwhen_action:
  822. case TAKdictionaryworkunitwrite:
  823. case TAKdictionaryresultwrite:
  824. case TAKspillwrite:
  825. return true;
  826. }
  827. return false;
  828. }
  829. //=====================================================================================================
  830. // ===========================================
  831. IRowInterfaces *createRowInterfaces(IOutputMetaData *meta, unsigned actid, unsigned heapFlags, ICodeContext *context)
  832. {
  833. class cRowInterfaces: implements IRowInterfaces, public CSimpleInterface
  834. {
  835. unsigned actid;
  836. Linked<IOutputMetaData> meta;
  837. ICodeContext* context;
  838. Linked<IEngineRowAllocator> allocator;
  839. Linked<IOutputRowSerializer> serializer;
  840. Linked<IOutputRowDeserializer> deserializer;
  841. CSingletonLock allocatorlock;
  842. CSingletonLock serializerlock;
  843. CSingletonLock deserializerlock;
  844. unsigned heapFlags;
  845. public:
  846. IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
  847. cRowInterfaces(IOutputMetaData *_meta,unsigned _actid, unsigned _heapFlags, ICodeContext *_context)
  848. : meta(_meta), heapFlags(_heapFlags)
  849. {
  850. context = _context;
  851. actid = _actid;
  852. }
  853. IEngineRowAllocator * queryRowAllocator()
  854. {
  855. if (allocatorlock.lock()) {
  856. if (!allocator&&meta)
  857. allocator.setown(context->getRowAllocatorEx(meta, actid, heapFlags));
  858. allocatorlock.unlock();
  859. }
  860. return allocator;
  861. }
  862. IOutputRowSerializer * queryRowSerializer()
  863. {
  864. if (serializerlock.lock()) {
  865. if (!serializer&&meta)
  866. serializer.setown(meta->createDiskSerializer(context,actid));
  867. serializerlock.unlock();
  868. }
  869. return serializer;
  870. }
  871. IOutputRowDeserializer * queryRowDeserializer()
  872. {
  873. if (deserializerlock.lock()) {
  874. if (!deserializer&&meta)
  875. deserializer.setown(meta->createDiskDeserializer(context,actid));
  876. deserializerlock.unlock();
  877. }
  878. return deserializer;
  879. }
  880. IOutputMetaData *queryRowMetaData()
  881. {
  882. return meta;
  883. }
  884. unsigned queryActivityId() const
  885. {
  886. return actid;
  887. }
  888. ICodeContext *queryCodeContext()
  889. {
  890. return context;
  891. }
  892. };
  893. return new cRowInterfaces(meta,actid,heapFlags,context);
  894. };
  895. static NullVirtualFieldCallback nullVirtualFieldCallback;
  896. class CRowStreamReader : public CSimpleInterfaceOf<IExtRowStream>
  897. {
  898. protected:
  899. Linked<IFileIO> fileio;
  900. Linked<IMemoryMappedFile> mmfile;
  901. Linked<IOutputRowDeserializer> deserializer;
  902. Linked<IEngineRowAllocator> allocator;
  903. Owned<ISerialStream> strm;
  904. Owned<ISourceRowPrefetcher> prefetcher;
  905. CThorContiguousRowBuffer prefetchBuffer;
  906. unsigned __int64 progress = 0;
  907. Linked<ITranslator> translatorContainer;
  908. MemoryBuffer translateBuf;
  909. IOutputMetaData *actualFormat = nullptr;
  910. const IDynamicTransform *translator = nullptr;
  911. IVirtualFieldCallback * fieldCallback;
  912. RowFilter actualFilter;
  913. RtlDynRow *filterRow = nullptr;
  914. EmptyRowSemantics emptyRowSemantics;
  915. offset_t currentRowOffset = 0;
  916. bool eoi = false;
  917. bool eos = false;
  918. bool eog = false;
  919. bool hadMatchInGroup = false;
  920. offset_t bufofs = 0;
  921. #ifdef TRACE_CREATE
  922. static unsigned rdnum;
  923. #endif
  924. class : implements IFileSerialStreamCallback
  925. {
  926. public:
  927. CRC32 crc;
  928. void process(offset_t ofs, size32_t sz, const void *buf)
  929. {
  930. crc.tally(sz,buf);
  931. }
  932. } crccb;
  933. inline bool fieldFilterMatch(const void * buffer)
  934. {
  935. if (actualFilter.numFilterFields())
  936. {
  937. filterRow->setRow(buffer, 0);
  938. return actualFilter.matches(*filterRow);
  939. }
  940. else
  941. return true;
  942. }
  943. inline bool checkEmptyRow()
  944. {
  945. if (ers_allow == emptyRowSemantics)
  946. {
  947. byte b;
  948. prefetchBuffer.read(1, &b);
  949. prefetchBuffer.finishedRow();
  950. if (1 == b)
  951. return true;
  952. }
  953. return false;
  954. }
  955. inline void checkEog()
  956. {
  957. if (ers_eogonly == emptyRowSemantics)
  958. {
  959. byte b;
  960. prefetchBuffer.read(1, &b);
  961. eog = 1 == b;
  962. }
  963. }
  964. inline bool checkExitConditions()
  965. {
  966. if (prefetchBuffer.eos())
  967. {
  968. eos = true;
  969. return true;
  970. }
  971. if (eog)
  972. {
  973. eog = false;
  974. if (hadMatchInGroup)
  975. {
  976. hadMatchInGroup = false;
  977. return true;
  978. }
  979. }
  980. return false;
  981. }
  982. const byte *getNextPrefetchRow()
  983. {
  984. while (true)
  985. {
  986. ++progress;
  987. if (checkEmptyRow())
  988. return nullptr;
  989. currentRowOffset = prefetchBuffer.tell();
  990. prefetcher->readAhead(prefetchBuffer);
  991. bool matched = fieldFilterMatch(prefetchBuffer.queryRow());
  992. checkEog();
  993. if (matched) // NB: prefetchDone() call must be paired with a row returned from prefetchRow()
  994. {
  995. hadMatchInGroup = true;
  996. return prefetchBuffer.queryRow(); // NB: buffer ptr could have changed due to reading eog byte
  997. }
  998. else
  999. prefetchBuffer.finishedRow();
  1000. if (checkExitConditions())
  1001. break;
  1002. }
  1003. return nullptr;
  1004. }
  1005. const void *getNextRow()
  1006. {
  1007. /* NB: this is very similar to getNextPrefetchRow() above
  1008. * with the primary difference being it is deserializing into
  1009. * a row builder and returning finalized rows.
  1010. */
  1011. while (true)
  1012. {
  1013. ++progress;
  1014. if (checkEmptyRow())
  1015. return nullptr;
  1016. currentRowOffset = prefetchBuffer.tell();
  1017. RtlDynamicRowBuilder rowBuilder(*allocator);
  1018. size32_t size = deserializer->deserialize(rowBuilder, prefetchBuffer);
  1019. bool matched = fieldFilterMatch(rowBuilder.getUnfinalized());
  1020. checkEog();
  1021. prefetchBuffer.finishedRow();
  1022. const void *row = rowBuilder.finalizeRowClear(size);
  1023. if (matched)
  1024. {
  1025. hadMatchInGroup = true;
  1026. return row;
  1027. }
  1028. ReleaseRoxieRow(row);
  1029. if (checkExitConditions())
  1030. break;
  1031. }
  1032. return nullptr;
  1033. }
  1034. public:
  1035. CRowStreamReader(IFileIO *_fileio, IMemoryMappedFile *_mmfile, IRowInterfaces *rowif, offset_t _ofs, offset_t _len, bool _tallycrc, EmptyRowSemantics _emptyRowSemantics, ITranslator *_translatorContainer, IVirtualFieldCallback * _fieldCallback)
  1036. : fileio(_fileio), mmfile(_mmfile), allocator(rowif->queryRowAllocator()), prefetchBuffer(nullptr), emptyRowSemantics(_emptyRowSemantics), translatorContainer(_translatorContainer), fieldCallback(_fieldCallback)
  1037. {
  1038. #ifdef TRACE_CREATE
  1039. PROGLOG("CRowStreamReader %d = %p",++rdnum,this);
  1040. #endif
  1041. if (fileio)
  1042. strm.setown(createFileSerialStream(fileio,_ofs,_len,(size32_t)-1, _tallycrc?&crccb:NULL));
  1043. else
  1044. strm.setown(createFileSerialStream(mmfile,_ofs,_len,_tallycrc?&crccb:NULL));
  1045. currentRowOffset = _ofs;
  1046. if (translatorContainer)
  1047. {
  1048. actualFormat = &translatorContainer->queryActualFormat();
  1049. translator = &translatorContainer->queryTranslator();
  1050. }
  1051. else
  1052. {
  1053. actualFormat = rowif->queryRowMetaData();
  1054. deserializer.set(rowif->queryRowDeserializer());
  1055. }
  1056. prefetcher.setown(actualFormat->createDiskPrefetcher());
  1057. if (prefetcher)
  1058. prefetchBuffer.setStream(strm);
  1059. if (!fieldCallback)
  1060. fieldCallback = &nullVirtualFieldCallback;
  1061. }
  1062. ~CRowStreamReader()
  1063. {
  1064. #ifdef TRACE_CREATE
  1065. PROGLOG("~CRowStreamReader %d = %p",rdnum--,this);
  1066. #endif
  1067. delete filterRow;
  1068. }
  1069. IMPLEMENT_IINTERFACE_USING(CSimpleInterfaceOf<IExtRowStream>)
  1070. virtual void reinit(offset_t _ofs,offset_t _len,unsigned __int64 _maxrows) override
  1071. {
  1072. assertex(_maxrows == 0);
  1073. eoi = false;
  1074. eos = (_len==0);
  1075. eog = false;
  1076. hadMatchInGroup = false;
  1077. bufofs = 0;
  1078. progress = 0;
  1079. strm->reset(_ofs,_len);
  1080. currentRowOffset = _ofs;
  1081. }
  1082. virtual const void *nextRow() override
  1083. {
  1084. if (eog)
  1085. {
  1086. eog = false;
  1087. hadMatchInGroup = false;
  1088. }
  1089. else if (!eos)
  1090. {
  1091. if (prefetchBuffer.eos())
  1092. eos = true;
  1093. else
  1094. {
  1095. if (translator)
  1096. {
  1097. const byte *row = getNextPrefetchRow();
  1098. if (row)
  1099. {
  1100. RtlDynamicRowBuilder rowBuilder(*allocator);
  1101. size32_t size = translator->translate(rowBuilder, *fieldCallback, row);
  1102. prefetchBuffer.finishedRow();
  1103. return rowBuilder.finalizeRowClear(size);
  1104. }
  1105. }
  1106. else
  1107. return getNextRow();
  1108. }
  1109. }
  1110. return nullptr;
  1111. }
  1112. virtual const byte *prefetchRow() override
  1113. {
  1114. // NB: prefetchDone() call must be paired with a row returned from prefetchRow()
  1115. if (eog)
  1116. {
  1117. eog = false;
  1118. hadMatchInGroup = false;
  1119. }
  1120. else if (!eos)
  1121. {
  1122. if (prefetchBuffer.eos())
  1123. eos = true;
  1124. else
  1125. {
  1126. const byte *row = getNextPrefetchRow();
  1127. if (row)
  1128. {
  1129. if (translator)
  1130. {
  1131. translateBuf.setLength(0);
  1132. MemoryBufferBuilder rowBuilder(translateBuf, 0);
  1133. translator->translate(rowBuilder, *fieldCallback, row);
  1134. row = rowBuilder.getSelf();
  1135. }
  1136. return row;
  1137. }
  1138. }
  1139. }
  1140. return nullptr;
  1141. }
  1142. virtual void prefetchDone() override
  1143. {
  1144. prefetchBuffer.finishedRow();
  1145. }
  1146. virtual void stop() override
  1147. {
  1148. stop(NULL);
  1149. }
  1150. void clear()
  1151. {
  1152. strm.clear();
  1153. fileio.clear();
  1154. }
  1155. virtual void stop(CRC32 *crcout) override
  1156. {
  1157. if (!eos) {
  1158. eos = true;
  1159. clear();
  1160. }
  1161. // NB CRC will only be right if stopped at eos
  1162. if (crcout)
  1163. *crcout = crccb.crc;
  1164. }
  1165. virtual offset_t getOffset() const override
  1166. {
  1167. return prefetchBuffer.tell();
  1168. }
  1169. virtual offset_t getLastRowOffset() const override
  1170. {
  1171. return currentRowOffset;
  1172. }
  1173. virtual unsigned __int64 getStatistic(StatisticKind kind) override
  1174. {
  1175. if (fileio)
  1176. return fileio->getStatistic(kind);
  1177. return 0;
  1178. }
  1179. virtual unsigned __int64 queryProgress() const override
  1180. {
  1181. return progress;
  1182. }
  1183. virtual void setFilters(IConstArrayOf<IFieldFilter> &filters)
  1184. {
  1185. if (filterRow)
  1186. {
  1187. delete filterRow;
  1188. filterRow = nullptr;
  1189. actualFilter.clear();
  1190. }
  1191. if (filters.ordinality())
  1192. {
  1193. actualFilter.appendFilters(filters);
  1194. if (translatorContainer)
  1195. {
  1196. const IKeyTranslator *keyedTranslator = translatorContainer->queryKeyedTranslator();
  1197. if (keyedTranslator)
  1198. keyedTranslator->translate(actualFilter);
  1199. }
  1200. const RtlRecord *actual = &actualFormat->queryRecordAccessor(true);
  1201. filterRow = new RtlDynRow(*actual);
  1202. }
  1203. }
  1204. };
  1205. class CLimitedRowStreamReader : public CRowStreamReader
  1206. {
  1207. unsigned __int64 maxrows;
  1208. unsigned __int64 rownum;
  1209. public:
  1210. CLimitedRowStreamReader(IFileIO *_fileio, IMemoryMappedFile *_mmfile, IRowInterfaces *rowif, offset_t _ofs, offset_t _len, unsigned __int64 _maxrows, bool _tallycrc, EmptyRowSemantics _emptyRowSemantics, ITranslator *translatorContainer, IVirtualFieldCallback * _fieldCallback)
  1211. : CRowStreamReader(_fileio, _mmfile, rowif, _ofs, _len, _tallycrc, _emptyRowSemantics, translatorContainer, _fieldCallback)
  1212. {
  1213. maxrows = _maxrows;
  1214. rownum = 0;
  1215. eos = maxrows==0;
  1216. }
  1217. virtual void reinit(offset_t _ofs,offset_t _len,unsigned __int64 _maxrows) override
  1218. {
  1219. CRowStreamReader::reinit(_ofs, _len, 0);
  1220. if (_maxrows==0)
  1221. eos = true;
  1222. maxrows = _maxrows;
  1223. rownum = 0;
  1224. }
  1225. virtual const void *nextRow() override
  1226. {
  1227. const void * ret = CRowStreamReader::nextRow();
  1228. if (++rownum==maxrows)
  1229. eos = true;
  1230. return ret;
  1231. }
  1232. };
  1233. #ifdef TRACE_CREATE
  1234. unsigned CRowStreamReader::rdnum;
  1235. #endif
  1236. IExtRowStream *createRowStreamEx(IFileIO *fileIO, IRowInterfaces *rowIf, offset_t offset, offset_t len, unsigned __int64 maxrows, unsigned rwFlags, ITranslator *translatorContainer, IVirtualFieldCallback * fieldCallback)
  1237. {
  1238. EmptyRowSemantics emptyRowSemantics = extractESRFromRWFlags(rwFlags);
  1239. if (maxrows == (unsigned __int64)-1)
  1240. return new CRowStreamReader(fileIO, NULL, rowIf, offset, len, TestRwFlag(rwFlags, rw_crc), emptyRowSemantics, translatorContainer, fieldCallback);
  1241. else
  1242. return new CLimitedRowStreamReader(fileIO, NULL, rowIf, offset, len, maxrows, TestRwFlag(rwFlags, rw_crc), emptyRowSemantics, translatorContainer, fieldCallback);
  1243. }
  1244. bool UseMemoryMappedRead = false;
  1245. IExtRowStream *createRowStreamEx(IFile *file, IRowInterfaces *rowIf, offset_t offset, offset_t len, unsigned __int64 maxrows, unsigned rwFlags, IExpander *eexp, ITranslator *translatorContainer, IVirtualFieldCallback * fieldCallback)
  1246. {
  1247. bool compressed = TestRwFlag(rwFlags, rw_compress);
  1248. EmptyRowSemantics emptyRowSemantics = extractESRFromRWFlags(rwFlags);
  1249. if (UseMemoryMappedRead && !compressed)
  1250. {
  1251. PROGLOG("Memory Mapped read of %s",file->queryFilename());
  1252. Owned<IMemoryMappedFile> mmfile = file->openMemoryMapped();
  1253. if (!mmfile)
  1254. return NULL;
  1255. if (maxrows == (unsigned __int64)-1)
  1256. return new CRowStreamReader(NULL, mmfile, rowIf, offset, len, TestRwFlag(rwFlags, rw_crc), emptyRowSemantics, translatorContainer, fieldCallback);
  1257. else
  1258. return new CLimitedRowStreamReader(NULL, mmfile, rowIf, offset, len, maxrows, TestRwFlag(rwFlags, rw_crc), emptyRowSemantics, translatorContainer, fieldCallback);
  1259. }
  1260. else
  1261. {
  1262. Owned<IFileIO> fileio;
  1263. if (compressed)
  1264. {
  1265. // JCSMORE should pass in a flag for rw_compressblkcrc I think, doesn't look like it (or anywhere else)
  1266. // checks the block crc's at the moment.
  1267. fileio.setown(createCompressedFileReader(file, eexp, UseMemoryMappedRead));
  1268. }
  1269. else
  1270. fileio.setown(file->open(IFOread));
  1271. if (!fileio)
  1272. return NULL;
  1273. if (maxrows == (unsigned __int64)-1)
  1274. return new CRowStreamReader(fileio, NULL, rowIf, offset, len, TestRwFlag(rwFlags, rw_crc), emptyRowSemantics, translatorContainer, fieldCallback);
  1275. else
  1276. return new CLimitedRowStreamReader(fileio, NULL, rowIf, offset, len, maxrows, TestRwFlag(rwFlags, rw_crc), emptyRowSemantics, translatorContainer, fieldCallback);
  1277. }
  1278. }
  1279. IExtRowStream *createRowStream(IFile *file, IRowInterfaces *rowIf, unsigned rwFlags, IExpander *eexp, ITranslator *translatorContainer, IVirtualFieldCallback * fieldCallback)
  1280. {
  1281. return createRowStreamEx(file, rowIf, 0, (offset_t)-1, (unsigned __int64)-1, rwFlags, eexp, translatorContainer, fieldCallback);
  1282. }
  1283. // Memory map sizes can be big, restrict to 64-bit platforms.
  1284. void useMemoryMappedRead(bool on)
  1285. {
  1286. #if defined(_DEBUG) || defined(__64BIT__)
  1287. UseMemoryMappedRead = on;
  1288. #endif
  1289. }
  1290. #define ROW_WRITER_BUFFERSIZE (0x100000)
  1291. class CRowStreamWriter : private IRowSerializerTarget, implements IExtRowWriter, public CSimpleInterface
  1292. {
  1293. Linked<IFileIOStream> stream;
  1294. Linked<IOutputRowSerializer> serializer;
  1295. Linked<IEngineRowAllocator> allocator;
  1296. CRC32 crc;
  1297. EmptyRowSemantics emptyRowSemantics;
  1298. bool tallycrc;
  1299. unsigned nested;
  1300. MemoryAttr ma;
  1301. MemoryBuffer extbuf; // may need to spill to disk at some point
  1302. byte *buf;
  1303. size32_t bufpos;
  1304. bool autoflush;
  1305. #ifdef TRACE_CREATE
  1306. static unsigned wrnum;
  1307. #endif
  1308. void flushBuffer(bool final)
  1309. {
  1310. try
  1311. {
  1312. if (bufpos) {
  1313. stream->write(bufpos,buf);
  1314. if (tallycrc)
  1315. crc.tally(bufpos,buf);
  1316. bufpos = 0;
  1317. }
  1318. size32_t extpos = extbuf.length();
  1319. if (!extpos)
  1320. return;
  1321. if (!final)
  1322. extpos = (extpos/ROW_WRITER_BUFFERSIZE)*ROW_WRITER_BUFFERSIZE;
  1323. if (extpos) {
  1324. stream->write(extpos,extbuf.toByteArray());
  1325. if (tallycrc)
  1326. crc.tally(extpos,extbuf.toByteArray());
  1327. }
  1328. if (extpos<extbuf.length()) {
  1329. bufpos = extbuf.length()-extpos;
  1330. memcpy(buf,extbuf.toByteArray()+extpos,bufpos);
  1331. }
  1332. extbuf.clear();
  1333. }
  1334. catch (IException *e)
  1335. {
  1336. autoflush = false; // avoid follow-on errors
  1337. EXCLOG(e, "flushBuffer");
  1338. throw;
  1339. }
  1340. }
  1341. void streamFlush()
  1342. {
  1343. try
  1344. {
  1345. stream->flush();
  1346. }
  1347. catch (IException *e)
  1348. {
  1349. autoflush = false; // avoid follow-on errors
  1350. EXCLOG(e, "streamFlush");
  1351. throw;
  1352. }
  1353. }
  1354. public:
  1355. IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
  1356. CRowStreamWriter(IFileIOStream *_stream, IOutputRowSerializer *_serializer, IEngineRowAllocator *_allocator, EmptyRowSemantics _emptyRowSemantics, bool _tallycrc, bool _autoflush)
  1357. : stream(_stream), serializer(_serializer), allocator(_allocator), emptyRowSemantics(_emptyRowSemantics)
  1358. {
  1359. #ifdef TRACE_CREATE
  1360. PROGLOG("createRowWriter %d = %p",++wrnum,this);
  1361. #endif
  1362. tallycrc = _tallycrc;
  1363. nested = 0;
  1364. buf = (byte *)ma.allocate(ROW_WRITER_BUFFERSIZE);
  1365. bufpos = 0;
  1366. autoflush = _autoflush;
  1367. }
  1368. ~CRowStreamWriter()
  1369. {
  1370. #ifdef TRACE_CREATE
  1371. PROGLOG("~createRowWriter %d = %p",wrnum--,this);
  1372. #endif
  1373. if (autoflush)
  1374. flush();
  1375. else if (bufpos+extbuf.length()) {
  1376. #ifdef _DEBUG
  1377. PrintStackReport();
  1378. #endif
  1379. WARNLOG("CRowStreamWriter closed with %d bytes unflushed",bufpos+extbuf.length());
  1380. }
  1381. }
  1382. void putRow(const void *row)
  1383. {
  1384. if (row)
  1385. {
  1386. if (ers_allow == emptyRowSemantics)
  1387. {
  1388. byte b = 0;
  1389. put(1, &b);
  1390. serializer->serialize(*this, (const byte *)row);
  1391. }
  1392. else
  1393. {
  1394. serializer->serialize(*this, (const byte *)row);
  1395. if (ers_eogonly == emptyRowSemantics)
  1396. {
  1397. byte b = 0;
  1398. if (bufpos<ROW_WRITER_BUFFERSIZE)
  1399. buf[bufpos++] = b;
  1400. else
  1401. extbuf.append(b);
  1402. }
  1403. }
  1404. allocator->releaseRow(row);
  1405. }
  1406. else if (ers_eogonly == emptyRowSemantics) // backpatch
  1407. {
  1408. byte b = 1;
  1409. if (extbuf.length())
  1410. extbuf.writeDirect(extbuf.length()-1, sizeof(b), &b);
  1411. else
  1412. {
  1413. assertex(bufpos);
  1414. buf[bufpos-1] = b;
  1415. }
  1416. }
  1417. else if (ers_allow == emptyRowSemantics)
  1418. {
  1419. byte b = 1;
  1420. put(1, &b);
  1421. }
  1422. }
  1423. void flush()
  1424. {
  1425. flushBuffer(true);
  1426. streamFlush();
  1427. }
  1428. void flush(CRC32 *crcout)
  1429. {
  1430. flushBuffer(true);
  1431. streamFlush();
  1432. if (crcout)
  1433. *crcout = crc;
  1434. }
  1435. offset_t getPosition()
  1436. {
  1437. return stream->tell()+bufpos+extbuf.length();
  1438. }
  1439. void put(size32_t len, const void * ptr)
  1440. {
  1441. // first fill buf
  1442. for (;;) {
  1443. if (bufpos<ROW_WRITER_BUFFERSIZE) {
  1444. size32_t wr = ROW_WRITER_BUFFERSIZE-bufpos;
  1445. if (wr>len)
  1446. wr = len;
  1447. memcpy(buf+bufpos,ptr,wr);
  1448. bufpos += wr;
  1449. len -= wr;
  1450. if (len==0)
  1451. break; // quick exit
  1452. ptr = (const byte *)ptr + wr;
  1453. }
  1454. if (nested) {
  1455. // have to append to ext buffer (will need to spill to disk here if gets *too* big)
  1456. extbuf.append(len,ptr);
  1457. break;
  1458. }
  1459. else
  1460. flushBuffer(false);
  1461. }
  1462. }
  1463. size32_t beginNested(size32_t count)
  1464. {
  1465. if (nested++==0)
  1466. if (bufpos==ROW_WRITER_BUFFERSIZE)
  1467. flushBuffer(false);
  1468. size32_t ret = bufpos+extbuf.length();
  1469. size32_t sz = 0;
  1470. put(sizeof(sz),&sz);
  1471. return ret;
  1472. }
  1473. void endNested(size32_t pos)
  1474. {
  1475. size32_t sz = bufpos+extbuf.length()-(pos + sizeof(size32_t));
  1476. size32_t wr = sizeof(size32_t);
  1477. byte *out = (byte *)&sz;
  1478. if (pos<ROW_WRITER_BUFFERSIZE) {
  1479. size32_t space = ROW_WRITER_BUFFERSIZE-pos;
  1480. if (space>wr)
  1481. space = wr;
  1482. memcpy(buf+pos,out,space);
  1483. wr -= space;
  1484. if (wr==0) {
  1485. --nested;
  1486. return; // quick exit
  1487. }
  1488. out += space;
  1489. pos += space;
  1490. }
  1491. extbuf.writeDirect(pos-ROW_WRITER_BUFFERSIZE,wr,out);
  1492. --nested;
  1493. }
  1494. };
  1495. #ifdef TRACE_CREATE
  1496. unsigned CRowStreamWriter::wrnum=0;
  1497. #endif
  1498. IExtRowWriter *createRowWriter(IFile *iFile, IRowInterfaces *rowIf, unsigned flags, ICompressor *compressor, size32_t compressorBlkSz)
  1499. {
  1500. OwnedIFileIO iFileIO;
  1501. if (TestRwFlag(flags, rw_compress))
  1502. {
  1503. size32_t fixedSize = rowIf->queryRowMetaData()->querySerializedDiskMeta()->getFixedSize();
  1504. if (fixedSize && TestRwFlag(flags, rw_grouped))
  1505. ++fixedSize; // row writer will include a grouping byte
  1506. ICompressedFileIO *compressedFileIO = createCompressedFileWriter(iFile, fixedSize, TestRwFlag(flags, rw_extend), TestRwFlag(flags, rw_compressblkcrc), compressor, getCompMethod(flags));
  1507. if (compressorBlkSz)
  1508. compressedFileIO->setBlockSize(compressorBlkSz);
  1509. iFileIO.setown(compressedFileIO);
  1510. }
  1511. else
  1512. iFileIO.setown(iFile->open((flags & rw_extend)?IFOwrite:IFOcreate));
  1513. if (!iFileIO)
  1514. return NULL;
  1515. flags &= ~COMP_MASK;
  1516. return createRowWriter(iFileIO, rowIf, flags);
  1517. }
  1518. IExtRowWriter *createRowWriter(IFileIO *iFileIO, IRowInterfaces *rowIf, unsigned flags, size32_t compressorBlkSz)
  1519. {
  1520. if (TestRwFlag(flags, rw_compress))
  1521. throw MakeStringException(0, "Unsupported createRowWriter flags");
  1522. Owned<IFileIOStream> stream;
  1523. if (TestRwFlag(flags, rw_buffered))
  1524. stream.setown(createBufferedIOStream(iFileIO));
  1525. else
  1526. stream.setown(createIOStream(iFileIO));
  1527. if (flags & rw_extend)
  1528. stream->seek(0, IFSend);
  1529. flags &= ~((unsigned)(rw_extend|rw_buffered));
  1530. return createRowWriter(stream, rowIf, flags);
  1531. }
  1532. IExtRowWriter *createRowWriter(IFileIOStream *strm, IRowInterfaces *rowIf, unsigned flags)
  1533. {
  1534. if (0 != (flags & (rw_extend|rw_buffered|COMP_MASK)))
  1535. throw MakeStringException(0, "Unsupported createRowWriter flags");
  1536. EmptyRowSemantics emptyRowSemantics = extractESRFromRWFlags(flags);
  1537. Owned<CRowStreamWriter> writer = new CRowStreamWriter(strm, rowIf->queryRowSerializer(), rowIf->queryRowAllocator(), emptyRowSemantics, TestRwFlag(flags, rw_crc), TestRwFlag(flags, rw_autoflush));
  1538. return writer.getClear();
  1539. }
  1540. class CDiskMerger : implements IDiskMerger, public CInterface
  1541. {
  1542. IArrayOf<IFile> tempfiles;
  1543. IRowStream **strms;
  1544. Linked<IRecordSize> irecsize;
  1545. StringAttr tempnamebase;
  1546. Linked<IRowLinkCounter> linker;
  1547. Linked<IRowInterfaces> rowInterfaces;
  1548. public:
  1549. IMPLEMENT_IINTERFACE;
  1550. CDiskMerger(IRowInterfaces *_rowInterfaces, IRowLinkCounter *_linker, const char *_tempnamebase)
  1551. : rowInterfaces(_rowInterfaces), linker(_linker), tempnamebase(_tempnamebase)
  1552. {
  1553. strms = NULL;
  1554. }
  1555. ~CDiskMerger()
  1556. {
  1557. for (unsigned i=0;i<tempfiles.ordinality();i++) {
  1558. if (strms&&strms[i])
  1559. strms[i]->Release();
  1560. try
  1561. {
  1562. tempfiles.item(i).remove();
  1563. }
  1564. catch (IException * e)
  1565. {
  1566. //Exceptions inside destructors are bad.
  1567. EXCLOG(e);
  1568. e->Release();
  1569. }
  1570. }
  1571. free(strms);
  1572. }
  1573. IRowWriter *createWriteBlock()
  1574. {
  1575. StringBuffer tempname(tempnamebase);
  1576. tempname.append('.').append(tempfiles.ordinality()).append('_').append((__int64)GetCurrentThreadId()).append('_').append((unsigned)GetCurrentProcessId());
  1577. IFile *file = createIFile(tempname.str());
  1578. tempfiles.append(*file);
  1579. return createRowWriter(file, rowInterfaces);
  1580. }
  1581. void put(const void **rows,unsigned numrows)
  1582. {
  1583. Owned<IRowWriter> out = createWriteBlock();
  1584. for (unsigned i=0;i<numrows;i++)
  1585. out->putRow(rows[i]);
  1586. }
  1587. void putIndirect(const void ***rowptrs,unsigned numrows)
  1588. {
  1589. Owned<IRowWriter> out = createWriteBlock();
  1590. for (unsigned i=0;i<numrows;i++)
  1591. out->putRow(*(rowptrs[i]));
  1592. }
  1593. virtual void put(ISortedRowProvider *rows)
  1594. {
  1595. Owned<IRowWriter> out = createWriteBlock();
  1596. void * row;
  1597. while((row = rows->getNextSorted()) != NULL)
  1598. out->putRow(row);
  1599. }
  1600. IRowStream *merge(ICompare *icompare, bool partdedup)
  1601. {
  1602. unsigned numstrms = tempfiles.ordinality();
  1603. strms = (IRowStream **)calloc(numstrms,sizeof(IRowStream *));
  1604. unsigned i;
  1605. for (i=0;i<numstrms;i++) {
  1606. strms[i] = createRowStream(&tempfiles.item(i), rowInterfaces);
  1607. }
  1608. if (numstrms==1)
  1609. return LINK(strms[0]);
  1610. if (icompare)
  1611. return createRowStreamMerger(numstrms, strms, icompare, partdedup, linker);
  1612. return createConcatRowStream(numstrms,strms);
  1613. }
  1614. virtual count_t mergeTo(IRowWriter *dest, ICompare *icompare, bool partdedup)
  1615. {
  1616. count_t count = 0;
  1617. Owned<IRowStream> mergedStream = merge(icompare, partdedup);
  1618. for (;;)
  1619. {
  1620. const void *row = mergedStream->nextRow();
  1621. if (!row)
  1622. return count;
  1623. dest->putRow(row); // takes ownership
  1624. ++count;
  1625. }
  1626. return count;
  1627. }
  1628. };
  1629. IDiskMerger *createDiskMerger(IRowInterfaces *rowInterfaces, IRowLinkCounter *linker, const char *tempnamebase)
  1630. {
  1631. return new CDiskMerger(rowInterfaces, linker, tempnamebase);
  1632. }
  1633. //---------------------------------------------------------------------------------------------------------------------
  1634. void ActivityTimeAccumulator::addStatistics(IStatisticGatherer & builder) const
  1635. {
  1636. if (totalCycles)
  1637. {
  1638. builder.addStatistic(StWhenFirstRow, firstRow);
  1639. builder.addStatistic(StTimeElapsed, elapsed());
  1640. builder.addStatistic(StTimeTotalExecute, cycle_to_nanosec(totalCycles));
  1641. builder.addStatistic(StTimeFirstExecute, latency());
  1642. }
  1643. }
  1644. void ActivityTimeAccumulator::addStatistics(CRuntimeStatisticCollection & merged) const
  1645. {
  1646. if (totalCycles)
  1647. {
  1648. merged.mergeStatistic(StWhenFirstRow, firstRow);
  1649. merged.mergeStatistic(StTimeElapsed, elapsed());
  1650. merged.mergeStatistic(StTimeTotalExecute, cycle_to_nanosec(totalCycles));
  1651. merged.mergeStatistic(StTimeFirstExecute, latency());
  1652. }
  1653. }
  1654. void ActivityTimeAccumulator::merge(const ActivityTimeAccumulator & other)
  1655. {
  1656. if (other.totalCycles)
  1657. {
  1658. if (totalCycles)
  1659. {
  1660. //Record the earliest start, the latest end, the longest latencies
  1661. cycle_t thisLatency = latencyCycles();
  1662. cycle_t otherLatency = other.latencyCycles();
  1663. cycle_t maxLatency = std::max(thisLatency, otherLatency);
  1664. if (startCycles > other.startCycles)
  1665. {
  1666. startCycles = other.startCycles;
  1667. firstRow =other.firstRow;
  1668. }
  1669. firstExitCycles = startCycles + maxLatency;
  1670. if (endCycles < other.endCycles)
  1671. endCycles = other.endCycles;
  1672. totalCycles += other.totalCycles;
  1673. }
  1674. else
  1675. *this = other;
  1676. }
  1677. }
  1678. //---------------------------------------------------------------------------------------------------------------------
  1679. //MORE: Not currently implemented for windows.
  1680. #ifdef CPU_SETSIZE
  1681. static unsigned getCpuId(const char * text, char * * next)
  1682. {
  1683. unsigned cpu = (unsigned)strtoul(text, next, 10);
  1684. if (*next == text)
  1685. throw makeStringExceptionV(1, "Invalid CPU: %s", text);
  1686. else if (cpu >= CPU_SETSIZE)
  1687. throw makeStringExceptionV(1, "CPU %u is out of range 0..%u", cpu, CPU_SETSIZE);
  1688. return cpu;
  1689. }
  1690. #endif
  1691. void setProcessAffinity(const char * cpuList)
  1692. {
  1693. assertex(cpuList);
  1694. #ifdef CPU_ZERO
  1695. cpu_set_t cpus;
  1696. CPU_ZERO(&cpus);
  1697. const char * cur = cpuList;
  1698. for (;;)
  1699. {
  1700. char * next;
  1701. unsigned cpu1 = getCpuId(cur, &next);
  1702. if (*next == '-')
  1703. {
  1704. const char * range = next+1;
  1705. unsigned cpu2 = getCpuId(range, &next);
  1706. for (unsigned cpu= cpu1; cpu <= cpu2; cpu++)
  1707. CPU_SET(cpu, &cpus);
  1708. }
  1709. else
  1710. CPU_SET(cpu1, &cpus);
  1711. if (*next == '\0')
  1712. break;
  1713. if (*next != ',')
  1714. throw makeStringExceptionV(1, "Invalid cpu affinity list %s", cur);
  1715. cur = next+1;
  1716. }
  1717. if (sched_setaffinity(0, sizeof(cpu_set_t), &cpus))
  1718. throw makeStringException(errno, "Failed to set affinity");
  1719. DBGLOG("Process affinity set to %s", cpuList);
  1720. #endif
  1721. clearAffinityCache();
  1722. }
  1723. void setAutoAffinity(unsigned curProcess, unsigned processPerMachine, const char * optNodes)
  1724. {
  1725. #if defined(CPU_ZERO) && defined(_USE_NUMA)
  1726. if (processPerMachine <= 1)
  1727. return;
  1728. if (numa_available() == -1)
  1729. {
  1730. DBGLOG("Numa functions not available");
  1731. return;
  1732. }
  1733. if (optNodes)
  1734. throw makeStringException(1, "Numa node list not yet supported");
  1735. unsigned numaMap[NUMA_NUM_NODES];
  1736. unsigned numNumaNodes = 0;
  1737. #if defined(LIBNUMA_API_VERSION) && (LIBNUMA_API_VERSION>=2)
  1738. //Create a bit mask to record which nodes are available to the system
  1739. //num_all_nodes_ptr contains only nodes with associated memory - which causes issues on misconfigured systems
  1740. struct bitmask * available_nodes = numa_allocate_nodemask();
  1741. numa_bitmask_clearall(available_nodes);
  1742. unsigned maxcpus = numa_num_configured_cpus();
  1743. for (unsigned cpu=0; cpu < maxcpus; cpu++)
  1744. {
  1745. //Check the cpu can be used by this process.
  1746. if (numa_bitmask_isbitset(numa_all_cpus_ptr, cpu))
  1747. {
  1748. int node = numa_node_of_cpu(cpu);
  1749. if (node != -1)
  1750. numa_bitmask_setbit(available_nodes, node);
  1751. }
  1752. }
  1753. for (unsigned i=0; i<=numa_max_node(); i++)
  1754. {
  1755. if (numa_bitmask_isbitset(available_nodes, i))
  1756. {
  1757. numaMap[numNumaNodes] = i;
  1758. numNumaNodes++;
  1759. if (!numa_bitmask_isbitset(numa_all_nodes_ptr, i))
  1760. DBGLOG("Numa: Potential inefficiency - node %u does not have any associated memory", i);
  1761. }
  1762. }
  1763. numa_bitmask_free(available_nodes);
  1764. DBGLOG("Affinity: Max cpus(%u) nodes(%u) actual nodes(%u), processes(%u)", maxcpus, numa_max_node()+1, numNumaNodes, processPerMachine);
  1765. #else
  1766. //On very old versions of numa assume that all nodes are present
  1767. for (unsigned i=0; i<=numa_max_node(); i++)
  1768. {
  1769. numaMap[numNumaNodes] = i;
  1770. numNumaNodes++;
  1771. }
  1772. #endif
  1773. if (numNumaNodes <= 1)
  1774. return;
  1775. unsigned firstNode = 0;
  1776. unsigned numNodes = 1;
  1777. if (processPerMachine >= numNumaNodes)
  1778. {
  1779. firstNode = curProcess % numNumaNodes;
  1780. }
  1781. else
  1782. {
  1783. firstNode = (curProcess * numNumaNodes) / processPerMachine;
  1784. unsigned nextNode = ((curProcess+1) * numNumaNodes) / processPerMachine;
  1785. numNodes = nextNode - firstNode;
  1786. }
  1787. if ((processPerMachine % numNumaNodes) != 0)
  1788. DBGLOG("Affinity: %u processes will not be evenly balanced over %u numa nodes", processPerMachine, numNumaNodes);
  1789. #if defined(LIBNUMA_API_VERSION) && (LIBNUMA_API_VERSION>=2)
  1790. //This code assumes the nodes are sensibly ordered (e.g., nodes on the same socket are next to each other), and
  1791. //only works well when number of processes is a multiple of the number of numa nodes. A full solution would look
  1792. //at distances.
  1793. struct bitmask * cpus = numa_allocate_cpumask();
  1794. struct bitmask * nodeMask = numa_allocate_cpumask();
  1795. for (unsigned node=0; node < numNodes; node++)
  1796. {
  1797. numa_node_to_cpus(numaMap[firstNode+node], nodeMask);
  1798. //Shame there is no inbuilt union operation.
  1799. for (unsigned cpu=0; cpu < maxcpus; cpu++)
  1800. {
  1801. if (numa_bitmask_isbitset(nodeMask, cpu))
  1802. numa_bitmask_setbit(cpus, cpu);
  1803. }
  1804. }
  1805. bool ok = (numa_sched_setaffinity(0, cpus) == 0);
  1806. numa_bitmask_free(nodeMask);
  1807. numa_bitmask_free(cpus);
  1808. #else
  1809. cpu_set_t cpus;
  1810. CPU_ZERO(&cpus);
  1811. numa_node_to_cpus(numaMap[firstNode], (unsigned long *) &cpus, sizeof (cpus));
  1812. bool ok = sched_setaffinity (0, sizeof(cpus), &cpus) != 0;
  1813. #endif
  1814. if (!ok)
  1815. throw makeStringExceptionV(1, "Failed to set affinity to numa node %u (id:%u)", firstNode, numaMap[firstNode]);
  1816. DBGLOG("Process bound to numa node %u..%u (id:%u) of %u", firstNode, firstNode + numNodes - 1, numaMap[firstNode], numNumaNodes);
  1817. #endif
  1818. clearAffinityCache();
  1819. }
  1820. void bindMemoryToLocalNodes()
  1821. {
  1822. #if defined(LIBNUMA_API_VERSION) && (LIBNUMA_API_VERSION>=2)
  1823. numa_set_bind_policy(1);
  1824. unsigned numNumaNodes = 0;
  1825. for (unsigned i=0; i<=numa_max_node(); i++)
  1826. {
  1827. if (numa_bitmask_isbitset(numa_all_nodes_ptr, i))
  1828. numNumaNodes++;
  1829. }
  1830. if (numNumaNodes <= 1)
  1831. return;
  1832. struct bitmask *nodes = numa_get_run_node_mask();
  1833. numa_set_membind(nodes);
  1834. DBGLOG("Process memory bound to numa nodemask 0x%x (of %u nodes total)", (unsigned)(*(nodes->maskp)), numNumaNodes);
  1835. numa_bitmask_free(nodes);
  1836. #endif
  1837. }
  1838. static IOutputMetaData *_getDaliLayoutInfo(MemoryBuffer &layoutBin, IPropertyTree const &props)
  1839. {
  1840. try
  1841. {
  1842. Owned<IException> error;
  1843. bool isGrouped = props.getPropBool("@grouped", false);
  1844. if (props.hasProp("_rtlType"))
  1845. {
  1846. props.getPropBin("_rtlType", layoutBin);
  1847. try
  1848. {
  1849. return createTypeInfoOutputMetaData(layoutBin, isGrouped);
  1850. }
  1851. catch (IException *E)
  1852. {
  1853. EXCLOG(E);
  1854. error.setown(E); // Save to throw later if we can't recover via ECL
  1855. }
  1856. }
  1857. if (props.hasProp("ECL"))
  1858. {
  1859. const char *kind = props.queryProp("@kind");
  1860. bool isIndex = (kind && streq(kind, "key"));
  1861. StringBuffer layoutECL;
  1862. props.getProp("ECL", layoutECL);
  1863. MultiErrorReceiver errs;
  1864. Owned<IHqlExpression> expr = parseQuery(layoutECL.str(), &errs);
  1865. if (expr && (errs.errCount() == 0))
  1866. {
  1867. if (props.hasProp("_record_layout")) // Some old indexes need the payload count patched in from here
  1868. {
  1869. MemoryBuffer mb;
  1870. props.getPropBin("_record_layout", mb);
  1871. expr.setown(patchEclRecordDefinitionFromRecordLayout(expr, mb));
  1872. }
  1873. else if (!expr->hasAttribute(_payload_Atom))
  1874. {
  1875. //Very old records before _record_layout was added to the meta information (November 2006!)
  1876. IHqlExpression * lastField = queryLastField(expr);
  1877. if (lastField && lastField->queryType()->isInteger())
  1878. expr.setown(prependOwnedOperand(expr, createAttribute(_payload_Atom, createConstant(1))));
  1879. }
  1880. if (exportBinaryType(layoutBin, expr, isIndex))
  1881. return createTypeInfoOutputMetaData(layoutBin, isGrouped);
  1882. }
  1883. }
  1884. if (error)
  1885. {
  1886. throw(error.getClear());
  1887. }
  1888. }
  1889. catch (IException *E)
  1890. {
  1891. EXCLOG(E, "Cannot deserialize file metadata:");
  1892. ::Release(E);
  1893. }
  1894. catch (...)
  1895. {
  1896. DBGLOG("Cannot deserialize file metadata: Unknown error");
  1897. }
  1898. return nullptr;
  1899. }
  1900. extern THORHELPER_API IOutputMetaData *getDaliLayoutInfo(IPropertyTree const &props)
  1901. {
  1902. MemoryBuffer layoutBin;
  1903. return _getDaliLayoutInfo(layoutBin, props);
  1904. }
  1905. extern THORHELPER_API bool getDaliLayoutInfo(MemoryBuffer &layoutBin, IPropertyTree const &props)
  1906. {
  1907. Owned<IOutputMetaData> meta = _getDaliLayoutInfo(layoutBin, props);
  1908. return nullptr != meta; // meta created to verify, but only returning layoutBin;
  1909. }
  1910. static bool getTranslators(Owned<const IDynamicTransform> &translator, Owned<const IKeyTranslator> *keyedTranslator, const char *tracing, unsigned expectedCrc, IOutputMetaData *expectedFormat, unsigned publishedCrc, IOutputMetaData *publishedFormat, unsigned projectedCrc, IOutputMetaData *projectedFormat, RecordTranslationMode mode)
  1911. {
  1912. if (expectedCrc)
  1913. {
  1914. IOutputMetaData * sourceFormat = expectedFormat;
  1915. unsigned sourceCrc = expectedCrc;
  1916. if (mode != RecordTranslationMode::AlwaysECL)
  1917. {
  1918. if (publishedFormat)
  1919. {
  1920. sourceFormat = publishedFormat;
  1921. sourceCrc = publishedCrc;
  1922. }
  1923. if (publishedCrc && expectedCrc && (publishedCrc != expectedCrc) && (RecordTranslationMode::None == mode))
  1924. throwTranslationError(publishedFormat->queryRecordAccessor(true), expectedFormat->queryRecordAccessor(true), tracing);
  1925. }
  1926. //This has a very low possibility of format crcs accidentally matching, which could lead to a crashes on an untranslated files.
  1927. if ((projectedFormat != sourceFormat) && (projectedCrc != sourceCrc))
  1928. {
  1929. translator.setown(createRecordTranslator(projectedFormat->queryRecordAccessor(true), sourceFormat->queryRecordAccessor(true)));
  1930. DBGLOG("Record layout translator created for %s", tracing);
  1931. translator->describe();
  1932. if (!translator->canTranslate())
  1933. throw MakeStringException(0, "Untranslatable record layout mismatch detected for file %s", tracing);
  1934. if (translator->needsTranslate())
  1935. {
  1936. if (keyedTranslator && (sourceFormat != expectedFormat))
  1937. {
  1938. Owned<const IKeyTranslator> _keyedTranslator = createKeyTranslator(sourceFormat->queryRecordAccessor(true), expectedFormat->queryRecordAccessor(true));
  1939. //MORE: What happens if the key filters cannot be translated?
  1940. if (_keyedTranslator->needsTranslate())
  1941. keyedTranslator->swap(_keyedTranslator);
  1942. }
  1943. }
  1944. else
  1945. translator.clear();
  1946. }
  1947. }
  1948. return nullptr != translator.get();
  1949. }
  1950. bool getTranslators(Owned<const IDynamicTransform> &translator, const char *tracing, unsigned expectedCrc, IOutputMetaData *expectedFormat, unsigned publishedCrc, IOutputMetaData *publishedFormat, unsigned projectedCrc, IOutputMetaData *projectedFormat, RecordTranslationMode mode)
  1951. {
  1952. return getTranslators(translator, nullptr, tracing, expectedCrc, expectedFormat, publishedCrc, publishedFormat, projectedCrc, projectedFormat, mode);
  1953. }
  1954. bool getTranslators(Owned<const IDynamicTransform> &translator, Owned<const IKeyTranslator> &keyedTranslator, const char *tracing, unsigned expectedCrc, IOutputMetaData *expectedFormat, unsigned publishedCrc, IOutputMetaData *publishedFormat, unsigned projectedCrc, IOutputMetaData *projectedFormat, RecordTranslationMode mode)
  1955. {
  1956. return getTranslators(translator, &keyedTranslator, tracing, expectedCrc, expectedFormat, publishedCrc, publishedFormat, projectedCrc, projectedFormat, mode);
  1957. }
  1958. ITranslator *getTranslators(const char *tracing, unsigned expectedCrc, IOutputMetaData *expectedFormat, unsigned publishedCrc, IOutputMetaData *publishedFormat, unsigned projectedCrc, IOutputMetaData *projectedFormat, RecordTranslationMode mode)
  1959. {
  1960. Owned<const IDynamicTransform> translator;
  1961. Owned<const IKeyTranslator> keyedTranslator;
  1962. if (getTranslators(translator, &keyedTranslator, tracing, expectedCrc, expectedFormat, publishedCrc, publishedFormat, projectedCrc, projectedFormat, mode))
  1963. {
  1964. if (RecordTranslationMode::AlwaysECL == mode)
  1965. {
  1966. publishedFormat = expectedFormat;
  1967. publishedCrc = expectedCrc;
  1968. }
  1969. else if (!publishedFormat)
  1970. publishedFormat = expectedFormat;
  1971. class CTranslator : public CSimpleInterfaceOf<ITranslator>
  1972. {
  1973. Linked<IOutputMetaData> actualFormat;
  1974. Linked<const IDynamicTransform> translator;
  1975. Linked<const IKeyTranslator> keyedTranslator;
  1976. public:
  1977. CTranslator(IOutputMetaData *_actualFormat, const IDynamicTransform *_translator, const IKeyTranslator *_keyedTranslator)
  1978. : actualFormat(_actualFormat), translator(_translator), keyedTranslator(_keyedTranslator)
  1979. {
  1980. }
  1981. virtual IOutputMetaData &queryActualFormat() const override
  1982. {
  1983. return *actualFormat;
  1984. }
  1985. virtual const IDynamicTransform &queryTranslator() const override
  1986. {
  1987. return *translator;
  1988. }
  1989. virtual const IKeyTranslator *queryKeyedTranslator() const override
  1990. {
  1991. return keyedTranslator;
  1992. }
  1993. };
  1994. return new CTranslator(publishedFormat, translator, keyedTranslator);
  1995. }
  1996. else
  1997. return nullptr;
  1998. }
  1999. #ifdef _USE_TBB
  2000. #include "tbb/task.h"
  2001. CPersistentTask::CPersistentTask(const char *name, IThreaded *_owner) : owner(_owner) {}
  2002. void CPersistentTask::start()
  2003. {
  2004. class RunTask : public tbb::task
  2005. {
  2006. public:
  2007. RunTask(CPersistentTask * _owner, tbb::task * _next) : owner(_owner), next(_next)
  2008. {
  2009. }
  2010. virtual tbb::task * execute()
  2011. {
  2012. try
  2013. {
  2014. owner->owner->threadmain();
  2015. }
  2016. catch (IException *e)
  2017. {
  2018. owner->exception.setown(e);
  2019. }
  2020. return next;
  2021. }
  2022. protected:
  2023. CPersistentTask * owner;
  2024. tbb::task * next;
  2025. };
  2026. end = new (tbb::task::allocate_root()) tbb::empty_task();
  2027. tbb::task * task = new (end->allocate_child()) RunTask(this, nullptr);
  2028. end->set_ref_count(1+1);
  2029. tbb::task::spawn(*task);
  2030. }
  2031. bool CPersistentTask::join(unsigned timeout, bool throwException)
  2032. {
  2033. end->wait_for_all();
  2034. end->destroy(*end);
  2035. end = nullptr;
  2036. if (throwException && exception.get())
  2037. throw exception.getClear();
  2038. return true;
  2039. }
  2040. #endif