thbuf.cpp 54 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "platform.h"
  14. #include <limits.h>
  15. #include <stddef.h>
  16. #include "jlib.hpp"
  17. #include "jmisc.hpp"
  18. #include "jio.hpp"
  19. #include "jlzw.hpp"
  20. #include "jsem.hpp"
  21. #include "jthread.hpp"
  22. #include "jarray.hpp"
  23. #include "jiface.hpp"
  24. #include "jfile.hpp"
  25. #include "jset.hpp"
  26. #include "jqueue.tpp"
  27. #include "thmem.hpp"
  28. #include "thalloc.hpp"
  29. #include "thbuf.hpp"
  30. #include "eclrtl.hpp"
  31. #include "thgraph.hpp"
  32. #ifdef _DEBUG
  33. //#define _FULL_TRACE
  34. //#define TRACE_CREATE
  35. struct SmartBufReenterCheck
  36. {
  37. bool &check;
  38. SmartBufReenterCheck(bool &_check)
  39. : check(_check)
  40. {
  41. assertex(!check);
  42. check = true;
  43. }
  44. ~SmartBufReenterCheck()
  45. {
  46. assertex(check);
  47. check = false;
  48. }
  49. };
  50. #define REENTRANCY_CHECK(n) SmartBufReenterCheck recheck(n);
  51. #else
  52. #define REENTRANCY_CHECK(n)
  53. #endif
  54. class CSmartRowBuffer: public CSimpleInterface, implements ISmartRowBuffer, implements IRowWriter
  55. {
  56. CActivityBase *activity;
  57. ThorRowQueue *in;
  58. size32_t insz;
  59. ThorRowQueue *out;
  60. Linked<IFile> file;
  61. Owned<IFileIO> fileio;
  62. SpinLock lock;
  63. bool waiting;
  64. Semaphore waitsem;
  65. bool waitflush;
  66. Semaphore waitflushsem;
  67. size32_t blocksize;
  68. unsigned numblocks;
  69. Owned<IBitSet> diskfree;
  70. UnsignedArray diskin;
  71. UnsignedArray diskinlen;
  72. Linked<IOutputRowSerializer> serializer;
  73. Linked<IOutputRowDeserializer> deserializer;
  74. Linked<IEngineRowAllocator> allocator;
  75. bool eoi;
  76. #ifdef _DEBUG
  77. bool putrecheck;
  78. bool getrecheck;
  79. #endif
  80. unsigned allocblk(unsigned nb)
  81. {
  82. if (nb<=numblocks) {
  83. for (unsigned i=0;i<=numblocks-nb;i++) {
  84. unsigned j;
  85. for (j=i;j<i+nb;j++)
  86. if (!diskfree->test(j))
  87. break;
  88. if (j==i+nb) {
  89. while (j>i)
  90. diskfree->set(--j,false);
  91. return i;
  92. }
  93. }
  94. }
  95. unsigned ret = numblocks;
  96. numblocks += nb;
  97. return ret;
  98. }
  99. void freeblk(unsigned b,unsigned nb)
  100. {
  101. for (unsigned i=0;i<nb;i++)
  102. diskfree->set(i+b,true);
  103. }
  104. void diskflush()
  105. {
  106. struct cSaveIn {
  107. ThorRowQueue *rq;
  108. ThorRowQueue *&in;
  109. cSaveIn(ThorRowQueue *&_in)
  110. : in(_in)
  111. {
  112. rq = in;
  113. in = NULL;
  114. }
  115. ~cSaveIn()
  116. {
  117. if (!in)
  118. in = rq;
  119. }
  120. } csavein(in); // mark as not there so consumer will pause
  121. if (out && (out->ordinality()==0) && (diskin.ordinality()==0)) {
  122. in = out;
  123. out = csavein.rq;
  124. insz = 0;
  125. return;
  126. }
  127. if (!fileio) {
  128. SpinUnblock unblock(lock);
  129. fileio.setown(file->open(IFOcreaterw));
  130. if (!fileio)
  131. {
  132. throw MakeStringException(-1,"CSmartRowBuffer::flush cannot write file %s",file->queryFilename());
  133. }
  134. }
  135. MemoryBuffer mb;
  136. CMemoryRowSerializer mbs(mb);
  137. unsigned nb = 0;
  138. unsigned blk = 0;
  139. if (!waiting) {
  140. mb.ensureCapacity(blocksize);
  141. {
  142. SpinUnblock unblock(lock);
  143. unsigned p = 0;
  144. byte b;
  145. for (unsigned i=0;i<csavein.rq->ordinality();i++) {
  146. if (waiting)
  147. break;
  148. const void *row = csavein.rq->item(i);
  149. if (row) {
  150. b = 1;
  151. mb.append(b);
  152. serializer->serialize(mbs,(const byte *)row);
  153. }
  154. else {
  155. b = 2; // eog
  156. mb.append(b);
  157. }
  158. }
  159. b = 0;
  160. mb.append(b);
  161. }
  162. }
  163. if (!waiting) {
  164. nb = (mb.length()+blocksize-1)/blocksize;
  165. blk = allocblk(nb);
  166. SpinUnblock unblock(lock);
  167. if (mb.length()<nb*blocksize) { // bit overkill!
  168. size32_t left = nb*blocksize-mb.length();
  169. memset(mb.reserve(left),0,left);
  170. }
  171. fileio->write(blk*(offset_t)blocksize,mb.length(),mb.bufferBase());
  172. mb.clear();
  173. }
  174. if (waiting) {
  175. // if eoi, stopped while serializing/writing, putRow may put final row to existing 'in'
  176. if (!eoi)
  177. {
  178. // consumer caught up
  179. freeblk(blk,nb);
  180. assertex(out->ordinality()==0);
  181. assertex(diskin.ordinality()==0);
  182. in = out;
  183. out = csavein.rq;
  184. }
  185. }
  186. else {
  187. diskin.append(blk);
  188. diskinlen.append(nb);
  189. while (csavein.rq->ordinality())
  190. ReleaseThorRow(csavein.rq->dequeue());
  191. }
  192. insz = 0;
  193. }
  194. void load()
  195. {
  196. ThorRowQueue *rq = out;
  197. out = NULL; // mark as not there so producer won't fill
  198. unsigned blk = diskin.item(0);
  199. unsigned nb = diskinlen.item(0);
  200. diskin.remove(0); // better as q but given reading from disk...
  201. diskinlen.remove(0);
  202. {
  203. SpinUnblock unblock(lock);
  204. MemoryAttr ma;
  205. size32_t readBlockSize = nb*blocksize;
  206. byte *buf = (byte *)ma.allocate(readBlockSize);
  207. CThorStreamDeserializerSource ds(readBlockSize,buf);
  208. assertex(fileio.get());
  209. size32_t rd = fileio->read(blk*(offset_t)blocksize,readBlockSize,buf);
  210. assertex(rd==readBlockSize);
  211. unsigned p = 0;
  212. loop {
  213. byte b;
  214. ds.read(sizeof(b),&b);
  215. if (!b)
  216. break;
  217. if (b==1) {
  218. RtlDynamicRowBuilder rowBuilder(allocator);
  219. size32_t sz = deserializer->deserialize(rowBuilder,ds);
  220. rq->enqueue(rowBuilder.finalizeRowClear(sz));
  221. }
  222. else if (b==2)
  223. rq->enqueue(NULL);
  224. }
  225. }
  226. freeblk(blk,nb);
  227. out = rq;
  228. }
  229. public:
  230. IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
  231. CSmartRowBuffer(CActivityBase *_activity, IFile *_file,size32_t bufsize,IRowInterfaces *rowif)
  232. : activity(_activity), file(_file), allocator(rowif->queryRowAllocator()), serializer(rowif->queryRowSerializer()), deserializer(rowif->queryRowDeserializer())
  233. {
  234. #ifdef _DEBUG
  235. putrecheck = false;
  236. getrecheck = false;
  237. #endif
  238. in = new ThorRowQueue;
  239. out = new ThorRowQueue;
  240. waiting = false;
  241. waitflush = false;
  242. blocksize = ((bufsize/2+0xfffff)/0x100000)*0x100000;
  243. numblocks = 0;
  244. insz = 0;
  245. eoi = false;
  246. diskfree.setown(createBitSet());
  247. #ifdef _FULL_TRACE
  248. ActPrintLog(activity, "SmartBuffer create %x",(unsigned)(memsize_t)this);
  249. #endif
  250. }
  251. ~CSmartRowBuffer()
  252. {
  253. #ifdef _FULL_TRACE
  254. ActPrintLog(activity, "SmartBuffer destroy %x",(unsigned)(memsize_t)this);
  255. #endif
  256. assertex(!waiting);
  257. assertex(!waitflush);
  258. // clear in/out contents
  259. while (in->ordinality())
  260. ReleaseThorRow(in->dequeue());
  261. delete in;
  262. while (out->ordinality())
  263. ReleaseThorRow(out->dequeue());
  264. delete out;
  265. }
  266. void putRow(const void *row)
  267. {
  268. REENTRANCY_CHECK(putrecheck)
  269. size32_t sz = thorRowMemoryFootprint(serializer, row);
  270. SpinBlock block(lock);
  271. if (eoi) {
  272. ReleaseThorRow(row);
  273. return;
  274. }
  275. assertex(in); // reentry check
  276. if (sz+insz+(in->ordinality()+2)*sizeof(byte)>blocksize) // byte extra per row + end byte
  277. diskflush();
  278. in->enqueue(row);
  279. insz += sz;
  280. if (waiting) {
  281. waitsem.signal();
  282. waiting = false;
  283. }
  284. }
  285. void stop()
  286. {
  287. #ifdef _FULL_TRACE
  288. ActPrintLog(activity, "SmartBuffer stop %x",(unsigned)(memsize_t)this);
  289. #endif
  290. SpinBlock block(lock);
  291. #ifdef _DEBUG
  292. if (waiting)
  293. {
  294. ActPrintLogEx(&activity->queryContainer(), thorlog_null, MCwarning, "CSmartRowBuffer::stop while nextRow waiting");
  295. PrintStackReport();
  296. }
  297. #endif
  298. eoi = true;
  299. while (out&&out->ordinality())
  300. ReleaseThorRow(out->dequeue());
  301. while (NULL == in)
  302. {
  303. waiting = true;
  304. SpinUnblock unblock(lock);
  305. waitsem.wait();
  306. }
  307. while (out&&out->ordinality())
  308. ReleaseThorRow(out->dequeue());
  309. while (in&&in->ordinality())
  310. ReleaseThorRow(in->dequeue());
  311. diskin.kill();
  312. if (waiting) {
  313. waitsem.signal();
  314. waiting = false;
  315. }
  316. if (waitflush) {
  317. waitflushsem.signal();
  318. waitflush = false;
  319. }
  320. }
  321. const void *nextRow()
  322. {
  323. REENTRANCY_CHECK(getrecheck)
  324. const void * ret;
  325. assertex(out);
  326. assertex(!waiting); // reentrancy checks
  327. loop {
  328. {
  329. SpinBlock block(lock);
  330. if (out->ordinality()) {
  331. ret = out->dequeue();
  332. break;
  333. }
  334. if (diskin.ordinality()) {
  335. load();
  336. ret = out->dequeue();
  337. break;
  338. }
  339. if (in) {
  340. if (in->ordinality()) {
  341. ret = in->dequeue();
  342. if (ret) {
  343. size32_t sz = thorRowMemoryFootprint(serializer, ret);
  344. assertex(insz>=sz);
  345. insz -= sz;
  346. }
  347. break;
  348. }
  349. else {
  350. if (waitflush) {
  351. waitflushsem.signal();
  352. waitflush = false;
  353. }
  354. if (eoi)
  355. return NULL;
  356. }
  357. }
  358. assertex(!waiting); // reentrancy check
  359. waiting = true;
  360. }
  361. waitsem.wait();
  362. }
  363. return ret;
  364. }
  365. void flush()
  366. {
  367. // I think flush should wait til all rows read or stopped
  368. #ifdef _FULL_TRACE
  369. ActPrintLog(activity, "SmartBuffer flush %x",(unsigned)(memsize_t)this);
  370. #endif
  371. SpinBlock block(lock);
  372. if (eoi) return;
  373. loop {
  374. assertex(in); // reentry check
  375. diskflush();
  376. eoi = true;
  377. if (waiting) {
  378. waitsem.signal();
  379. waiting = false;
  380. }
  381. if (out&&!out->ordinality()&&!diskin.ordinality()&&!in->ordinality())
  382. break;
  383. waitflush = true;
  384. SpinUnblock unblock(lock);
  385. while (!waitflushsem.wait(1000*60))
  386. ActPrintLogEx(&activity->queryContainer(), thorlog_null, MCwarning, "CSmartRowBuffer::flush stalled");
  387. }
  388. }
  389. IRowWriter *queryWriter()
  390. {
  391. return this;
  392. }
  393. };
  394. class CSmartRowInMemoryBuffer: public CSimpleInterface, implements ISmartRowBuffer, implements IRowWriter
  395. {
  396. // NB must *not* call LinkThorRow or ReleaseThorRow (or Owned*ThorRow) if deallocator set
  397. CActivityBase *activity;
  398. IRowInterfaces *rowIf;
  399. ThorRowQueue *in;
  400. size32_t insz;
  401. SpinLock lock;
  402. bool waitingin;
  403. Semaphore waitinsem;
  404. bool waitingout;
  405. Semaphore waitoutsem;
  406. size32_t blocksize;
  407. bool eoi;
  408. #ifdef _DEBUG
  409. bool putrecheck;
  410. bool getrecheck;
  411. #endif
  412. public:
  413. IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
  414. CSmartRowInMemoryBuffer(CActivityBase *_activity, IRowInterfaces *_rowIf, size32_t bufsize)
  415. : activity(_activity), rowIf(_rowIf)
  416. {
  417. #ifdef _DEBUG
  418. putrecheck = false;
  419. getrecheck = false;
  420. #endif
  421. in = new ThorRowQueue;
  422. waitingin = false;
  423. waitingout = false;
  424. blocksize = ((bufsize/2+0xfffff)/0x100000)*0x100000;
  425. insz = 0;
  426. eoi = false;
  427. }
  428. ~CSmartRowInMemoryBuffer()
  429. {
  430. // clear in contents
  431. while (in->ordinality())
  432. ReleaseThorRow(in->dequeue());
  433. delete in;
  434. }
  435. void putRow(const void *row)
  436. {
  437. REENTRANCY_CHECK(putrecheck)
  438. size32_t sz = 0;
  439. if (row) {
  440. sz = thorRowMemoryFootprint(rowIf->queryRowSerializer(), row);
  441. #ifdef _DEBUG
  442. assertex(sz<0x1000000);
  443. #endif
  444. assertex((int)sz>=0); // trap invalid sizes a bit earlier
  445. #ifdef _TRACE_SMART_PUTGET
  446. ActPrintLog(activity, "***putRow(%x) %d {%x}",(unsigned)row,sz,*(const unsigned *)row);
  447. #endif
  448. }
  449. {
  450. SpinBlock block(lock);
  451. if (!eoi) {
  452. assertex(in); // reentry check
  453. while ((sz+insz>blocksize)&&insz) {
  454. waitingin = true;
  455. {
  456. SpinUnblock unblock(lock);
  457. waitinsem.wait();
  458. }
  459. if (eoi)
  460. break;
  461. }
  462. }
  463. if (!eoi) {
  464. in->enqueue(row);
  465. insz += sz;
  466. #ifdef _TRACE_SMART_PUTGET
  467. ActPrintLog(activity, "***putRow2(%x) insize = %d ",insz);
  468. #endif
  469. if (waitingout) {
  470. waitoutsem.signal();
  471. waitingout = false;
  472. }
  473. return;
  474. }
  475. }
  476. // cancelled
  477. ReleaseThorRow(row);
  478. }
  479. const void *nextRow()
  480. {
  481. REENTRANCY_CHECK(getrecheck)
  482. const void * ret;
  483. SpinBlock block(lock);
  484. assertex(!waitingout); // reentrancy checks
  485. loop {
  486. if (in->ordinality()) {
  487. ret = in->dequeue();
  488. if (ret) {
  489. size32_t sz = thorRowMemoryFootprint(rowIf->queryRowSerializer(), ret);
  490. #ifdef _TRACE_SMART_PUTGET
  491. ActPrintLog(activity, "***dequeueRow(%x) %d insize=%d {%x}",(unsigned)ret,sz,insz,*(const unsigned *)ret);
  492. #endif
  493. if (insz<sz) {
  494. ActPrintLog(activity, "CSmartRowInMemoryBuffer::nextRow(%x) %d insize=%d {%x} ord = %d",(unsigned)(memsize_t)ret,sz,insz,*(const unsigned *)ret,in->ordinality());
  495. assertex(insz>=sz);
  496. }
  497. insz -= sz;
  498. }
  499. break;
  500. }
  501. else if (eoi) {
  502. ret = NULL;
  503. break;
  504. }
  505. assertex(!waitingout); // reentrancy check
  506. waitingout = true;
  507. SpinUnblock unblock(lock);
  508. waitoutsem.wait();
  509. }
  510. if (waitingin) {
  511. waitinsem.signal();
  512. waitingin = false;
  513. }
  514. return ret;
  515. }
  516. void stop()
  517. {
  518. #ifdef _FULL_TRACE
  519. ActPrintLog(activity, "CSmartRowInMemoryBuffer stop %x",(unsigned)(memsize_t)this);
  520. #endif
  521. SpinBlock block(lock);
  522. #ifdef _DEBUG
  523. if (waitingout) {
  524. ActPrintLogEx(&activity->queryContainer(), thorlog_null, MCwarning, "CSmartRowInMemoryBuffer::stop while nextRow waiting");
  525. PrintStackReport();
  526. }
  527. #endif
  528. eoi = true;
  529. while (in->ordinality())
  530. ReleaseThorRow(in->dequeue());
  531. in->clear();
  532. if (waitingin) {
  533. waitinsem.signal();
  534. waitingin = false;
  535. }
  536. }
  537. // offset_t getPosition()
  538. // {
  539. // SpinBlock block(lock);
  540. // return pos;
  541. // }
  542. void flush()
  543. {
  544. // I think flush should wait til all rows read
  545. SpinBlock block(lock);
  546. eoi = true;
  547. loop {
  548. assertex(in); // reentry check
  549. if (waitingout) {
  550. waitoutsem.signal();
  551. waitingout = false;
  552. }
  553. if (!in->ordinality())
  554. break;
  555. waitingin = true;
  556. SpinUnblock unblock(lock);
  557. while (!waitinsem.wait(1000*60))
  558. ActPrintLogEx(&activity->queryContainer(), thorlog_null, MCwarning, "CSmartRowInMemoryBuffer::flush stalled");
  559. }
  560. }
  561. IRowWriter *queryWriter()
  562. {
  563. return this;
  564. }
  565. };
  566. ISmartRowBuffer * createSmartBuffer(CActivityBase *activity, const char * tempname, size32_t buffsize, IRowInterfaces *rowif)
  567. {
  568. Owned<IFile> file = createIFile(tempname);
  569. return new CSmartRowBuffer(activity,file,buffsize,rowif);
  570. }
  571. ISmartRowBuffer * createSmartInMemoryBuffer(CActivityBase *activity, IRowInterfaces *rowIf, size32_t buffsize)
  572. {
  573. return new CSmartRowInMemoryBuffer(activity, rowIf, buffsize);
  574. }
  575. class COverflowableBuffer : public CSimpleInterface, implements IRowWriterMultiReader
  576. {
  577. CActivityBase &activity;
  578. IRowInterfaces *rowIf;
  579. Owned<IThorRowCollector> collector;
  580. Owned<IRowWriter> writer;
  581. bool eoi, grouped, shared;
  582. public:
  583. IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
  584. COverflowableBuffer(CActivityBase &_activity, IRowInterfaces *_rowIf, bool _grouped, bool _shared, unsigned spillPriority)
  585. : activity(_activity), rowIf(_rowIf), grouped(_grouped), shared(_shared)
  586. {
  587. collector.setown(createThorRowCollector(activity, rowIf, NULL, stableSort_none, rc_mixed, spillPriority, grouped));
  588. writer.setown(collector->getWriter());
  589. eoi = false;
  590. }
  591. ~COverflowableBuffer()
  592. {
  593. writer.clear();
  594. collector.clear();
  595. }
  596. // IRowWriterMultiReader
  597. virtual IRowStream *getReader()
  598. {
  599. flush();
  600. return collector->getStream(shared);
  601. }
  602. // IRowWriter
  603. virtual void putRow(const void *row)
  604. {
  605. assertex(!eoi);
  606. writer->putRow(row);
  607. }
  608. virtual void flush()
  609. {
  610. eoi = true;
  611. }
  612. };
  613. IRowWriterMultiReader *createOverflowableBuffer(CActivityBase &activity, IRowInterfaces *rowIf, bool grouped, bool shared, unsigned spillPriority)
  614. {
  615. return new COverflowableBuffer(activity, rowIf, grouped, shared, spillPriority);
  616. }
  617. #define VALIDATEEQ(LHS, RHS) if ((LHS)!=(RHS)) { StringBuffer s("FAIL(EQ) - LHS="); s.append(LHS).append(", RHS=").append(RHS); PROGLOG("%s", s.str()); throwUnexpected(); }
  618. #define VALIDATELT(LHS, RHS) if ((LHS)>=(RHS)) { StringBuffer s("FAIL(LT) - LHS="); s.append(LHS).append(", RHS=").append(RHS); PROGLOG("%s", s.str()); throwUnexpected(); }
  619. //#define TRACE_WRITEAHEAD
  620. class CRowSet : public CSimpleInterface
  621. {
  622. unsigned chunk;
  623. CThorExpandingRowArray rows;
  624. public:
  625. CRowSet(CActivityBase &activity, unsigned _chunk) : rows(activity, &activity, true), chunk(_chunk)
  626. {
  627. }
  628. void reset(unsigned _chunk)
  629. {
  630. chunk = _chunk;
  631. rows.kill();
  632. }
  633. inline unsigned queryChunk() const { return chunk; }
  634. inline unsigned getRowCount() const { return rows.ordinality(); }
  635. inline void addRow(const void *row) { rows.append(row); }
  636. inline const void *getRow(unsigned r)
  637. {
  638. return rows.get(r);
  639. }
  640. };
  641. class Chunk : public CInterface
  642. {
  643. public:
  644. offset_t offset;
  645. size32_t size;
  646. Chunk(offset_t _offset, size_t _size) : offset(_offset), size(_size) { }
  647. Chunk(const Chunk &other) : offset(other.offset), size(other.size) { }
  648. bool operator==(Chunk const &other) const { return size==other.size && offset==other.offset; }
  649. Chunk &operator=(Chunk const &other) { offset = other.offset; size = other.size; return *this; }
  650. offset_t endOffset() { return offset+size; }
  651. };
  652. typedef int (*ChunkCompareFunc)(Chunk *, Chunk *);
  653. int chunkOffsetCompare(Chunk *lhs, Chunk *rhs)
  654. {
  655. if (lhs->offset>rhs->offset)
  656. return 1;
  657. else if (lhs->offset<rhs->offset)
  658. return -1;
  659. return 0;
  660. }
  661. int chunkSizeCompare(Chunk **_lhs, Chunk **_rhs)
  662. {
  663. Chunk *lhs = *(Chunk **)_lhs;
  664. Chunk *rhs = *(Chunk **)_rhs;
  665. return (int)lhs->size - (int)rhs->size;
  666. }
  667. int chunkSizeCompare2(Chunk *lhs, Chunk *rhs)
  668. {
  669. return (int)lhs->size - (int)rhs->size;
  670. }
  671. class CSharedWriteAheadBase : public CSimpleInterface, implements ISharedSmartBuffer
  672. {
  673. size32_t totalOutChunkSize;
  674. bool writeAtEof;
  675. rowcount_t rowsWritten;
  676. IArrayOf<IRowStream> outputs;
  677. unsigned readersWaiting;
  678. virtual void init()
  679. {
  680. stopped = false;
  681. writeAtEof = false;
  682. rowsWritten = 0;
  683. readersWaiting = 0;
  684. totalChunksOut = lowestChunk = 0;
  685. lowestOutput = 0;
  686. #ifdef TRACE_WRITEAHEAD
  687. totalOutChunkSize = sizeof(unsigned);
  688. #else
  689. totalOutChunkSize = 0;
  690. #endif
  691. }
  692. inline bool isEof(rowcount_t rowsRead)
  693. {
  694. return stopped || (writeAtEof && rowsWritten == rowsRead);
  695. }
  696. inline void signalReaders()
  697. {
  698. if (readersWaiting)
  699. {
  700. #ifdef TRACE_WRITEAHEAD
  701. ActPrintLogEx(&activity->queryContainer(), thorlog_all, MCdebugProgress, "signalReaders: %d", readersWaiting);
  702. #endif
  703. readersWaiting = 0;
  704. ForEachItemIn(o, outputs)
  705. queryCOutput(o).wakeRead(); // if necessary
  706. }
  707. }
  708. inline const rowcount_t &readerWait(const rowcount_t &rowsRead)
  709. {
  710. ++readersWaiting;
  711. return rowsWritten;
  712. }
  713. CRowSet *loadMore(unsigned output)
  714. {
  715. // needs to be called in crit
  716. Linked<CRowSet> rowSet;
  717. unsigned currentChunkNum = queryCOutput(output).currentChunkNum;
  718. if (currentChunkNum == totalChunksOut)
  719. {
  720. #ifdef TRACE_WRITEAHEAD
  721. PROGLOG("output=%d, chunk=%d INMEM", output, currentChunkNum);
  722. #endif
  723. rowSet.set(inMemRows);
  724. }
  725. else
  726. {
  727. unsigned whichChunk = queryCOutput(output).currentChunkNum - lowestChunk;
  728. #ifdef TRACE_WRITEAHEAD
  729. PROGLOG("output=%d, chunk=%d (whichChunk=%d)", output, currentChunkNum, whichChunk);
  730. #endif
  731. rowSet.setown(readRows(output, whichChunk));
  732. assertex(rowSet);
  733. }
  734. VALIDATEEQ(currentChunkNum, rowSet->queryChunk());
  735. advanceNextReadChunk(output);
  736. return rowSet.getClear();
  737. }
  738. protected:
  739. class COutput : public CSimpleInterface, implements IRowStream
  740. {
  741. CSharedWriteAheadBase &parent;
  742. unsigned output;
  743. rowcount_t rowsRead;
  744. bool eof, readerWaiting;
  745. Semaphore readWaitSem;
  746. Owned<CRowSet> outputOwnedRows;
  747. CRowSet *rowSet;
  748. unsigned row, rowsInRowSet;
  749. void init()
  750. {
  751. rowsRead = 0;
  752. currentChunkNum = 0;
  753. rowsInRowSet = row = 0;
  754. readerWaiting = eof = false;
  755. rowSet = NULL;
  756. }
  757. inline void doStop()
  758. {
  759. if (eof) return;
  760. eof = true;
  761. parent.stopOutput(output);
  762. }
  763. inline const rowcount_t readerWait(const rowcount_t &rowsRead)
  764. {
  765. if (rowsRead == parent.rowsWritten)
  766. {
  767. if (parent.stopped || parent.writeAtEof)
  768. return 0;
  769. readerWaiting = true;
  770. #ifdef TRACE_WRITEAHEAD
  771. ActPrintLogEx(&parent.activity->queryContainer(), thorlog_all, MCdebugProgress, "readerWait(%d)", output);
  772. #endif
  773. const rowcount_t &rowsWritten = parent.readerWait(rowsRead);
  774. {
  775. CriticalUnblock b(parent.crit);
  776. readWaitSem.wait();
  777. }
  778. if (parent.isEof(rowsRead))
  779. return 0;
  780. }
  781. return parent.rowsWritten;
  782. }
  783. inline void wakeRead()
  784. {
  785. if (readerWaiting)
  786. {
  787. readerWaiting = false;
  788. readWaitSem.signal();
  789. }
  790. }
  791. public:
  792. unsigned currentChunkNum;
  793. public:
  794. IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
  795. COutput(CSharedWriteAheadBase &_parent, unsigned _output) : parent(_parent), output(_output)
  796. {
  797. init();
  798. }
  799. void reset()
  800. {
  801. init();
  802. outputOwnedRows.clear();
  803. }
  804. inline CRowSet *queryRowSet() { return rowSet; }
  805. const void *nextRow()
  806. {
  807. if (eof)
  808. return NULL;
  809. if (row == rowsInRowSet)
  810. {
  811. CriticalBlock b(parent.crit);
  812. if (!rowSet || (row == (rowsInRowSet = rowSet->getRowCount())))
  813. {
  814. rowcount_t totalRows = readerWait(rowsRead);
  815. if (0 == totalRows)
  816. {
  817. doStop();
  818. return NULL;
  819. }
  820. if (!rowSet || (row == (rowsInRowSet = rowSet->getRowCount()))) // maybe have caught up in same rowSet
  821. {
  822. Owned<CRowSet> newRows = parent.loadMore(output);
  823. if (rowSet)
  824. VALIDATEEQ(newRows->queryChunk(), rowSet->queryChunk()+1);
  825. outputOwnedRows.setown(newRows.getClear());
  826. rowSet = outputOwnedRows.get();
  827. rowsInRowSet = rowSet->getRowCount();
  828. row = 0;
  829. }
  830. }
  831. }
  832. rowsRead++;
  833. SpinBlock b(parent.spin);
  834. const void *retrow = rowSet->getRow(row++);
  835. return retrow;
  836. }
  837. virtual void stop()
  838. {
  839. CriticalBlock b(parent.crit);
  840. doStop();
  841. }
  842. friend class CSharedWriteAheadBase;
  843. };
  844. CActivityBase *activity;
  845. size32_t minChunkSize;
  846. unsigned lowestChunk, lowestOutput, outputCount, totalChunksOut;
  847. bool stopped;
  848. Owned<CRowSet> inMemRows;
  849. CriticalSection crit;
  850. SpinLock spin;
  851. Linked<IOutputMetaData> meta;
  852. inline COutput &queryCOutput(unsigned i) { return (COutput &) outputs.item(i); }
  853. inline unsigned getLowest()
  854. {
  855. unsigned o=0;
  856. offset_t lsf = (unsigned)-1;
  857. unsigned lsfOutput = (unsigned)-1;
  858. loop
  859. {
  860. if (queryCOutput(o).currentChunkNum < lsf)
  861. {
  862. lsf = queryCOutput(o).currentChunkNum;
  863. lsfOutput = o;
  864. }
  865. if (++o == outputCount)
  866. break;
  867. }
  868. return lsfOutput;
  869. }
  870. void advanceNextReadChunk(unsigned output)
  871. {
  872. unsigned &currentChunkNum = queryCOutput(output).currentChunkNum;
  873. unsigned prevChunkNum = currentChunkNum;
  874. currentChunkNum++;
  875. if (output == lowestOutput) // might be joint lowest
  876. {
  877. lowestOutput = getLowest();
  878. if (currentChunkNum == queryCOutput(lowestOutput).currentChunkNum) // has new page moved up to join new lowest, in which case it was last.
  879. {
  880. if (prevChunkNum != totalChunksOut) // if equal, then prevOffset was mem page and not in pool/disk to free
  881. freeOffsetChunk(prevChunkNum);
  882. else
  883. {
  884. VALIDATEEQ(prevChunkNum, inMemRows->queryChunk()); // validate is current in mem page
  885. }
  886. ++lowestChunk;
  887. #ifdef TRACE_WRITEAHEAD
  888. ActPrintLogEx(&activity->queryContainer(), thorlog_all, MCdebugProgress, "aNRP: %d, lowestChunk=%d, totalChunksOut=%d", prevChunkNum, lowestChunk, totalChunksOut);
  889. #endif
  890. }
  891. }
  892. }
  893. void stopOutput(unsigned output)
  894. {
  895. // mark stopped output with very high page (as if read all), will not be picked again
  896. unsigned lowestA = getLowest();
  897. if (output == lowestA)
  898. {
  899. unsigned chunkOrig = queryCOutput(output).currentChunkNum;
  900. queryCOutput(output).currentChunkNum = (unsigned)-1;
  901. unsigned lO = getLowest(); // get next lowest
  902. queryCOutput(output).currentChunkNum = chunkOrig;
  903. if (((unsigned)-1) == lO) // if last to stop
  904. {
  905. markStop();
  906. return;
  907. }
  908. #ifdef TRACE_WRITEAHEAD
  909. ActPrintLogEx(&activity->queryContainer(), thorlog_all, MCdebugProgress, "Input %d stopped, forcing catchup to free pages", output);
  910. #endif
  911. while (queryCOutput(output).currentChunkNum < queryCOutput(lO).currentChunkNum)
  912. {
  913. Owned<CRowSet> tmpChunk = loadMore(output); // discard
  914. }
  915. #ifdef TRACE_WRITEAHEAD
  916. ActPrintLog(activity, "Input %d stopped. Caught up", output);
  917. #endif
  918. queryCOutput(output).currentChunkNum = (unsigned)-1;
  919. lowestOutput = getLowest();
  920. }
  921. else
  922. queryCOutput(output).currentChunkNum = (unsigned)-1;
  923. }
  924. void stopAll()
  925. {
  926. CriticalBlock b(crit);
  927. markStop();
  928. }
  929. virtual void markStop()
  930. {
  931. if (stopped) return;
  932. stopped = true;
  933. }
  934. virtual void freeOffsetChunk(unsigned chunk) = 0;
  935. virtual CRowSet *readRows(unsigned output, unsigned chunk) = 0;
  936. virtual void flushRows() = 0;
  937. virtual size32_t rowSize(const void *row) = 0;
  938. public:
  939. IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
  940. CSharedWriteAheadBase(CActivityBase *_activity, unsigned _outputCount, IRowInterfaces *rowIf) : activity(_activity), outputCount(_outputCount), meta(rowIf->queryRowMetaData())
  941. {
  942. init();
  943. minChunkSize = 0x2000;
  944. size32_t minSize = meta->getMinRecordSize();
  945. if (minChunkSize<minSize*10) // if rec minSize bigish, ensure reasonable minChunkSize
  946. {
  947. minChunkSize = minSize*10;
  948. if (minChunkSize > 0x10000)
  949. minChunkSize += 2*(minSize+1);
  950. }
  951. outputCount = _outputCount;
  952. unsigned c=0;
  953. for (; c<outputCount; c++)
  954. {
  955. outputs.append(* new COutput(*this, c));
  956. }
  957. inMemRows.setown(new CRowSet(*activity, 0));
  958. }
  959. ~CSharedWriteAheadBase()
  960. {
  961. ForEachItemIn(o, outputs)
  962. {
  963. COutput &output = queryCOutput(o);
  964. output.outputOwnedRows.clear();
  965. }
  966. inMemRows.clear();
  967. }
  968. unsigned anyReaderBehind()
  969. {
  970. unsigned reader=0;
  971. loop
  972. {
  973. if (reader>=outputCount) // so all waiting, don't need in mem page.
  974. break;
  975. if (!queryCOutput(reader).eof && (queryCOutput(reader).rowSet != inMemRows.get())) // if output using this in mem page, then it has linked already, need to find one that is not (i.e. behind)
  976. return reader;
  977. ++reader;
  978. }
  979. return NotFound;
  980. }
  981. // ISharedSmartBuffer
  982. virtual void putRow(const void *row)
  983. {
  984. if (stopped)
  985. {
  986. ReleaseThorRow(row);
  987. return;
  988. }
  989. unsigned len=rowSize(row);
  990. CriticalBlock b(crit);
  991. if (totalOutChunkSize >= minChunkSize) // chunks required to be at least minChunkSize
  992. {
  993. unsigned reader=anyReaderBehind();
  994. if (NotFound != reader)
  995. flushRows();
  996. inMemRows.setown(new CRowSet(*activity, ++totalChunksOut));
  997. #ifdef TRACE_WRITEAHEAD
  998. totalOutChunkSize = sizeof(unsigned);
  999. #else
  1000. totalOutChunkSize = 0;
  1001. #endif
  1002. }
  1003. {
  1004. SpinBlock b(spin);
  1005. inMemRows->addRow(row);
  1006. }
  1007. totalOutChunkSize += len;
  1008. rowsWritten++; // including NULLs(eogs)
  1009. signalReaders();
  1010. }
  1011. virtual void flush()
  1012. {
  1013. CriticalBlock b(crit);
  1014. writeAtEof = true;
  1015. signalReaders();
  1016. }
  1017. virtual offset_t getPosition()
  1018. {
  1019. throwUnexpected();
  1020. return 0;
  1021. }
  1022. virtual IRowStream *queryOutput(unsigned output)
  1023. {
  1024. return &outputs.item(output);
  1025. }
  1026. virtual void cancel()
  1027. {
  1028. CriticalBlock b(crit);
  1029. stopAll();
  1030. signalReaders();
  1031. }
  1032. virtual void reset()
  1033. {
  1034. init();
  1035. unsigned c=0;
  1036. for (; c<outputCount; c++)
  1037. queryCOutput(c).reset();
  1038. inMemRows->reset(0);
  1039. }
  1040. friend class COutput;
  1041. };
  1042. class CSharedWriteAheadDisk : public CSharedWriteAheadBase
  1043. {
  1044. IDiskUsage *iDiskUsage;
  1045. Owned<IFile> spillFile;
  1046. Owned<IFileIO> spillFileIO;
  1047. CIArrayOf<Chunk> freeChunks;
  1048. PointerArrayOf<Chunk> freeChunksSized;
  1049. QueueOf<Chunk, false> savedChunks;
  1050. offset_t highOffset;
  1051. Linked<IEngineRowAllocator> allocator;
  1052. Linked<IOutputRowSerializer> serializer;
  1053. Linked<IOutputRowDeserializer> deserializer;
  1054. IOutputMetaData *serializeMeta;
  1055. struct AddRemoveFreeChunk
  1056. {
  1057. PointerArrayOf<Chunk> &chunkArray;
  1058. Chunk *chunk;
  1059. AddRemoveFreeChunk(PointerArrayOf<Chunk> &_chunkArray, Chunk *_chunk) : chunkArray(_chunkArray), chunk(_chunk)
  1060. {
  1061. chunkArray.zap(chunk);
  1062. }
  1063. ~AddRemoveFreeChunk()
  1064. {
  1065. bool isNew;
  1066. aindex_t sizePos = chunkArray.bAdd(chunk, chunkSizeCompare, isNew);
  1067. if (!isNew) // might match size of another block
  1068. chunkArray.add(chunk, sizePos);
  1069. }
  1070. };
  1071. unsigned findNext(Chunk **base, unsigned arrCount, Chunk *searchChunk, ChunkCompareFunc compareFunc, bool &matched)
  1072. {
  1073. unsigned middle;
  1074. unsigned left = 0;
  1075. int result;
  1076. Chunk *CurEntry;
  1077. unsigned right = arrCount-1;
  1078. /*
  1079. * Loop until we've narrowed down the search range to one or two
  1080. * items. This ensures that middle != 0, preventing 'right' from wrapping.
  1081. */
  1082. while (right - left > 1)
  1083. {
  1084. /* Calculate the middle entry in the array - NOT the middle offset */
  1085. middle = (right + left) >> 1;
  1086. CurEntry = base[middle];
  1087. result = compareFunc(searchChunk, CurEntry);
  1088. if (result < 0)
  1089. right = middle - 1;
  1090. else if (result > 0)
  1091. left = middle + 1;
  1092. else
  1093. {
  1094. matched = true;
  1095. return middle;
  1096. }
  1097. }
  1098. middle = left;
  1099. /*
  1100. * The range has been narrowed down to 1 or 2 items.
  1101. * Perform an optimal check on these last two elements.
  1102. */
  1103. result = compareFunc(searchChunk, base[middle]);
  1104. if (0 == result)
  1105. matched = true;
  1106. else if (result > 0)
  1107. {
  1108. ++middle;
  1109. if (right == left + 1)
  1110. {
  1111. result = compareFunc(searchChunk, base[middle]);
  1112. if (0 == result)
  1113. matched = true;
  1114. else
  1115. {
  1116. matched = false;
  1117. if (result > 0)
  1118. ++middle;
  1119. }
  1120. }
  1121. else
  1122. matched = false;
  1123. }
  1124. else
  1125. matched = false;
  1126. if (middle < arrCount)
  1127. return middle;
  1128. return NotFound;
  1129. }
  1130. inline Chunk *getOutOffset(size32_t required)
  1131. {
  1132. unsigned numFreeChunks = freeChunks.ordinality();
  1133. if (numFreeChunks)
  1134. {
  1135. Chunk searchChunk(0, required);
  1136. bool matched;
  1137. unsigned nextPos = findNext(freeChunksSized.getArray(), freeChunksSized.ordinality(), &searchChunk, chunkSizeCompare2, matched);
  1138. if (NotFound != nextPos)
  1139. {
  1140. Linked<Chunk> nextChunk = freeChunksSized.item(nextPos);
  1141. if (nextChunk->size-required >= minChunkSize)
  1142. {
  1143. Owned<Chunk> chunk = new Chunk(nextChunk->offset, required);
  1144. decFreeChunk(nextChunk, required);
  1145. #ifdef TRACE_WRITEAHEAD
  1146. ActPrintLogEx(&activity->queryContainer(), thorlog_all, MCdebugProgress, "getOutOffset: got [free] offset = %"I64F"d, size=%d, but took required=%d", nextChunk->offset, nextChunk->size+required, required);
  1147. #endif
  1148. return chunk.getClear();
  1149. }
  1150. #ifdef TRACE_WRITEAHEAD
  1151. ActPrintLogEx(&activity->queryContainer(), thorlog_all, MCdebugProgress, "getOutOffset: got [free] offset = %"I64F"d, size=%d", diskChunk.offset, diskChunk.size);
  1152. #endif
  1153. freeChunksSized.remove(nextPos);
  1154. freeChunks.zap(*nextChunk);
  1155. return nextChunk.getClear(); // if smaller than minChunkSize, return all, don't leave useless fragment
  1156. }
  1157. }
  1158. #ifdef TRACE_WRITEAHEAD
  1159. ActPrintLog(activity, "getOutOffset: got new upper offset # = %"I64F"d", highOffset);
  1160. #endif
  1161. Chunk *chunk = new Chunk(highOffset, required);
  1162. if (iDiskUsage)
  1163. {
  1164. iDiskUsage->decrease(highOffset);
  1165. highOffset += required; // NB next
  1166. iDiskUsage->increase(highOffset);
  1167. }
  1168. else
  1169. highOffset += required; // NB next
  1170. return chunk;
  1171. }
  1172. inline void mergeFreeChunk(Chunk *dst, const Chunk *src)
  1173. {
  1174. AddRemoveFreeChunk ar(freeChunksSized, dst);
  1175. dst->offset = src->offset;
  1176. dst->size += src->size;
  1177. }
  1178. inline void decFreeChunk(Chunk *chunk, size32_t sz)
  1179. {
  1180. AddRemoveFreeChunk ar(freeChunksSized, chunk);
  1181. chunk->size -= sz;
  1182. chunk->offset += sz;
  1183. }
  1184. inline void incFreeChunk(Chunk *chunk, size32_t sz)
  1185. {
  1186. AddRemoveFreeChunk ar(freeChunksSized, chunk);
  1187. chunk->size += sz;
  1188. }
  1189. inline void addFreeChunk(Chunk *freeChunk, unsigned pos=NotFound)
  1190. {
  1191. bool isNew;
  1192. aindex_t sizePos = freeChunksSized.bAdd(freeChunk, chunkSizeCompare, isNew);
  1193. if (!isNew)
  1194. freeChunksSized.add(freeChunk, sizePos);
  1195. if (NotFound == pos)
  1196. freeChunks.append(*LINK(freeChunk));
  1197. else
  1198. freeChunks.add(*LINK(freeChunk), pos);
  1199. }
  1200. virtual void freeOffsetChunk(unsigned chunk)
  1201. {
  1202. // chunk(unused here) is nominal sequential page #, savedChunks is page # in diskfile
  1203. assertex(savedChunks.ordinality());
  1204. Owned<Chunk> freeChunk = savedChunks.dequeue();
  1205. unsigned nmemb = freeChunks.ordinality();
  1206. if (0 == nmemb)
  1207. addFreeChunk(freeChunk);
  1208. else
  1209. {
  1210. bool matched;
  1211. unsigned nextPos = findNext(freeChunks.getArray(), freeChunks.ordinality(), freeChunk, chunkOffsetCompare, matched);
  1212. assertex(!matched); // should never happen, have found a chunk with same offset as one being freed.
  1213. if (NotFound != nextPos)
  1214. {
  1215. Chunk *nextChunk = &freeChunks.item(nextPos);
  1216. if (freeChunk->endOffset() == nextChunk->offset)
  1217. mergeFreeChunk(nextChunk, freeChunk);
  1218. else if (nextPos > 0)
  1219. {
  1220. Chunk *prevChunk = &freeChunks.item(nextPos-1);
  1221. if (prevChunk->endOffset() == freeChunk->offset)
  1222. incFreeChunk(prevChunk, freeChunk->size);
  1223. else
  1224. addFreeChunk(freeChunk, nextPos);
  1225. }
  1226. else
  1227. addFreeChunk(freeChunk, nextPos);
  1228. }
  1229. else
  1230. addFreeChunk(freeChunk);
  1231. }
  1232. #ifdef TRACE_WRITEAHEAD
  1233. ActPrintLogEx(&activity->queryContainer(), thorlog_all, MCdebugProgress, "Added chunk, offset %"I64F"d size=%d to freeChunks", freeChunk->offset, freeChunk->size);
  1234. #endif
  1235. }
  1236. virtual CRowSet *readRows(unsigned output, unsigned whichChunk)
  1237. {
  1238. assertex(savedChunks.ordinality() > whichChunk);
  1239. unsigned currentChunkNum = queryCOutput(output).currentChunkNum;
  1240. unsigned o=0;
  1241. for (; o<outputCount; o++)
  1242. {
  1243. if (o == output) continue;
  1244. COutput &coutput = queryCOutput(o);
  1245. if (coutput.queryRowSet() && (coutput.queryRowSet()->queryChunk() == currentChunkNum))
  1246. {
  1247. #ifdef TRACE_WRITEAHEAD
  1248. ActPrintLogEx(&activity->queryContainer(), thorlog_all, MCdebugProgress, "Output: %d, readRows found other output %d with matching offset: %d", output, o, currentChunkNum);
  1249. #endif
  1250. return LINK(coutput.queryRowSet());
  1251. }
  1252. }
  1253. Chunk &chunk = *savedChunks.item(whichChunk);
  1254. Owned<ISerialStream> stream = createFileSerialStream(spillFileIO, chunk.offset);
  1255. #ifdef TRACE_WRITEAHEAD
  1256. unsigned diskChunkNum;
  1257. stream->get(sizeof(diskChunkNum), &diskChunkNum);
  1258. VALIDATEEQ(diskChunkNum, currentChunkNum);
  1259. #endif
  1260. CThorStreamDeserializerSource ds(stream);
  1261. Owned<CRowSet> rowSet = new CRowSet(*activity, currentChunkNum);
  1262. loop
  1263. {
  1264. byte b;
  1265. ds.read(sizeof(b),&b);
  1266. if (!b)
  1267. break;
  1268. if (1==b)
  1269. {
  1270. RtlDynamicRowBuilder rowBuilder(allocator);
  1271. size32_t sz = deserializer->deserialize(rowBuilder, ds);
  1272. rowSet->addRow(rowBuilder.finalizeRowClear(sz));
  1273. }
  1274. else if (2==b)
  1275. rowSet->addRow(NULL);
  1276. }
  1277. return rowSet.getClear();
  1278. }
  1279. virtual void flushRows()
  1280. {
  1281. // NB: called in crit
  1282. MemoryBuffer mb;
  1283. mb.ensureCapacity(minChunkSize); // starting size/could be more if variable and bigger
  1284. #ifdef TRACE_WRITEAHEAD
  1285. mb.append(inMemRows->queryChunk()); // for debug purposes only
  1286. #endif
  1287. CMemoryRowSerializer mbs(mb);
  1288. unsigned r=0;
  1289. for (;r<inMemRows->getRowCount();r++)
  1290. {
  1291. OwnedConstThorRow row = inMemRows->getRow(r);
  1292. if (row)
  1293. {
  1294. mb.append((byte)1);
  1295. serializer->serialize(mbs,(const byte *)row.get());
  1296. }
  1297. else
  1298. mb.append((byte)2); // eog
  1299. }
  1300. mb.append((byte)0);
  1301. size32_t len = mb.length();
  1302. Owned<Chunk> freeChunk = getOutOffset(len); // will find space for 'len', might be bigger if from free list
  1303. spillFileIO->write(freeChunk->offset, len, mb.toByteArray());
  1304. savedChunks.enqueue(freeChunk.getClear());
  1305. #ifdef TRACE_WRITEAHEAD
  1306. ActPrintLogEx(&activity->queryContainer(), thorlog_all, MCdebugProgress, "Flushed chunk = %d (savedChunks pos=%d), writeOffset = %"I64F"d, writeSize = %d", inMemRows->queryChunk(), savedChunks.ordinality()-1, freeChunk->offset, len);
  1307. #endif
  1308. }
  1309. virtual size32_t rowSize(const void *row)
  1310. {
  1311. if (!row)
  1312. return 1; // eog;
  1313. else if (meta == serializeMeta)
  1314. return meta->getRecordSize(row)+1; // space on disk, +1 = eog marker
  1315. CSizingSerializer ssz;
  1316. serializer->serialize(ssz,(const byte *)row);
  1317. return ssz.size()+1; // space on disk, +1 = eog marker
  1318. }
  1319. public:
  1320. CSharedWriteAheadDisk(CActivityBase *activity, const char *spillName, unsigned outputCount, IRowInterfaces *rowIf, IDiskUsage *_iDiskUsage) : CSharedWriteAheadBase(activity, outputCount, rowIf),
  1321. allocator(rowIf->queryRowAllocator()), serializer(rowIf->queryRowSerializer()), deserializer(rowIf->queryRowDeserializer()), serializeMeta(meta->querySerializedDiskMeta()), iDiskUsage(_iDiskUsage)
  1322. {
  1323. assertex(spillName);
  1324. spillFile.setown(createIFile(spillName));
  1325. spillFile->setShareMode(IFSHnone);
  1326. spillFileIO.setown(spillFile->open(IFOcreaterw));
  1327. highOffset = 0;
  1328. }
  1329. ~CSharedWriteAheadDisk()
  1330. {
  1331. spillFileIO.clear();
  1332. if (spillFile)
  1333. spillFile->remove();
  1334. loop
  1335. {
  1336. Owned<Chunk> chunk = savedChunks.dequeue();
  1337. if (!chunk) break;
  1338. }
  1339. PROGLOG("CSharedWriteAheadDisk: highOffset=%"I64F"d", highOffset);
  1340. }
  1341. virtual void reset()
  1342. {
  1343. CSharedWriteAheadBase::reset();
  1344. loop
  1345. {
  1346. Owned<Chunk> chunk = savedChunks.dequeue();
  1347. if (!chunk) break;
  1348. }
  1349. freeChunks.kill();
  1350. freeChunksSized.kill();
  1351. highOffset = 0;
  1352. spillFileIO->setSize(0);
  1353. }
  1354. };
  1355. ISharedSmartBuffer *createSharedSmartDiskBuffer(CActivityBase *activity, const char *spillname, unsigned outputs, IRowInterfaces *rowIf, IDiskUsage *iDiskUsage)
  1356. {
  1357. return new CSharedWriteAheadDisk(activity, spillname, outputs, rowIf, iDiskUsage);
  1358. }
  1359. #define MIN_POOL_CHUNKS 10
  1360. class CSharedWriteAheadMem : public CSharedWriteAheadBase
  1361. {
  1362. QueueOf<CRowSet, false> chunkPool;
  1363. Semaphore poolSem;
  1364. bool writerBlocked;
  1365. unsigned maxPoolChunks;
  1366. virtual void markStop()
  1367. {
  1368. CSharedWriteAheadBase::markStop();
  1369. if (writerBlocked)
  1370. {
  1371. writerBlocked = false;
  1372. poolSem.signal();
  1373. }
  1374. }
  1375. virtual void freeOffsetChunk(unsigned chunk)
  1376. {
  1377. Owned<CRowSet> topRowSet = chunkPool.dequeue();
  1378. VALIDATEEQ(chunk, topRowSet->queryChunk());
  1379. #ifdef TRACE_WRITEAHEAD
  1380. ActPrintLogEx(&activity->queryContainer(), thorlog_all, MCdebugProgress, "freeOffsetChunk: Dequeue chunkPool chunks: %d, chunkPool.ordinality() = %d", topRowSet->queryChunk(), chunkPool.ordinality());
  1381. #endif
  1382. topRowSet.clear();
  1383. if (writerBlocked)
  1384. {
  1385. writerBlocked = false;
  1386. poolSem.signal();
  1387. }
  1388. }
  1389. virtual CRowSet *readRows(unsigned output, unsigned whichChunk)
  1390. {
  1391. Linked<CRowSet> rowSet = chunkPool.item(whichChunk);
  1392. VALIDATEEQ(queryCOutput(output).currentChunkNum, rowSet->queryChunk());
  1393. return rowSet.getClear();
  1394. }
  1395. virtual void flushRows()
  1396. {
  1397. // NB: called in crit
  1398. if (chunkPool.ordinality() >= maxPoolChunks)
  1399. {
  1400. writerBlocked = true;
  1401. { CriticalUnblock b(crit);
  1402. poolSem.wait();
  1403. if (stopped) return;
  1404. }
  1405. unsigned reader=anyReaderBehind();
  1406. if (NotFound == reader)
  1407. {
  1408. #ifdef TRACE_WRITEAHEAD
  1409. ActPrintLogEx(&activity->queryContainer(), thorlog_all, MCdebugProgress, "flushRows: caught up whilst blocked to: %d", inMemRows->queryChunk());
  1410. #endif
  1411. return; // caught up whilst blocked
  1412. }
  1413. VALIDATELT(chunkPool.ordinality(), maxPoolChunks);
  1414. }
  1415. #ifdef TRACE_WRITEAHEAD
  1416. ActPrintLogEx(&activity->queryContainer(), thorlog_all, MCdebugProgress, "Flushed chunk = %d, chunkPool chunks = %d", inMemRows->queryChunk(), 1+chunkPool.ordinality());
  1417. #endif
  1418. chunkPool.enqueue(inMemRows.getClear());
  1419. }
  1420. virtual size32_t rowSize(const void *row)
  1421. {
  1422. return meta->getRecordSize(row); // space in mem.
  1423. }
  1424. public:
  1425. CSharedWriteAheadMem(CActivityBase *activity, unsigned outputCount, IRowInterfaces *rowif, unsigned buffSize) : CSharedWriteAheadBase(activity, outputCount, rowif)
  1426. {
  1427. if (((unsigned)-1) == buffSize)
  1428. maxPoolChunks = (unsigned)-1; // no limit
  1429. else
  1430. {
  1431. maxPoolChunks = buffSize / minChunkSize;
  1432. if (maxPoolChunks < MIN_POOL_CHUNKS)
  1433. maxPoolChunks = MIN_POOL_CHUNKS;
  1434. }
  1435. writerBlocked = false;
  1436. }
  1437. ~CSharedWriteAheadMem()
  1438. {
  1439. loop
  1440. {
  1441. Owned<CRowSet> rowSet = chunkPool.dequeue();
  1442. if (!rowSet)
  1443. break;
  1444. }
  1445. }
  1446. virtual void reset()
  1447. {
  1448. CSharedWriteAheadBase::reset();
  1449. loop
  1450. {
  1451. Owned<CRowSet> rowSet = chunkPool.dequeue();
  1452. if (!rowSet)
  1453. break;
  1454. }
  1455. writerBlocked = false;
  1456. }
  1457. };
  1458. ISharedSmartBuffer *createSharedSmartMemBuffer(CActivityBase *activity, unsigned outputs, IRowInterfaces *rowIf, unsigned buffSize)
  1459. {
  1460. return new CSharedWriteAheadMem(activity, outputs, rowIf, buffSize);
  1461. }
  1462. class CRowMultiWriterReader : public CSimpleInterface, implements IRowMultiWriterReader
  1463. {
  1464. rowidx_t readGranularity, writeGranularity, rowPos, limit, rowsToRead;
  1465. CThorSpillableRowArray rows;
  1466. const void **readRows;
  1467. CActivityBase &activity;
  1468. IRowInterfaces *rowIf;
  1469. bool readerBlocked, eos, eow;
  1470. Semaphore emptySem, fullSem;
  1471. unsigned numWriters, writersComplete, writersBlocked;
  1472. class CAWriter : public CSimpleInterface, implements IRowWriter
  1473. {
  1474. CRowMultiWriterReader &owner;
  1475. CThorExpandingRowArray rows;
  1476. public:
  1477. IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
  1478. CAWriter(CRowMultiWriterReader &_owner) : owner(_owner), rows(_owner.activity, _owner.rowIf)
  1479. {
  1480. }
  1481. ~CAWriter()
  1482. {
  1483. flush();
  1484. owner.writerStopped();
  1485. }
  1486. // IRowWriter impl.
  1487. virtual void putRow(const void *row)
  1488. {
  1489. if (rows.ordinality() >= owner.writeGranularity)
  1490. owner.addRows(rows);
  1491. rows.append(row);
  1492. }
  1493. virtual void flush()
  1494. {
  1495. if (rows.ordinality())
  1496. owner.addRows(rows);
  1497. }
  1498. };
  1499. void addRows(CThorExpandingRowArray &inRows)
  1500. {
  1501. loop
  1502. {
  1503. {
  1504. CThorArrayLockBlock block(rows);
  1505. if (eos)
  1506. {
  1507. inRows.kill();
  1508. return;
  1509. }
  1510. if (rows.numCommitted() < limit)
  1511. {
  1512. // NB: allowed to go over limit, by as much as inRows.ordinality()-1
  1513. rows.appendRows(inRows, true);
  1514. if (readerBlocked && (rows.numCommitted() >= readGranularity))
  1515. {
  1516. emptySem.signal();
  1517. readerBlocked = false;
  1518. }
  1519. return;
  1520. }
  1521. writersBlocked++;
  1522. }
  1523. fullSem.wait();
  1524. }
  1525. }
  1526. public:
  1527. IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
  1528. CRowMultiWriterReader(CActivityBase &_activity, IRowInterfaces *_rowIf, unsigned _limit, unsigned _readGranularity, unsigned _writerGranularity)
  1529. : activity(_activity), rowIf(_rowIf), rows(_activity, _rowIf), limit(_limit), readGranularity(_readGranularity), writeGranularity(_writerGranularity)
  1530. {
  1531. if (readGranularity > limit)
  1532. readGranularity = limit; // readGranularity must be <= limit;
  1533. numWriters = 0;
  1534. roxiemem::IRowManager *rowManager = activity.queryJob().queryRowManager();
  1535. readRows = static_cast<const void * *>(rowManager->allocate(readGranularity * sizeof(void*), activity.queryContainer().queryId()));
  1536. eos = eow = readerBlocked = false;
  1537. rowPos = rowsToRead = 0;
  1538. writersComplete = writersBlocked = 0;
  1539. rows.setup(rowIf, false, stableSort_none, true); // turning on throwOnOom;
  1540. }
  1541. ~CRowMultiWriterReader()
  1542. {
  1543. while (rowPos < rowsToRead)
  1544. {
  1545. ReleaseThorRow(readRows[rowPos++]);
  1546. }
  1547. ReleaseThorRow(readRows);
  1548. }
  1549. void writerStopped()
  1550. {
  1551. CThorArrayLockBlock block(rows);
  1552. writersComplete++;
  1553. if (writersComplete == numWriters)
  1554. {
  1555. rows.flush();
  1556. eow = true;
  1557. if (readerBlocked)
  1558. {
  1559. emptySem.signal();
  1560. readerBlocked = false;
  1561. }
  1562. }
  1563. }
  1564. // ISharedWriteBuffer impl.
  1565. virtual IRowWriter *getWriter()
  1566. {
  1567. ++numWriters;
  1568. return new CAWriter(*this);
  1569. }
  1570. virtual void abort()
  1571. {
  1572. CThorArrayLockBlock block(rows);
  1573. eos = true;
  1574. if (writersBlocked)
  1575. {
  1576. fullSem.signal(writersBlocked);
  1577. writersBlocked = 0;
  1578. }
  1579. if (readerBlocked)
  1580. {
  1581. emptySem.signal();
  1582. readerBlocked = false;
  1583. }
  1584. }
  1585. // IRowStream impl.
  1586. virtual const void *nextRow()
  1587. {
  1588. if (eos)
  1589. return NULL;
  1590. if (rowPos == rowsToRead)
  1591. {
  1592. loop
  1593. {
  1594. {
  1595. CThorArrayLockBlock block(rows);
  1596. if (rows.numCommitted() >= readGranularity || eow)
  1597. {
  1598. rowsToRead = (eow && rows.numCommitted() < readGranularity) ? rows.numCommitted() : readGranularity;
  1599. if (0 == rowsToRead)
  1600. {
  1601. eos = true;
  1602. return NULL;
  1603. }
  1604. rows.readBlock(readRows, rowsToRead);
  1605. rowPos = 0;
  1606. if (writersBlocked)
  1607. {
  1608. fullSem.signal(writersBlocked);
  1609. writersBlocked = 0;
  1610. }
  1611. break; // fall through to return a row
  1612. }
  1613. readerBlocked = true;
  1614. }
  1615. emptySem.wait();
  1616. if (eos)
  1617. return NULL;
  1618. }
  1619. }
  1620. const void *row = readRows[rowPos];
  1621. readRows[rowPos] = NULL;
  1622. ++rowPos;
  1623. return row;
  1624. }
  1625. virtual void stop()
  1626. {
  1627. eos = true;
  1628. }
  1629. };
  1630. IRowMultiWriterReader *createSharedWriteBuffer(CActivityBase *activity, IRowInterfaces *rowif, unsigned limit, unsigned readGranularity, unsigned writeGranularity)
  1631. {
  1632. return new CRowMultiWriterReader(*activity, rowif, limit, readGranularity, writeGranularity);
  1633. }
  1634. class CRCFileStream: public CSimpleInterface, implements IFileIOStream
  1635. {
  1636. Linked<IFileIOStream> streamin;
  1637. public:
  1638. CRC32 &crc;
  1639. bool valid;
  1640. IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
  1641. CRCFileStream(IFileIOStream *_stream,CRC32 &_crc)
  1642. : streamin(_stream), crc(_crc)
  1643. {
  1644. valid = true;
  1645. }
  1646. size32_t read(size32_t len, void * data)
  1647. {
  1648. size32_t rd = streamin->read(len,data);
  1649. crc.tally(rd,data);
  1650. return rd;
  1651. }
  1652. size32_t write(size32_t len, const void * data)
  1653. {
  1654. throw MakeStringException(-1,"CRCFileStream does not support write");
  1655. }
  1656. void seek(offset_t pos, IFSmode origin)
  1657. {
  1658. offset_t t = streamin->tell();
  1659. streamin->seek(pos,origin);
  1660. if (t!=streamin->tell()) // crc invalid
  1661. if (valid) {
  1662. WARNLOG("CRCFileStream::seek called - CRC will be invalid");
  1663. valid = false;
  1664. }
  1665. }
  1666. offset_t size()
  1667. {
  1668. return streamin->size();
  1669. }
  1670. virtual offset_t tell()
  1671. {
  1672. return streamin->tell();
  1673. }
  1674. void flush()
  1675. {
  1676. // noop as only read
  1677. }
  1678. };