thmem.cpp 69 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "platform.h"
  14. #include "jmisc.hpp"
  15. #include "jio.hpp"
  16. #include "jsort.hpp"
  17. #include "jsorta.hpp"
  18. #include "jvmem.hpp"
  19. #include "jflz.hpp"
  20. #include "thbufdef.hpp"
  21. #include "thor.hpp"
  22. #include "thormisc.hpp"
  23. #include "eclhelper.hpp"
  24. #include "dautils.hpp"
  25. #include "daclient.hpp"
  26. #define NO_BWD_COMPAT_MAXSIZE
  27. #include "thorcommon.ipp"
  28. #include "eclrtl.hpp"
  29. #include "roxiemem.hpp"
  30. #include "roxierow.hpp"
  31. #include "thmem.hpp"
  32. #include "thgraph.hpp"
  33. #include "thalloc.hpp"
  34. #undef ALLOCATE
  35. #undef CLONE
  36. #undef MEMACTIVITYTRACESTRING
  37. #include "thbuf.hpp"
  38. #include "thmem.hpp"
  39. #ifdef _DEBUG
  40. //#define _TESTING
  41. #define ASSERTEX(c) assertex(c)
  42. #else
  43. #define ASSERTEX(c)
  44. #endif
  45. static memsize_t MTthreshold=0;
  46. static CriticalSection MTcritsect; // held when blocked
  47. static Owned<ILargeMemLimitNotify> MTthresholdnotify;
  48. static bool MTlocked = false;
  49. void checkMultiThorMemoryThreshold(bool inc)
  50. {
  51. if (MTthresholdnotify.get()) {
  52. CriticalBlock block(MTcritsect);
  53. memsize_t used = 0; // JCSMORE - might work via callback in new scheme
  54. if (MTlocked) {
  55. if (used<MTthreshold/2) {
  56. DBGLOG("Multi Thor threshold lock released: %"I64F"d",(offset_t)used);
  57. MTlocked = false;
  58. MTthresholdnotify->give(used);
  59. }
  60. }
  61. else if (used>MTthreshold) {
  62. DBGLOG("Multi Thor threshold exceeded: %"I64F"d",(offset_t)used);
  63. if (!MTthresholdnotify->take(used)) {
  64. throw createOutOfMemException(-9,
  65. 1024, // dummy value
  66. used);
  67. }
  68. DBGLOG("Multi Thor lock taken");
  69. MTlocked = true;
  70. }
  71. }
  72. }
  73. extern graph_decl void setMultiThorMemoryNotify(size32_t size,ILargeMemLimitNotify *notify)
  74. {
  75. CriticalBlock block(MTcritsect);
  76. if (MTthresholdnotify.get()&&!notify&&MTlocked) {
  77. MTlocked = false;
  78. MTthresholdnotify->give(0);
  79. }
  80. MTthreshold = size;
  81. MTthresholdnotify.set(notify);
  82. if (notify)
  83. checkMultiThorMemoryThreshold(true);
  84. }
  85. static memsize_t largeMemSize = 0;
  86. memsize_t setLargeMemSize(unsigned limitMB)
  87. {
  88. memsize_t prevLargeMemSize = largeMemSize;
  89. largeMemSize = 1024*1024*(memsize_t)limitMB;
  90. return prevLargeMemSize;
  91. }
  92. memsize_t queryLargeMemSize()
  93. {
  94. if (0 == largeMemSize)
  95. throwUnexpected();
  96. return largeMemSize;
  97. }
  98. // =================================
  99. StringBuffer &getRecordString(const void *key, IOutputRowSerializer *serializer, const char *prefix, StringBuffer &out)
  100. {
  101. MemoryBuffer mb;
  102. const byte *k = (const byte *)key;
  103. size32_t sz = 0;
  104. if (serializer&&k) {
  105. CMemoryRowSerializer mbsz(mb);
  106. serializer->serialize(mbsz,(const byte *)k);
  107. k = (const byte *)mb.bufferBase();
  108. sz = mb.length();
  109. }
  110. if (sz)
  111. out.appendf("%s(%d): ",prefix,sz);
  112. else {
  113. out.append(prefix).append(": ");
  114. if (k)
  115. sz = 16;
  116. else
  117. out.append("NULL");
  118. }
  119. bool first=false;
  120. while (sz) {
  121. if (first)
  122. first=false;
  123. else
  124. out.append(',');
  125. if ((sz>=3)&&isprint(k[0])&&isprint(k[1])&&isprint(k[2])) {
  126. out.append('"');
  127. do {
  128. out.append(*k);
  129. sz--;
  130. if (sz==0)
  131. break;
  132. if (out.length()>1024)
  133. break;
  134. k++;
  135. } while (isprint(*k));
  136. out.append('"');
  137. }
  138. if (out.length()>1024) {
  139. out.append("...");
  140. break;
  141. }
  142. if (sz) {
  143. out.appendf("%2x",(unsigned)*k);
  144. k++;
  145. sz--;
  146. }
  147. }
  148. return out;
  149. }
  150. //====
  151. class CSpillableStreamBase : public CSimpleInterface, implements roxiemem::IBufferedRowCallback
  152. {
  153. protected:
  154. CActivityBase &activity;
  155. IRowInterfaces *rowIf;
  156. bool preserveNulls, ownsRows, useCompression;
  157. CThorSpillableRowArray rows;
  158. OwnedIFile spillFile;
  159. Owned<IRowStream> spillStream;
  160. bool spillRows()
  161. {
  162. // NB: Should always be called whilst 'rows' is locked (with CThorArrayLockBlock)
  163. rowidx_t numRows = rows.numCommitted();
  164. if (0 == numRows)
  165. return false;
  166. StringBuffer tempname;
  167. GetTempName(tempname,"streamspill", true);
  168. spillFile.setown(createIFile(tempname.str()));
  169. rows.save(*spillFile, useCompression); // saves committed rows
  170. rows.noteSpilled(numRows);
  171. return true;
  172. }
  173. public:
  174. IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
  175. CSpillableStreamBase(CActivityBase &_activity, CThorSpillableRowArray &inRows, IRowInterfaces *_rowIf, bool _preserveNulls)
  176. : activity(_activity), rowIf(_rowIf), rows(_activity, _rowIf, _preserveNulls), preserveNulls(_preserveNulls)
  177. {
  178. rows.swap(inRows);
  179. useCompression = false;
  180. activity.queryJob().queryRowManager()->addRowBuffer(this);
  181. }
  182. ~CSpillableStreamBase()
  183. {
  184. activity.queryJob().queryRowManager()->removeRowBuffer(this);
  185. spillStream.clear();
  186. if (spillFile)
  187. spillFile->remove();
  188. }
  189. // IBufferedRowCallback
  190. virtual unsigned getPriority() const
  191. {
  192. return SPILL_PRIORITY_SPILLABLE_STREAM;
  193. }
  194. virtual bool freeBufferedRows(bool critical)
  195. {
  196. CThorArrayLockBlock block(rows);
  197. return spillRows();
  198. }
  199. };
  200. // NB: Shared/spillable, holds all rows in mem until needs to spill.
  201. // spills all to disk, and stream continue reading from row in file
  202. class CSharedSpillableRowSet : public CSpillableStreamBase, implements IInterface
  203. {
  204. class CStream : public CSimpleInterface, implements IRowStream, implements IWritePosCallback
  205. {
  206. rowidx_t pos;
  207. offset_t outputOffset;
  208. Owned<IRowStream> spillStream;
  209. Linked<CSharedSpillableRowSet> owner;
  210. public:
  211. IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
  212. CStream(CSharedSpillableRowSet &_owner) : owner(&_owner)
  213. {
  214. pos = 0;
  215. outputOffset = (offset_t)-1;
  216. owner->rows.registerWriteCallback(*this);
  217. }
  218. ~CStream()
  219. {
  220. spillStream.clear(); // NB: clear stream 1st
  221. owner->rows.unregisterWriteCallback(*this);
  222. owner.clear();
  223. }
  224. // IRowStream
  225. virtual const void *nextRow()
  226. {
  227. if (spillStream)
  228. return spillStream->nextRow();
  229. CThorArrayLockBlock block(owner->rows);
  230. if (pos == owner->rows.numCommitted())
  231. return NULL;
  232. else if (owner->spillFile) // i.e. has spilt
  233. {
  234. assertex(((offset_t)-1) != outputOffset);
  235. unsigned rwFlags = DEFAULT_RWFLAGS;
  236. if (owner->preserveNulls)
  237. rwFlags |= rw_grouped;
  238. spillStream.setown(::createRowStreamEx(owner->spillFile, owner->rowIf, outputOffset, (offset_t)-1, (unsigned __int64)-1, rwFlags));
  239. return spillStream->nextRow();
  240. }
  241. return owner->rows.get(pos++);
  242. }
  243. virtual void stop() { }
  244. // IWritePosCallback
  245. virtual rowidx_t queryRecordNumber()
  246. {
  247. return pos;
  248. }
  249. virtual void filePosition(offset_t pos)
  250. {
  251. // Called via spilling save, stream will continue reading from file @ pos
  252. outputOffset = pos;
  253. }
  254. };
  255. public:
  256. IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
  257. CSharedSpillableRowSet(CActivityBase &_activity, CThorSpillableRowArray &inRows, IRowInterfaces *_rowIf, bool _preserveNulls)
  258. : CSpillableStreamBase(_activity, inRows, _rowIf, _preserveNulls)
  259. {
  260. }
  261. IRowStream *createRowStream()
  262. {
  263. {
  264. // already spilled?
  265. CThorArrayLockBlock block(rows);
  266. if (spillFile)
  267. {
  268. unsigned rwFlags = DEFAULT_RWFLAGS;
  269. if (preserveNulls)
  270. rwFlags |= rw_grouped;
  271. return ::createRowStream(spillFile, rowIf, rwFlags);
  272. }
  273. }
  274. return new CStream(*this);
  275. }
  276. };
  277. // NB: A single unshared spillable stream
  278. class CSpillableStream : public CSpillableStreamBase, implements IRowStream
  279. {
  280. rowidx_t pos, numReadRows, granularity;
  281. const void **readRows;
  282. public:
  283. IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
  284. CSpillableStream(CActivityBase &_activity, CThorSpillableRowArray &inRows, IRowInterfaces *_rowIf, bool _preserveNulls)
  285. : CSpillableStreamBase(_activity, inRows, _rowIf, _preserveNulls)
  286. {
  287. useCompression = activity.getOptBool(THOROPT_COMPRESS_SPILLS, true);
  288. pos = numReadRows = 0;
  289. granularity = 500; // JCSMORE - rows
  290. // a small amount of rows to read from swappable rows
  291. roxiemem::IRowManager *rowManager = activity.queryJob().queryRowManager();
  292. readRows = static_cast<const void * *>(rowManager->allocate(granularity * sizeof(void*), activity.queryContainer().queryId()));
  293. }
  294. ~CSpillableStream()
  295. {
  296. while (pos < numReadRows)
  297. {
  298. ReleaseThorRow(readRows[pos++]);
  299. }
  300. ReleaseThorRow(readRows);
  301. }
  302. // IRowStream
  303. virtual const void *nextRow()
  304. {
  305. if (spillStream)
  306. return spillStream->nextRow();
  307. if (pos == numReadRows)
  308. {
  309. CThorArrayLockBlock block(rows);
  310. if (spillFile)
  311. {
  312. unsigned rwFlags = DEFAULT_RWFLAGS;
  313. if (preserveNulls)
  314. rwFlags |= rw_grouped;
  315. if (useCompression)
  316. rwFlags |= rw_compress;
  317. spillStream.setown(createRowStream(spillFile, rowIf, rwFlags));
  318. return spillStream->nextRow();
  319. }
  320. rowidx_t fetch = rows.numCommitted();
  321. if (0 == fetch)
  322. return NULL;
  323. if (fetch >= granularity)
  324. fetch = granularity;
  325. // consume 'fetch' rows
  326. const void **toRead = rows.getBlock(fetch);
  327. memcpy(readRows, toRead, fetch * sizeof(void *));
  328. rows.noteSpilled(fetch);
  329. numReadRows = fetch;
  330. pos = 0;
  331. }
  332. const void *row = readRows[pos];
  333. readRows[pos] = NULL;
  334. ++pos;
  335. return row;
  336. }
  337. virtual void stop() { }
  338. };
  339. //====
  340. class CResizeRowCallback : implements roxiemem::IRowResizeCallback
  341. {
  342. IThorArrayLock &alock;
  343. void **&rows;
  344. memsize_t &capacity;
  345. public:
  346. CResizeRowCallback(void **&_rows, memsize_t &_capacity, IThorArrayLock &_alock) : rows(_rows), capacity(_capacity), alock(_alock) { }
  347. virtual void lock() { alock.lock(); }
  348. virtual void unlock() { alock.unlock(); }
  349. virtual void update(memsize_t _capacity, void * ptr) { capacity = _capacity; rows = (void **)ptr; }
  350. virtual void atomicUpdate(memsize_t capacity, void * ptr)
  351. {
  352. CThorArrayLockBlock block(alock);
  353. update(capacity, ptr);
  354. }
  355. };
  356. //====
  357. void CThorExpandingRowArray::init(rowidx_t initialSize, StableSortFlag _stableSort)
  358. {
  359. rowManager = activity.queryJob().queryRowManager();
  360. stableSort = _stableSort;
  361. throwOnOom = false;
  362. stableTable = NULL;
  363. if (initialSize)
  364. {
  365. rows = static_cast<const void * *>(rowManager->allocate(initialSize * sizeof(void*), activity.queryContainer().queryId()));
  366. maxRows = getRowsCapacity();
  367. memset(rows, 0, maxRows * sizeof(void *));
  368. if (stableSort_earlyAlloc == stableSort)
  369. stableTable = static_cast<void **>(rowManager->allocate(maxRows * sizeof(void*), activity.queryContainer().queryId()));
  370. else
  371. stableTable = NULL;
  372. }
  373. else
  374. {
  375. rows = NULL;
  376. maxRows = 0;
  377. }
  378. numRows = 0;
  379. }
  380. const void *CThorExpandingRowArray::allocateRowTable(rowidx_t num)
  381. {
  382. try
  383. {
  384. return rowManager->allocate(num * sizeof(void*), activity.queryContainer().queryId());
  385. }
  386. catch (IException * e)
  387. {
  388. //Pahological cases - not enough memory to reallocate the target row buffer, or no contiguous pages available.
  389. unsigned code = e->errorCode();
  390. if ((code == ROXIEMM_MEMORY_LIMIT_EXCEEDED) || (code == ROXIEMM_MEMORY_POOL_EXHAUSTED))
  391. {
  392. e->Release();
  393. return NULL;
  394. }
  395. throw;
  396. }
  397. }
  398. rowidx_t CThorExpandingRowArray::getNewSize(rowidx_t requiredRows)
  399. {
  400. rowidx_t newSize = maxRows;
  401. //This condition must be <= at least 1/scaling factor below otherwise you'll get an infinite loop.
  402. if (newSize <= 4)
  403. newSize = requiredRows;
  404. else
  405. {
  406. //What algorithm should we use to increase the size? Trading memory usage against copying row pointers.
  407. // adding 50% would reduce the number of allocations.
  408. // anything below 32% would mean that blocks n,n+1 when freed have enough space for block n+3 which might
  409. // reduce fragmentation.
  410. //Use 25% for the moment. It should possibly be configurable - e.g., higher for thor global sort.
  411. while (newSize < requiredRows)
  412. newSize += newSize/4;
  413. }
  414. return newSize;
  415. }
  416. bool CThorExpandingRowArray::resizeRowTable(void **oldRows, memsize_t newCapacity, bool copy, roxiemem::IRowResizeCallback &callback)
  417. {
  418. try
  419. {
  420. if (oldRows)
  421. rowManager->resizeRow(oldRows, copy?RoxieRowCapacity(oldRows):0, newCapacity, activity.queryContainer().queryId(), callback);
  422. else
  423. {
  424. void **newRows = (void **)rowManager->allocate(newCapacity, activity.queryContainer().queryId());
  425. callback.atomicUpdate(RoxieRowCapacity(newRows), newRows);
  426. }
  427. }
  428. catch (IException * e)
  429. {
  430. //Pathological cases - not enough memory to reallocate the target row buffer, or no contiguous pages available.
  431. unsigned code = e->errorCode();
  432. if ((code == ROXIEMM_MEMORY_LIMIT_EXCEEDED) || (code == ROXIEMM_MEMORY_POOL_EXHAUSTED))
  433. {
  434. e->Release();
  435. return false;
  436. }
  437. throw;
  438. }
  439. return true;
  440. }
  441. const void *CThorExpandingRowArray::allocateNewRows(rowidx_t requiredRows)
  442. {
  443. rowidx_t newSize = maxRows;
  444. //This condition must be <= at least 1/scaling factor below otherwise you'll get an infinite loop.
  445. if (newSize <= 4)
  446. newSize = requiredRows;
  447. else
  448. {
  449. //What algorithm should we use to increase the size? Trading memory usage against copying row pointers.
  450. // adding 50% would reduce the number of allocations.
  451. // anything below 32% would mean that blocks n,n+1 when freed have enough space for block n+3 which might
  452. // reduce fragmentation.
  453. //Use 25% for the moment. It should possibly be configurable - e.g., higher for thor global sort.
  454. while (newSize < requiredRows)
  455. newSize += newSize/4;
  456. }
  457. return allocateRowTable(newSize);
  458. }
  459. void CThorExpandingRowArray::doSort(rowidx_t n, void **const rows, ICompare &compare, unsigned maxCores)
  460. {
  461. // NB: will only be called if numRows>1
  462. if (stableSort_none != stableSort)
  463. {
  464. OwnedConstThorRow tmpStableTable;
  465. void **stableTablePtr;
  466. if (stableSort_lateAlloc == stableSort)
  467. {
  468. dbgassertex(NULL == stableTable);
  469. tmpStableTable.setown(rowManager->allocate(getRowsCapacity() * sizeof(void *), activity.queryContainer().queryId()));
  470. stableTablePtr = (void **)tmpStableTable.get();
  471. }
  472. else
  473. {
  474. dbgassertex(NULL != stableTable);
  475. stableTablePtr = stableTable;
  476. }
  477. void **_rows = rows;
  478. memcpy(stableTable, _rows, n*sizeof(void **));
  479. parqsortvecstable(stableTable, n, compare, (void ***)_rows, maxCores);
  480. while (n--)
  481. {
  482. *_rows = **((void ***)_rows);
  483. _rows++;
  484. }
  485. }
  486. else
  487. parqsortvec((void **const)rows, n, compare, maxCores);
  488. }
  489. CThorExpandingRowArray::CThorExpandingRowArray(CActivityBase &_activity, IRowInterfaces *_rowIf, bool _allowNulls, StableSortFlag _stableSort, bool _throwOnOom, rowidx_t initialSize) : activity(_activity)
  490. {
  491. init(initialSize, _stableSort);
  492. setup(_rowIf, _allowNulls, _stableSort, _throwOnOom);
  493. }
  494. CThorExpandingRowArray::~CThorExpandingRowArray()
  495. {
  496. clearRows();
  497. ReleaseThorRow(rows);
  498. ReleaseThorRow(stableTable);
  499. }
  500. void CThorExpandingRowArray::setup(IRowInterfaces *_rowIf, bool _allowNulls, StableSortFlag _stableSort, bool _throwOnOom)
  501. {
  502. rowIf = _rowIf;
  503. stableSort = _stableSort;
  504. throwOnOom = _throwOnOom;
  505. allowNulls = _allowNulls;
  506. if (rowIf)
  507. {
  508. allocator = rowIf->queryRowAllocator();
  509. deserializer = rowIf->queryRowDeserializer();
  510. serializer = rowIf->queryRowSerializer();
  511. }
  512. else
  513. {
  514. allocator = NULL;
  515. deserializer = NULL;
  516. serializer = NULL;
  517. }
  518. }
  519. void CThorExpandingRowArray::clearRows()
  520. {
  521. for (rowidx_t i = 0; i < numRows; i++)
  522. ReleaseThorRow(rows[i]);
  523. numRows = 0;
  524. }
  525. void CThorExpandingRowArray::kill()
  526. {
  527. clearRows();
  528. maxRows = 0;
  529. ReleaseThorRow(rows);
  530. ReleaseThorRow(stableTable);
  531. rows = NULL;
  532. stableTable = NULL;
  533. }
  534. void CThorExpandingRowArray::swap(CThorExpandingRowArray &other)
  535. {
  536. roxiemem::IRowManager *otherRowManager = other.rowManager;
  537. IRowInterfaces *otherRowIf = other.rowIf;
  538. const void **otherRows = other.rows;
  539. void **otherStableTable = other.stableTable;
  540. bool otherAllowNulls = other.allowNulls;
  541. StableSortFlag otherStableSort = other.stableSort;
  542. bool otherThrowOnOom = other.throwOnOom;
  543. rowidx_t otherMaxRows = other.maxRows;
  544. rowidx_t otherNumRows = other.numRows;
  545. other.rowManager = rowManager;
  546. other.setup(rowIf, allowNulls, stableSort, throwOnOom);
  547. other.rows = rows;
  548. other.stableTable = stableTable;
  549. other.maxRows = maxRows;
  550. other.numRows = numRows;
  551. rowManager = otherRowManager;
  552. setup(otherRowIf, otherAllowNulls, otherStableSort, otherThrowOnOom);
  553. rows = otherRows;
  554. stableTable = otherStableTable;
  555. maxRows = otherMaxRows;
  556. numRows = otherNumRows;
  557. }
  558. void CThorExpandingRowArray::transferRows(rowidx_t & outNumRows, const void * * & outRows)
  559. {
  560. outNumRows = numRows;
  561. outRows = rows;
  562. numRows = 0;
  563. maxRows = 0;
  564. rows = NULL;
  565. ReleaseThorRow(stableTable);
  566. stableTable = NULL;
  567. }
  568. void CThorExpandingRowArray::transferFrom(CThorExpandingRowArray &donor)
  569. {
  570. kill();
  571. donor.transferRows(numRows, rows);
  572. maxRows = numRows;
  573. if (maxRows && (stableSort_earlyAlloc == stableSort))
  574. ensure(maxRows);
  575. }
  576. void CThorExpandingRowArray::transferFrom(CThorSpillableRowArray &donor)
  577. {
  578. transferFrom((CThorExpandingRowArray &)donor);
  579. }
  580. void CThorExpandingRowArray::removeRows(rowidx_t start, rowidx_t n)
  581. {
  582. assertex(numRows-start >= n);
  583. assertex(!n || rows);
  584. if (rows)
  585. {
  586. rowidx_t end = start+n;
  587. for (rowidx_t i = start; i < end; i++)
  588. ReleaseThorRow(rows[i]);
  589. //firstRow = 0;
  590. const void **from = rows+start;
  591. memmove(from, from+n, (numRows-end) * sizeof(void *));
  592. numRows -= n;
  593. }
  594. }
  595. void CThorExpandingRowArray::clearUnused()
  596. {
  597. if (rows)
  598. memset(rows+numRows, 0, (maxRows-numRows) * sizeof(void *));
  599. }
  600. bool CThorExpandingRowArray::ensure(rowidx_t requiredRows)
  601. {
  602. //Only the writer is allowed to reallocate rows (otherwise append can't be optimized), so rows is valid outside the lock
  603. if (0 == requiredRows)
  604. return true;
  605. // NB: only ensure alters row capacity, so no locking required to protect getRowsCapacity()
  606. memsize_t capacity = rows ? RoxieRowCapacity(rows) : 0;
  607. rowidx_t currentMaxRows = getRowsCapacity();
  608. if (currentMaxRows < requiredRows) // check, because may have expanded previously, but failed to allocate stableTable and set new maxRows
  609. {
  610. capacity = ((memsize_t)getNewSize(requiredRows)) * sizeof(void *);
  611. CResizeRowCallback callback(*(void ***)(&rows), capacity, *this);
  612. if (!resizeRowTable((void **)rows, capacity, true, callback)) // callback will reset capacity
  613. {
  614. if (throwOnOom)
  615. throw MakeActivityException(&activity, 0, "Out of memory, allocating row array, had %"RIPF"d, trying to allocate %"RIPF"d elements", ordinality(), requiredRows);
  616. return false;
  617. }
  618. }
  619. if (stableSort_earlyAlloc == stableSort)
  620. {
  621. memsize_t dummy;
  622. CResizeRowCallback callback(stableTable, dummy, *this);
  623. if (!resizeRowTable(stableTable, capacity, false, callback))
  624. {
  625. if (throwOnOom)
  626. throw MakeActivityException(&activity, 0, "Out of memory, resizing stable row array, trying to allocate %"RIPF"d elements", currentMaxRows);
  627. return false;
  628. }
  629. // NB: If allocation of stableTable fails, 'rows' has expanded, but maxRows has not
  630. // this means, that on a subsequent ensure() call, it will only need to [attempt] to resize the stable ptr array.
  631. // (see comment if (currentMaxRows < requiredRows) check above
  632. }
  633. // Both row tables updated, only now update maxRows
  634. CThorArrayLockBlock block(*this);
  635. maxRows = capacity / sizeof(void *);
  636. return true;
  637. }
  638. void CThorExpandingRowArray::sort(ICompare &compare, unsigned maxCores)
  639. {
  640. if (numRows>1)
  641. doSort(numRows, (void **const)rows, compare, maxCores);
  642. }
  643. void CThorExpandingRowArray::reorder(rowidx_t start, rowidx_t num, rowidx_t *neworder)
  644. {
  645. if (start>=numRows)
  646. return;
  647. if (start+num>numRows)
  648. num = numRows-start;
  649. if (!num)
  650. return;
  651. MemoryAttr ma;
  652. void **tmp = (void **)ma.allocate(num*sizeof(void *));
  653. const void **p = rows + start;
  654. memcpy(tmp, p, num*sizeof(void *));
  655. for (rowidx_t i=0; i<num; i++)
  656. p[i] = tmp[neworder[i]];
  657. }
  658. bool CThorExpandingRowArray::equal(ICompare *icmp, CThorExpandingRowArray &other)
  659. {
  660. // slow but better than prev!
  661. rowidx_t n = other.ordinality();
  662. if (n!=ordinality())
  663. return false;
  664. for (rowidx_t i=0;i<n;i++)
  665. {
  666. const void *p1 = rows[i];
  667. const void *p2 = other.query(i);
  668. if (0 != icmp->docompare(p1, p2))
  669. return false;
  670. }
  671. return true;
  672. }
  673. bool CThorExpandingRowArray::checkSorted(ICompare *icmp)
  674. {
  675. rowidx_t n=ordinality();
  676. for (rowidx_t i=1; i<n; i++)
  677. {
  678. if (icmp->docompare(rows[i-1], rows[i])>0)
  679. return false;
  680. }
  681. return true;
  682. }
  683. IRowStream *CThorExpandingRowArray::createRowStream(rowidx_t start, rowidx_t num, bool streamOwns)
  684. {
  685. class CStream : public CSimpleInterface, implements IRowStream
  686. {
  687. CThorExpandingRowArray &parent;
  688. rowidx_t pos, lastRow;
  689. bool owns;
  690. public:
  691. IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
  692. CStream(CThorExpandingRowArray &_parent, rowidx_t firstRow, rowidx_t _lastRow, bool _owns)
  693. : parent(_parent), pos(firstRow), lastRow(_lastRow), owns(_owns)
  694. {
  695. }
  696. // IRowStream
  697. virtual const void *nextRow()
  698. {
  699. if (pos >= lastRow)
  700. return NULL;
  701. if (owns)
  702. return parent.getClear(pos++);
  703. else
  704. return parent.get(pos++);
  705. }
  706. virtual void stop() { }
  707. };
  708. if (start>ordinality())
  709. start = ordinality();
  710. rowidx_t lastRow;
  711. if ((num==(rowidx_t)-1)||(start+num>ordinality()))
  712. lastRow = ordinality();
  713. else
  714. lastRow = start+num;
  715. return new CStream(*this, start, lastRow, streamOwns);
  716. }
  717. void CThorExpandingRowArray::partition(ICompare &compare, unsigned num, UnsignedArray &out)
  718. {
  719. rowidx_t p=0;
  720. rowidx_t n = ordinality();
  721. while (num)
  722. {
  723. out.append(p);
  724. if (p<n)
  725. {
  726. rowidx_t q = p+(n-p)/num;
  727. if (p==q) // skip to next group
  728. {
  729. while (q<n)
  730. {
  731. q++;
  732. if ((q<n)&&(compare.docompare(rows[p],rows[q])!=0)) // ensure at next group
  733. break;
  734. }
  735. }
  736. else
  737. {
  738. while ((q<n)&&(q!=p)&&(compare.docompare(rows[q-1],rows[q])==0)) // ensure at start of group
  739. q--;
  740. }
  741. p = q;
  742. }
  743. num--;
  744. }
  745. out.append(n);
  746. }
  747. offset_t CThorExpandingRowArray::serializedSize()
  748. {
  749. IOutputMetaData *meta = rowIf->queryRowMetaData();
  750. IOutputMetaData *diskMeta = meta->querySerializedDiskMeta();
  751. rowidx_t c = ordinality();
  752. offset_t total = 0;
  753. if (diskMeta->isFixedSize())
  754. total = c * diskMeta->getFixedSize();
  755. else
  756. {
  757. Owned<IOutputRowSerializer> diskSerializer = diskMeta->createDiskSerializer(rowIf->queryCodeContext(), rowIf->queryActivityId());
  758. CSizingSerializer ssz;
  759. for (rowidx_t i=0; i<c; i++)
  760. {
  761. diskSerializer->serialize(ssz, (const byte *)rows[i]);
  762. total += ssz.size();
  763. ssz.reset();
  764. }
  765. }
  766. return total;
  767. }
  768. memsize_t CThorExpandingRowArray::getMemUsage()
  769. {
  770. roxiemem::IRowManager *rM = activity.queryJob().queryRowManager();
  771. IOutputMetaData *meta = rowIf->queryRowMetaData();
  772. IOutputMetaData *diskMeta = meta->querySerializedDiskMeta(); // GH->JCS - really I want a internalMeta here.
  773. rowidx_t c = ordinality();
  774. memsize_t total = 0;
  775. if (diskMeta->isFixedSize())
  776. total = c * rM->getExpectedFootprint(diskMeta->getFixedSize(), 0);
  777. else
  778. {
  779. CSizingSerializer ssz;
  780. for (rowidx_t i=0; i<c; i++)
  781. {
  782. serializer->serialize(ssz, (const byte *)rows[i]);
  783. total += rM->getExpectedFootprint(ssz.size(), 0);
  784. ssz.reset();
  785. }
  786. }
  787. // NB: worst case, when expanding (see ensure method)
  788. memsize_t sz = rM->getExpectedFootprint(maxRows * sizeof(void *), 0);
  789. memsize_t szE = sz / 100 * 125; // don't care if sz v. small
  790. if (stableSort_none == stableSort)
  791. total += sz + szE;
  792. else
  793. total += sz + szE * 2;
  794. return total;
  795. }
  796. void CThorExpandingRowArray::serialize(IRowSerializerTarget &out)
  797. {
  798. bool warnnull = true;
  799. assertex(serializer);
  800. rowidx_t n = ordinality();
  801. if (n)
  802. {
  803. for (rowidx_t i = 0; i < n; i++)
  804. {
  805. const void *row = query(i);
  806. if (row)
  807. serializer->serialize(out, (const byte *)row);
  808. else if (warnnull)
  809. {
  810. WARNLOG("CThorExpandingRowArray::serialize ignoring NULL row");
  811. warnnull = false;
  812. }
  813. }
  814. }
  815. }
  816. void CThorExpandingRowArray::serialize(MemoryBuffer &mb)
  817. {
  818. assertex(serializer);
  819. CMemoryRowSerializer s(mb);
  820. if (!allowNulls)
  821. serialize(s);
  822. else
  823. {
  824. unsigned short guard = 0x7631;
  825. mb.append(guard);
  826. rowidx_t n = ordinality();
  827. if (n)
  828. {
  829. for (rowidx_t i = 0; i < n; i++)
  830. {
  831. const void *row = query(i);
  832. bool isnull = (row==NULL);
  833. mb.append(isnull);
  834. if (!isnull)
  835. serializer->serialize(s, (const byte *)row);
  836. }
  837. }
  838. }
  839. }
  840. void CThorExpandingRowArray::serializeCompress(MemoryBuffer &mb)
  841. {
  842. MemoryBuffer exp;
  843. serialize(exp);
  844. fastLZCompressToBuffer(mb,exp.length(), exp.toByteArray());
  845. }
  846. rowidx_t CThorExpandingRowArray::serializeBlock(MemoryBuffer &mb, rowidx_t idx, rowidx_t count, size32_t dstmax, bool hardMax)
  847. {
  848. assertex(serializer);
  849. CMemoryRowSerializer out(mb);
  850. bool warnnull = true;
  851. rowidx_t num=ordinality();
  852. if (idx>=num)
  853. return 0;
  854. if (num-idx<count)
  855. count = num-idx;
  856. rowidx_t ret = 0;
  857. for (rowidx_t i=0;i<count;i++)
  858. {
  859. size32_t ln = mb.length();
  860. const void *row = query(i+idx);
  861. if (row)
  862. serializer->serialize(out,(const byte *)row);
  863. else if (warnnull)
  864. {
  865. WARNLOG("CThorExpandingRowArray::serialize ignoring NULL row");
  866. warnnull = false;
  867. }
  868. // allows at least one
  869. if (mb.length()>dstmax)
  870. {
  871. if (hardMax && ln) // remove last if above limit
  872. mb.setLength(ln);
  873. else
  874. ++ret;
  875. break;
  876. }
  877. else
  878. ++ret;
  879. }
  880. return ret;
  881. }
  882. void CThorExpandingRowArray::deserializeRow(IRowDeserializerSource &in)
  883. {
  884. RtlDynamicRowBuilder rowBuilder(allocator);
  885. size32_t sz = deserializer->deserialize(rowBuilder,in);
  886. append(rowBuilder.finalizeRowClear(sz));
  887. }
  888. void CThorExpandingRowArray::deserialize(size32_t sz, const void *buf)
  889. {
  890. if (allowNulls)
  891. {
  892. ASSERTEX((sz>=sizeof(short))&&(*(unsigned short *)buf==0x7631)); // check for mismatch
  893. buf = (const byte *)buf+sizeof(unsigned short);
  894. sz -= sizeof(unsigned short);
  895. }
  896. CThorStreamDeserializerSource d(sz,buf);
  897. while (!d.eos())
  898. {
  899. if (allowNulls)
  900. {
  901. bool nullrow;
  902. d.read(sizeof(bool),&nullrow);
  903. if (nullrow)
  904. {
  905. append(NULL);
  906. continue;
  907. }
  908. }
  909. deserializeRow(d);
  910. }
  911. }
  912. void CThorExpandingRowArray::deserializeExpand(size32_t sz, const void *data)
  913. {
  914. MemoryBuffer mb;
  915. fastLZDecompressToBuffer(mb, data);
  916. deserialize(mb.length(), mb.bufferBase());
  917. }
  918. //////////////////
  919. void CThorSpillableRowArray::registerWriteCallback(IWritePosCallback &cb)
  920. {
  921. CThorArrayLockBlock block(*this);
  922. writeCallbacks.append(cb); // NB not linked to avoid circular dependency
  923. }
  924. void CThorSpillableRowArray::unregisterWriteCallback(IWritePosCallback &cb)
  925. {
  926. CThorArrayLockBlock block(*this);
  927. writeCallbacks.zap(cb);
  928. }
  929. CThorSpillableRowArray::CThorSpillableRowArray(CActivityBase &activity, IRowInterfaces *rowIf, bool allowNulls, StableSortFlag stableSort, rowidx_t initialSize, size32_t _commitDelta)
  930. : CThorExpandingRowArray(activity, rowIf, false, stableSort, false, initialSize), commitDelta(_commitDelta)
  931. {
  932. commitRows = 0;
  933. firstRow = 0;
  934. }
  935. CThorSpillableRowArray::~CThorSpillableRowArray()
  936. {
  937. clearRows();
  938. }
  939. void CThorSpillableRowArray::clearRows()
  940. {
  941. for (rowidx_t i = firstRow; i < numRows; i++)
  942. ReleaseThorRow(rows[i]);
  943. numRows = 0;
  944. firstRow = 0;
  945. commitRows = 0;
  946. }
  947. void CThorSpillableRowArray::kill()
  948. {
  949. clearRows();
  950. CThorExpandingRowArray::kill();
  951. }
  952. void CThorSpillableRowArray::sort(ICompare &compare, unsigned maxCores)
  953. {
  954. // NB: only to be called inside lock
  955. rowidx_t n = numCommitted();
  956. if (n>1)
  957. {
  958. void **const rows = (void **const)getBlock(n);
  959. doSort(n, rows, compare, maxCores);
  960. }
  961. }
  962. rowidx_t CThorSpillableRowArray::save(IFile &iFile, bool useCompression)
  963. {
  964. rowidx_t n = numCommitted();
  965. if (0 == n)
  966. return 0;
  967. ActPrintLog(&activity, "CThorSpillableRowArray::save %"RIPF"d rows", n);
  968. if (useCompression)
  969. assertex(0 == writeCallbacks.ordinality()); // incompatible
  970. unsigned rwFlags = DEFAULT_RWFLAGS;
  971. if (useCompression)
  972. rwFlags |= rw_compress;
  973. if (allowNulls)
  974. rwFlags |= rw_grouped;
  975. Owned<IExtRowWriter> writer = createRowWriter(&iFile, rowIf, rwFlags);
  976. const void **rows = getBlock(n);
  977. for (rowidx_t i=0; i < n; i++)
  978. {
  979. const void *row = rows[i];
  980. assertex(row || allowNulls);
  981. writer->putRow(row);
  982. rows[i] = NULL;
  983. ForEachItemIn(c, writeCallbacks)
  984. {
  985. IWritePosCallback &callback = writeCallbacks.item(c);
  986. if (i == callback.queryRecordNumber())
  987. {
  988. writer->flush();
  989. callback.filePosition(writer->getPosition());
  990. }
  991. }
  992. }
  993. writer->flush();
  994. offset_t bytesWritten = writer->getPosition();
  995. writer.clear();
  996. ActPrintLog(&activity, "CThorSpillableRowArray::save done, bytes = %"I64F"d", (__int64)bytesWritten);
  997. return n;
  998. }
  999. // JCSMORE - these methods are essentially borrowed from RoxieOutputRowArray, would be good to unify
  1000. const void **CThorSpillableRowArray::getBlock(rowidx_t readRows)
  1001. {
  1002. dbgassertex(firstRow+readRows <= commitRows);
  1003. return rows + firstRow;
  1004. }
  1005. void CThorSpillableRowArray::flush()
  1006. {
  1007. CThorArrayLockBlock block(*this);
  1008. dbgassertex(numRows >= commitRows);
  1009. //This test could be improved...
  1010. if (firstRow != 0 && firstRow == commitRows)
  1011. {
  1012. //A block of rows was removed - copy these rows to the start of the block.
  1013. memmove(rows, rows+firstRow, (numRows-firstRow) * sizeof(void *));
  1014. numRows -= firstRow;
  1015. firstRow = 0;
  1016. }
  1017. commitRows = numRows;
  1018. }
  1019. void CThorSpillableRowArray::transferFrom(CThorExpandingRowArray &src)
  1020. {
  1021. CThorArrayLockBlock block(*this);
  1022. CThorExpandingRowArray::transferFrom(src);
  1023. commitRows = numRows;
  1024. }
  1025. void CThorSpillableRowArray::swap(CThorSpillableRowArray &other)
  1026. {
  1027. CThorArrayLockBlock block(*this);
  1028. CThorExpandingRowArray::swap(other);
  1029. rowidx_t otherFirstRow = other.firstRow;
  1030. rowidx_t otherCommitRows = other.commitRows;
  1031. other.firstRow = firstRow;
  1032. other.commitRows = commitRows;
  1033. firstRow = otherFirstRow;
  1034. commitRows = otherCommitRows;
  1035. }
  1036. IRowStream *CThorSpillableRowArray::createRowStream()
  1037. {
  1038. return new CSpillableStream(activity, *this, rowIf, allowNulls);
  1039. }
  1040. class CThorRowCollectorBase : public CSimpleInterface, implements roxiemem::IBufferedRowCallback
  1041. {
  1042. protected:
  1043. CActivityBase &activity;
  1044. CThorSpillableRowArray spillableRows;
  1045. PointerIArrayOf<CFileOwner> spillFiles;
  1046. Owned<IOutputRowSerializer> serializer;
  1047. RowCollectorFlags diskMemMix;
  1048. rowcount_t totalRows;
  1049. unsigned spillPriority;
  1050. unsigned overflowCount;
  1051. unsigned maxCores;
  1052. unsigned outStreams;
  1053. ICompare *iCompare;
  1054. bool isStable, preserveGrouping;
  1055. IRowInterfaces *rowIf;
  1056. SpinLock readerLock;
  1057. bool mmRegistered;
  1058. Owned<CSharedSpillableRowSet> spillableRowSet;
  1059. bool spillRows()
  1060. {
  1061. rowidx_t numRows = spillableRows.numCommitted();
  1062. if (numRows == 0)
  1063. return false;
  1064. totalRows += numRows;
  1065. if (iCompare)
  1066. {
  1067. ActPrintLog(&activity, "Sorting %"RIPF"d rows", spillableRows.numCommitted());
  1068. CCycleTimer timer;
  1069. spillableRows.sort(*iCompare, maxCores); // sorts committed rows
  1070. ActPrintLog(&activity, "Sort took: %f", ((float)timer.elapsedMs())/1000);
  1071. }
  1072. StringBuffer tempname;
  1073. GetTempName(tempname,"srtspill",true);
  1074. Owned<IFile> iFile = createIFile(tempname.str());
  1075. spillFiles.append(new CFileOwner(iFile.getLink()));
  1076. spillableRows.save(*iFile, activity.getOptBool(THOROPT_COMPRESS_SPILLS, true)); // saves committed rows
  1077. spillableRows.noteSpilled(numRows);
  1078. ++overflowCount;
  1079. return true;
  1080. }
  1081. void setPreserveGrouping(bool _preserveGrouping)
  1082. {
  1083. preserveGrouping = _preserveGrouping;
  1084. spillableRows.setAllowNulls(preserveGrouping);
  1085. }
  1086. void flush()
  1087. {
  1088. spillableRows.flush();
  1089. }
  1090. void putRow(const void *row)
  1091. {
  1092. if (!spillableRows.append(row))
  1093. {
  1094. bool oom = false;
  1095. if (spillingEnabled())
  1096. {
  1097. CThorArrayLockBlock block(spillableRows);
  1098. //We should have been called back to free any committed rows, but occasionally it may not (e.g., if
  1099. //the problem is global memory is exhausted) - in which case force a spill here (but add any pending
  1100. //rows first).
  1101. if (spillableRows.numCommitted() != 0)
  1102. {
  1103. spillableRows.flush();
  1104. spillRows();
  1105. }
  1106. //Ensure new rows are written to the head of the array. It needs to be a separate call because
  1107. //spillRows() cannot shift active row pointer since it can be called from any thread
  1108. spillableRows.flush();
  1109. if (!spillableRows.append(row))
  1110. oom = true;
  1111. }
  1112. else
  1113. oom = true;
  1114. if (oom)
  1115. {
  1116. ReleaseThorRow(row);
  1117. throw MakeActivityException(&activity, ROXIEMM_MEMORY_LIMIT_EXCEEDED, "Insufficient memory to append sort row");
  1118. }
  1119. }
  1120. }
  1121. IRowStream *getStream(CThorExpandingRowArray *allMemRows, memsize_t *memUsage, bool shared)
  1122. {
  1123. SpinBlock b(readerLock);
  1124. if (0 == outStreams)
  1125. {
  1126. spillableRows.flush();
  1127. if (spillingEnabled())
  1128. {
  1129. // i.e. all disk OR (some on disk already AND allDiskOrAllMem)
  1130. if (((rc_allDisk == diskMemMix) || ((rc_allDiskOrAllMem == diskMemMix) && overflowCount)))
  1131. {
  1132. CThorArrayLockBlock block(spillableRows);
  1133. if (spillableRows.numCommitted())
  1134. {
  1135. spillRows();
  1136. spillableRows.kill();
  1137. }
  1138. }
  1139. }
  1140. }
  1141. ++outStreams;
  1142. /* Ensure existing callback is cleared, before:
  1143. * a) instreams are built, since new spillFiles can be added to as long as existing callback is active
  1144. * b) locked CThorSpillableRowArrayLock section below, which in turn may add a new callback.
  1145. * Otherwise, once this section has the lock, the existing callback may be called by roxiemem and block,
  1146. * causing this section to deadlock inside roxiemem, if it tries to add a new callback.
  1147. */
  1148. clearSpillingCallback();
  1149. // NB: CStreamFileOwner links CFileOwner - last usage will auto delete file
  1150. // which may be one of these streams or CThorRowCollectorBase itself
  1151. IArrayOf<IRowStream> instrms;
  1152. ForEachItemIn(f, spillFiles)
  1153. {
  1154. CFileOwner *fileOwner = spillFiles.item(f);
  1155. unsigned rwFlags = DEFAULT_RWFLAGS;
  1156. if (activity.getOptBool(THOROPT_COMPRESS_SPILLS, true))
  1157. rwFlags |= rw_compress;
  1158. if (preserveGrouping)
  1159. rwFlags |= rw_grouped;
  1160. Owned<IExtRowStream> strm = createRowStream(&fileOwner->queryIFile(), rowIf, rwFlags);
  1161. instrms.append(* new CStreamFileOwner(fileOwner, strm));
  1162. }
  1163. {
  1164. if (spillableRowSet)
  1165. instrms.append(*spillableRowSet->createRowStream());
  1166. else if (spillableRows.numCommitted())
  1167. {
  1168. totalRows += spillableRows.numCommitted();
  1169. if (iCompare && (1 == outStreams))
  1170. spillableRows.sort(*iCompare, maxCores);
  1171. // NB: if rc_allDiskOrAllMem and some disk already, will have been spilt already (see above) and not reach here
  1172. if (rc_allDiskOrAllMem == diskMemMix || (NULL!=allMemRows && (rc_allMem == diskMemMix)))
  1173. {
  1174. assertex(allMemRows);
  1175. assertex(1 == outStreams);
  1176. if (memUsage)
  1177. *memUsage = spillableRows.getMemUsage(); // a bit expensive if variable rows
  1178. allMemRows->transferFrom(spillableRows);
  1179. // stream cannot be used
  1180. return NULL;
  1181. }
  1182. if (!shared)
  1183. instrms.append(*spillableRows.createRowStream()); // NB: stream will take ownership of rows in spillableRows
  1184. else
  1185. {
  1186. spillableRowSet.setown(new CSharedSpillableRowSet(activity, spillableRows, rowIf, preserveGrouping));
  1187. instrms.append(*spillableRowSet->createRowStream());
  1188. }
  1189. }
  1190. }
  1191. if (0 == instrms.ordinality())
  1192. return createNullRowStream();
  1193. else if (1 == instrms.ordinality())
  1194. return LINK(&instrms.item(0));
  1195. else if (iCompare)
  1196. {
  1197. Owned<IRowLinkCounter> linkcounter = new CThorRowLinkCounter;
  1198. return createRowStreamMerger(instrms.ordinality(), instrms.getArray(), iCompare, false, linkcounter);
  1199. }
  1200. else
  1201. return createConcatRowStream(instrms.ordinality(),instrms.getArray());
  1202. }
  1203. void reset()
  1204. {
  1205. spillableRows.kill();
  1206. spillFiles.kill();
  1207. totalRows = 0;
  1208. overflowCount = outStreams = 0;
  1209. }
  1210. inline bool spillingEnabled() const { return SPILL_PRIORITY_DISABLE != spillPriority; }
  1211. void clearSpillingCallback()
  1212. {
  1213. if (mmRegistered)
  1214. {
  1215. activity.queryJob().queryRowManager()->removeRowBuffer(this);
  1216. mmRegistered = false;
  1217. }
  1218. }
  1219. public:
  1220. CThorRowCollectorBase(CActivityBase &_activity, IRowInterfaces *_rowIf, ICompare *_iCompare, bool _isStable, RowCollectorFlags _diskMemMix, unsigned _spillPriority)
  1221. : activity(_activity),
  1222. rowIf(_rowIf), iCompare(_iCompare), isStable(_isStable), diskMemMix(_diskMemMix), spillPriority(_spillPriority),
  1223. spillableRows(_activity, _rowIf)
  1224. {
  1225. preserveGrouping = false;
  1226. totalRows = 0;
  1227. overflowCount = outStreams = 0;
  1228. mmRegistered = false;
  1229. if (rc_allMem == diskMemMix)
  1230. spillPriority = SPILL_PRIORITY_DISABLE; // all mem, implies no spilling
  1231. else if (spillingEnabled())
  1232. {
  1233. activity.queryJob().queryRowManager()->addRowBuffer(this);
  1234. mmRegistered = true;
  1235. }
  1236. maxCores = activity.queryMaxCores();
  1237. spillableRows.setup(rowIf, false, isStable?stableSort_earlyAlloc:stableSort_none);
  1238. }
  1239. ~CThorRowCollectorBase()
  1240. {
  1241. reset();
  1242. clearSpillingCallback();
  1243. }
  1244. void transferRowsOut(CThorExpandingRowArray &out, bool sort)
  1245. {
  1246. CThorArrayLockBlock block(spillableRows);
  1247. spillableRows.flush();
  1248. totalRows += spillableRows.numCommitted();
  1249. if (sort && iCompare)
  1250. spillableRows.sort(*iCompare, maxCores);
  1251. out.transferFrom(spillableRows);
  1252. }
  1253. // IThorRowCollectorCommon
  1254. virtual rowcount_t numRows() const
  1255. {
  1256. return totalRows+spillableRows.numCommitted();
  1257. }
  1258. virtual unsigned numOverflows() const
  1259. {
  1260. return overflowCount;
  1261. }
  1262. virtual unsigned overflowScale() const
  1263. {
  1264. // 1 if no spill
  1265. if (!overflowCount)
  1266. return 1;
  1267. return overflowCount*2+3; // bit arbitrary
  1268. }
  1269. virtual void transferRowsIn(CThorExpandingRowArray &src)
  1270. {
  1271. reset();
  1272. spillableRows.transferFrom(src);
  1273. }
  1274. virtual void setup(ICompare *_iCompare, bool _isStable, RowCollectorFlags _diskMemMix, unsigned _spillPriority)
  1275. {
  1276. iCompare = _iCompare;
  1277. isStable = _isStable;
  1278. diskMemMix = _diskMemMix;
  1279. spillPriority = _spillPriority;
  1280. if (rc_allMem == diskMemMix)
  1281. spillPriority = SPILL_PRIORITY_DISABLE; // all mem, implies no spilling
  1282. if (mmRegistered && !spillingEnabled())
  1283. {
  1284. mmRegistered = false;
  1285. activity.queryJob().queryRowManager()->removeRowBuffer(this);
  1286. }
  1287. spillableRows.setup(rowIf, false, isStable?stableSort_earlyAlloc:stableSort_none);
  1288. }
  1289. virtual void ensure(rowidx_t max)
  1290. {
  1291. spillableRows.ensure(max);
  1292. }
  1293. // IBufferedRowCallback
  1294. virtual unsigned getPriority() const
  1295. {
  1296. return spillPriority;
  1297. }
  1298. virtual bool freeBufferedRows(bool critical)
  1299. {
  1300. if (!spillingEnabled())
  1301. return false;
  1302. CThorArrayLockBlock block(spillableRows);
  1303. return spillRows();
  1304. }
  1305. };
  1306. enum TRLGroupFlag { trl_ungroup, trl_preserveGrouping, trl_stopAtEog };
  1307. class CThorRowLoader : public CThorRowCollectorBase, implements IThorRowLoader
  1308. {
  1309. IRowStream *load(IRowStream *in, const bool &abort, TRLGroupFlag grouping, CThorExpandingRowArray *allMemRows, memsize_t *memUsage)
  1310. {
  1311. reset();
  1312. setPreserveGrouping(trl_preserveGrouping == grouping);
  1313. while (!abort)
  1314. {
  1315. const void *next = in->nextRow();
  1316. if (!next)
  1317. {
  1318. if (grouping == trl_stopAtEog)
  1319. break;
  1320. else
  1321. {
  1322. next = in->nextRow();
  1323. if (!next)
  1324. break;
  1325. if (grouping == trl_preserveGrouping)
  1326. putRow(NULL);
  1327. }
  1328. }
  1329. putRow(next);
  1330. }
  1331. return getStream(allMemRows, memUsage, false);
  1332. }
  1333. public:
  1334. IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
  1335. CThorRowLoader(CActivityBase &activity, IRowInterfaces *rowIf, ICompare *iCompare, bool isStable, RowCollectorFlags diskMemMix, unsigned spillPriority)
  1336. : CThorRowCollectorBase(activity, rowIf, iCompare, isStable, diskMemMix, spillPriority)
  1337. {
  1338. }
  1339. // IThorRowCollectorCommon
  1340. virtual rowcount_t numRows() const { return CThorRowCollectorBase::numRows(); }
  1341. virtual unsigned numOverflows() const { return CThorRowCollectorBase::numOverflows(); }
  1342. virtual unsigned overflowScale() const { return CThorRowCollectorBase::overflowScale(); }
  1343. virtual void transferRowsOut(CThorExpandingRowArray &dst, bool sort) { CThorRowCollectorBase::transferRowsOut(dst, sort); }
  1344. virtual void transferRowsIn(CThorExpandingRowArray &src) { CThorRowCollectorBase::transferRowsIn(src); }
  1345. virtual void setup(ICompare *iCompare, bool isStable=false, RowCollectorFlags diskMemMix=rc_mixed, unsigned spillPriority=50)
  1346. {
  1347. CThorRowCollectorBase::setup(iCompare, isStable, diskMemMix, spillPriority);
  1348. }
  1349. virtual void ensure(rowidx_t max) { CThorRowCollectorBase::ensure(max); }
  1350. // IThorRowLoader
  1351. virtual IRowStream *load(IRowStream *in, const bool &abort, bool preserveGrouping, CThorExpandingRowArray *allMemRows, memsize_t *memUsage)
  1352. {
  1353. assertex(!iCompare || !preserveGrouping); // can't sort if group preserving
  1354. return load(in, abort, preserveGrouping?trl_preserveGrouping:trl_ungroup, allMemRows, memUsage);
  1355. }
  1356. virtual IRowStream *loadGroup(IRowStream *in, const bool &abort, CThorExpandingRowArray *allMemRows, memsize_t *memUsage)
  1357. {
  1358. return load(in, abort, trl_stopAtEog, allMemRows, memUsage);
  1359. }
  1360. };
  1361. IThorRowLoader *createThorRowLoader(CActivityBase &activity, IRowInterfaces *rowIf, ICompare *iCompare, bool isStable, RowCollectorFlags diskMemMix, unsigned spillPriority)
  1362. {
  1363. return new CThorRowLoader(activity, rowIf, iCompare, isStable, diskMemMix, spillPriority);
  1364. }
  1365. IThorRowLoader *createThorRowLoader(CActivityBase &activity, ICompare *iCompare, bool isStable, RowCollectorFlags diskMemMix, unsigned spillPriority)
  1366. {
  1367. return createThorRowLoader(activity, &activity, iCompare, isStable, diskMemMix, spillPriority);
  1368. }
  1369. class CThorRowCollector : public CThorRowCollectorBase, implements IThorRowCollector
  1370. {
  1371. public:
  1372. IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
  1373. CThorRowCollector(CActivityBase &activity, IRowInterfaces *rowIf, ICompare *iCompare, bool isStable, RowCollectorFlags diskMemMix, unsigned spillPriority)
  1374. : CThorRowCollectorBase(activity, rowIf, iCompare, isStable, diskMemMix, spillPriority)
  1375. {
  1376. }
  1377. // IThorRowCollectorCommon
  1378. virtual void setPreserveGrouping(bool tf)
  1379. {
  1380. assertex(!iCompare || !tf); // can't sort if group preserving
  1381. CThorRowCollectorBase::setPreserveGrouping(tf);
  1382. }
  1383. virtual rowcount_t numRows() const { return CThorRowCollectorBase::numRows(); }
  1384. virtual unsigned numOverflows() const { return CThorRowCollectorBase::numOverflows(); }
  1385. virtual unsigned overflowScale() const { return CThorRowCollectorBase::overflowScale(); }
  1386. virtual void transferRowsOut(CThorExpandingRowArray &dst, bool sort) { CThorRowCollectorBase::transferRowsOut(dst, sort); }
  1387. virtual void transferRowsIn(CThorExpandingRowArray &src) { CThorRowCollectorBase::transferRowsIn(src); }
  1388. virtual void setup(ICompare *iCompare, bool isStable=false, RowCollectorFlags diskMemMix=rc_mixed, unsigned spillPriority=50)
  1389. {
  1390. CThorRowCollectorBase::setup(iCompare, isStable, diskMemMix, spillPriority);
  1391. }
  1392. virtual void ensure(rowidx_t max) { CThorRowCollectorBase::ensure(max); }
  1393. // IThorRowCollector
  1394. virtual IRowWriter *getWriter()
  1395. {
  1396. class CWriter : public CSimpleInterface, implements IRowWriter
  1397. {
  1398. Linked<CThorRowCollector> parent;
  1399. public:
  1400. IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
  1401. CWriter(CThorRowCollector *_parent) : parent(_parent)
  1402. {
  1403. }
  1404. ~CWriter()
  1405. {
  1406. flush();
  1407. }
  1408. // IRowWriter
  1409. virtual void putRow(const void *row)
  1410. {
  1411. parent->putRow(row);
  1412. }
  1413. virtual void flush()
  1414. {
  1415. parent->flush();
  1416. }
  1417. };
  1418. return new CWriter(this);
  1419. }
  1420. virtual void reset()
  1421. {
  1422. CThorRowCollectorBase::reset();
  1423. }
  1424. virtual IRowStream *getStream(bool shared)
  1425. {
  1426. return CThorRowCollectorBase::getStream(NULL, NULL, shared);
  1427. }
  1428. };
  1429. IThorRowCollector *createThorRowCollector(CActivityBase &activity, IRowInterfaces *rowIf, ICompare *iCompare, bool isStable, RowCollectorFlags diskMemMix, unsigned spillPriority, bool preserveGrouping)
  1430. {
  1431. Owned<IThorRowCollector> collector = new CThorRowCollector(activity, rowIf, iCompare, isStable, diskMemMix, spillPriority);
  1432. collector->setPreserveGrouping(preserveGrouping);
  1433. return collector.getClear();
  1434. }
  1435. IThorRowCollector *createThorRowCollector(CActivityBase &activity, ICompare *iCompare, bool isStable, RowCollectorFlags diskMemMix, unsigned spillPriority, bool preserveGrouping)
  1436. {
  1437. return createThorRowCollector(activity, &activity, iCompare, isStable, diskMemMix, spillPriority, preserveGrouping);
  1438. }
  1439. void setThorInABox(unsigned num)
  1440. {
  1441. }
  1442. class cMultiThorResourceMutex: public CSimpleInterface, implements ILargeMemLimitNotify, implements IDaliMutexNotifyWaiting
  1443. {
  1444. class cMultiThorResourceMutexThread: public Thread
  1445. {
  1446. cMultiThorResourceMutex &parent;
  1447. public:
  1448. cMultiThorResourceMutexThread(cMultiThorResourceMutex &_parent)
  1449. : Thread("cMultiThorResourceMutexThread"),parent(_parent)
  1450. {
  1451. }
  1452. int run()
  1453. {
  1454. parent.run();
  1455. return 0;
  1456. }
  1457. };
  1458. Owned<cMultiThorResourceMutexThread> thread;
  1459. Owned<IDaliMutex> mutex;
  1460. bool stopping;
  1461. Linked<ICommunicator> clusterComm;
  1462. CSDSServerStatus *status;
  1463. public:
  1464. IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
  1465. cMultiThorResourceMutex(const char *groupname,CSDSServerStatus *_status)
  1466. {
  1467. status = _status;
  1468. stopping = false;
  1469. clusterComm.set(&queryClusterComm());
  1470. if (clusterComm->queryGroup().rank(queryMyNode())==0) { // master so start thread
  1471. thread.setown(new cMultiThorResourceMutexThread(*this));
  1472. thread->start();
  1473. StringBuffer mname("thorres:");
  1474. mname.append(groupname);
  1475. mutex.setown(createDaliMutex(mname.str()));
  1476. }
  1477. }
  1478. ~cMultiThorResourceMutex()
  1479. {
  1480. stopping = true;
  1481. if (thread)
  1482. stop();
  1483. }
  1484. void run() // on master
  1485. {
  1486. PROGLOG("cMultiThorResourceMutex thread run");
  1487. try {
  1488. CMessageBuffer mbuf;
  1489. while (!stopping) {
  1490. mbuf.clear();
  1491. rank_t from;
  1492. unsigned timeout = 1000*60*5;
  1493. if (clusterComm->recv(mbuf,RANK_ALL,MPTAG_THORRESOURCELOCK,&from,timeout)) {
  1494. byte req;
  1495. mbuf.read(req);
  1496. if (req==1) {
  1497. if (mutex)
  1498. mutex->enter();
  1499. }
  1500. else if (req==0) {
  1501. if (mutex)
  1502. mutex->leave();
  1503. }
  1504. clusterComm->reply(mbuf,1000*60*5);
  1505. }
  1506. }
  1507. }
  1508. catch (IException *e) {
  1509. EXCLOG(e,"cMultiThorResourceMutex::run");
  1510. }
  1511. }
  1512. void stop()
  1513. {
  1514. PROGLOG("cMultiThorResourceMutex::stop enter");
  1515. stopping = true;
  1516. if (mutex)
  1517. mutex->kill();
  1518. try {
  1519. clusterComm->cancel(RANK_ALL,MPTAG_THORRESOURCELOCK);
  1520. }
  1521. catch (IException *e) {
  1522. EXCLOG(e,"cMultiThorResourceMutex::stop");
  1523. }
  1524. if (thread)
  1525. thread->join();
  1526. mutex.clear();
  1527. PROGLOG("cMultiThorResourceMutex::stop leave");
  1528. }
  1529. bool take(memsize_t tot)
  1530. {
  1531. if (stopping)
  1532. return true;
  1533. if (mutex)
  1534. return mutex->enter();
  1535. if (stopping)
  1536. return false;
  1537. CMessageBuffer mbuf;
  1538. byte req = 1;
  1539. mbuf.append(req);
  1540. try {
  1541. if (!clusterComm->sendRecv(mbuf,0,MPTAG_THORRESOURCELOCK,(unsigned)-1))
  1542. stopping = true;
  1543. }
  1544. catch (IException *e) {
  1545. EXCLOG(e,"cMultiThorResourceMutex::take");
  1546. }
  1547. return !stopping;
  1548. }
  1549. // will raise oom exception if false returned
  1550. void give(memsize_t tot)
  1551. {
  1552. if (mutex) {
  1553. mutex->leave();
  1554. return;
  1555. }
  1556. if (stopping)
  1557. return;
  1558. CMessageBuffer mbuf;
  1559. byte req = 0;
  1560. mbuf.append(req);
  1561. try {
  1562. if (!clusterComm->sendRecv(mbuf,0,MPTAG_THORRESOURCELOCK,(unsigned)-1))
  1563. stopping = true;
  1564. }
  1565. catch (IException *e) {
  1566. EXCLOG(e,"cMultiThorResourceMutex::give");
  1567. }
  1568. }
  1569. //IDaliMutexNotifyWaiting
  1570. void startWait()
  1571. {
  1572. if (status)
  1573. status->queryProperties()->setPropInt("@memoryBlocked",1);
  1574. }
  1575. void cycleWait()
  1576. {
  1577. if (status)
  1578. status->queryProperties()->setPropInt("@memoryBlocked",status->queryProperties()->getPropInt("@memoryBlocked")+1);
  1579. }
  1580. void stopWait(bool got)
  1581. {
  1582. if (status)
  1583. status->queryProperties()->setPropInt("@memoryBlocked",0);
  1584. }
  1585. };
  1586. ILargeMemLimitNotify *createMultiThorResourceMutex(const char *grpname,CSDSServerStatus *_status)
  1587. {
  1588. return new cMultiThorResourceMutex(grpname,_status);
  1589. }
  1590. class CThorAllocator : public CSimpleInterface, implements IThorAllocator, implements IRowAllocatorMetaActIdCacheCallback
  1591. {
  1592. protected:
  1593. mutable Owned<IRowAllocatorMetaActIdCache> allocatorMetaCache;
  1594. Owned<roxiemem::IRowManager> rowManager;
  1595. roxiemem::RoxieHeapFlags flags;
  1596. IContextLogger &logctx;
  1597. public:
  1598. IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
  1599. CThorAllocator(memsize_t memSize, unsigned memorySpillAt, IContextLogger &_logctx, roxiemem::RoxieHeapFlags _flags) : logctx(_logctx), flags(_flags)
  1600. {
  1601. allocatorMetaCache.setown(createRowAllocatorCache(this));
  1602. rowManager.setown(roxiemem::createRowManager(memSize, NULL, logctx, allocatorMetaCache, false));
  1603. rowManager->setMemoryLimit(memSize, 0==memorySpillAt ? 0 : memSize/100*memorySpillAt);
  1604. }
  1605. ~CThorAllocator()
  1606. {
  1607. rowManager.clear();
  1608. allocatorMetaCache.clear();
  1609. }
  1610. // roxiemem::IRowAllocatorMetaActIdCacheCallback
  1611. virtual IEngineRowAllocator *createAllocator(IOutputMetaData *meta, unsigned activityId, unsigned id) const
  1612. {
  1613. return createRoxieRowAllocator(*rowManager, meta, activityId, id, flags);
  1614. }
  1615. // IThorAllocator
  1616. virtual IEngineRowAllocator *getRowAllocator(IOutputMetaData * meta, unsigned activityId) const
  1617. {
  1618. return allocatorMetaCache->ensure(meta, activityId);
  1619. }
  1620. virtual roxiemem::IRowManager *queryRowManager() const
  1621. {
  1622. return rowManager;
  1623. }
  1624. virtual roxiemem::RoxieHeapFlags queryFlags() const { return flags; }
  1625. virtual bool queryCrc() const { return false; }
  1626. };
  1627. // derived to avoid a 'crcChecking' check per getRowAllocator only
  1628. class CThorCrcCheckingAllocator : public CThorAllocator
  1629. {
  1630. public:
  1631. CThorCrcCheckingAllocator(memsize_t memSize, unsigned memorySpillAt, IContextLogger &logctx, roxiemem::RoxieHeapFlags flags) : CThorAllocator(memSize, memorySpillAt, logctx, flags)
  1632. {
  1633. }
  1634. // IThorAllocator
  1635. virtual bool queryCrc() const { return true; }
  1636. // roxiemem::IRowAllocatorMetaActIdCacheCallback
  1637. virtual IEngineRowAllocator *createAllocator(IOutputMetaData *meta, unsigned activityId, unsigned cacheId) const
  1638. {
  1639. return createCrcRoxieRowAllocator(*rowManager, meta, activityId, cacheId, flags);
  1640. }
  1641. };
  1642. IThorAllocator *createThorAllocator(memsize_t memSize, unsigned memorySpillAt, IContextLogger &logctx, bool crcChecking, bool usePacked)
  1643. {
  1644. PROGLOG("CRC allocator %s", crcChecking?"ON":"OFF");
  1645. PROGLOG("Packed allocator %s", usePacked?"ON":"OFF");
  1646. roxiemem::RoxieHeapFlags flags;
  1647. if (usePacked)
  1648. flags = roxiemem::RHFpacked;
  1649. else
  1650. flags = roxiemem::RHFnone;
  1651. if (crcChecking)
  1652. return new CThorCrcCheckingAllocator(memSize, memorySpillAt, logctx, flags);
  1653. else
  1654. return new CThorAllocator(memSize, memorySpillAt, logctx, flags);
  1655. }
  1656. #define OUTPUTMETACHILDROW_VERSION 2 // for now, it's only significant that non-zero
  1657. class COutputMetaWithChildRow : public CSimpleInterface, implements IOutputMetaData
  1658. {
  1659. Linked<IEngineRowAllocator> childAllocator;
  1660. IOutputMetaData *childMeta;
  1661. size32_t extraSz;
  1662. Owned<IOutputRowSerializer> diskSerializer;
  1663. Owned<IOutputRowDeserializer> diskDeserializer;
  1664. Owned<IOutputRowSerializer> internalSerializer;
  1665. Owned<IOutputRowDeserializer> internalDeserializer;
  1666. Owned<ISourceRowPrefetcher> prefetcher;
  1667. class CSerializer : public CSimpleInterface, implements IOutputRowSerializer
  1668. {
  1669. Owned<IOutputRowSerializer> childSerializer;
  1670. size32_t extraSz;
  1671. public:
  1672. IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
  1673. CSerializer(IOutputRowSerializer *_childSerializer, size32_t _extraSz) : childSerializer(_childSerializer), extraSz(_extraSz)
  1674. {
  1675. }
  1676. virtual void serialize(IRowSerializerTarget &out, const byte *self)
  1677. {
  1678. out.put(extraSz, self);
  1679. const byte *childRow = *(const byte **)(self+extraSz);
  1680. if (childRow)
  1681. {
  1682. byte b=1;
  1683. out.put(1, &b);
  1684. childSerializer->serialize(out, childRow);
  1685. }
  1686. else
  1687. {
  1688. byte b=0;
  1689. out.put(1, &b);
  1690. }
  1691. }
  1692. };
  1693. class CDeserializer : public CSimpleInterface, implements IOutputRowDeserializer
  1694. {
  1695. Owned<IOutputRowDeserializer> childDeserializer;
  1696. Linked<IEngineRowAllocator> childAllocator;
  1697. size32_t extraSz;
  1698. public:
  1699. IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
  1700. CDeserializer(IOutputRowDeserializer *_childDeserializer, IEngineRowAllocator *_childAllocator, size32_t _extraSz) : childDeserializer(_childDeserializer), childAllocator(_childAllocator), extraSz(_extraSz)
  1701. {
  1702. }
  1703. virtual size32_t deserialize(ARowBuilder & rowBuilder, IRowDeserializerSource &in)
  1704. {
  1705. byte * self = rowBuilder.getSelf();
  1706. in.read(extraSz, self);
  1707. byte b;
  1708. in.read(1, &b);
  1709. const void *fChildRow;
  1710. if (b)
  1711. {
  1712. RtlDynamicRowBuilder childBuilder(childAllocator);
  1713. size32_t sz = childDeserializer->deserialize(childBuilder, in);
  1714. fChildRow = childBuilder.finalizeRowClear(sz);
  1715. }
  1716. else
  1717. fChildRow = NULL;
  1718. memcpy(self+extraSz, &fChildRow, sizeof(const void *));
  1719. return extraSz + sizeof(const void *);
  1720. }
  1721. };
  1722. class CPrefetcher : public CSimpleInterface, implements ISourceRowPrefetcher
  1723. {
  1724. Owned<ISourceRowPrefetcher> childPrefetcher;
  1725. size32_t extraSz;
  1726. public:
  1727. IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
  1728. CPrefetcher(ISourceRowPrefetcher *_childPrefetcher, size32_t _extraSz) : childPrefetcher(_childPrefetcher), extraSz(_extraSz)
  1729. {
  1730. }
  1731. virtual void readAhead(IRowDeserializerSource &in)
  1732. {
  1733. in.skip(extraSz);
  1734. byte b;
  1735. in.read(1, &b);
  1736. if (b)
  1737. childPrefetcher->readAhead(in);
  1738. }
  1739. };
  1740. public:
  1741. IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
  1742. COutputMetaWithChildRow(IEngineRowAllocator *_childAllocator, size32_t _extraSz) : childAllocator(_childAllocator), extraSz(_extraSz)
  1743. {
  1744. childMeta = childAllocator->queryOutputMeta();
  1745. }
  1746. virtual size32_t getRecordSize(const void *) { return extraSz + sizeof(const void *); }
  1747. virtual size32_t getMinRecordSize() const { return extraSz + sizeof(const void *); }
  1748. virtual size32_t getFixedSize() const { return extraSz + sizeof(const void *); }
  1749. virtual void toXML(const byte * self, IXmlWriter & out)
  1750. {
  1751. // ignoring xml'ing extra
  1752. //GH: I think this is what it should do
  1753. childMeta->toXML(*(const byte **)(self+extraSz), out);
  1754. }
  1755. virtual unsigned getVersion() const { return OUTPUTMETACHILDROW_VERSION; }
  1756. //The following can only be called if getMetaDataVersion >= 1, may seh otherwise. Creating a different interface was too painful
  1757. virtual unsigned getMetaFlags() { return MDFneeddestruct|childMeta->getMetaFlags(); }
  1758. virtual void destruct(byte * self)
  1759. {
  1760. OwnedConstThorRow childRow = *(const void **)(self+extraSz);
  1761. }
  1762. virtual IOutputRowSerializer * createDiskSerializer(ICodeContext * ctx, unsigned activityId)
  1763. {
  1764. if (!diskSerializer)
  1765. diskSerializer.setown(new CSerializer(childMeta->createDiskSerializer(ctx, activityId), extraSz));
  1766. return LINK(diskSerializer);
  1767. }
  1768. virtual IOutputRowDeserializer * createDiskDeserializer(ICodeContext * ctx, unsigned activityId)
  1769. {
  1770. if (!diskDeserializer)
  1771. diskDeserializer.setown(new CDeserializer(childMeta->createDiskDeserializer(ctx, activityId), childAllocator, extraSz));
  1772. return LINK(diskDeserializer);
  1773. }
  1774. virtual ISourceRowPrefetcher * createDiskPrefetcher(ICodeContext * ctx, unsigned activityId)
  1775. {
  1776. if (!prefetcher)
  1777. prefetcher.setown(new CPrefetcher(childMeta->createDiskPrefetcher(ctx, activityId), extraSz));
  1778. return LINK(prefetcher);
  1779. }
  1780. virtual IOutputMetaData * querySerializedDiskMeta() { return this; }
  1781. virtual IOutputRowSerializer * createInternalSerializer(ICodeContext * ctx, unsigned activityId)
  1782. {
  1783. if (!internalSerializer)
  1784. internalSerializer.setown(new CSerializer(childMeta->createInternalSerializer(ctx, activityId), extraSz));
  1785. return LINK(internalSerializer);
  1786. }
  1787. virtual IOutputRowDeserializer * createInternalDeserializer(ICodeContext * ctx, unsigned activityId)
  1788. {
  1789. if (!internalDeserializer)
  1790. internalDeserializer.setown(new CDeserializer(childMeta->createInternalDeserializer(ctx, activityId), childAllocator, extraSz));
  1791. return LINK(internalDeserializer);
  1792. }
  1793. virtual void walkIndirectMembers(const byte * self, IIndirectMemberVisitor & visitor)
  1794. {
  1795. //GH: I think this is what it should do, please check
  1796. visitor.visitRow(*(const byte **)(self+extraSz));
  1797. }
  1798. };
  1799. IOutputMetaData *createOutputMetaDataWithChildRow(IEngineRowAllocator *childAllocator, size32_t extraSz)
  1800. {
  1801. return new COutputMetaWithChildRow(childAllocator, extraSz);
  1802. }
  1803. class COutputMetaWithExtra : public CSimpleInterface, implements IOutputMetaData
  1804. {
  1805. Linked<IOutputMetaData> meta;
  1806. size32_t metaSz;
  1807. Owned<IOutputRowSerializer> diskSerializer;
  1808. Owned<IOutputRowDeserializer> diskDeserializer;
  1809. Owned<IOutputRowSerializer> internalSerializer;
  1810. Owned<IOutputRowDeserializer> internalDeserializer;
  1811. Owned<ISourceRowPrefetcher> prefetcher;
  1812. Owned<IOutputMetaData> serializedmeta;
  1813. class CSerializer : public CSimpleInterface, implements IOutputRowSerializer
  1814. {
  1815. Owned<IOutputRowSerializer> serializer;
  1816. size32_t metaSz;
  1817. public:
  1818. IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
  1819. CSerializer(IOutputRowSerializer *_serializer, size32_t _metaSz) : serializer(_serializer), metaSz(_metaSz)
  1820. {
  1821. }
  1822. virtual void serialize(IRowSerializerTarget &out, const byte *self)
  1823. {
  1824. out.put(metaSz, self);
  1825. serializer->serialize(out, self+metaSz);
  1826. }
  1827. };
  1828. //GH - This code is the same as CPrefixedRowDeserializer
  1829. class CDeserializer : public CSimpleInterface, implements IOutputRowDeserializer
  1830. {
  1831. Owned<IOutputRowDeserializer> deserializer;
  1832. size32_t metaSz;
  1833. public:
  1834. IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
  1835. CDeserializer(IOutputRowDeserializer *_deserializer, size32_t _metaSz) : deserializer(_deserializer), metaSz(_metaSz)
  1836. {
  1837. }
  1838. virtual size32_t deserialize(ARowBuilder & rowBuilder, IRowDeserializerSource &in)
  1839. {
  1840. in.read(metaSz, rowBuilder.getSelf());
  1841. CPrefixedRowBuilder prefixedBuilder(metaSz, rowBuilder);
  1842. size32_t sz = deserializer->deserialize(prefixedBuilder, in);
  1843. return sz+metaSz;
  1844. }
  1845. };
  1846. class CPrefetcher : public CSimpleInterface, implements ISourceRowPrefetcher
  1847. {
  1848. Owned<ISourceRowPrefetcher> childPrefetcher;
  1849. size32_t metaSz;
  1850. public:
  1851. IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
  1852. CPrefetcher(ISourceRowPrefetcher *_childPrefetcher, size32_t _metaSz) : childPrefetcher(_childPrefetcher), metaSz(_metaSz)
  1853. {
  1854. }
  1855. virtual void readAhead(IRowDeserializerSource &in)
  1856. {
  1857. in.skip(metaSz);
  1858. childPrefetcher->readAhead(in);
  1859. }
  1860. };
  1861. public:
  1862. IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
  1863. COutputMetaWithExtra(IOutputMetaData *_meta, size32_t _metaSz) : meta(_meta), metaSz(_metaSz)
  1864. {
  1865. }
  1866. virtual size32_t getRecordSize(const void *rec)
  1867. {
  1868. size32_t sz = meta->getRecordSize(rec?((byte *)rec)+metaSz:NULL);
  1869. return sz+metaSz;
  1870. }
  1871. virtual size32_t getMinRecordSize() const
  1872. {
  1873. return meta->getMinRecordSize() + metaSz;
  1874. }
  1875. virtual size32_t getFixedSize() const
  1876. {
  1877. size32_t sz = meta->getFixedSize();
  1878. if (!sz)
  1879. return 0;
  1880. return sz+metaSz;
  1881. }
  1882. virtual void toXML(const byte * self, IXmlWriter & out) { meta->toXML(self, out); }
  1883. virtual unsigned getVersion() const { return meta->getVersion(); }
  1884. //The following can only be called if getMetaDataVersion >= 1, may seh otherwise. Creating a different interface was too painful
  1885. virtual unsigned getMetaFlags() { return meta->getMetaFlags(); }
  1886. virtual void destruct(byte * self) { meta->destruct(self); }
  1887. virtual IOutputRowSerializer * createDiskSerializer(ICodeContext * ctx, unsigned activityId)
  1888. {
  1889. if (!diskSerializer)
  1890. diskSerializer.setown(new CSerializer(meta->createDiskSerializer(ctx, activityId), metaSz));
  1891. return LINK(diskSerializer);
  1892. }
  1893. virtual IOutputRowDeserializer * createDiskDeserializer(ICodeContext * ctx, unsigned activityId)
  1894. {
  1895. if (!diskDeserializer)
  1896. diskDeserializer.setown(new CDeserializer(meta->createDiskDeserializer(ctx, activityId), metaSz));
  1897. return LINK(diskDeserializer);
  1898. }
  1899. virtual ISourceRowPrefetcher * createDiskPrefetcher(ICodeContext * ctx, unsigned activityId)
  1900. {
  1901. if (!prefetcher)
  1902. prefetcher.setown(new CPrefetcher(meta->createDiskPrefetcher(ctx, activityId), metaSz));
  1903. return LINK(prefetcher);
  1904. }
  1905. virtual IOutputMetaData * querySerializedDiskMeta()
  1906. {
  1907. IOutputMetaData *sm = meta->querySerializedDiskMeta();
  1908. if (sm==meta.get())
  1909. return this;
  1910. if (!serializedmeta.get())
  1911. serializedmeta.setown(new COutputMetaWithExtra(sm,metaSz));
  1912. return serializedmeta.get();
  1913. }
  1914. virtual IOutputRowSerializer * createInternalSerializer(ICodeContext * ctx, unsigned activityId)
  1915. {
  1916. if (!internalSerializer)
  1917. internalSerializer.setown(new CSerializer(meta->createInternalSerializer(ctx, activityId), metaSz));
  1918. return LINK(internalSerializer);
  1919. }
  1920. virtual IOutputRowDeserializer * createInternalDeserializer(ICodeContext * ctx, unsigned activityId)
  1921. {
  1922. if (!internalDeserializer)
  1923. internalDeserializer.setown(new CDeserializer(meta->createInternalDeserializer(ctx, activityId), metaSz));
  1924. return LINK(internalDeserializer);
  1925. }
  1926. virtual void walkIndirectMembers(const byte * self, IIndirectMemberVisitor & visitor)
  1927. {
  1928. meta->walkIndirectMembers(self, visitor);
  1929. }
  1930. };
  1931. IOutputMetaData *createOutputMetaDataWithExtra(IOutputMetaData *meta, size32_t sz)
  1932. {
  1933. return new COutputMetaWithExtra(meta, sz);
  1934. }
  1935. IPerfMonHook *createThorMemStatsPerfMonHook(IPerfMonHook *chain)
  1936. {
  1937. return LINK(chain);
  1938. }