thmem.cpp 77 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "platform.h"
  14. #include "jmisc.hpp"
  15. #include "jio.hpp"
  16. #include "jsort.hpp"
  17. #include "jsorta.hpp"
  18. #include "jvmem.hpp"
  19. #include "jflz.hpp"
  20. #include "thbufdef.hpp"
  21. #include "thor.hpp"
  22. #include "thormisc.hpp"
  23. #include "eclhelper.hpp"
  24. #include "dautils.hpp"
  25. #include "daclient.hpp"
  26. #define NO_BWD_COMPAT_MAXSIZE
  27. #include "thorcommon.ipp"
  28. #include "eclrtl.hpp"
  29. #include "roxiemem.hpp"
  30. #include "roxierow.hpp"
  31. #include "thmem.hpp"
  32. #include "thgraph.hpp"
  33. #include "thalloc.hpp"
  34. #undef ALLOCATE
  35. #undef CLONE
  36. #undef MEMACTIVITYTRACESTRING
  37. #include "thbuf.hpp"
  38. #include "thmem.hpp"
  39. #ifdef _DEBUG
  40. //#define _TESTING
  41. #define ASSERTEX(c) assertex(c)
  42. #else
  43. #define ASSERTEX(c)
  44. #endif
  45. static memsize_t MTthreshold=0;
  46. static CriticalSection MTcritsect; // held when blocked
  47. static Owned<ILargeMemLimitNotify> MTthresholdnotify;
  48. static bool MTlocked = false;
  49. void checkMultiThorMemoryThreshold(bool inc)
  50. {
  51. if (MTthresholdnotify.get()) {
  52. CriticalBlock block(MTcritsect);
  53. memsize_t used = 0; // JCSMORE - might work via callback in new scheme
  54. if (MTlocked) {
  55. if (used<MTthreshold/2) {
  56. DBGLOG("Multi Thor threshold lock released: %"I64F"d",(offset_t)used);
  57. MTlocked = false;
  58. MTthresholdnotify->give(used);
  59. }
  60. }
  61. else if (used>MTthreshold) {
  62. DBGLOG("Multi Thor threshold exceeded: %"I64F"d",(offset_t)used);
  63. if (!MTthresholdnotify->take(used)) {
  64. throw createOutOfMemException(-9,
  65. 1024, // dummy value
  66. used);
  67. }
  68. DBGLOG("Multi Thor lock taken");
  69. MTlocked = true;
  70. }
  71. }
  72. }
  73. extern graph_decl void setMultiThorMemoryNotify(size32_t size,ILargeMemLimitNotify *notify)
  74. {
  75. CriticalBlock block(MTcritsect);
  76. if (MTthresholdnotify.get()&&!notify&&MTlocked) {
  77. MTlocked = false;
  78. MTthresholdnotify->give(0);
  79. }
  80. MTthreshold = size;
  81. MTthresholdnotify.set(notify);
  82. if (notify)
  83. checkMultiThorMemoryThreshold(true);
  84. }
  85. static memsize_t largeMemSize = 0;
  86. memsize_t setLargeMemSize(unsigned limitMB)
  87. {
  88. memsize_t prevLargeMemSize = largeMemSize;
  89. largeMemSize = 1024*1024*(memsize_t)limitMB;
  90. return prevLargeMemSize;
  91. }
  92. memsize_t queryLargeMemSize()
  93. {
  94. if (0 == largeMemSize)
  95. throwUnexpected();
  96. return largeMemSize;
  97. }
  98. // =================================
  99. StringBuffer &getRecordString(const void *key, IOutputRowSerializer *serializer, const char *prefix, StringBuffer &out)
  100. {
  101. MemoryBuffer mb;
  102. const byte *k = (const byte *)key;
  103. size32_t sz = 0;
  104. if (serializer&&k) {
  105. CMemoryRowSerializer mbsz(mb);
  106. serializer->serialize(mbsz,(const byte *)k);
  107. k = (const byte *)mb.bufferBase();
  108. sz = mb.length();
  109. }
  110. if (sz)
  111. out.appendf("%s(%d): ",prefix,sz);
  112. else {
  113. out.append(prefix).append(": ");
  114. if (k)
  115. sz = 16;
  116. else
  117. out.append("NULL");
  118. }
  119. bool first=false;
  120. while (sz) {
  121. if (first)
  122. first=false;
  123. else
  124. out.append(',');
  125. if ((sz>=3)&&isprint(k[0])&&isprint(k[1])&&isprint(k[2])) {
  126. out.append('"');
  127. do {
  128. out.append(*k);
  129. sz--;
  130. if (sz==0)
  131. break;
  132. if (out.length()>1024)
  133. break;
  134. k++;
  135. } while (isprint(*k));
  136. out.append('"');
  137. }
  138. if (out.length()>1024) {
  139. out.append("...");
  140. break;
  141. }
  142. if (sz) {
  143. out.appendf("%2x",(unsigned)*k);
  144. k++;
  145. sz--;
  146. }
  147. }
  148. return out;
  149. }
  150. //====
  151. class CSpillableStreamBase : public CSimpleInterface, implements roxiemem::IBufferedRowCallback
  152. {
  153. protected:
  154. CActivityBase &activity;
  155. IRowInterfaces *rowIf;
  156. bool preserveNulls, ownsRows, useCompression;
  157. CThorSpillableRowArray rows;
  158. OwnedIFile spillFile;
  159. Owned<IRowStream> spillStream;
  160. bool spillRows()
  161. {
  162. // NB: Should always be called whilst 'rows' is locked (with CThorArrayLockBlock)
  163. rowidx_t numRows = rows.numCommitted();
  164. if (0 == numRows)
  165. return false;
  166. StringBuffer tempName;
  167. VStringBuffer tempPrefix("streamspill_%d", activity.queryActivityId());
  168. GetTempName(tempName, tempPrefix.str(), true);
  169. spillFile.setown(createIFile(tempName.str()));
  170. VStringBuffer spillPrefixStr("SpillableStream(%d)", SPILL_PRIORITY_SPILLABLE_STREAM); // const for now
  171. rows.save(*spillFile, useCompression, spillPrefixStr.str()); // saves committed rows
  172. return true;
  173. }
  174. public:
  175. IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
  176. CSpillableStreamBase(CActivityBase &_activity, CThorSpillableRowArray &inRows, IRowInterfaces *_rowIf, bool _preserveNulls)
  177. : activity(_activity), rowIf(_rowIf), rows(_activity, _rowIf, _preserveNulls), preserveNulls(_preserveNulls)
  178. {
  179. rows.swap(inRows);
  180. useCompression = false;
  181. }
  182. ~CSpillableStreamBase()
  183. {
  184. activity.queryJob().queryRowManager()->removeRowBuffer(this);
  185. spillStream.clear();
  186. if (spillFile)
  187. spillFile->remove();
  188. }
  189. // IBufferedRowCallback
  190. virtual unsigned getSpillCost() const
  191. {
  192. return SPILL_PRIORITY_SPILLABLE_STREAM;
  193. }
  194. virtual bool freeBufferedRows(bool critical)
  195. {
  196. CThorArrayLockBlock block(rows);
  197. return spillRows();
  198. }
  199. };
  200. // NB: Shared/spillable, holds all rows in mem until needs to spill.
  201. // spills all to disk, and stream continue reading from row in file
  202. class CSharedSpillableRowSet : public CSpillableStreamBase, implements IInterface
  203. {
  204. class CStream : public CSimpleInterface, implements IRowStream, implements IWritePosCallback
  205. {
  206. rowidx_t pos;
  207. offset_t outputOffset;
  208. Owned<IRowStream> spillStream;
  209. Linked<CSharedSpillableRowSet> owner;
  210. public:
  211. IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
  212. CStream(CSharedSpillableRowSet &_owner) : owner(&_owner)
  213. {
  214. pos = 0;
  215. outputOffset = (offset_t)-1;
  216. owner->rows.registerWriteCallback(*this);
  217. }
  218. ~CStream()
  219. {
  220. spillStream.clear(); // NB: clear stream 1st
  221. owner->rows.unregisterWriteCallback(*this);
  222. owner.clear();
  223. }
  224. // IRowStream
  225. virtual const void *nextRow()
  226. {
  227. if (spillStream)
  228. return spillStream->nextRow();
  229. CThorArrayLockBlock block(owner->rows);
  230. if (owner->spillFile) // i.e. has spilt
  231. {
  232. assertex(((offset_t)-1) != outputOffset);
  233. owner->rows.kill(); // no longer needed, frees pointer array
  234. unsigned rwFlags = DEFAULT_RWFLAGS;
  235. if (owner->preserveNulls)
  236. rwFlags |= rw_grouped;
  237. spillStream.setown(::createRowStreamEx(owner->spillFile, owner->rowIf, outputOffset, (offset_t)-1, (unsigned __int64)-1, rwFlags));
  238. owner->rows.unregisterWriteCallback(*this); // no longer needed
  239. return spillStream->nextRow();
  240. }
  241. else if (pos == owner->rows.numCommitted())
  242. return NULL;
  243. return owner->rows.get(pos++);
  244. }
  245. virtual void stop() { }
  246. // IWritePosCallback
  247. virtual rowidx_t queryRecordNumber()
  248. {
  249. return pos;
  250. }
  251. virtual void filePosition(offset_t pos)
  252. {
  253. // Called via spilling save, stream will continue reading from file @ pos
  254. outputOffset = pos;
  255. }
  256. };
  257. public:
  258. IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
  259. CSharedSpillableRowSet(CActivityBase &_activity, CThorSpillableRowArray &inRows, IRowInterfaces *_rowIf, bool _preserveNulls)
  260. : CSpillableStreamBase(_activity, inRows, _rowIf, _preserveNulls)
  261. {
  262. activity.queryJob().queryRowManager()->addRowBuffer(this);
  263. }
  264. IRowStream *createRowStream()
  265. {
  266. {
  267. // already spilled?
  268. CThorArrayLockBlock block(rows);
  269. if (spillFile)
  270. {
  271. unsigned rwFlags = DEFAULT_RWFLAGS;
  272. if (preserveNulls)
  273. rwFlags |= rw_grouped;
  274. return ::createRowStream(spillFile, rowIf, rwFlags);
  275. }
  276. }
  277. return new CStream(*this);
  278. }
  279. };
  280. // NB: A single unshared spillable stream
  281. class CSpillableStream : public CSpillableStreamBase, implements IRowStream
  282. {
  283. rowidx_t pos, numReadRows, granularity;
  284. const void **readRows;
  285. public:
  286. IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
  287. CSpillableStream(CActivityBase &_activity, CThorSpillableRowArray &inRows, IRowInterfaces *_rowIf, bool _preserveNulls)
  288. : CSpillableStreamBase(_activity, inRows, _rowIf, _preserveNulls)
  289. {
  290. useCompression = activity.getOptBool(THOROPT_COMPRESS_SPILLS, true);
  291. pos = numReadRows = 0;
  292. granularity = 500; // JCSMORE - rows
  293. // a small amount of rows to read from swappable rows
  294. roxiemem::IRowManager *rowManager = activity.queryJob().queryRowManager();
  295. readRows = static_cast<const void * *>(rowManager->allocate(granularity * sizeof(void*), activity.queryContainer().queryId(), inRows.queryDefaultMaxSpillCost()));
  296. activity.queryJob().queryRowManager()->addRowBuffer(this);
  297. }
  298. ~CSpillableStream()
  299. {
  300. while (pos < numReadRows)
  301. {
  302. ReleaseThorRow(readRows[pos++]);
  303. }
  304. ReleaseThorRow(readRows);
  305. }
  306. // IRowStream
  307. virtual const void *nextRow()
  308. {
  309. if (spillStream)
  310. return spillStream->nextRow();
  311. if (pos == numReadRows)
  312. {
  313. CThorArrayLockBlock block(rows);
  314. if (spillFile)
  315. {
  316. rows.kill(); // no longer needed, frees pointer array
  317. unsigned rwFlags = DEFAULT_RWFLAGS;
  318. if (preserveNulls)
  319. rwFlags |= rw_grouped;
  320. if (useCompression)
  321. rwFlags |= rw_compress;
  322. spillStream.setown(createRowStream(spillFile, rowIf, rwFlags));
  323. return spillStream->nextRow();
  324. }
  325. rowidx_t fetch = rows.numCommitted();
  326. if (0 == fetch)
  327. return NULL;
  328. if (fetch >= granularity)
  329. fetch = granularity;
  330. // consume 'fetch' rows
  331. rows.readBlock(readRows, fetch);
  332. numReadRows = fetch;
  333. pos = 0;
  334. }
  335. const void *row = readRows[pos];
  336. readRows[pos] = NULL;
  337. ++pos;
  338. return row;
  339. }
  340. virtual void stop() { }
  341. };
  342. //====
  343. class CResizeRowCallback : implements roxiemem::IRowResizeCallback
  344. {
  345. IThorArrayLock &alock;
  346. void **&rows;
  347. memsize_t &capacity;
  348. public:
  349. CResizeRowCallback(void **&_rows, memsize_t &_capacity, IThorArrayLock &_alock) : rows(_rows), capacity(_capacity), alock(_alock) { }
  350. virtual void lock() { alock.lock(); }
  351. virtual void unlock() { alock.unlock(); }
  352. virtual void update(memsize_t _capacity, void * ptr) { capacity = _capacity; rows = (void **)ptr; }
  353. virtual void atomicUpdate(memsize_t capacity, void * ptr)
  354. {
  355. CThorArrayLockBlock block(alock);
  356. update(capacity, ptr);
  357. }
  358. };
  359. //====
  360. void CThorExpandingRowArray::init(rowidx_t initialSize)
  361. {
  362. rowManager = activity.queryJob().queryRowManager();
  363. throwOnOom = false;
  364. stableTable = NULL;
  365. if (initialSize)
  366. {
  367. rows = static_cast<const void * *>(rowManager->allocate(initialSize * sizeof(void*), activity.queryContainer().queryId(), defaultMaxSpillCost));
  368. maxRows = getRowsCapacity();
  369. memset(rows, 0, maxRows * sizeof(void *));
  370. if (stableSort_earlyAlloc == stableSort)
  371. stableTable = static_cast<void **>(rowManager->allocate(maxRows * sizeof(void*), activity.queryContainer().queryId(), defaultMaxSpillCost));
  372. else
  373. stableTable = NULL;
  374. }
  375. else
  376. {
  377. rows = NULL;
  378. maxRows = 0;
  379. }
  380. numRows = 0;
  381. }
  382. const void *CThorExpandingRowArray::allocateRowTable(rowidx_t num)
  383. {
  384. return _allocateRowTable(num, defaultMaxSpillCost);
  385. }
  386. const void *CThorExpandingRowArray::allocateRowTable(rowidx_t num, unsigned maxSpillCost)
  387. {
  388. return _allocateRowTable(num, maxSpillCost);
  389. }
  390. rowidx_t CThorExpandingRowArray::getNewSize(rowidx_t requiredRows)
  391. {
  392. rowidx_t newSize = maxRows;
  393. //This condition must be <= at least 1/scaling factor below otherwise you'll get an infinite loop.
  394. if (newSize <= 4)
  395. newSize = requiredRows;
  396. else
  397. {
  398. //What algorithm should we use to increase the size? Trading memory usage against copying row pointers.
  399. // adding 50% would reduce the number of allocations.
  400. // anything below 32% would mean that blocks n,n+1 when freed have enough space for block n+3 which might
  401. // reduce fragmentation.
  402. //Use 25% for the moment. It should possibly be configurable - e.g., higher for thor global sort.
  403. while (newSize < requiredRows)
  404. newSize += newSize/4;
  405. }
  406. return newSize;
  407. }
  408. bool CThorExpandingRowArray::resizeRowTable(void **oldRows, memsize_t newCapacity, bool copy, roxiemem::IRowResizeCallback &callback, unsigned maxSpillCost)
  409. {
  410. try
  411. {
  412. if (oldRows)
  413. rowManager->resizeRow(oldRows, copy?RoxieRowCapacity(oldRows):0, newCapacity, activity.queryContainer().queryId(), maxSpillCost, callback);
  414. else
  415. {
  416. void **newRows = (void **)rowManager->allocate(newCapacity, activity.queryContainer().queryId(), maxSpillCost);
  417. callback.atomicUpdate(RoxieRowCapacity(newRows), newRows);
  418. }
  419. }
  420. catch (IException * e)
  421. {
  422. //Pathological cases - not enough memory to reallocate the target row buffer, or no contiguous pages available.
  423. unsigned code = e->errorCode();
  424. if ((code == ROXIEMM_MEMORY_LIMIT_EXCEEDED) || (code == ROXIEMM_MEMORY_POOL_EXHAUSTED))
  425. {
  426. e->Release();
  427. return false;
  428. }
  429. throw;
  430. }
  431. return true;
  432. }
  433. void CThorExpandingRowArray::doSort(rowidx_t n, void **const rows, ICompare &compare, unsigned maxCores)
  434. {
  435. // NB: will only be called if numRows>1
  436. if (stableSort_none != stableSort)
  437. {
  438. OwnedConstThorRow tmpStableTable;
  439. void **stableTablePtr;
  440. if (stableSort_lateAlloc == stableSort)
  441. {
  442. dbgassertex(NULL == stableTable);
  443. tmpStableTable.setown(rowManager->allocate(getRowsCapacity() * sizeof(void *), activity.queryContainer().queryId(), defaultMaxSpillCost));
  444. stableTablePtr = (void **)tmpStableTable.get();
  445. }
  446. else
  447. {
  448. dbgassertex(NULL != stableTable);
  449. stableTablePtr = stableTable;
  450. }
  451. void **_rows = rows;
  452. memcpy(stableTablePtr, _rows, n*sizeof(void **));
  453. parqsortvecstable(stableTablePtr, n, compare, (void ***)_rows, maxCores);
  454. while (n--)
  455. {
  456. *_rows = **((void ***)_rows);
  457. _rows++;
  458. }
  459. }
  460. else
  461. parqsortvec((void **const)rows, n, compare, maxCores);
  462. }
  463. inline const void *CThorExpandingRowArray::_allocateRowTable(rowidx_t num, unsigned maxSpillCost)
  464. {
  465. try
  466. {
  467. return rowManager->allocate(num * sizeof(void*), activity.queryContainer().queryId(), maxSpillCost);
  468. }
  469. catch (IException * e)
  470. {
  471. unsigned code = e->errorCode();
  472. if ((code == ROXIEMM_MEMORY_LIMIT_EXCEEDED) || (code == ROXIEMM_MEMORY_POOL_EXHAUSTED))
  473. {
  474. e->Release();
  475. return NULL;
  476. }
  477. throw;
  478. }
  479. }
  480. inline bool CThorExpandingRowArray::_ensure(rowidx_t requiredRows, unsigned maxSpillCost)
  481. {
  482. //Only the writer is allowed to reallocate rows (otherwise append can't be optimized), so rows is valid outside the lock
  483. if (0 == requiredRows)
  484. return true;
  485. // NB: only ensure alters row capacity, so no locking required to protect getRowsCapacity()
  486. memsize_t capacity = rows ? RoxieRowCapacity(rows) : 0;
  487. rowidx_t currentMaxRows = getRowsCapacity();
  488. if (currentMaxRows < requiredRows) // check, because may have expanded previously, but failed to allocate stableTable and set new maxRows
  489. {
  490. capacity = ((memsize_t)getNewSize(requiredRows)) * sizeof(void *);
  491. CResizeRowCallback callback(*(void ***)(&rows), capacity, queryLock());
  492. if (!resizeRowTable((void **)rows, capacity, true, callback, maxSpillCost)) // callback will reset capacity
  493. {
  494. if (throwOnOom)
  495. throw MakeActivityException(&activity, 0, "Out of memory, allocating row array, had %"RIPF"d, trying to allocate %"RIPF"d elements", ordinality(), requiredRows);
  496. return false;
  497. }
  498. }
  499. if (stableSort_earlyAlloc == stableSort)
  500. {
  501. memsize_t dummy;
  502. CResizeRowCallback callback(stableTable, dummy, queryLock());
  503. if (!resizeRowTable(stableTable, capacity, false, callback, maxSpillCost))
  504. {
  505. if (throwOnOom)
  506. throw MakeActivityException(&activity, 0, "Out of memory, resizing stable row array, trying to allocate %"RIPF"d elements", currentMaxRows);
  507. return false;
  508. }
  509. // NB: If allocation of stableTable fails, 'rows' has expanded, but maxRows has not
  510. // this means, that on a subsequent ensure() call, it will only need to [attempt] to resize the stable ptr array.
  511. // (see comment if (currentMaxRows < requiredRows) check above
  512. }
  513. // Both row tables updated, only now update maxRows
  514. CThorArrayLockBlock block(queryLock());
  515. maxRows = capacity / sizeof(void *);
  516. return true;
  517. }
  518. CThorExpandingRowArray::CThorExpandingRowArray(CActivityBase &_activity, IRowInterfaces *_rowIf, bool _allowNulls, StableSortFlag _stableSort, bool _throwOnOom, rowidx_t initialSize)
  519. : activity(_activity)
  520. {
  521. setup(_rowIf, _allowNulls, _stableSort, _throwOnOom);
  522. setDefaultMaxSpillCost(roxiemem::SpillAllCost);
  523. init(initialSize);
  524. }
  525. CThorExpandingRowArray::~CThorExpandingRowArray()
  526. {
  527. clearRows();
  528. ReleaseThorRow(rows);
  529. ReleaseThorRow(stableTable);
  530. }
  531. void CThorExpandingRowArray::setup(IRowInterfaces *_rowIf, bool _allowNulls, StableSortFlag _stableSort, bool _throwOnOom)
  532. {
  533. rowIf = _rowIf;
  534. allowNulls = _allowNulls;
  535. stableSort = _stableSort;
  536. throwOnOom = _throwOnOom;
  537. if (rowIf)
  538. {
  539. allocator = rowIf->queryRowAllocator();
  540. deserializer = rowIf->queryRowDeserializer();
  541. serializer = rowIf->queryRowSerializer();
  542. }
  543. else
  544. {
  545. allocator = NULL;
  546. deserializer = NULL;
  547. serializer = NULL;
  548. }
  549. }
  550. void CThorExpandingRowArray::clearRows()
  551. {
  552. for (rowidx_t i = 0; i < numRows; i++)
  553. ReleaseThorRow(rows[i]);
  554. numRows = 0;
  555. }
  556. void CThorExpandingRowArray::compact()
  557. {
  558. const void **freeFinger = rows;
  559. const void **filledFinger = NULL;
  560. const void **rowEnd = rows+numRows;
  561. //skip any leading filled in entries.
  562. while (freeFinger != rowEnd && *freeFinger)
  563. freeFinger++;
  564. // move any subsequent filled in entries.
  565. for (filledFinger = freeFinger; filledFinger != rowEnd; filledFinger++)
  566. {
  567. if (*filledFinger)
  568. {
  569. *freeFinger++ = *filledFinger;
  570. *filledFinger = NULL;
  571. }
  572. }
  573. numRows = freeFinger-rows;
  574. }
  575. void CThorExpandingRowArray::kill()
  576. {
  577. clearRows();
  578. maxRows = 0;
  579. ReleaseThorRow(rows);
  580. ReleaseThorRow(stableTable);
  581. rows = NULL;
  582. stableTable = NULL;
  583. }
  584. void CThorExpandingRowArray::swap(CThorExpandingRowArray &other)
  585. {
  586. roxiemem::IRowManager *otherRowManager = other.rowManager;
  587. IRowInterfaces *otherRowIf = other.rowIf;
  588. const void **otherRows = other.rows;
  589. void **otherStableTable = other.stableTable;
  590. bool otherAllowNulls = other.allowNulls;
  591. StableSortFlag otherStableSort = other.stableSort;
  592. bool otherThrowOnOom = other.throwOnOom;
  593. unsigned otherDefaultMaxSpillCost = other.defaultMaxSpillCost;
  594. rowidx_t otherMaxRows = other.maxRows;
  595. rowidx_t otherNumRows = other.numRows;
  596. other.rowManager = rowManager;
  597. other.setup(rowIf, allowNulls, stableSort, throwOnOom);
  598. other.setDefaultMaxSpillCost(defaultMaxSpillCost);
  599. other.rows = rows;
  600. other.stableTable = stableTable;
  601. other.maxRows = maxRows;
  602. other.numRows = numRows;
  603. rowManager = otherRowManager;
  604. setup(otherRowIf, otherAllowNulls, otherStableSort, otherThrowOnOom);
  605. setDefaultMaxSpillCost(otherDefaultMaxSpillCost);
  606. rows = otherRows;
  607. stableTable = otherStableTable;
  608. maxRows = otherMaxRows;
  609. numRows = otherNumRows;
  610. }
  611. void CThorExpandingRowArray::transferRows(rowidx_t & outNumRows, const void * * & outRows)
  612. {
  613. outNumRows = numRows;
  614. outRows = rows;
  615. numRows = 0;
  616. maxRows = 0;
  617. rows = NULL;
  618. ReleaseThorRow(stableTable);
  619. stableTable = NULL;
  620. }
  621. void CThorExpandingRowArray::transferRowsCopy(const void **outRows, bool takeOwnership)
  622. {
  623. if (0 == numRows)
  624. return;
  625. memcpy(outRows, rows, numRows*sizeof(void **));
  626. if (takeOwnership)
  627. numRows = 0;
  628. else
  629. {
  630. const void **lastNewRow = outRows+numRows-1;
  631. loop
  632. {
  633. LinkThorRow(*outRows);
  634. if (outRows == lastNewRow)
  635. break;
  636. outRows++;
  637. }
  638. }
  639. }
  640. void CThorExpandingRowArray::transferFrom(CThorExpandingRowArray &donor)
  641. {
  642. kill();
  643. donor.transferRows(numRows, rows);
  644. maxRows = numRows;
  645. if (maxRows && (stableSort_earlyAlloc == stableSort))
  646. ensure(maxRows);
  647. }
  648. void CThorExpandingRowArray::transferFrom(CThorSpillableRowArray &donor)
  649. {
  650. transferFrom((CThorExpandingRowArray &)donor);
  651. }
  652. void CThorExpandingRowArray::removeRows(rowidx_t start, rowidx_t n)
  653. {
  654. assertex(numRows-start >= n);
  655. assertex(!n || rows);
  656. if (rows)
  657. {
  658. rowidx_t end = start+n;
  659. for (rowidx_t i = start; i < end; i++)
  660. ReleaseThorRow(rows[i]);
  661. //firstRow = 0;
  662. const void **from = rows+start;
  663. memmove(from, from+n, (numRows-end) * sizeof(void *));
  664. numRows -= n;
  665. }
  666. }
  667. bool CThorExpandingRowArray::appendRows(CThorExpandingRowArray &inRows, bool takeOwnership)
  668. {
  669. rowidx_t num = inRows.ordinality();
  670. if (0 == num)
  671. return true;
  672. if (numRows+num >= maxRows)
  673. {
  674. if (!ensure(numRows + num))
  675. return false;
  676. }
  677. const void **newRows = rows+numRows;
  678. inRows.transferRowsCopy(newRows, takeOwnership);
  679. numRows += num;
  680. return true;
  681. }
  682. bool CThorExpandingRowArray::appendRows(CThorSpillableRowArray &inRows, bool takeOwnership)
  683. {
  684. rowidx_t num = inRows.numCommitted();
  685. if (0 == num)
  686. return true;
  687. if (numRows+num >= maxRows)
  688. {
  689. if (!ensure(numRows + num))
  690. return false;
  691. }
  692. const void **newRows = rows+numRows;
  693. inRows.transferRowsCopy(newRows, takeOwnership);
  694. numRows += num;
  695. return true;
  696. }
  697. bool CThorExpandingRowArray::binaryInsert(const void *row, ICompare &compare, bool dropLast)
  698. {
  699. dbgassertex(NULL != row);
  700. if (numRows >= maxRows)
  701. {
  702. if (!ensure(numRows+1))
  703. return false;
  704. }
  705. binary_vec_insert_stable(row, rows, numRows, compare); // takes ownership of row
  706. if (dropLast)
  707. {
  708. // last row falls out, i.e. release last row and don't increment numRows
  709. dbgassertex(numRows); // numRows must be >=1 for dropLast
  710. ReleaseThorRow(rows[numRows]);
  711. }
  712. else
  713. ++numRows;
  714. return true;
  715. }
  716. void CThorExpandingRowArray::clearUnused()
  717. {
  718. if (rows)
  719. memset(rows+numRows, 0, (maxRows-numRows) * sizeof(void *));
  720. }
  721. bool CThorExpandingRowArray::ensure(rowidx_t requiredRows)
  722. {
  723. return _ensure(requiredRows, defaultMaxSpillCost);
  724. }
  725. bool CThorExpandingRowArray::ensure(rowidx_t requiredRows, unsigned maxSpillCost)
  726. {
  727. return _ensure(requiredRows, maxSpillCost);
  728. }
  729. void CThorExpandingRowArray::sort(ICompare &compare, unsigned maxCores)
  730. {
  731. if (numRows>1)
  732. doSort(numRows, (void **const)rows, compare, maxCores);
  733. }
  734. void CThorExpandingRowArray::reorder(rowidx_t start, rowidx_t num, rowidx_t *neworder)
  735. {
  736. if (start>=numRows)
  737. return;
  738. if (start+num>numRows)
  739. num = numRows-start;
  740. if (!num)
  741. return;
  742. MemoryAttr ma;
  743. void **tmp = (void **)ma.allocate(num*sizeof(void *));
  744. const void **p = rows + start;
  745. memcpy(tmp, p, num*sizeof(void *));
  746. for (rowidx_t i=0; i<num; i++)
  747. p[i] = tmp[neworder[i]];
  748. }
  749. bool CThorExpandingRowArray::equal(ICompare *icmp, CThorExpandingRowArray &other)
  750. {
  751. // slow but better than prev!
  752. rowidx_t n = other.ordinality();
  753. if (n!=ordinality())
  754. return false;
  755. for (rowidx_t i=0;i<n;i++)
  756. {
  757. const void *p1 = rows[i];
  758. const void *p2 = other.query(i);
  759. if (0 != icmp->docompare(p1, p2))
  760. return false;
  761. }
  762. return true;
  763. }
  764. bool CThorExpandingRowArray::checkSorted(ICompare *icmp)
  765. {
  766. rowidx_t n=ordinality();
  767. for (rowidx_t i=1; i<n; i++)
  768. {
  769. if (icmp->docompare(rows[i-1], rows[i])>0)
  770. return false;
  771. }
  772. return true;
  773. }
  774. IRowStream *CThorExpandingRowArray::createRowStream(rowidx_t start, rowidx_t num, bool streamOwns)
  775. {
  776. class CRowOwningStream : public CSimpleInterface, implements IRowStream
  777. {
  778. CThorExpandingRowArray rows;
  779. rowidx_t pos, lastRow;
  780. public:
  781. IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
  782. CRowOwningStream(CThorExpandingRowArray &_rows, rowidx_t firstRow, rowidx_t _lastRow)
  783. : pos(firstRow), lastRow(_lastRow), rows(_rows.queryActivity(), NULL)
  784. {
  785. rows.swap(_rows);
  786. }
  787. // IRowStream
  788. virtual const void *nextRow()
  789. {
  790. if (pos >= lastRow)
  791. {
  792. rows.kill();
  793. return NULL;
  794. }
  795. return rows.getClear(pos++);
  796. }
  797. virtual void stop()
  798. {
  799. rows.kill();
  800. }
  801. };
  802. class CStream : public CSimpleInterface, implements IRowStream
  803. {
  804. CThorExpandingRowArray *parent;
  805. rowidx_t pos, lastRow;
  806. public:
  807. IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
  808. CStream(CThorExpandingRowArray &_parent, rowidx_t firstRow, rowidx_t _lastRow)
  809. : pos(firstRow), lastRow(_lastRow)
  810. {
  811. parent = &_parent;
  812. }
  813. // IRowStream
  814. virtual const void *nextRow()
  815. {
  816. if (pos >= lastRow)
  817. return NULL;
  818. return parent->get(pos++);
  819. }
  820. virtual void stop() { }
  821. };
  822. dbgassertex(!streamOwns || ((0 == start) && ((rowidx_t)-1 == num)));
  823. if (start>ordinality())
  824. start = ordinality();
  825. rowidx_t lastRow;
  826. if ((num==(rowidx_t)-1)||(start+num>ordinality()))
  827. lastRow = ordinality();
  828. else
  829. lastRow = start+num;
  830. if (streamOwns)
  831. return new CRowOwningStream(*this, start, lastRow);
  832. else
  833. return new CStream(*this, start, lastRow);
  834. }
  835. void CThorExpandingRowArray::partition(ICompare &compare, unsigned num, UnsignedArray &out)
  836. {
  837. rowidx_t p=0;
  838. rowidx_t n = ordinality();
  839. while (num)
  840. {
  841. out.append(p);
  842. if (p<n)
  843. {
  844. rowidx_t q = p+(n-p)/num;
  845. if (p==q) // skip to next group
  846. {
  847. while (q<n)
  848. {
  849. q++;
  850. if ((q<n)&&(compare.docompare(rows[p],rows[q])!=0)) // ensure at next group
  851. break;
  852. }
  853. }
  854. else
  855. {
  856. while ((q<n)&&(q!=p)&&(compare.docompare(rows[q-1],rows[q])==0)) // ensure at start of group
  857. q--;
  858. }
  859. p = q;
  860. }
  861. num--;
  862. }
  863. out.append(n);
  864. }
  865. offset_t CThorExpandingRowArray::serializedSize()
  866. {
  867. IOutputMetaData *meta = rowIf->queryRowMetaData();
  868. IOutputMetaData *diskMeta = meta->querySerializedDiskMeta();
  869. rowidx_t c = ordinality();
  870. offset_t total = 0;
  871. if (diskMeta->isFixedSize())
  872. total = c * diskMeta->getFixedSize();
  873. else
  874. {
  875. Owned<IOutputRowSerializer> diskSerializer = diskMeta->createDiskSerializer(rowIf->queryCodeContext(), rowIf->queryActivityId());
  876. CSizingSerializer ssz;
  877. for (rowidx_t i=0; i<c; i++)
  878. {
  879. diskSerializer->serialize(ssz, (const byte *)rows[i]);
  880. total += ssz.size();
  881. ssz.reset();
  882. }
  883. }
  884. return total;
  885. }
  886. memsize_t CThorExpandingRowArray::getMemUsage()
  887. {
  888. roxiemem::IRowManager *rM = activity.queryJob().queryRowManager();
  889. IOutputMetaData *meta = rowIf->queryRowMetaData();
  890. IOutputMetaData *diskMeta = meta->querySerializedDiskMeta(); // GH->JCS - really I want a internalMeta here.
  891. rowidx_t c = ordinality();
  892. memsize_t total = 0;
  893. if (diskMeta->isFixedSize())
  894. total = c * rM->getExpectedFootprint(diskMeta->getFixedSize(), 0);
  895. else
  896. {
  897. CSizingSerializer ssz;
  898. for (rowidx_t i=0; i<c; i++)
  899. {
  900. serializer->serialize(ssz, (const byte *)rows[i]);
  901. total += rM->getExpectedFootprint(ssz.size(), 0);
  902. ssz.reset();
  903. }
  904. }
  905. // NB: worst case, when expanding (see ensure method)
  906. memsize_t sz = rM->getExpectedFootprint(maxRows * sizeof(void *), 0);
  907. memsize_t szE = sz / 100 * 125; // don't care if sz v. small
  908. if (stableSort_none == stableSort)
  909. total += sz + szE;
  910. else
  911. total += sz + szE * 2;
  912. return total;
  913. }
  914. void CThorExpandingRowArray::serialize(IRowSerializerTarget &out)
  915. {
  916. bool warnnull = true;
  917. assertex(serializer);
  918. rowidx_t n = ordinality();
  919. if (n)
  920. {
  921. for (rowidx_t i = 0; i < n; i++)
  922. {
  923. const void *row = query(i);
  924. if (row)
  925. serializer->serialize(out, (const byte *)row);
  926. else if (warnnull)
  927. {
  928. WARNLOG("CThorExpandingRowArray::serialize ignoring NULL row");
  929. warnnull = false;
  930. }
  931. }
  932. }
  933. }
  934. void CThorExpandingRowArray::serialize(MemoryBuffer &mb)
  935. {
  936. assertex(serializer);
  937. CMemoryRowSerializer s(mb);
  938. if (!allowNulls)
  939. serialize(s);
  940. else
  941. {
  942. unsigned short guard = 0x7631;
  943. mb.append(guard);
  944. rowidx_t n = ordinality();
  945. if (n)
  946. {
  947. for (rowidx_t i = 0; i < n; i++)
  948. {
  949. const void *row = query(i);
  950. bool isnull = (row==NULL);
  951. mb.append(isnull);
  952. if (!isnull)
  953. serializer->serialize(s, (const byte *)row);
  954. }
  955. }
  956. }
  957. }
  958. void CThorExpandingRowArray::serializeCompress(MemoryBuffer &mb)
  959. {
  960. MemoryBuffer exp;
  961. serialize(exp);
  962. fastLZCompressToBuffer(mb,exp.length(), exp.toByteArray());
  963. }
  964. rowidx_t CThorExpandingRowArray::serializeBlock(MemoryBuffer &mb, rowidx_t idx, rowidx_t count, size32_t dstmax, bool hardMax)
  965. {
  966. assertex(serializer);
  967. CMemoryRowSerializer out(mb);
  968. bool warnnull = true;
  969. rowidx_t num=ordinality();
  970. if (idx>=num)
  971. return 0;
  972. if (num-idx<count)
  973. count = num-idx;
  974. rowidx_t ret = 0;
  975. for (rowidx_t i=0;i<count;i++)
  976. {
  977. size32_t ln = mb.length();
  978. const void *row = query(i+idx);
  979. if (row)
  980. serializer->serialize(out,(const byte *)row);
  981. else if (warnnull)
  982. {
  983. WARNLOG("CThorExpandingRowArray::serialize ignoring NULL row");
  984. warnnull = false;
  985. }
  986. // allows at least one
  987. if (mb.length()>dstmax)
  988. {
  989. if (hardMax && ln) // remove last if above limit
  990. mb.setLength(ln);
  991. else
  992. ++ret;
  993. break;
  994. }
  995. else
  996. ++ret;
  997. }
  998. return ret;
  999. }
  1000. void CThorExpandingRowArray::deserializeRow(IRowDeserializerSource &in)
  1001. {
  1002. RtlDynamicRowBuilder rowBuilder(allocator);
  1003. size32_t sz = deserializer->deserialize(rowBuilder,in);
  1004. append(rowBuilder.finalizeRowClear(sz));
  1005. }
  1006. void CThorExpandingRowArray::deserialize(size32_t sz, const void *buf)
  1007. {
  1008. if (allowNulls)
  1009. {
  1010. ASSERTEX((sz>=sizeof(short))&&(*(unsigned short *)buf==0x7631)); // check for mismatch
  1011. buf = (const byte *)buf+sizeof(unsigned short);
  1012. sz -= sizeof(unsigned short);
  1013. }
  1014. CThorStreamDeserializerSource d(sz,buf);
  1015. while (!d.eos())
  1016. {
  1017. if (allowNulls)
  1018. {
  1019. bool nullrow;
  1020. d.read(sizeof(bool),&nullrow);
  1021. if (nullrow)
  1022. {
  1023. append(NULL);
  1024. continue;
  1025. }
  1026. }
  1027. deserializeRow(d);
  1028. }
  1029. }
  1030. void CThorExpandingRowArray::deserializeExpand(size32_t sz, const void *data)
  1031. {
  1032. MemoryBuffer mb;
  1033. fastLZDecompressToBuffer(mb, data);
  1034. deserialize(mb.length(), mb.bufferBase());
  1035. }
  1036. //////////////////
  1037. void CThorSpillableRowArray::registerWriteCallback(IWritePosCallback &cb)
  1038. {
  1039. CThorArrayLockBlock block(*this);
  1040. writeCallbacks.append(cb); // NB not linked to avoid circular dependency
  1041. }
  1042. void CThorSpillableRowArray::unregisterWriteCallback(IWritePosCallback &cb)
  1043. {
  1044. CThorArrayLockBlock block(*this);
  1045. writeCallbacks.zap(cb);
  1046. }
  1047. CThorSpillableRowArray::CThorSpillableRowArray(CActivityBase &activity, IRowInterfaces *rowIf, bool allowNulls, StableSortFlag stableSort, rowidx_t initialSize, size32_t _commitDelta)
  1048. : CThorExpandingRowArray(activity, rowIf, false, stableSort, false, initialSize), commitDelta(_commitDelta)
  1049. {
  1050. commitRows = 0;
  1051. firstRow = 0;
  1052. }
  1053. CThorSpillableRowArray::~CThorSpillableRowArray()
  1054. {
  1055. clearRows();
  1056. }
  1057. void CThorSpillableRowArray::clearRows()
  1058. {
  1059. for (rowidx_t i = firstRow; i < numRows; i++)
  1060. ReleaseThorRow(rows[i]);
  1061. numRows = 0;
  1062. firstRow = 0;
  1063. commitRows = 0;
  1064. }
  1065. void CThorSpillableRowArray::compact()
  1066. {
  1067. CThorArrayLockBlock block(*this);
  1068. assertex(0 == firstRow && numRows == commitRows);
  1069. CThorExpandingRowArray::compact();
  1070. commitRows = numRows;
  1071. }
  1072. void CThorSpillableRowArray::kill()
  1073. {
  1074. clearRows();
  1075. CThorExpandingRowArray::kill();
  1076. }
  1077. void CThorSpillableRowArray::sort(ICompare &compare, unsigned maxCores)
  1078. {
  1079. // NB: only to be called inside lock
  1080. rowidx_t n = numCommitted();
  1081. if (n>1)
  1082. {
  1083. void **const rows = (void **const)getBlock(n);
  1084. doSort(n, rows, compare, maxCores);
  1085. }
  1086. }
  1087. static int callbackSortRev(IInterface **cb2, IInterface **cb1)
  1088. {
  1089. rowidx_t i2 = ((IWritePosCallback *)(*cb2))->queryRecordNumber();
  1090. rowidx_t i1 = ((IWritePosCallback *)(*cb1))->queryRecordNumber();
  1091. if (i1==i2) return 0;
  1092. if (i1<i2) return -1;
  1093. return 1;
  1094. }
  1095. rowidx_t CThorSpillableRowArray::save(IFile &iFile, bool useCompression, const char *tracingPrefix)
  1096. {
  1097. rowidx_t n = numCommitted();
  1098. if (0 == n)
  1099. return 0;
  1100. ActPrintLog(&activity, "%s: CThorSpillableRowArray::save %"RIPF"d rows", tracingPrefix, n);
  1101. if (useCompression)
  1102. assertex(0 == writeCallbacks.ordinality()); // incompatible
  1103. unsigned rwFlags = DEFAULT_RWFLAGS;
  1104. if (useCompression)
  1105. rwFlags |= rw_compress;
  1106. if (allowNulls)
  1107. rwFlags |= rw_grouped;
  1108. // NB: This is always called within a CThorArrayLockBlock, as such no writebacks are added or updating
  1109. rowidx_t nextCBI = RCIDXMAX; // indicates none
  1110. IWritePosCallback *nextCB = NULL;
  1111. ICopyArrayOf<IWritePosCallback> cbCopy;
  1112. if (writeCallbacks.ordinality())
  1113. {
  1114. ForEachItemIn(c, writeCallbacks)
  1115. cbCopy.append(writeCallbacks.item(c));
  1116. cbCopy.sort(callbackSortRev);
  1117. nextCB = &cbCopy.pop();
  1118. nextCBI = nextCB->queryRecordNumber();
  1119. }
  1120. Owned<IExtRowWriter> writer = createRowWriter(&iFile, rowIf, rwFlags);
  1121. const void **rows = getBlock(n);
  1122. for (rowidx_t i=0; i < n; i++)
  1123. {
  1124. const void *row = rows[i];
  1125. assertex(row || allowNulls);
  1126. if (i == nextCBI)
  1127. {
  1128. writer->flush();
  1129. do
  1130. {
  1131. nextCB->filePosition(writer->getPosition());
  1132. if (cbCopy.ordinality())
  1133. {
  1134. nextCB = &cbCopy.pop();
  1135. nextCBI = nextCB->queryRecordNumber();
  1136. }
  1137. else
  1138. nextCBI = RCIDXMAX; // indicating no more
  1139. }
  1140. while (i == nextCBI); // loop as may be >1 IWritePosCallback at same pos
  1141. }
  1142. writer->putRow(row);
  1143. rows[i] = NULL;
  1144. }
  1145. writer->flush();
  1146. firstRow += n;
  1147. offset_t bytesWritten = writer->getPosition();
  1148. writer.clear();
  1149. ActPrintLog(&activity, "%s: CThorSpillableRowArray::save done, bytes = %"I64F"d", tracingPrefix, (__int64)bytesWritten);
  1150. return n;
  1151. }
  1152. // JCSMORE - these methods are essentially borrowed from RoxieOutputRowArray, would be good to unify
  1153. const void **CThorSpillableRowArray::getBlock(rowidx_t readRows)
  1154. {
  1155. dbgassertex(firstRow+readRows <= commitRows);
  1156. return rows + firstRow;
  1157. }
  1158. void CThorSpillableRowArray::flush()
  1159. {
  1160. CThorArrayLockBlock block(*this);
  1161. dbgassertex(numRows >= commitRows);
  1162. // if firstRow over 50% of commitRows, meaning over half of row array is empty, then reduce
  1163. if (firstRow != 0 && (firstRow >= commitRows/2))
  1164. {
  1165. //A block of rows was removed - copy these rows to the start of the block.
  1166. memmove(rows, rows+firstRow, (numRows-firstRow) * sizeof(void *));
  1167. numRows -= firstRow;
  1168. firstRow = 0;
  1169. }
  1170. commitRows = numRows;
  1171. }
  1172. bool CThorSpillableRowArray::appendRows(CThorExpandingRowArray &inRows, bool takeOwnership)
  1173. {
  1174. rowidx_t num = inRows.ordinality();
  1175. if (0 == num)
  1176. return true;
  1177. if (numRows+num >= maxRows)
  1178. {
  1179. if (!ensure(numRows + num))
  1180. {
  1181. flush();
  1182. if (numRows+num >= maxRows)
  1183. return false;
  1184. }
  1185. }
  1186. const void **newRows = rows+numRows;
  1187. inRows.transferRowsCopy(newRows, takeOwnership);
  1188. numRows += num;
  1189. if (numRows >= commitRows + commitDelta)
  1190. flush();
  1191. return true;
  1192. }
  1193. void CThorSpillableRowArray::transferFrom(CThorExpandingRowArray &src)
  1194. {
  1195. CThorArrayLockBlock block(*this);
  1196. CThorExpandingRowArray::transferFrom(src);
  1197. commitRows = numRows;
  1198. }
  1199. void CThorSpillableRowArray::swap(CThorSpillableRowArray &other)
  1200. {
  1201. CThorArrayLockBlock block(*this);
  1202. CThorExpandingRowArray::swap(other);
  1203. rowidx_t otherFirstRow = other.firstRow;
  1204. rowidx_t otherCommitRows = other.commitRows;
  1205. other.firstRow = firstRow;
  1206. other.commitRows = commitRows;
  1207. firstRow = otherFirstRow;
  1208. commitRows = otherCommitRows;
  1209. }
  1210. void CThorSpillableRowArray::readBlock(const void **outRows, rowidx_t readRows)
  1211. {
  1212. CThorArrayLockBlock block(*this);
  1213. dbgassertex(firstRow + readRows <= commitRows);
  1214. memcpy(outRows, rows + firstRow, readRows*sizeof(void *));
  1215. firstRow += readRows;
  1216. }
  1217. void CThorSpillableRowArray::transferRowsCopy(const void **outRows, bool takeOwnership)
  1218. {
  1219. CThorArrayLockBlock block(*this);
  1220. if (0 == numRows)
  1221. return;
  1222. assertex(numRows == commitRows);
  1223. memcpy(outRows, rows, numRows*sizeof(void *));
  1224. if (takeOwnership)
  1225. firstRow = commitRows = numRows = 0;
  1226. else
  1227. {
  1228. const void **lastNewRow = outRows+numRows-1;
  1229. loop
  1230. {
  1231. LinkThorRow(*outRows);
  1232. if (outRows == lastNewRow)
  1233. break;
  1234. outRows++;
  1235. }
  1236. }
  1237. }
  1238. IRowStream *CThorSpillableRowArray::createRowStream()
  1239. {
  1240. return new CSpillableStream(activity, *this, rowIf, allowNulls);
  1241. }
  1242. class CThorRowCollectorBase : public CSimpleInterface, implements roxiemem::IBufferedRowCallback
  1243. {
  1244. protected:
  1245. CActivityBase &activity;
  1246. CThorSpillableRowArray spillableRows;
  1247. PointerIArrayOf<CFileOwner> spillFiles;
  1248. Owned<IOutputRowSerializer> serializer;
  1249. RowCollectorSpillFlags diskMemMix;
  1250. rowcount_t totalRows;
  1251. unsigned spillPriority;
  1252. unsigned overflowCount;
  1253. unsigned maxCores;
  1254. unsigned outStreams;
  1255. ICompare *iCompare;
  1256. StableSortFlag stableSort;
  1257. bool preserveGrouping;
  1258. IRowInterfaces *rowIf;
  1259. CriticalSection readerLock;
  1260. bool mmRegistered;
  1261. Owned<CSharedSpillableRowSet> spillableRowSet;
  1262. unsigned options;
  1263. bool spillRows()
  1264. {
  1265. //This must only be called while a lock is held on spillableRows()
  1266. rowidx_t numRows = spillableRows.numCommitted();
  1267. if (numRows == 0)
  1268. return false;
  1269. totalRows += numRows;
  1270. StringBuffer tempPrefix, tempName;
  1271. if (iCompare)
  1272. {
  1273. ActPrintLog(&activity, "Sorting %"RIPF"d rows", spillableRows.numCommitted());
  1274. CCycleTimer timer;
  1275. spillableRows.sort(*iCompare, maxCores); // sorts committed rows
  1276. ActPrintLog(&activity, "Sort took: %f", ((float)timer.elapsedMs())/1000);
  1277. tempPrefix.append("srt");
  1278. }
  1279. tempPrefix.appendf("spill_%d", activity.queryActivityId());
  1280. GetTempName(tempName, tempPrefix.str(), true);
  1281. Owned<IFile> iFile = createIFile(tempName.str());
  1282. spillFiles.append(new CFileOwner(iFile.getLink()));
  1283. VStringBuffer spillPrefixStr("RowCollector(%d)", spillPriority);
  1284. spillableRows.save(*iFile, activity.getOptBool(THOROPT_COMPRESS_SPILLS, true), spillPrefixStr.str()); // saves committed rows
  1285. ++overflowCount;
  1286. return true;
  1287. }
  1288. void setPreserveGrouping(bool _preserveGrouping)
  1289. {
  1290. preserveGrouping = _preserveGrouping;
  1291. spillableRows.setAllowNulls(preserveGrouping);
  1292. }
  1293. void flush()
  1294. {
  1295. spillableRows.flush();
  1296. }
  1297. void putRow(const void *row)
  1298. {
  1299. if (!spillableRows.append(row))
  1300. {
  1301. bool oom = false;
  1302. if (spillingEnabled())
  1303. {
  1304. CThorArrayLockBlock block(spillableRows);
  1305. //We should have been called back to free any committed rows, but occasionally it may not (e.g., if
  1306. //the problem is global memory is exhausted) - in which case force a spill here (but add any pending
  1307. //rows first).
  1308. if (spillableRows.numCommitted() != 0)
  1309. {
  1310. spillableRows.flush();
  1311. spillRows();
  1312. }
  1313. //Ensure new rows are written to the head of the array. It needs to be a separate call because
  1314. //spillRows() cannot shift active row pointer since it can be called from any thread
  1315. spillableRows.flush();
  1316. if (!spillableRows.append(row))
  1317. oom = true;
  1318. }
  1319. else
  1320. oom = true;
  1321. if (oom)
  1322. {
  1323. ReleaseThorRow(row);
  1324. throw MakeActivityException(&activity, ROXIEMM_MEMORY_LIMIT_EXCEEDED, "Insufficient memory to append sort row");
  1325. }
  1326. }
  1327. }
  1328. IRowStream *getStream(CThorExpandingRowArray *allMemRows, memsize_t *memUsage, bool shared)
  1329. {
  1330. CriticalBlock b(readerLock);
  1331. if (0 == outStreams)
  1332. {
  1333. spillableRows.flush();
  1334. if (spillingEnabled())
  1335. {
  1336. // i.e. all disk OR (some on disk already AND allDiskOrAllMem)
  1337. if (((rc_allDisk == diskMemMix) || ((rc_allDiskOrAllMem == diskMemMix) && overflowCount)))
  1338. {
  1339. CThorArrayLockBlock block(spillableRows);
  1340. if (spillableRows.numCommitted())
  1341. {
  1342. spillRows();
  1343. spillableRows.kill();
  1344. }
  1345. }
  1346. }
  1347. }
  1348. ++outStreams;
  1349. /* Ensure existing callback is cleared, before:
  1350. * a) instreams are built, since new spillFiles can be added to as long as existing callback is active
  1351. * b) locked CThorSpillableRowArrayLock section below, which in turn may add a new callback.
  1352. * Otherwise, once this section has the lock, the existing callback may be called by roxiemem and block,
  1353. * causing this section to deadlock inside roxiemem, if it tries to add a new callback.
  1354. */
  1355. clearSpillingCallback();
  1356. // NB: CStreamFileOwner links CFileOwner - last usage will auto delete file
  1357. // which may be one of these streams or CThorRowCollectorBase itself
  1358. unsigned rwFlags = DEFAULT_RWFLAGS;
  1359. if (activity.getOptBool(THOROPT_COMPRESS_SPILLS, true))
  1360. rwFlags |= rw_compress;
  1361. if (preserveGrouping)
  1362. rwFlags |= rw_grouped;
  1363. IArrayOf<IRowStream> instrms;
  1364. ForEachItemIn(f, spillFiles)
  1365. {
  1366. CFileOwner *fileOwner = spillFiles.item(f);
  1367. Owned<IExtRowStream> strm = createRowStream(&fileOwner->queryIFile(), rowIf, rwFlags);
  1368. instrms.append(* new CStreamFileOwner(fileOwner, strm));
  1369. }
  1370. {
  1371. if (spillableRowSet)
  1372. instrms.append(*spillableRowSet->createRowStream());
  1373. else if (spillableRows.numCommitted())
  1374. {
  1375. totalRows += spillableRows.numCommitted();
  1376. if (iCompare && (1 == outStreams))
  1377. {
  1378. // Option(rcflag_noAllInMemSort) - avoid sorting allMemRows
  1379. if ((NULL == allMemRows) || (0 == (options & rcflag_noAllInMemSort)))
  1380. spillableRows.sort(*iCompare, maxCores);
  1381. }
  1382. if ((rc_allDiskOrAllMem == diskMemMix) || // must supply allMemRows, only here if no spilling (see above)
  1383. (NULL!=allMemRows && (rc_allMem == diskMemMix)) ||
  1384. (NULL!=allMemRows && (rc_mixed == diskMemMix) && 0 == overflowCount) // if allMemRows given, only if no spilling
  1385. )
  1386. {
  1387. assertex(allMemRows);
  1388. assertex(1 == outStreams);
  1389. if (memUsage)
  1390. *memUsage = spillableRows.getMemUsage(); // a bit expensive if variable rows
  1391. allMemRows->transferFrom(spillableRows);
  1392. // stream cannot be used
  1393. return NULL;
  1394. }
  1395. if (!shared)
  1396. instrms.append(*spillableRows.createRowStream()); // NB: stream will take ownership of rows in spillableRows
  1397. else
  1398. {
  1399. spillableRowSet.setown(new CSharedSpillableRowSet(activity, spillableRows, rowIf, preserveGrouping));
  1400. instrms.append(*spillableRowSet->createRowStream());
  1401. }
  1402. }
  1403. else
  1404. {
  1405. // If 0 rows, no overflow, don't return stream, except for rc_allDisk which will never fill allMemRows
  1406. if (allMemRows && (0 == overflowCount) && (diskMemMix != rc_allDisk))
  1407. return NULL;
  1408. }
  1409. }
  1410. if (0 == instrms.ordinality())
  1411. return createNullRowStream();
  1412. else if (1 == instrms.ordinality())
  1413. return LINK(&instrms.item(0));
  1414. else if (iCompare)
  1415. {
  1416. Owned<IRowLinkCounter> linkcounter = new CThorRowLinkCounter;
  1417. return createRowStreamMerger(instrms.ordinality(), instrms.getArray(), iCompare, false, linkcounter);
  1418. }
  1419. else
  1420. return createConcatRowStream(instrms.ordinality(),instrms.getArray());
  1421. }
  1422. void reset()
  1423. {
  1424. spillableRows.kill();
  1425. spillFiles.kill();
  1426. totalRows = 0;
  1427. overflowCount = outStreams = 0;
  1428. }
  1429. inline bool spillingEnabled() const { return SPILL_PRIORITY_DISABLE != spillPriority; }
  1430. void clearSpillingCallback()
  1431. {
  1432. if (mmRegistered)
  1433. {
  1434. activity.queryJob().queryRowManager()->removeRowBuffer(this);
  1435. mmRegistered = false;
  1436. }
  1437. }
  1438. void enableSpillingCallback()
  1439. {
  1440. if (!mmRegistered && spillingEnabled())
  1441. {
  1442. activity.queryJob().queryRowManager()->addRowBuffer(this);
  1443. mmRegistered = true;
  1444. }
  1445. }
  1446. public:
  1447. CThorRowCollectorBase(CActivityBase &_activity, IRowInterfaces *_rowIf, ICompare *_iCompare, StableSortFlag _stableSort, RowCollectorSpillFlags _diskMemMix, unsigned _spillPriority)
  1448. : activity(_activity),
  1449. rowIf(_rowIf), iCompare(_iCompare), stableSort(_stableSort), diskMemMix(_diskMemMix), spillPriority(_spillPriority),
  1450. spillableRows(_activity, _rowIf)
  1451. {
  1452. preserveGrouping = false;
  1453. totalRows = 0;
  1454. overflowCount = outStreams = 0;
  1455. mmRegistered = false;
  1456. if (rc_allMem == diskMemMix)
  1457. spillPriority = SPILL_PRIORITY_DISABLE; // all mem, implies no spilling
  1458. else
  1459. enableSpillingCallback();
  1460. maxCores = activity.queryMaxCores();
  1461. options = 0;
  1462. spillableRows.setup(rowIf, false, stableSort);
  1463. }
  1464. ~CThorRowCollectorBase()
  1465. {
  1466. reset();
  1467. clearSpillingCallback();
  1468. }
  1469. void transferRowsOut(CThorExpandingRowArray &out, bool sort)
  1470. {
  1471. CThorArrayLockBlock block(spillableRows);
  1472. spillableRows.flush();
  1473. totalRows += spillableRows.numCommitted();
  1474. if (sort && iCompare)
  1475. spillableRows.sort(*iCompare, maxCores);
  1476. out.transferFrom(spillableRows);
  1477. }
  1478. // IThorRowCollectorCommon
  1479. virtual rowcount_t numRows() const
  1480. {
  1481. return totalRows+spillableRows.numCommitted();
  1482. }
  1483. virtual unsigned numOverflows() const
  1484. {
  1485. return overflowCount;
  1486. }
  1487. virtual unsigned overflowScale() const
  1488. {
  1489. // 1 if no spill
  1490. if (!overflowCount)
  1491. return 1;
  1492. return overflowCount*2+3; // bit arbitrary
  1493. }
  1494. virtual void transferRowsIn(CThorExpandingRowArray &src)
  1495. {
  1496. reset();
  1497. spillableRows.transferFrom(src);
  1498. enableSpillingCallback();
  1499. }
  1500. virtual void setup(ICompare *_iCompare, StableSortFlag _stableSort, RowCollectorSpillFlags _diskMemMix, unsigned _spillPriority)
  1501. {
  1502. iCompare = _iCompare;
  1503. stableSort = _stableSort;
  1504. diskMemMix = _diskMemMix;
  1505. spillPriority = _spillPriority;
  1506. if (rc_allMem == diskMemMix)
  1507. spillPriority = SPILL_PRIORITY_DISABLE; // all mem, implies no spilling
  1508. if (mmRegistered && !spillingEnabled())
  1509. {
  1510. mmRegistered = false;
  1511. activity.queryJob().queryRowManager()->removeRowBuffer(this);
  1512. }
  1513. spillableRows.setup(rowIf, false, stableSort);
  1514. }
  1515. virtual void ensure(rowidx_t max)
  1516. {
  1517. spillableRows.ensure(max);
  1518. }
  1519. virtual void setOptions(unsigned _options)
  1520. {
  1521. options = _options;
  1522. }
  1523. // IBufferedRowCallback
  1524. virtual unsigned getSpillCost() const
  1525. {
  1526. return spillPriority;
  1527. }
  1528. virtual bool freeBufferedRows(bool critical)
  1529. {
  1530. if (!spillingEnabled())
  1531. return false;
  1532. CThorArrayLockBlock block(spillableRows);
  1533. return spillRows();
  1534. }
  1535. };
  1536. enum TRLGroupFlag { trl_ungroup, trl_preserveGrouping, trl_stopAtEog };
  1537. class CThorRowLoader : public CThorRowCollectorBase, implements IThorRowLoader
  1538. {
  1539. IRowStream *load(IRowStream *in, const bool &abort, TRLGroupFlag grouping, CThorExpandingRowArray *allMemRows, memsize_t *memUsage)
  1540. {
  1541. reset();
  1542. enableSpillingCallback();
  1543. setPreserveGrouping(trl_preserveGrouping == grouping);
  1544. while (!abort)
  1545. {
  1546. const void *next = in->nextRow();
  1547. if (!next)
  1548. {
  1549. if (grouping == trl_stopAtEog)
  1550. break;
  1551. else
  1552. {
  1553. next = in->nextRow();
  1554. if (!next)
  1555. break;
  1556. if (grouping == trl_preserveGrouping)
  1557. putRow(NULL);
  1558. }
  1559. }
  1560. putRow(next);
  1561. }
  1562. return getStream(allMemRows, memUsage, false);
  1563. }
  1564. public:
  1565. IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
  1566. CThorRowLoader(CActivityBase &activity, IRowInterfaces *rowIf, ICompare *iCompare, StableSortFlag stableSort, RowCollectorSpillFlags diskMemMix, unsigned spillPriority)
  1567. : CThorRowCollectorBase(activity, rowIf, iCompare, stableSort, diskMemMix, spillPriority)
  1568. {
  1569. }
  1570. // IThorRowCollectorCommon
  1571. virtual rowcount_t numRows() const { return CThorRowCollectorBase::numRows(); }
  1572. virtual unsigned numOverflows() const { return CThorRowCollectorBase::numOverflows(); }
  1573. virtual unsigned overflowScale() const { return CThorRowCollectorBase::overflowScale(); }
  1574. virtual void transferRowsOut(CThorExpandingRowArray &dst, bool sort) { CThorRowCollectorBase::transferRowsOut(dst, sort); }
  1575. virtual void transferRowsIn(CThorExpandingRowArray &src) { CThorRowCollectorBase::transferRowsIn(src); }
  1576. virtual void setup(ICompare *iCompare, StableSortFlag stableSort, RowCollectorSpillFlags diskMemMix=rc_mixed, unsigned spillPriority=50)
  1577. {
  1578. CThorRowCollectorBase::setup(iCompare, stableSort, diskMemMix, spillPriority);
  1579. }
  1580. virtual void ensure(rowidx_t max) { CThorRowCollectorBase::ensure(max); }
  1581. virtual void setOptions(unsigned options) { CThorRowCollectorBase::setOptions(options); }
  1582. // IThorRowLoader
  1583. virtual IRowStream *load(IRowStream *in, const bool &abort, bool preserveGrouping, CThorExpandingRowArray *allMemRows, memsize_t *memUsage)
  1584. {
  1585. assertex(!iCompare || !preserveGrouping); // can't sort if group preserving
  1586. return load(in, abort, preserveGrouping?trl_preserveGrouping:trl_ungroup, allMemRows, memUsage);
  1587. }
  1588. virtual IRowStream *loadGroup(IRowStream *in, const bool &abort, CThorExpandingRowArray *allMemRows, memsize_t *memUsage)
  1589. {
  1590. return load(in, abort, trl_stopAtEog, allMemRows, memUsage);
  1591. }
  1592. };
  1593. IThorRowLoader *createThorRowLoader(CActivityBase &activity, IRowInterfaces *rowIf, ICompare *iCompare, StableSortFlag stableSort, RowCollectorSpillFlags diskMemMix, unsigned spillPriority)
  1594. {
  1595. return new CThorRowLoader(activity, rowIf, iCompare, stableSort, diskMemMix, spillPriority);
  1596. }
  1597. IThorRowLoader *createThorRowLoader(CActivityBase &activity, ICompare *iCompare, StableSortFlag stableSort, RowCollectorSpillFlags diskMemMix, unsigned spillPriority)
  1598. {
  1599. return createThorRowLoader(activity, &activity, iCompare, stableSort, diskMemMix, spillPriority);
  1600. }
  1601. class CThorRowCollector : public CThorRowCollectorBase, implements IThorRowCollector
  1602. {
  1603. public:
  1604. IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
  1605. CThorRowCollector(CActivityBase &activity, IRowInterfaces *rowIf, ICompare *iCompare, StableSortFlag stableSort, RowCollectorSpillFlags diskMemMix, unsigned spillPriority)
  1606. : CThorRowCollectorBase(activity, rowIf, iCompare, stableSort, diskMemMix, spillPriority)
  1607. {
  1608. }
  1609. // IThorRowCollectorCommon
  1610. virtual void setPreserveGrouping(bool tf)
  1611. {
  1612. assertex(!iCompare || !tf); // can't sort if group preserving
  1613. CThorRowCollectorBase::setPreserveGrouping(tf);
  1614. }
  1615. virtual rowcount_t numRows() const { return CThorRowCollectorBase::numRows(); }
  1616. virtual unsigned numOverflows() const { return CThorRowCollectorBase::numOverflows(); }
  1617. virtual unsigned overflowScale() const { return CThorRowCollectorBase::overflowScale(); }
  1618. virtual void transferRowsOut(CThorExpandingRowArray &dst, bool sort) { CThorRowCollectorBase::transferRowsOut(dst, sort); }
  1619. virtual void transferRowsIn(CThorExpandingRowArray &src) { CThorRowCollectorBase::transferRowsIn(src); }
  1620. virtual void setup(ICompare *iCompare, StableSortFlag stableSort, RowCollectorSpillFlags diskMemMix=rc_mixed, unsigned spillPriority=50)
  1621. {
  1622. CThorRowCollectorBase::setup(iCompare, stableSort, diskMemMix, spillPriority);
  1623. }
  1624. virtual void ensure(rowidx_t max) { CThorRowCollectorBase::ensure(max); }
  1625. virtual void setOptions(unsigned options) { CThorRowCollectorBase::setOptions(options); }
  1626. // IThorRowCollector
  1627. virtual IRowWriter *getWriter()
  1628. {
  1629. class CWriter : public CSimpleInterface, implements IRowWriter
  1630. {
  1631. Linked<CThorRowCollector> parent;
  1632. public:
  1633. IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
  1634. CWriter(CThorRowCollector *_parent) : parent(_parent)
  1635. {
  1636. }
  1637. ~CWriter()
  1638. {
  1639. flush();
  1640. }
  1641. // IRowWriter
  1642. virtual void putRow(const void *row)
  1643. {
  1644. parent->putRow(row);
  1645. }
  1646. virtual void flush()
  1647. {
  1648. parent->flush();
  1649. }
  1650. };
  1651. return new CWriter(this);
  1652. }
  1653. virtual void reset()
  1654. {
  1655. CThorRowCollectorBase::reset();
  1656. }
  1657. virtual IRowStream *getStream(bool shared, CThorExpandingRowArray *allMemRows)
  1658. {
  1659. return CThorRowCollectorBase::getStream(allMemRows, NULL, shared);
  1660. }
  1661. };
  1662. IThorRowCollector *createThorRowCollector(CActivityBase &activity, IRowInterfaces *rowIf, ICompare *iCompare, StableSortFlag stableSort, RowCollectorSpillFlags diskMemMix, unsigned spillPriority, bool preserveGrouping)
  1663. {
  1664. Owned<IThorRowCollector> collector = new CThorRowCollector(activity, rowIf, iCompare, stableSort, diskMemMix, spillPriority);
  1665. collector->setPreserveGrouping(preserveGrouping);
  1666. return collector.getClear();
  1667. }
  1668. IThorRowCollector *createThorRowCollector(CActivityBase &activity, ICompare *iCompare, StableSortFlag stableSort, RowCollectorSpillFlags diskMemMix, unsigned spillPriority, bool preserveGrouping)
  1669. {
  1670. return createThorRowCollector(activity, &activity, iCompare, stableSort, diskMemMix, spillPriority, preserveGrouping);
  1671. }
  1672. void setThorInABox(unsigned num)
  1673. {
  1674. }
  1675. class cMultiThorResourceMutex: public CSimpleInterface, implements ILargeMemLimitNotify, implements IDaliMutexNotifyWaiting
  1676. {
  1677. class cMultiThorResourceMutexThread: public Thread
  1678. {
  1679. cMultiThorResourceMutex &parent;
  1680. public:
  1681. cMultiThorResourceMutexThread(cMultiThorResourceMutex &_parent)
  1682. : Thread("cMultiThorResourceMutexThread"),parent(_parent)
  1683. {
  1684. }
  1685. int run()
  1686. {
  1687. parent.run();
  1688. return 0;
  1689. }
  1690. };
  1691. Owned<cMultiThorResourceMutexThread> thread;
  1692. Owned<IDaliMutex> mutex;
  1693. bool stopping;
  1694. Linked<ICommunicator> clusterComm;
  1695. CSDSServerStatus *status;
  1696. public:
  1697. IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
  1698. cMultiThorResourceMutex(const char *groupname,CSDSServerStatus *_status)
  1699. {
  1700. status = _status;
  1701. stopping = false;
  1702. clusterComm.set(&queryClusterComm());
  1703. if (clusterComm->queryGroup().rank(queryMyNode())==0) { // master so start thread
  1704. thread.setown(new cMultiThorResourceMutexThread(*this));
  1705. thread->start();
  1706. StringBuffer mname("thorres:");
  1707. mname.append(groupname);
  1708. mutex.setown(createDaliMutex(mname.str()));
  1709. }
  1710. }
  1711. ~cMultiThorResourceMutex()
  1712. {
  1713. stopping = true;
  1714. if (thread)
  1715. stop();
  1716. }
  1717. void run() // on master
  1718. {
  1719. PROGLOG("cMultiThorResourceMutex thread run");
  1720. try {
  1721. CMessageBuffer mbuf;
  1722. while (!stopping) {
  1723. mbuf.clear();
  1724. rank_t from;
  1725. unsigned timeout = 1000*60*5;
  1726. if (clusterComm->recv(mbuf,RANK_ALL,MPTAG_THORRESOURCELOCK,&from,timeout)) {
  1727. byte req;
  1728. mbuf.read(req);
  1729. if (req==1) {
  1730. if (mutex)
  1731. mutex->enter();
  1732. }
  1733. else if (req==0) {
  1734. if (mutex)
  1735. mutex->leave();
  1736. }
  1737. clusterComm->reply(mbuf,1000*60*5);
  1738. }
  1739. }
  1740. }
  1741. catch (IException *e) {
  1742. EXCLOG(e,"cMultiThorResourceMutex::run");
  1743. }
  1744. }
  1745. void stop()
  1746. {
  1747. PROGLOG("cMultiThorResourceMutex::stop enter");
  1748. stopping = true;
  1749. if (mutex)
  1750. mutex->kill();
  1751. try {
  1752. clusterComm->cancel(RANK_ALL,MPTAG_THORRESOURCELOCK);
  1753. }
  1754. catch (IException *e) {
  1755. EXCLOG(e,"cMultiThorResourceMutex::stop");
  1756. }
  1757. if (thread)
  1758. thread->join();
  1759. mutex.clear();
  1760. PROGLOG("cMultiThorResourceMutex::stop leave");
  1761. }
  1762. bool take(memsize_t tot)
  1763. {
  1764. if (stopping)
  1765. return true;
  1766. if (mutex)
  1767. return mutex->enter();
  1768. if (stopping)
  1769. return false;
  1770. CMessageBuffer mbuf;
  1771. byte req = 1;
  1772. mbuf.append(req);
  1773. try {
  1774. if (!clusterComm->sendRecv(mbuf,0,MPTAG_THORRESOURCELOCK,(unsigned)-1))
  1775. stopping = true;
  1776. }
  1777. catch (IException *e) {
  1778. EXCLOG(e,"cMultiThorResourceMutex::take");
  1779. }
  1780. return !stopping;
  1781. }
  1782. // will raise oom exception if false returned
  1783. void give(memsize_t tot)
  1784. {
  1785. if (mutex) {
  1786. mutex->leave();
  1787. return;
  1788. }
  1789. if (stopping)
  1790. return;
  1791. CMessageBuffer mbuf;
  1792. byte req = 0;
  1793. mbuf.append(req);
  1794. try {
  1795. if (!clusterComm->sendRecv(mbuf,0,MPTAG_THORRESOURCELOCK,(unsigned)-1))
  1796. stopping = true;
  1797. }
  1798. catch (IException *e) {
  1799. EXCLOG(e,"cMultiThorResourceMutex::give");
  1800. }
  1801. }
  1802. //IDaliMutexNotifyWaiting
  1803. void startWait()
  1804. {
  1805. if (status)
  1806. status->queryProperties()->setPropInt("@memoryBlocked",1);
  1807. }
  1808. void cycleWait()
  1809. {
  1810. if (status)
  1811. status->queryProperties()->setPropInt("@memoryBlocked",status->queryProperties()->getPropInt("@memoryBlocked")+1);
  1812. }
  1813. void stopWait(bool got)
  1814. {
  1815. if (status)
  1816. status->queryProperties()->setPropInt("@memoryBlocked",0);
  1817. }
  1818. };
  1819. ILargeMemLimitNotify *createMultiThorResourceMutex(const char *grpname,CSDSServerStatus *_status)
  1820. {
  1821. return new cMultiThorResourceMutex(grpname,_status);
  1822. }
  1823. class CThorAllocator : public CSimpleInterface, implements IThorAllocator, implements IRowAllocatorMetaActIdCacheCallback
  1824. {
  1825. protected:
  1826. mutable Owned<IRowAllocatorMetaActIdCache> allocatorMetaCache;
  1827. Owned<roxiemem::IRowManager> rowManager;
  1828. roxiemem::RoxieHeapFlags defaultFlags;
  1829. IContextLogger &logctx;
  1830. public:
  1831. IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
  1832. CThorAllocator(memsize_t memSize, unsigned memorySpillAt, IContextLogger &_logctx, roxiemem::RoxieHeapFlags _defaultFlags) : logctx(_logctx), defaultFlags(_defaultFlags)
  1833. {
  1834. allocatorMetaCache.setown(createRowAllocatorCache(this));
  1835. rowManager.setown(roxiemem::createRowManager(memSize, NULL, logctx, allocatorMetaCache, false));
  1836. rowManager->setMemoryLimit(memSize, 0==memorySpillAt ? 0 : memSize/100*memorySpillAt);
  1837. const bool paranoid = false;
  1838. if (paranoid)
  1839. {
  1840. //you probably want to test these options individually
  1841. rowManager->setMemoryCallbackThreshold((unsigned)-1);
  1842. rowManager->setCallbackOnThread(true);
  1843. rowManager->setMinimizeFootprint(true, true);
  1844. rowManager->setReleaseWhenModifyCallback(true, true);
  1845. }
  1846. }
  1847. ~CThorAllocator()
  1848. {
  1849. rowManager.clear();
  1850. allocatorMetaCache.clear();
  1851. }
  1852. // roxiemem::IRowAllocatorMetaActIdCacheCallback
  1853. virtual IEngineRowAllocator *createAllocator(IRowAllocatorMetaActIdCache * cache, IOutputMetaData *meta, unsigned activityId, unsigned id, roxiemem::RoxieHeapFlags flags) const
  1854. {
  1855. return createRoxieRowAllocator(cache, *rowManager, meta, activityId, id, flags);
  1856. }
  1857. // IThorAllocator
  1858. virtual IEngineRowAllocator *getRowAllocator(IOutputMetaData * meta, unsigned activityId, roxiemem::RoxieHeapFlags flags) const
  1859. {
  1860. return allocatorMetaCache->ensure(meta, activityId, flags);
  1861. }
  1862. virtual IEngineRowAllocator *getRowAllocator(IOutputMetaData * meta, unsigned activityId) const
  1863. {
  1864. return allocatorMetaCache->ensure(meta, activityId, defaultFlags);
  1865. }
  1866. virtual roxiemem::IRowManager *queryRowManager() const
  1867. {
  1868. return rowManager;
  1869. }
  1870. virtual roxiemem::RoxieHeapFlags queryFlags() const { return defaultFlags; }
  1871. virtual bool queryCrc() const { return false; }
  1872. };
  1873. // derived to avoid a 'crcChecking' check per getRowAllocator only
  1874. class CThorCrcCheckingAllocator : public CThorAllocator
  1875. {
  1876. public:
  1877. CThorCrcCheckingAllocator(memsize_t memSize, unsigned memorySpillAt, IContextLogger &logctx, roxiemem::RoxieHeapFlags flags) : CThorAllocator(memSize, memorySpillAt, logctx, flags)
  1878. {
  1879. }
  1880. // IThorAllocator
  1881. virtual bool queryCrc() const { return true; }
  1882. // roxiemem::IRowAllocatorMetaActIdCacheCallback
  1883. virtual IEngineRowAllocator *createAllocator(IRowAllocatorMetaActIdCache * cache, IOutputMetaData *meta, unsigned activityId, unsigned cacheId, roxiemem::RoxieHeapFlags flags) const
  1884. {
  1885. return createCrcRoxieRowAllocator(cache, *rowManager, meta, activityId, cacheId, flags);
  1886. }
  1887. };
  1888. IThorAllocator *createThorAllocator(memsize_t memSize, unsigned memorySpillAt, IContextLogger &logctx, bool crcChecking, bool usePacked)
  1889. {
  1890. PROGLOG("CRC allocator %s", crcChecking?"ON":"OFF");
  1891. PROGLOG("Packed allocator %s", usePacked?"ON":"OFF");
  1892. roxiemem::RoxieHeapFlags flags;
  1893. if (usePacked)
  1894. flags = roxiemem::RHFpacked;
  1895. else
  1896. flags = roxiemem::RHFnone;
  1897. if (crcChecking)
  1898. return new CThorCrcCheckingAllocator(memSize, memorySpillAt, logctx, flags);
  1899. else
  1900. return new CThorAllocator(memSize, memorySpillAt, logctx, flags);
  1901. }
  1902. #define OUTPUTMETACHILDROW_VERSION 2 // for now, it's only significant that non-zero
  1903. class COutputMetaWithChildRow : public CSimpleInterface, implements IOutputMetaData
  1904. {
  1905. Linked<IEngineRowAllocator> childAllocator;
  1906. IOutputMetaData *childMeta;
  1907. size32_t extraSz;
  1908. Owned<IOutputRowSerializer> diskSerializer;
  1909. Owned<IOutputRowDeserializer> diskDeserializer;
  1910. Owned<IOutputRowSerializer> internalSerializer;
  1911. Owned<IOutputRowDeserializer> internalDeserializer;
  1912. Owned<ISourceRowPrefetcher> prefetcher;
  1913. class CSerializer : public CSimpleInterface, implements IOutputRowSerializer
  1914. {
  1915. Owned<IOutputRowSerializer> childSerializer;
  1916. size32_t extraSz;
  1917. public:
  1918. IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
  1919. CSerializer(IOutputRowSerializer *_childSerializer, size32_t _extraSz) : childSerializer(_childSerializer), extraSz(_extraSz)
  1920. {
  1921. }
  1922. virtual void serialize(IRowSerializerTarget &out, const byte *self)
  1923. {
  1924. out.put(extraSz, self);
  1925. const byte *childRow = *(const byte **)(self+extraSz);
  1926. if (childRow)
  1927. {
  1928. byte b=1;
  1929. out.put(1, &b);
  1930. childSerializer->serialize(out, childRow);
  1931. }
  1932. else
  1933. {
  1934. byte b=0;
  1935. out.put(1, &b);
  1936. }
  1937. }
  1938. };
  1939. class CDeserializer : public CSimpleInterface, implements IOutputRowDeserializer
  1940. {
  1941. Owned<IOutputRowDeserializer> childDeserializer;
  1942. Linked<IEngineRowAllocator> childAllocator;
  1943. size32_t extraSz;
  1944. public:
  1945. IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
  1946. CDeserializer(IOutputRowDeserializer *_childDeserializer, IEngineRowAllocator *_childAllocator, size32_t _extraSz) : childDeserializer(_childDeserializer), childAllocator(_childAllocator), extraSz(_extraSz)
  1947. {
  1948. }
  1949. virtual size32_t deserialize(ARowBuilder & rowBuilder, IRowDeserializerSource &in)
  1950. {
  1951. byte * self = rowBuilder.getSelf();
  1952. in.read(extraSz, self);
  1953. byte b;
  1954. in.read(1, &b);
  1955. const void *fChildRow;
  1956. if (b)
  1957. {
  1958. RtlDynamicRowBuilder childBuilder(childAllocator);
  1959. size32_t sz = childDeserializer->deserialize(childBuilder, in);
  1960. fChildRow = childBuilder.finalizeRowClear(sz);
  1961. }
  1962. else
  1963. fChildRow = NULL;
  1964. memcpy(self+extraSz, &fChildRow, sizeof(const void *));
  1965. return extraSz + sizeof(const void *);
  1966. }
  1967. };
  1968. class CPrefetcher : public CSimpleInterface, implements ISourceRowPrefetcher
  1969. {
  1970. Owned<ISourceRowPrefetcher> childPrefetcher;
  1971. size32_t extraSz;
  1972. public:
  1973. IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
  1974. CPrefetcher(ISourceRowPrefetcher *_childPrefetcher, size32_t _extraSz) : childPrefetcher(_childPrefetcher), extraSz(_extraSz)
  1975. {
  1976. }
  1977. virtual void readAhead(IRowDeserializerSource &in)
  1978. {
  1979. in.skip(extraSz);
  1980. byte b;
  1981. in.read(1, &b);
  1982. if (b)
  1983. childPrefetcher->readAhead(in);
  1984. }
  1985. };
  1986. public:
  1987. IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
  1988. COutputMetaWithChildRow(IEngineRowAllocator *_childAllocator, size32_t _extraSz) : childAllocator(_childAllocator), extraSz(_extraSz)
  1989. {
  1990. childMeta = childAllocator->queryOutputMeta();
  1991. }
  1992. virtual size32_t getRecordSize(const void *) { return extraSz + sizeof(const void *); }
  1993. virtual size32_t getMinRecordSize() const { return extraSz + sizeof(const void *); }
  1994. virtual size32_t getFixedSize() const { return extraSz + sizeof(const void *); }
  1995. virtual void toXML(const byte * self, IXmlWriter & out)
  1996. {
  1997. // ignoring xml'ing extra
  1998. //GH: I think this is what it should do
  1999. childMeta->toXML(*(const byte **)(self+extraSz), out);
  2000. }
  2001. virtual unsigned getVersion() const { return OUTPUTMETACHILDROW_VERSION; }
  2002. //The following can only be called if getMetaDataVersion >= 1, may seh otherwise. Creating a different interface was too painful
  2003. virtual unsigned getMetaFlags() { return MDFneeddestruct|childMeta->getMetaFlags(); }
  2004. virtual void destruct(byte * self)
  2005. {
  2006. OwnedConstThorRow childRow = *(const void **)(self+extraSz);
  2007. }
  2008. virtual IOutputRowSerializer * createDiskSerializer(ICodeContext * ctx, unsigned activityId)
  2009. {
  2010. if (!diskSerializer)
  2011. diskSerializer.setown(new CSerializer(childMeta->createDiskSerializer(ctx, activityId), extraSz));
  2012. return LINK(diskSerializer);
  2013. }
  2014. virtual IOutputRowDeserializer * createDiskDeserializer(ICodeContext * ctx, unsigned activityId)
  2015. {
  2016. if (!diskDeserializer)
  2017. diskDeserializer.setown(new CDeserializer(childMeta->createDiskDeserializer(ctx, activityId), childAllocator, extraSz));
  2018. return LINK(diskDeserializer);
  2019. }
  2020. virtual ISourceRowPrefetcher * createDiskPrefetcher(ICodeContext * ctx, unsigned activityId)
  2021. {
  2022. if (!prefetcher)
  2023. prefetcher.setown(new CPrefetcher(childMeta->createDiskPrefetcher(ctx, activityId), extraSz));
  2024. return LINK(prefetcher);
  2025. }
  2026. virtual IOutputMetaData * querySerializedDiskMeta() { return this; }
  2027. virtual IOutputRowSerializer * createInternalSerializer(ICodeContext * ctx, unsigned activityId)
  2028. {
  2029. if (!internalSerializer)
  2030. internalSerializer.setown(new CSerializer(childMeta->createInternalSerializer(ctx, activityId), extraSz));
  2031. return LINK(internalSerializer);
  2032. }
  2033. virtual IOutputRowDeserializer * createInternalDeserializer(ICodeContext * ctx, unsigned activityId)
  2034. {
  2035. if (!internalDeserializer)
  2036. internalDeserializer.setown(new CDeserializer(childMeta->createInternalDeserializer(ctx, activityId), childAllocator, extraSz));
  2037. return LINK(internalDeserializer);
  2038. }
  2039. virtual void walkIndirectMembers(const byte * self, IIndirectMemberVisitor & visitor)
  2040. {
  2041. //GH: I think this is what it should do, please check
  2042. visitor.visitRow(*(const byte **)(self+extraSz));
  2043. }
  2044. virtual IOutputMetaData * queryChildMeta(unsigned i)
  2045. {
  2046. return childMeta->queryChildMeta(i);
  2047. }
  2048. };
  2049. IOutputMetaData *createOutputMetaDataWithChildRow(IEngineRowAllocator *childAllocator, size32_t extraSz)
  2050. {
  2051. return new COutputMetaWithChildRow(childAllocator, extraSz);
  2052. }
  2053. class COutputMetaWithExtra : public CSimpleInterface, implements IOutputMetaData
  2054. {
  2055. Linked<IOutputMetaData> meta;
  2056. size32_t metaSz;
  2057. Owned<IOutputRowSerializer> diskSerializer;
  2058. Owned<IOutputRowDeserializer> diskDeserializer;
  2059. Owned<IOutputRowSerializer> internalSerializer;
  2060. Owned<IOutputRowDeserializer> internalDeserializer;
  2061. Owned<ISourceRowPrefetcher> prefetcher;
  2062. Owned<IOutputMetaData> serializedmeta;
  2063. class CSerializer : public CSimpleInterface, implements IOutputRowSerializer
  2064. {
  2065. Owned<IOutputRowSerializer> serializer;
  2066. size32_t metaSz;
  2067. public:
  2068. IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
  2069. CSerializer(IOutputRowSerializer *_serializer, size32_t _metaSz) : serializer(_serializer), metaSz(_metaSz)
  2070. {
  2071. }
  2072. virtual void serialize(IRowSerializerTarget &out, const byte *self)
  2073. {
  2074. out.put(metaSz, self);
  2075. serializer->serialize(out, self+metaSz);
  2076. }
  2077. };
  2078. //GH - This code is the same as CPrefixedRowDeserializer
  2079. class CDeserializer : public CSimpleInterface, implements IOutputRowDeserializer
  2080. {
  2081. Owned<IOutputRowDeserializer> deserializer;
  2082. size32_t metaSz;
  2083. public:
  2084. IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
  2085. CDeserializer(IOutputRowDeserializer *_deserializer, size32_t _metaSz) : deserializer(_deserializer), metaSz(_metaSz)
  2086. {
  2087. }
  2088. virtual size32_t deserialize(ARowBuilder & rowBuilder, IRowDeserializerSource &in)
  2089. {
  2090. in.read(metaSz, rowBuilder.getSelf());
  2091. CPrefixedRowBuilder prefixedBuilder(metaSz, rowBuilder);
  2092. size32_t sz = deserializer->deserialize(prefixedBuilder, in);
  2093. return sz+metaSz;
  2094. }
  2095. };
  2096. class CPrefetcher : public CSimpleInterface, implements ISourceRowPrefetcher
  2097. {
  2098. Owned<ISourceRowPrefetcher> childPrefetcher;
  2099. size32_t metaSz;
  2100. public:
  2101. IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
  2102. CPrefetcher(ISourceRowPrefetcher *_childPrefetcher, size32_t _metaSz) : childPrefetcher(_childPrefetcher), metaSz(_metaSz)
  2103. {
  2104. }
  2105. virtual void readAhead(IRowDeserializerSource &in)
  2106. {
  2107. in.skip(metaSz);
  2108. childPrefetcher->readAhead(in);
  2109. }
  2110. };
  2111. public:
  2112. IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
  2113. COutputMetaWithExtra(IOutputMetaData *_meta, size32_t _metaSz) : meta(_meta), metaSz(_metaSz)
  2114. {
  2115. }
  2116. virtual size32_t getRecordSize(const void *rec)
  2117. {
  2118. size32_t sz = meta->getRecordSize(rec?((byte *)rec)+metaSz:NULL);
  2119. return sz+metaSz;
  2120. }
  2121. virtual size32_t getMinRecordSize() const
  2122. {
  2123. return meta->getMinRecordSize() + metaSz;
  2124. }
  2125. virtual size32_t getFixedSize() const
  2126. {
  2127. size32_t sz = meta->getFixedSize();
  2128. if (!sz)
  2129. return 0;
  2130. return sz+metaSz;
  2131. }
  2132. virtual void toXML(const byte * self, IXmlWriter & out) { meta->toXML(self, out); }
  2133. virtual unsigned getVersion() const { return meta->getVersion(); }
  2134. //The following can only be called if getMetaDataVersion >= 1, may seh otherwise. Creating a different interface was too painful
  2135. virtual unsigned getMetaFlags() { return meta->getMetaFlags(); }
  2136. virtual void destruct(byte * self) { meta->destruct(self); }
  2137. virtual IOutputRowSerializer * createDiskSerializer(ICodeContext * ctx, unsigned activityId)
  2138. {
  2139. if (!diskSerializer)
  2140. diskSerializer.setown(new CSerializer(meta->createDiskSerializer(ctx, activityId), metaSz));
  2141. return LINK(diskSerializer);
  2142. }
  2143. virtual IOutputRowDeserializer * createDiskDeserializer(ICodeContext * ctx, unsigned activityId)
  2144. {
  2145. if (!diskDeserializer)
  2146. diskDeserializer.setown(new CDeserializer(meta->createDiskDeserializer(ctx, activityId), metaSz));
  2147. return LINK(diskDeserializer);
  2148. }
  2149. virtual ISourceRowPrefetcher * createDiskPrefetcher(ICodeContext * ctx, unsigned activityId)
  2150. {
  2151. if (!prefetcher)
  2152. prefetcher.setown(new CPrefetcher(meta->createDiskPrefetcher(ctx, activityId), metaSz));
  2153. return LINK(prefetcher);
  2154. }
  2155. virtual IOutputMetaData * querySerializedDiskMeta()
  2156. {
  2157. IOutputMetaData *sm = meta->querySerializedDiskMeta();
  2158. if (sm==meta.get())
  2159. return this;
  2160. if (!serializedmeta.get())
  2161. serializedmeta.setown(new COutputMetaWithExtra(sm,metaSz));
  2162. return serializedmeta.get();
  2163. }
  2164. virtual IOutputRowSerializer * createInternalSerializer(ICodeContext * ctx, unsigned activityId)
  2165. {
  2166. if (!internalSerializer)
  2167. internalSerializer.setown(new CSerializer(meta->createInternalSerializer(ctx, activityId), metaSz));
  2168. return LINK(internalSerializer);
  2169. }
  2170. virtual IOutputRowDeserializer * createInternalDeserializer(ICodeContext * ctx, unsigned activityId)
  2171. {
  2172. if (!internalDeserializer)
  2173. internalDeserializer.setown(new CDeserializer(meta->createInternalDeserializer(ctx, activityId), metaSz));
  2174. return LINK(internalDeserializer);
  2175. }
  2176. virtual void walkIndirectMembers(const byte * self, IIndirectMemberVisitor & visitor)
  2177. {
  2178. meta->walkIndirectMembers(self, visitor);
  2179. }
  2180. virtual IOutputMetaData * queryChildMeta(unsigned i)
  2181. {
  2182. return meta->queryChildMeta(i);
  2183. }
  2184. };
  2185. IOutputMetaData *createOutputMetaDataWithExtra(IOutputMetaData *meta, size32_t sz)
  2186. {
  2187. return new COutputMetaWithExtra(meta, sz);
  2188. }
  2189. IPerfMonHook *createThorMemStatsPerfMonHook(IPerfMonHook *chain)
  2190. {
  2191. return LINK(chain);
  2192. }