thbuf.cpp 54 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "platform.h"
  14. #include <limits.h>
  15. #include <stddef.h>
  16. #include "jlib.hpp"
  17. #include "jmisc.hpp"
  18. #include "jio.hpp"
  19. #include "jlzw.hpp"
  20. #include "jsem.hpp"
  21. #include "jthread.hpp"
  22. #include "jarray.hpp"
  23. #include "jiface.hpp"
  24. #include "jfile.hpp"
  25. #include "jset.hpp"
  26. #include "jqueue.tpp"
  27. #include "thmem.hpp"
  28. #include "thalloc.hpp"
  29. #include "thbuf.hpp"
  30. #include "eclrtl.hpp"
  31. #include "thgraph.hpp"
  32. #ifdef _DEBUG
  33. //#define _FULL_TRACE
  34. //#define TRACE_CREATE
  35. struct SmartBufReenterCheck
  36. {
  37. bool &check;
  38. SmartBufReenterCheck(bool &_check)
  39. : check(_check)
  40. {
  41. assertex(!check);
  42. check = true;
  43. }
  44. ~SmartBufReenterCheck()
  45. {
  46. assertex(check);
  47. check = false;
  48. }
  49. };
  50. #define REENTRANCY_CHECK(n) SmartBufReenterCheck recheck(n);
  51. #else
  52. #define REENTRANCY_CHECK(n)
  53. #endif
  54. class CSmartRowBuffer: public CSimpleInterface, implements ISmartRowBuffer, implements IRowWriter
  55. {
  56. CActivityBase *activity;
  57. ThorRowQueue *in;
  58. size32_t insz;
  59. ThorRowQueue *out;
  60. Linked<IFile> file;
  61. Owned<IFileIO> fileio;
  62. SpinLock lock;
  63. bool waiting;
  64. Semaphore waitsem;
  65. bool waitflush;
  66. Semaphore waitflushsem;
  67. size32_t blocksize;
  68. unsigned numblocks;
  69. Owned<IBitSet> diskfree;
  70. UnsignedArray diskin;
  71. UnsignedArray diskinlen;
  72. Linked<IOutputRowSerializer> serializer;
  73. Linked<IOutputRowDeserializer> deserializer;
  74. Linked<IEngineRowAllocator> allocator;
  75. bool eoi;
  76. #ifdef _DEBUG
  77. bool putrecheck;
  78. bool getrecheck;
  79. #endif
  80. unsigned allocblk(unsigned nb)
  81. {
  82. if (nb<=numblocks) {
  83. for (unsigned i=0;i<=numblocks-nb;i++) {
  84. unsigned j;
  85. for (j=i;j<i+nb;j++)
  86. if (!diskfree->test(j))
  87. break;
  88. if (j==i+nb) {
  89. while (j>i)
  90. diskfree->set(--j,false);
  91. return i;
  92. }
  93. }
  94. }
  95. unsigned ret = numblocks;
  96. numblocks += nb;
  97. return ret;
  98. }
  99. void freeblk(unsigned b,unsigned nb)
  100. {
  101. for (unsigned i=0;i<nb;i++)
  102. diskfree->set(i+b,true);
  103. }
  104. void diskflush()
  105. {
  106. struct cSaveIn {
  107. ThorRowQueue *rq;
  108. ThorRowQueue *&in;
  109. cSaveIn(ThorRowQueue *&_in)
  110. : in(_in)
  111. {
  112. rq = in;
  113. in = NULL;
  114. }
  115. ~cSaveIn()
  116. {
  117. if (!in)
  118. in = rq;
  119. }
  120. } csavein(in); // mark as not there so consumer will pause
  121. if (out && (out->ordinality()==0) && (diskin.ordinality()==0)) {
  122. in = out;
  123. out = csavein.rq;
  124. insz = 0;
  125. return;
  126. }
  127. if (!fileio) {
  128. SpinUnblock unblock(lock);
  129. fileio.setown(file->open(IFOcreaterw));
  130. if (!fileio)
  131. {
  132. throw MakeStringException(-1,"CSmartRowBuffer::flush cannot write file %s",file->queryFilename());
  133. }
  134. }
  135. MemoryBuffer mb;
  136. CMemoryRowSerializer mbs(mb);
  137. unsigned nb = 0;
  138. unsigned blk = 0;
  139. if (!waiting) {
  140. mb.ensureCapacity(blocksize);
  141. {
  142. SpinUnblock unblock(lock);
  143. unsigned p = 0;
  144. byte b;
  145. for (unsigned i=0;i<csavein.rq->ordinality();i++) {
  146. if (waiting)
  147. break;
  148. const void *row = csavein.rq->item(i);
  149. if (row) {
  150. b = 1;
  151. mb.append(b);
  152. serializer->serialize(mbs,(const byte *)row);
  153. }
  154. else {
  155. b = 2; // eog
  156. mb.append(b);
  157. }
  158. }
  159. b = 0;
  160. mb.append(b);
  161. }
  162. }
  163. if (!waiting) {
  164. nb = (mb.length()+blocksize-1)/blocksize;
  165. blk = allocblk(nb);
  166. SpinUnblock unblock(lock);
  167. if (mb.length()<nb*blocksize) { // bit overkill!
  168. size32_t left = nb*blocksize-mb.length();
  169. memset(mb.reserve(left),0,left);
  170. }
  171. fileio->write(blk*(offset_t)blocksize,mb.length(),mb.bufferBase());
  172. mb.clear();
  173. }
  174. if (waiting) {
  175. // if eoi, stopped while serializing/writing, putRow may put final row to existing 'in'
  176. if (!eoi)
  177. {
  178. // consumer caught up
  179. freeblk(blk,nb);
  180. assertex(out->ordinality()==0);
  181. assertex(diskin.ordinality()==0);
  182. in = out;
  183. out = csavein.rq;
  184. }
  185. }
  186. else {
  187. diskin.append(blk);
  188. diskinlen.append(nb);
  189. while (csavein.rq->ordinality())
  190. ReleaseThorRow(csavein.rq->dequeue());
  191. }
  192. insz = 0;
  193. }
  194. void load()
  195. {
  196. ThorRowQueue *rq = out;
  197. out = NULL; // mark as not there so producer won't fill
  198. unsigned blk = diskin.item(0);
  199. unsigned nb = diskinlen.item(0);
  200. diskin.remove(0); // better as q but given reading from disk...
  201. diskinlen.remove(0);
  202. {
  203. SpinUnblock unblock(lock);
  204. MemoryAttr ma;
  205. size32_t readBlockSize = nb*blocksize;
  206. byte *buf = (byte *)ma.allocate(readBlockSize);
  207. CThorStreamDeserializerSource ds(readBlockSize,buf);
  208. assertex(fileio.get());
  209. size32_t rd = fileio->read(blk*(offset_t)blocksize,readBlockSize,buf);
  210. assertex(rd==readBlockSize);
  211. unsigned p = 0;
  212. loop {
  213. byte b;
  214. ds.read(sizeof(b),&b);
  215. if (!b)
  216. break;
  217. if (b==1) {
  218. RtlDynamicRowBuilder rowBuilder(allocator);
  219. size32_t sz = deserializer->deserialize(rowBuilder,ds);
  220. rq->enqueue(rowBuilder.finalizeRowClear(sz));
  221. }
  222. else if (b==2)
  223. rq->enqueue(NULL);
  224. }
  225. }
  226. freeblk(blk,nb);
  227. out = rq;
  228. }
  229. public:
  230. IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
  231. CSmartRowBuffer(CActivityBase *_activity, IFile *_file,size32_t bufsize,IRowInterfaces *rowif)
  232. : activity(_activity), file(_file), allocator(rowif->queryRowAllocator()), serializer(rowif->queryRowSerializer()), deserializer(rowif->queryRowDeserializer())
  233. {
  234. #ifdef _DEBUG
  235. putrecheck = false;
  236. getrecheck = false;
  237. #endif
  238. in = new ThorRowQueue;
  239. out = new ThorRowQueue;
  240. waiting = false;
  241. waitflush = false;
  242. blocksize = ((bufsize/2+0xfffff)/0x100000)*0x100000;
  243. numblocks = 0;
  244. insz = 0;
  245. eoi = false;
  246. diskfree.setown(createBitSet());
  247. #ifdef _FULL_TRACE
  248. ActPrintLog(activity, "SmartBuffer create %x",(unsigned)(memsize_t)this);
  249. #endif
  250. }
  251. ~CSmartRowBuffer()
  252. {
  253. #ifdef _FULL_TRACE
  254. ActPrintLog(activity, "SmartBuffer destroy %x",(unsigned)(memsize_t)this);
  255. #endif
  256. assertex(!waiting);
  257. assertex(!waitflush);
  258. // clear in/out contents
  259. while (in->ordinality())
  260. ReleaseThorRow(in->dequeue());
  261. delete in;
  262. while (out->ordinality())
  263. ReleaseThorRow(out->dequeue());
  264. delete out;
  265. }
  266. void putRow(const void *row)
  267. {
  268. REENTRANCY_CHECK(putrecheck)
  269. size32_t sz = thorRowMemoryFootprint(serializer, row);
  270. SpinBlock block(lock);
  271. if (eoi) {
  272. ReleaseThorRow(row);
  273. return;
  274. }
  275. assertex(in); // reentry check
  276. if (sz+insz+(in->ordinality()+2)*sizeof(byte)>blocksize) // byte extra per row + end byte
  277. diskflush();
  278. in->enqueue(row);
  279. insz += sz;
  280. if (waiting) {
  281. waitsem.signal();
  282. waiting = false;
  283. }
  284. }
  285. void stop()
  286. {
  287. #ifdef _FULL_TRACE
  288. ActPrintLog(activity, "SmartBuffer stop %x",(unsigned)(memsize_t)this);
  289. #endif
  290. SpinBlock block(lock);
  291. #ifdef _DEBUG
  292. if (waiting)
  293. {
  294. ActPrintLogEx(&activity->queryContainer(), thorlog_null, MCwarning, "CSmartRowBuffer::stop while nextRow waiting");
  295. PrintStackReport();
  296. }
  297. #endif
  298. eoi = true;
  299. while (out&&out->ordinality())
  300. ReleaseThorRow(out->dequeue());
  301. while (NULL == in)
  302. {
  303. waiting = true;
  304. SpinUnblock unblock(lock);
  305. waitsem.wait();
  306. }
  307. while (out&&out->ordinality())
  308. ReleaseThorRow(out->dequeue());
  309. while (in&&in->ordinality())
  310. ReleaseThorRow(in->dequeue());
  311. diskin.kill();
  312. if (waiting) {
  313. waitsem.signal();
  314. waiting = false;
  315. }
  316. if (waitflush) {
  317. waitflushsem.signal();
  318. waitflush = false;
  319. }
  320. }
  321. const void *nextRow()
  322. {
  323. REENTRANCY_CHECK(getrecheck)
  324. const void * ret;
  325. assertex(out);
  326. assertex(!waiting); // reentrancy checks
  327. loop {
  328. {
  329. SpinBlock block(lock);
  330. if (out->ordinality()) {
  331. ret = out->dequeue();
  332. break;
  333. }
  334. if (diskin.ordinality()) {
  335. load();
  336. ret = out->dequeue();
  337. break;
  338. }
  339. if (in) {
  340. if (in->ordinality()) {
  341. ret = in->dequeue();
  342. if (ret) {
  343. size32_t sz = thorRowMemoryFootprint(serializer, ret);
  344. assertex(insz>=sz);
  345. insz -= sz;
  346. }
  347. break;
  348. }
  349. else {
  350. if (waitflush) {
  351. waitflushsem.signal();
  352. waitflush = false;
  353. }
  354. if (eoi)
  355. return NULL;
  356. }
  357. }
  358. assertex(!waiting); // reentrancy check
  359. waiting = true;
  360. }
  361. waitsem.wait();
  362. }
  363. return ret;
  364. }
  365. void flush()
  366. {
  367. // I think flush should wait til all rows read or stopped
  368. #ifdef _FULL_TRACE
  369. ActPrintLog(activity, "SmartBuffer flush %x",(unsigned)(memsize_t)this);
  370. #endif
  371. SpinBlock block(lock);
  372. if (eoi) return;
  373. loop {
  374. assertex(in); // reentry check
  375. diskflush();
  376. eoi = true;
  377. if (waiting) {
  378. waitsem.signal();
  379. waiting = false;
  380. }
  381. if (out&&!out->ordinality()&&!diskin.ordinality()&&!in->ordinality())
  382. break;
  383. waitflush = true;
  384. SpinUnblock unblock(lock);
  385. while (!waitflushsem.wait(1000*60))
  386. ActPrintLogEx(&activity->queryContainer(), thorlog_null, MCwarning, "CSmartRowBuffer::flush stalled");
  387. }
  388. }
  389. IRowWriter *queryWriter()
  390. {
  391. return this;
  392. }
  393. };
  394. class CSmartRowInMemoryBuffer: public CSimpleInterface, implements ISmartRowBuffer, implements IRowWriter
  395. {
  396. // NB must *not* call LinkThorRow or ReleaseThorRow (or Owned*ThorRow) if deallocator set
  397. CActivityBase *activity;
  398. IRowInterfaces *rowIf;
  399. ThorRowQueue *in;
  400. size32_t insz;
  401. SpinLock lock;
  402. bool waitingin;
  403. Semaphore waitinsem;
  404. bool waitingout;
  405. Semaphore waitoutsem;
  406. size32_t blocksize;
  407. bool eoi;
  408. #ifdef _DEBUG
  409. bool putrecheck;
  410. bool getrecheck;
  411. #endif
  412. public:
  413. IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
  414. CSmartRowInMemoryBuffer(CActivityBase *_activity, IRowInterfaces *_rowIf, size32_t bufsize)
  415. : activity(_activity), rowIf(_rowIf)
  416. {
  417. #ifdef _DEBUG
  418. putrecheck = false;
  419. getrecheck = false;
  420. #endif
  421. in = new ThorRowQueue;
  422. waitingin = false;
  423. waitingout = false;
  424. blocksize = ((bufsize/2+0xfffff)/0x100000)*0x100000;
  425. insz = 0;
  426. eoi = false;
  427. }
  428. ~CSmartRowInMemoryBuffer()
  429. {
  430. // clear in contents
  431. while (in->ordinality())
  432. ReleaseThorRow(in->dequeue());
  433. delete in;
  434. }
  435. void putRow(const void *row)
  436. {
  437. REENTRANCY_CHECK(putrecheck)
  438. size32_t sz = 0;
  439. if (row) {
  440. sz = thorRowMemoryFootprint(rowIf->queryRowSerializer(), row);
  441. #ifdef _DEBUG
  442. assertex(sz<0x1000000);
  443. #endif
  444. assertex((int)sz>=0); // trap invalid sizes a bit earlier
  445. #ifdef _TRACE_SMART_PUTGET
  446. ActPrintLog(activity, "***putRow(%x) %d {%x}",(unsigned)row,sz,*(const unsigned *)row);
  447. #endif
  448. }
  449. {
  450. SpinBlock block(lock);
  451. if (!eoi) {
  452. assertex(in); // reentry check
  453. while ((sz+insz>blocksize)&&insz) {
  454. waitingin = true;
  455. {
  456. SpinUnblock unblock(lock);
  457. waitinsem.wait();
  458. }
  459. if (eoi)
  460. break;
  461. }
  462. }
  463. if (!eoi) {
  464. in->enqueue(row);
  465. insz += sz;
  466. #ifdef _TRACE_SMART_PUTGET
  467. ActPrintLog(activity, "***putRow2(%x) insize = %d ",insz);
  468. #endif
  469. if (waitingout) {
  470. waitoutsem.signal();
  471. waitingout = false;
  472. }
  473. return;
  474. }
  475. }
  476. // cancelled
  477. ReleaseThorRow(row);
  478. }
  479. const void *nextRow()
  480. {
  481. REENTRANCY_CHECK(getrecheck)
  482. const void * ret;
  483. SpinBlock block(lock);
  484. assertex(!waitingout); // reentrancy checks
  485. loop {
  486. if (in->ordinality()) {
  487. ret = in->dequeue();
  488. if (ret) {
  489. size32_t sz = thorRowMemoryFootprint(rowIf->queryRowSerializer(), ret);
  490. #ifdef _TRACE_SMART_PUTGET
  491. ActPrintLog(activity, "***dequeueRow(%x) %d insize=%d {%x}",(unsigned)ret,sz,insz,*(const unsigned *)ret);
  492. #endif
  493. if (insz<sz) {
  494. ActPrintLog(activity, "CSmartRowInMemoryBuffer::nextRow(%x) %d insize=%d {%x} ord = %d",(unsigned)(memsize_t)ret,sz,insz,*(const unsigned *)ret,in->ordinality());
  495. assertex(insz>=sz);
  496. }
  497. insz -= sz;
  498. }
  499. break;
  500. }
  501. else if (eoi) {
  502. ret = NULL;
  503. break;
  504. }
  505. assertex(!waitingout); // reentrancy check
  506. waitingout = true;
  507. SpinUnblock unblock(lock);
  508. waitoutsem.wait();
  509. }
  510. if (waitingin) {
  511. waitinsem.signal();
  512. waitingin = false;
  513. }
  514. return ret;
  515. }
  516. void stop()
  517. {
  518. #ifdef _FULL_TRACE
  519. ActPrintLog(activity, "CSmartRowInMemoryBuffer stop %x",(unsigned)(memsize_t)this);
  520. #endif
  521. SpinBlock block(lock);
  522. #ifdef _DEBUG
  523. if (waitingout) {
  524. ActPrintLogEx(&activity->queryContainer(), thorlog_null, MCwarning, "CSmartRowInMemoryBuffer::stop while nextRow waiting");
  525. PrintStackReport();
  526. }
  527. #endif
  528. eoi = true;
  529. while (in->ordinality())
  530. ReleaseThorRow(in->dequeue());
  531. in->clear();
  532. if (waitingin) {
  533. waitinsem.signal();
  534. waitingin = false;
  535. }
  536. }
  537. // offset_t getPosition()
  538. // {
  539. // SpinBlock block(lock);
  540. // return pos;
  541. // }
  542. void flush()
  543. {
  544. // I think flush should wait til all rows read
  545. SpinBlock block(lock);
  546. eoi = true;
  547. loop {
  548. assertex(in); // reentry check
  549. if (waitingout) {
  550. waitoutsem.signal();
  551. waitingout = false;
  552. }
  553. if (!in->ordinality())
  554. break;
  555. waitingin = true;
  556. SpinUnblock unblock(lock);
  557. while (!waitinsem.wait(1000*60))
  558. ActPrintLogEx(&activity->queryContainer(), thorlog_null, MCwarning, "CSmartRowInMemoryBuffer::flush stalled");
  559. }
  560. }
  561. IRowWriter *queryWriter()
  562. {
  563. return this;
  564. }
  565. };
  566. ISmartRowBuffer * createSmartBuffer(CActivityBase *activity, const char * tempname, size32_t buffsize, IRowInterfaces *rowif)
  567. {
  568. Owned<IFile> file = createIFile(tempname);
  569. return new CSmartRowBuffer(activity,file,buffsize,rowif);
  570. }
  571. ISmartRowBuffer * createSmartInMemoryBuffer(CActivityBase *activity, IRowInterfaces *rowIf, size32_t buffsize)
  572. {
  573. return new CSmartRowInMemoryBuffer(activity, rowIf, buffsize);
  574. }
  575. class COverflowableBuffer : public CSimpleInterface, implements IRowWriterMultiReader
  576. {
  577. CActivityBase &activity;
  578. IRowInterfaces *rowIf;
  579. Owned<IThorRowCollector> collector;
  580. Owned<IRowWriter> writer;
  581. bool eoi, shared;
  582. public:
  583. IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
  584. COverflowableBuffer(CActivityBase &_activity, IRowInterfaces *_rowIf, bool grouped, bool _shared, unsigned spillPriority)
  585. : activity(_activity), rowIf(_rowIf), shared(_shared)
  586. {
  587. collector.setown(createThorRowCollector(activity, rowIf, NULL, stableSort_none, rc_mixed, spillPriority, grouped));
  588. writer.setown(collector->getWriter());
  589. eoi = false;
  590. }
  591. ~COverflowableBuffer()
  592. {
  593. writer.clear();
  594. collector.clear();
  595. }
  596. // IRowWriterMultiReader
  597. virtual IRowStream *getReader()
  598. {
  599. flush();
  600. return collector->getStream(shared);
  601. }
  602. // IRowWriter
  603. virtual void putRow(const void *row)
  604. {
  605. assertex(!eoi);
  606. writer->putRow(row);
  607. }
  608. virtual void flush()
  609. {
  610. eoi = true;
  611. }
  612. };
  613. IRowWriterMultiReader *createOverflowableBuffer(CActivityBase &activity, IRowInterfaces *rowIf, bool grouped, bool shared, unsigned spillPriority)
  614. {
  615. return new COverflowableBuffer(activity, rowIf, grouped, shared, spillPriority);
  616. }
  617. #define VALIDATEEQ(LHS, RHS) if ((LHS)!=(RHS)) { StringBuffer s("FAIL(EQ) - LHS="); s.append(LHS).append(", RHS=").append(RHS); PROGLOG("%s", s.str()); throwUnexpected(); }
  618. #define VALIDATELT(LHS, RHS) if ((LHS)>=(RHS)) { StringBuffer s("FAIL(LT) - LHS="); s.append(LHS).append(", RHS=").append(RHS); PROGLOG("%s", s.str()); throwUnexpected(); }
  619. //#define TRACE_WRITEAHEAD
  620. class CRowSet : public CSimpleInterface
  621. {
  622. unsigned chunk;
  623. CThorExpandingRowArray rows;
  624. public:
  625. CRowSet(CActivityBase &activity, unsigned _chunk) : rows(activity, &activity, true), chunk(_chunk)
  626. {
  627. }
  628. void reset(unsigned _chunk)
  629. {
  630. chunk = _chunk;
  631. rows.kill();
  632. }
  633. inline unsigned queryChunk() const { return chunk; }
  634. inline unsigned getRowCount() const { return rows.ordinality(); }
  635. inline void addRow(const void *row) { rows.append(row); }
  636. inline const void *getRow(unsigned r)
  637. {
  638. return rows.get(r);
  639. }
  640. };
  641. class Chunk : public CInterface
  642. {
  643. public:
  644. offset_t offset;
  645. size32_t size;
  646. Chunk(offset_t _offset, size_t _size) : offset(_offset), size(_size) { }
  647. Chunk(const Chunk &other) : offset(other.offset), size(other.size) { }
  648. bool operator==(Chunk const &other) const { return size==other.size && offset==other.offset; }
  649. Chunk &operator=(Chunk const &other) { offset = other.offset; size = other.size; return *this; }
  650. offset_t endOffset() { return offset+size; }
  651. };
  652. typedef int (*ChunkCompareFunc)(Chunk *, Chunk *);
  653. int chunkOffsetCompare(Chunk *lhs, Chunk *rhs)
  654. {
  655. if (lhs->offset>rhs->offset)
  656. return 1;
  657. else if (lhs->offset<rhs->offset)
  658. return -1;
  659. return 0;
  660. }
  661. int chunkSizeCompare(Chunk **_lhs, Chunk **_rhs)
  662. {
  663. Chunk *lhs = *(Chunk **)_lhs;
  664. Chunk *rhs = *(Chunk **)_rhs;
  665. return (int)lhs->size - (int)rhs->size;
  666. }
  667. int chunkSizeCompare2(Chunk *lhs, Chunk *rhs)
  668. {
  669. return (int)lhs->size - (int)rhs->size;
  670. }
  671. class CSharedWriteAheadBase : public CSimpleInterface, implements ISharedSmartBuffer
  672. {
  673. size32_t totalOutChunkSize;
  674. bool writeAtEof;
  675. rowcount_t rowsWritten;
  676. IArrayOf<IRowStream> outputs;
  677. unsigned readersWaiting;
  678. virtual void init()
  679. {
  680. stopped = false;
  681. writeAtEof = false;
  682. rowsWritten = 0;
  683. readersWaiting = 0;
  684. totalChunksOut = lowestChunk = 0;
  685. lowestOutput = 0;
  686. #ifdef TRACE_WRITEAHEAD
  687. totalOutChunkSize = sizeof(unsigned);
  688. #else
  689. totalOutChunkSize = 0;
  690. #endif
  691. }
  692. inline bool isEof(rowcount_t rowsRead)
  693. {
  694. return stopped || (writeAtEof && rowsWritten == rowsRead);
  695. }
  696. inline void signalReaders()
  697. {
  698. if (readersWaiting)
  699. {
  700. #ifdef TRACE_WRITEAHEAD
  701. ActPrintLogEx(&activity->queryContainer(), thorlog_all, MCdebugProgress, "signalReaders: %d", readersWaiting);
  702. #endif
  703. readersWaiting = 0;
  704. ForEachItemIn(o, outputs)
  705. queryCOutput(o).wakeRead(); // if necessary
  706. }
  707. }
  708. inline const rowcount_t &readerWait(const rowcount_t &rowsRead)
  709. {
  710. ++readersWaiting;
  711. return rowsWritten;
  712. }
  713. CRowSet *loadMore(unsigned output)
  714. {
  715. // needs to be called in crit
  716. Linked<CRowSet> rowSet;
  717. unsigned currentChunkNum = queryCOutput(output).currentChunkNum;
  718. if (currentChunkNum == totalChunksOut)
  719. {
  720. #ifdef TRACE_WRITEAHEAD
  721. PROGLOG("output=%d, chunk=%d INMEM", output, currentChunkNum);
  722. #endif
  723. rowSet.set(inMemRows);
  724. }
  725. else
  726. {
  727. unsigned whichChunk = queryCOutput(output).currentChunkNum - lowestChunk;
  728. #ifdef TRACE_WRITEAHEAD
  729. PROGLOG("output=%d, chunk=%d (whichChunk=%d)", output, currentChunkNum, whichChunk);
  730. #endif
  731. rowSet.setown(readRows(output, whichChunk));
  732. assertex(rowSet);
  733. }
  734. VALIDATEEQ(currentChunkNum, rowSet->queryChunk());
  735. advanceNextReadChunk(output);
  736. return rowSet.getClear();
  737. }
  738. protected:
  739. class COutput : public CSimpleInterface, implements IRowStream
  740. {
  741. CSharedWriteAheadBase &parent;
  742. unsigned output;
  743. rowcount_t rowsRead;
  744. bool eof, readerWaiting;
  745. Semaphore readWaitSem;
  746. Owned<CRowSet> outputOwnedRows;
  747. CRowSet *rowSet;
  748. unsigned row, rowsInRowSet;
  749. void init()
  750. {
  751. rowsRead = 0;
  752. currentChunkNum = 0;
  753. rowsInRowSet = row = 0;
  754. readerWaiting = eof = false;
  755. rowSet = NULL;
  756. }
  757. inline void doStop()
  758. {
  759. if (eof) return;
  760. eof = true;
  761. parent.stopOutput(output);
  762. }
  763. inline const rowcount_t readerWait(const rowcount_t &rowsRead)
  764. {
  765. if (rowsRead == parent.rowsWritten)
  766. {
  767. if (parent.stopped || parent.writeAtEof)
  768. return 0;
  769. readerWaiting = true;
  770. #ifdef TRACE_WRITEAHEAD
  771. ActPrintLogEx(&parent.activity->queryContainer(), thorlog_all, MCdebugProgress, "readerWait(%d)", output);
  772. #endif
  773. const rowcount_t &rowsWritten = parent.readerWait(rowsRead);
  774. {
  775. CriticalUnblock b(parent.crit);
  776. unsigned mins=0;
  777. loop
  778. {
  779. if (readWaitSem.wait(60000))
  780. break; // NB: will also be signal if aborting
  781. ActPrintLog(parent.activity, "output %d @ row # %"RCPF"d, has been blocked for %d minute(s)", output, rowsRead, ++mins);
  782. }
  783. }
  784. if (parent.isEof(rowsRead))
  785. return 0;
  786. }
  787. return parent.rowsWritten;
  788. }
  789. inline void wakeRead()
  790. {
  791. if (readerWaiting)
  792. {
  793. readerWaiting = false;
  794. readWaitSem.signal();
  795. }
  796. }
  797. public:
  798. unsigned currentChunkNum;
  799. public:
  800. IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
  801. COutput(CSharedWriteAheadBase &_parent, unsigned _output) : parent(_parent), output(_output)
  802. {
  803. init();
  804. }
  805. void reset()
  806. {
  807. init();
  808. outputOwnedRows.clear();
  809. }
  810. inline CRowSet *queryRowSet() { return rowSet; }
  811. const void *nextRow()
  812. {
  813. if (eof)
  814. return NULL;
  815. if (row == rowsInRowSet)
  816. {
  817. CriticalBlock b(parent.crit);
  818. if (!rowSet || (row == (rowsInRowSet = rowSet->getRowCount())))
  819. {
  820. rowcount_t totalRows = readerWait(rowsRead);
  821. if (0 == totalRows)
  822. {
  823. doStop();
  824. return NULL;
  825. }
  826. if (!rowSet || (row == (rowsInRowSet = rowSet->getRowCount()))) // maybe have caught up in same rowSet
  827. {
  828. Owned<CRowSet> newRows = parent.loadMore(output);
  829. if (rowSet)
  830. VALIDATEEQ(newRows->queryChunk(), rowSet->queryChunk()+1);
  831. outputOwnedRows.setown(newRows.getClear());
  832. rowSet = outputOwnedRows.get();
  833. rowsInRowSet = rowSet->getRowCount();
  834. row = 0;
  835. }
  836. }
  837. }
  838. rowsRead++;
  839. SpinBlock b(parent.spin);
  840. const void *retrow = rowSet->getRow(row++);
  841. return retrow;
  842. }
  843. virtual void stop()
  844. {
  845. CriticalBlock b(parent.crit);
  846. doStop();
  847. }
  848. friend class CSharedWriteAheadBase;
  849. };
  850. CActivityBase *activity;
  851. size32_t minChunkSize;
  852. unsigned lowestChunk, lowestOutput, outputCount, totalChunksOut;
  853. bool stopped;
  854. Owned<CRowSet> inMemRows;
  855. CriticalSection crit;
  856. SpinLock spin;
  857. Linked<IOutputMetaData> meta;
  858. inline COutput &queryCOutput(unsigned i) { return (COutput &) outputs.item(i); }
  859. inline unsigned getLowest()
  860. {
  861. unsigned o=0;
  862. offset_t lsf = (unsigned)-1;
  863. unsigned lsfOutput = (unsigned)-1;
  864. loop
  865. {
  866. if (queryCOutput(o).currentChunkNum < lsf)
  867. {
  868. lsf = queryCOutput(o).currentChunkNum;
  869. lsfOutput = o;
  870. }
  871. if (++o == outputCount)
  872. break;
  873. }
  874. return lsfOutput;
  875. }
  876. void advanceNextReadChunk(unsigned output)
  877. {
  878. unsigned &currentChunkNum = queryCOutput(output).currentChunkNum;
  879. unsigned prevChunkNum = currentChunkNum;
  880. currentChunkNum++;
  881. if (output == lowestOutput) // might be joint lowest
  882. {
  883. lowestOutput = getLowest();
  884. if (currentChunkNum == queryCOutput(lowestOutput).currentChunkNum) // has new page moved up to join new lowest, in which case it was last.
  885. {
  886. if (prevChunkNum != totalChunksOut) // if equal, then prevOffset was mem page and not in pool/disk to free
  887. freeOffsetChunk(prevChunkNum);
  888. else
  889. {
  890. VALIDATEEQ(prevChunkNum, inMemRows->queryChunk()); // validate is current in mem page
  891. }
  892. ++lowestChunk;
  893. #ifdef TRACE_WRITEAHEAD
  894. ActPrintLogEx(&activity->queryContainer(), thorlog_all, MCdebugProgress, "aNRP: %d, lowestChunk=%d, totalChunksOut=%d", prevChunkNum, lowestChunk, totalChunksOut);
  895. #endif
  896. }
  897. }
  898. }
  899. void stopOutput(unsigned output)
  900. {
  901. // mark stopped output with very high page (as if read all), will not be picked again
  902. unsigned lowestA = getLowest();
  903. if (output == lowestA)
  904. {
  905. unsigned chunkOrig = queryCOutput(output).currentChunkNum;
  906. queryCOutput(output).currentChunkNum = (unsigned)-1;
  907. unsigned lO = getLowest(); // get next lowest
  908. queryCOutput(output).currentChunkNum = chunkOrig;
  909. if (((unsigned)-1) == lO) // if last to stop
  910. {
  911. markStop();
  912. return;
  913. }
  914. #ifdef TRACE_WRITEAHEAD
  915. ActPrintLogEx(&activity->queryContainer(), thorlog_all, MCdebugProgress, "Input %d stopped, forcing catchup to free pages", output);
  916. #endif
  917. while (queryCOutput(output).currentChunkNum < queryCOutput(lO).currentChunkNum)
  918. {
  919. Owned<CRowSet> tmpChunk = loadMore(output); // discard
  920. }
  921. #ifdef TRACE_WRITEAHEAD
  922. ActPrintLog(activity, "Input %d stopped. Caught up", output);
  923. #endif
  924. queryCOutput(output).currentChunkNum = (unsigned)-1;
  925. lowestOutput = getLowest();
  926. }
  927. else
  928. queryCOutput(output).currentChunkNum = (unsigned)-1;
  929. }
  930. void stopAll()
  931. {
  932. CriticalBlock b(crit);
  933. markStop();
  934. }
  935. virtual void markStop()
  936. {
  937. if (stopped) return;
  938. stopped = true;
  939. }
  940. virtual void freeOffsetChunk(unsigned chunk) = 0;
  941. virtual CRowSet *readRows(unsigned output, unsigned chunk) = 0;
  942. virtual void flushRows() = 0;
  943. virtual size32_t rowSize(const void *row) = 0;
  944. public:
  945. IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
  946. CSharedWriteAheadBase(CActivityBase *_activity, unsigned _outputCount, IRowInterfaces *rowIf) : activity(_activity), outputCount(_outputCount), meta(rowIf->queryRowMetaData())
  947. {
  948. init();
  949. minChunkSize = 0x2000;
  950. size32_t minSize = meta->getMinRecordSize();
  951. if (minChunkSize<minSize*10) // if rec minSize bigish, ensure reasonable minChunkSize
  952. {
  953. minChunkSize = minSize*10;
  954. if (minChunkSize > 0x10000)
  955. minChunkSize += 2*(minSize+1);
  956. }
  957. outputCount = _outputCount;
  958. unsigned c=0;
  959. for (; c<outputCount; c++)
  960. {
  961. outputs.append(* new COutput(*this, c));
  962. }
  963. inMemRows.setown(new CRowSet(*activity, 0));
  964. }
  965. ~CSharedWriteAheadBase()
  966. {
  967. ForEachItemIn(o, outputs)
  968. {
  969. COutput &output = queryCOutput(o);
  970. output.outputOwnedRows.clear();
  971. }
  972. inMemRows.clear();
  973. }
  974. unsigned anyReaderBehind()
  975. {
  976. unsigned reader=0;
  977. loop
  978. {
  979. if (reader>=outputCount) // so all waiting, don't need in mem page.
  980. break;
  981. if (!queryCOutput(reader).eof && (queryCOutput(reader).rowSet != inMemRows.get())) // if output using this in mem page, then it has linked already, need to find one that is not (i.e. behind)
  982. return reader;
  983. ++reader;
  984. }
  985. return NotFound;
  986. }
  987. // ISharedSmartBuffer
  988. virtual void putRow(const void *row)
  989. {
  990. if (stopped)
  991. {
  992. ReleaseThorRow(row);
  993. return;
  994. }
  995. unsigned len=rowSize(row);
  996. CriticalBlock b(crit);
  997. if (totalOutChunkSize >= minChunkSize) // chunks required to be at least minChunkSize
  998. {
  999. unsigned reader=anyReaderBehind();
  1000. if (NotFound != reader)
  1001. flushRows();
  1002. inMemRows.setown(new CRowSet(*activity, ++totalChunksOut));
  1003. #ifdef TRACE_WRITEAHEAD
  1004. totalOutChunkSize = sizeof(unsigned);
  1005. #else
  1006. totalOutChunkSize = 0;
  1007. #endif
  1008. }
  1009. {
  1010. SpinBlock b(spin);
  1011. inMemRows->addRow(row);
  1012. }
  1013. totalOutChunkSize += len;
  1014. rowsWritten++; // including NULLs(eogs)
  1015. signalReaders();
  1016. }
  1017. virtual void flush()
  1018. {
  1019. CriticalBlock b(crit);
  1020. writeAtEof = true;
  1021. signalReaders();
  1022. }
  1023. virtual offset_t getPosition()
  1024. {
  1025. throwUnexpected();
  1026. return 0;
  1027. }
  1028. virtual IRowStream *queryOutput(unsigned output)
  1029. {
  1030. return &outputs.item(output);
  1031. }
  1032. virtual void cancel()
  1033. {
  1034. CriticalBlock b(crit);
  1035. stopAll();
  1036. signalReaders();
  1037. }
  1038. virtual void reset()
  1039. {
  1040. init();
  1041. unsigned c=0;
  1042. for (; c<outputCount; c++)
  1043. queryCOutput(c).reset();
  1044. inMemRows->reset(0);
  1045. }
  1046. friend class COutput;
  1047. };
  1048. class CSharedWriteAheadDisk : public CSharedWriteAheadBase
  1049. {
  1050. IDiskUsage *iDiskUsage;
  1051. Owned<IFile> spillFile;
  1052. Owned<IFileIO> spillFileIO;
  1053. CIArrayOf<Chunk> freeChunks;
  1054. PointerArrayOf<Chunk> freeChunksSized;
  1055. QueueOf<Chunk, false> savedChunks;
  1056. offset_t highOffset;
  1057. Linked<IEngineRowAllocator> allocator;
  1058. Linked<IOutputRowSerializer> serializer;
  1059. Linked<IOutputRowDeserializer> deserializer;
  1060. IOutputMetaData *serializeMeta;
  1061. struct AddRemoveFreeChunk
  1062. {
  1063. PointerArrayOf<Chunk> &chunkArray;
  1064. Chunk *chunk;
  1065. AddRemoveFreeChunk(PointerArrayOf<Chunk> &_chunkArray, Chunk *_chunk) : chunkArray(_chunkArray), chunk(_chunk)
  1066. {
  1067. chunkArray.zap(chunk);
  1068. }
  1069. ~AddRemoveFreeChunk()
  1070. {
  1071. bool isNew;
  1072. aindex_t sizePos = chunkArray.bAdd(chunk, chunkSizeCompare, isNew);
  1073. if (!isNew) // might match size of another block
  1074. chunkArray.add(chunk, sizePos);
  1075. }
  1076. };
  1077. unsigned findNext(Chunk **base, unsigned arrCount, Chunk *searchChunk, ChunkCompareFunc compareFunc, bool &matched)
  1078. {
  1079. unsigned middle;
  1080. unsigned left = 0;
  1081. int result;
  1082. Chunk *CurEntry;
  1083. unsigned right = arrCount-1;
  1084. /*
  1085. * Loop until we've narrowed down the search range to one or two
  1086. * items. This ensures that middle != 0, preventing 'right' from wrapping.
  1087. */
  1088. while (right - left > 1)
  1089. {
  1090. /* Calculate the middle entry in the array - NOT the middle offset */
  1091. middle = (right + left) >> 1;
  1092. CurEntry = base[middle];
  1093. result = compareFunc(searchChunk, CurEntry);
  1094. if (result < 0)
  1095. right = middle - 1;
  1096. else if (result > 0)
  1097. left = middle + 1;
  1098. else
  1099. {
  1100. matched = true;
  1101. return middle;
  1102. }
  1103. }
  1104. middle = left;
  1105. /*
  1106. * The range has been narrowed down to 1 or 2 items.
  1107. * Perform an optimal check on these last two elements.
  1108. */
  1109. result = compareFunc(searchChunk, base[middle]);
  1110. if (0 == result)
  1111. matched = true;
  1112. else if (result > 0)
  1113. {
  1114. ++middle;
  1115. if (right == left + 1)
  1116. {
  1117. result = compareFunc(searchChunk, base[middle]);
  1118. if (0 == result)
  1119. matched = true;
  1120. else
  1121. {
  1122. matched = false;
  1123. if (result > 0)
  1124. ++middle;
  1125. }
  1126. }
  1127. else
  1128. matched = false;
  1129. }
  1130. else
  1131. matched = false;
  1132. if (middle < arrCount)
  1133. return middle;
  1134. return NotFound;
  1135. }
  1136. inline Chunk *getOutOffset(size32_t required)
  1137. {
  1138. unsigned numFreeChunks = freeChunks.ordinality();
  1139. if (numFreeChunks)
  1140. {
  1141. Chunk searchChunk(0, required);
  1142. bool matched;
  1143. unsigned nextPos = findNext(freeChunksSized.getArray(), freeChunksSized.ordinality(), &searchChunk, chunkSizeCompare2, matched);
  1144. if (NotFound != nextPos)
  1145. {
  1146. Linked<Chunk> nextChunk = freeChunksSized.item(nextPos);
  1147. if (nextChunk->size-required >= minChunkSize)
  1148. {
  1149. Owned<Chunk> chunk = new Chunk(nextChunk->offset, required);
  1150. decFreeChunk(nextChunk, required);
  1151. #ifdef TRACE_WRITEAHEAD
  1152. ActPrintLogEx(&activity->queryContainer(), thorlog_all, MCdebugProgress, "getOutOffset: got [free] offset = %"I64F"d, size=%d, but took required=%d", nextChunk->offset, nextChunk->size+required, required);
  1153. #endif
  1154. return chunk.getClear();
  1155. }
  1156. #ifdef TRACE_WRITEAHEAD
  1157. ActPrintLogEx(&activity->queryContainer(), thorlog_all, MCdebugProgress, "getOutOffset: got [free] offset = %"I64F"d, size=%d", diskChunk.offset, diskChunk.size);
  1158. #endif
  1159. freeChunksSized.remove(nextPos);
  1160. freeChunks.zap(*nextChunk);
  1161. return nextChunk.getClear(); // if smaller than minChunkSize, return all, don't leave useless fragment
  1162. }
  1163. }
  1164. #ifdef TRACE_WRITEAHEAD
  1165. ActPrintLog(activity, "getOutOffset: got new upper offset # = %"I64F"d", highOffset);
  1166. #endif
  1167. Chunk *chunk = new Chunk(highOffset, required);
  1168. if (iDiskUsage)
  1169. {
  1170. iDiskUsage->decrease(highOffset);
  1171. highOffset += required; // NB next
  1172. iDiskUsage->increase(highOffset);
  1173. }
  1174. else
  1175. highOffset += required; // NB next
  1176. return chunk;
  1177. }
  1178. inline void mergeFreeChunk(Chunk *dst, const Chunk *src)
  1179. {
  1180. AddRemoveFreeChunk ar(freeChunksSized, dst);
  1181. dst->offset = src->offset;
  1182. dst->size += src->size;
  1183. }
  1184. inline void decFreeChunk(Chunk *chunk, size32_t sz)
  1185. {
  1186. AddRemoveFreeChunk ar(freeChunksSized, chunk);
  1187. chunk->size -= sz;
  1188. chunk->offset += sz;
  1189. }
  1190. inline void incFreeChunk(Chunk *chunk, size32_t sz)
  1191. {
  1192. AddRemoveFreeChunk ar(freeChunksSized, chunk);
  1193. chunk->size += sz;
  1194. }
  1195. inline void addFreeChunk(Chunk *freeChunk, unsigned pos=NotFound)
  1196. {
  1197. bool isNew;
  1198. aindex_t sizePos = freeChunksSized.bAdd(freeChunk, chunkSizeCompare, isNew);
  1199. if (!isNew)
  1200. freeChunksSized.add(freeChunk, sizePos);
  1201. if (NotFound == pos)
  1202. freeChunks.append(*LINK(freeChunk));
  1203. else
  1204. freeChunks.add(*LINK(freeChunk), pos);
  1205. }
  1206. virtual void freeOffsetChunk(unsigned chunk)
  1207. {
  1208. // chunk(unused here) is nominal sequential page #, savedChunks is page # in diskfile
  1209. assertex(savedChunks.ordinality());
  1210. Owned<Chunk> freeChunk = savedChunks.dequeue();
  1211. unsigned nmemb = freeChunks.ordinality();
  1212. if (0 == nmemb)
  1213. addFreeChunk(freeChunk);
  1214. else
  1215. {
  1216. bool matched;
  1217. unsigned nextPos = findNext(freeChunks.getArray(), freeChunks.ordinality(), freeChunk, chunkOffsetCompare, matched);
  1218. assertex(!matched); // should never happen, have found a chunk with same offset as one being freed.
  1219. if (NotFound != nextPos)
  1220. {
  1221. Chunk *nextChunk = &freeChunks.item(nextPos);
  1222. if (freeChunk->endOffset() == nextChunk->offset)
  1223. mergeFreeChunk(nextChunk, freeChunk);
  1224. else if (nextPos > 0)
  1225. {
  1226. Chunk *prevChunk = &freeChunks.item(nextPos-1);
  1227. if (prevChunk->endOffset() == freeChunk->offset)
  1228. incFreeChunk(prevChunk, freeChunk->size);
  1229. else
  1230. addFreeChunk(freeChunk, nextPos);
  1231. }
  1232. else
  1233. addFreeChunk(freeChunk, nextPos);
  1234. }
  1235. else
  1236. addFreeChunk(freeChunk);
  1237. }
  1238. #ifdef TRACE_WRITEAHEAD
  1239. ActPrintLogEx(&activity->queryContainer(), thorlog_all, MCdebugProgress, "Added chunk, offset %"I64F"d size=%d to freeChunks", freeChunk->offset, freeChunk->size);
  1240. #endif
  1241. }
  1242. virtual CRowSet *readRows(unsigned output, unsigned whichChunk)
  1243. {
  1244. assertex(savedChunks.ordinality() > whichChunk);
  1245. unsigned currentChunkNum = queryCOutput(output).currentChunkNum;
  1246. unsigned o=0;
  1247. for (; o<outputCount; o++)
  1248. {
  1249. if (o == output) continue;
  1250. COutput &coutput = queryCOutput(o);
  1251. if (coutput.queryRowSet() && (coutput.queryRowSet()->queryChunk() == currentChunkNum))
  1252. {
  1253. #ifdef TRACE_WRITEAHEAD
  1254. ActPrintLogEx(&activity->queryContainer(), thorlog_all, MCdebugProgress, "Output: %d, readRows found other output %d with matching offset: %d", output, o, currentChunkNum);
  1255. #endif
  1256. return LINK(coutput.queryRowSet());
  1257. }
  1258. }
  1259. Chunk &chunk = *savedChunks.item(whichChunk);
  1260. Owned<ISerialStream> stream = createFileSerialStream(spillFileIO, chunk.offset);
  1261. #ifdef TRACE_WRITEAHEAD
  1262. unsigned diskChunkNum;
  1263. stream->get(sizeof(diskChunkNum), &diskChunkNum);
  1264. VALIDATEEQ(diskChunkNum, currentChunkNum);
  1265. #endif
  1266. CThorStreamDeserializerSource ds(stream);
  1267. Owned<CRowSet> rowSet = new CRowSet(*activity, currentChunkNum);
  1268. loop
  1269. {
  1270. byte b;
  1271. ds.read(sizeof(b),&b);
  1272. if (!b)
  1273. break;
  1274. if (1==b)
  1275. {
  1276. RtlDynamicRowBuilder rowBuilder(allocator);
  1277. size32_t sz = deserializer->deserialize(rowBuilder, ds);
  1278. rowSet->addRow(rowBuilder.finalizeRowClear(sz));
  1279. }
  1280. else if (2==b)
  1281. rowSet->addRow(NULL);
  1282. }
  1283. return rowSet.getClear();
  1284. }
  1285. virtual void flushRows()
  1286. {
  1287. // NB: called in crit
  1288. MemoryBuffer mb;
  1289. mb.ensureCapacity(minChunkSize); // starting size/could be more if variable and bigger
  1290. #ifdef TRACE_WRITEAHEAD
  1291. mb.append(inMemRows->queryChunk()); // for debug purposes only
  1292. #endif
  1293. CMemoryRowSerializer mbs(mb);
  1294. unsigned r=0;
  1295. for (;r<inMemRows->getRowCount();r++)
  1296. {
  1297. OwnedConstThorRow row = inMemRows->getRow(r);
  1298. if (row)
  1299. {
  1300. mb.append((byte)1);
  1301. serializer->serialize(mbs,(const byte *)row.get());
  1302. }
  1303. else
  1304. mb.append((byte)2); // eog
  1305. }
  1306. mb.append((byte)0);
  1307. size32_t len = mb.length();
  1308. Owned<Chunk> freeChunk = getOutOffset(len); // will find space for 'len', might be bigger if from free list
  1309. spillFileIO->write(freeChunk->offset, len, mb.toByteArray());
  1310. savedChunks.enqueue(freeChunk.getClear());
  1311. #ifdef TRACE_WRITEAHEAD
  1312. ActPrintLogEx(&activity->queryContainer(), thorlog_all, MCdebugProgress, "Flushed chunk = %d (savedChunks pos=%d), writeOffset = %"I64F"d, writeSize = %d", inMemRows->queryChunk(), savedChunks.ordinality()-1, freeChunk->offset, len);
  1313. #endif
  1314. }
  1315. virtual size32_t rowSize(const void *row)
  1316. {
  1317. if (!row)
  1318. return 1; // eog;
  1319. else if (meta == serializeMeta)
  1320. return meta->getRecordSize(row)+1; // space on disk, +1 = eog marker
  1321. CSizingSerializer ssz;
  1322. serializer->serialize(ssz,(const byte *)row);
  1323. return ssz.size()+1; // space on disk, +1 = eog marker
  1324. }
  1325. public:
  1326. CSharedWriteAheadDisk(CActivityBase *activity, const char *spillName, unsigned outputCount, IRowInterfaces *rowIf, IDiskUsage *_iDiskUsage) : CSharedWriteAheadBase(activity, outputCount, rowIf),
  1327. allocator(rowIf->queryRowAllocator()), serializer(rowIf->queryRowSerializer()), deserializer(rowIf->queryRowDeserializer()), serializeMeta(meta->querySerializedDiskMeta()), iDiskUsage(_iDiskUsage)
  1328. {
  1329. assertex(spillName);
  1330. spillFile.setown(createIFile(spillName));
  1331. spillFile->setShareMode(IFSHnone);
  1332. spillFileIO.setown(spillFile->open(IFOcreaterw));
  1333. highOffset = 0;
  1334. }
  1335. ~CSharedWriteAheadDisk()
  1336. {
  1337. spillFileIO.clear();
  1338. if (spillFile)
  1339. spillFile->remove();
  1340. loop
  1341. {
  1342. Owned<Chunk> chunk = savedChunks.dequeue();
  1343. if (!chunk) break;
  1344. }
  1345. PROGLOG("CSharedWriteAheadDisk: highOffset=%"I64F"d", highOffset);
  1346. }
  1347. virtual void reset()
  1348. {
  1349. CSharedWriteAheadBase::reset();
  1350. loop
  1351. {
  1352. Owned<Chunk> chunk = savedChunks.dequeue();
  1353. if (!chunk) break;
  1354. }
  1355. freeChunks.kill();
  1356. freeChunksSized.kill();
  1357. highOffset = 0;
  1358. spillFileIO->setSize(0);
  1359. }
  1360. };
  1361. ISharedSmartBuffer *createSharedSmartDiskBuffer(CActivityBase *activity, const char *spillname, unsigned outputs, IRowInterfaces *rowIf, IDiskUsage *iDiskUsage)
  1362. {
  1363. return new CSharedWriteAheadDisk(activity, spillname, outputs, rowIf, iDiskUsage);
  1364. }
  1365. #define MIN_POOL_CHUNKS 10
  1366. class CSharedWriteAheadMem : public CSharedWriteAheadBase
  1367. {
  1368. QueueOf<CRowSet, false> chunkPool;
  1369. Semaphore poolSem;
  1370. bool writerBlocked;
  1371. unsigned maxPoolChunks;
  1372. virtual void markStop()
  1373. {
  1374. CSharedWriteAheadBase::markStop();
  1375. if (writerBlocked)
  1376. {
  1377. writerBlocked = false;
  1378. poolSem.signal();
  1379. }
  1380. }
  1381. virtual void freeOffsetChunk(unsigned chunk)
  1382. {
  1383. Owned<CRowSet> topRowSet = chunkPool.dequeue();
  1384. VALIDATEEQ(chunk, topRowSet->queryChunk());
  1385. #ifdef TRACE_WRITEAHEAD
  1386. ActPrintLogEx(&activity->queryContainer(), thorlog_all, MCdebugProgress, "freeOffsetChunk: Dequeue chunkPool chunks: %d, chunkPool.ordinality() = %d", topRowSet->queryChunk(), chunkPool.ordinality());
  1387. #endif
  1388. topRowSet.clear();
  1389. if (writerBlocked)
  1390. {
  1391. writerBlocked = false;
  1392. poolSem.signal();
  1393. }
  1394. }
  1395. virtual CRowSet *readRows(unsigned output, unsigned whichChunk)
  1396. {
  1397. Linked<CRowSet> rowSet = chunkPool.item(whichChunk);
  1398. VALIDATEEQ(queryCOutput(output).currentChunkNum, rowSet->queryChunk());
  1399. return rowSet.getClear();
  1400. }
  1401. virtual void flushRows()
  1402. {
  1403. // NB: called in crit
  1404. if (chunkPool.ordinality() >= maxPoolChunks)
  1405. {
  1406. writerBlocked = true;
  1407. { CriticalUnblock b(crit);
  1408. poolSem.wait();
  1409. if (stopped) return;
  1410. }
  1411. unsigned reader=anyReaderBehind();
  1412. if (NotFound == reader)
  1413. {
  1414. #ifdef TRACE_WRITEAHEAD
  1415. ActPrintLogEx(&activity->queryContainer(), thorlog_all, MCdebugProgress, "flushRows: caught up whilst blocked to: %d", inMemRows->queryChunk());
  1416. #endif
  1417. return; // caught up whilst blocked
  1418. }
  1419. VALIDATELT(chunkPool.ordinality(), maxPoolChunks);
  1420. }
  1421. #ifdef TRACE_WRITEAHEAD
  1422. ActPrintLogEx(&activity->queryContainer(), thorlog_all, MCdebugProgress, "Flushed chunk = %d, chunkPool chunks = %d", inMemRows->queryChunk(), 1+chunkPool.ordinality());
  1423. #endif
  1424. chunkPool.enqueue(inMemRows.getClear());
  1425. }
  1426. virtual size32_t rowSize(const void *row)
  1427. {
  1428. return meta->getRecordSize(row); // space in mem.
  1429. }
  1430. public:
  1431. CSharedWriteAheadMem(CActivityBase *activity, unsigned outputCount, IRowInterfaces *rowif, unsigned buffSize) : CSharedWriteAheadBase(activity, outputCount, rowif)
  1432. {
  1433. if (((unsigned)-1) == buffSize)
  1434. maxPoolChunks = (unsigned)-1; // no limit
  1435. else
  1436. {
  1437. maxPoolChunks = buffSize / minChunkSize;
  1438. if (maxPoolChunks < MIN_POOL_CHUNKS)
  1439. maxPoolChunks = MIN_POOL_CHUNKS;
  1440. }
  1441. writerBlocked = false;
  1442. }
  1443. ~CSharedWriteAheadMem()
  1444. {
  1445. loop
  1446. {
  1447. Owned<CRowSet> rowSet = chunkPool.dequeue();
  1448. if (!rowSet)
  1449. break;
  1450. }
  1451. }
  1452. virtual void reset()
  1453. {
  1454. CSharedWriteAheadBase::reset();
  1455. loop
  1456. {
  1457. Owned<CRowSet> rowSet = chunkPool.dequeue();
  1458. if (!rowSet)
  1459. break;
  1460. }
  1461. writerBlocked = false;
  1462. }
  1463. };
  1464. ISharedSmartBuffer *createSharedSmartMemBuffer(CActivityBase *activity, unsigned outputs, IRowInterfaces *rowIf, unsigned buffSize)
  1465. {
  1466. return new CSharedWriteAheadMem(activity, outputs, rowIf, buffSize);
  1467. }
  1468. class CRowMultiWriterReader : public CSimpleInterface, implements IRowMultiWriterReader
  1469. {
  1470. rowidx_t readGranularity, writeGranularity, rowPos, limit, rowsToRead;
  1471. CThorSpillableRowArray rows;
  1472. const void **readRows;
  1473. CActivityBase &activity;
  1474. IRowInterfaces *rowIf;
  1475. bool readerBlocked, eos, eow;
  1476. Semaphore emptySem, fullSem;
  1477. unsigned numWriters, writersComplete, writersBlocked;
  1478. class CAWriter : public CSimpleInterface, implements IRowWriter
  1479. {
  1480. CRowMultiWriterReader &owner;
  1481. CThorExpandingRowArray rows;
  1482. public:
  1483. IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
  1484. CAWriter(CRowMultiWriterReader &_owner) : owner(_owner), rows(_owner.activity, _owner.rowIf)
  1485. {
  1486. }
  1487. ~CAWriter()
  1488. {
  1489. flush();
  1490. owner.writerStopped();
  1491. }
  1492. // IRowWriter impl.
  1493. virtual void putRow(const void *row)
  1494. {
  1495. if (rows.ordinality() >= owner.writeGranularity)
  1496. owner.addRows(rows);
  1497. rows.append(row);
  1498. }
  1499. virtual void flush()
  1500. {
  1501. if (rows.ordinality())
  1502. owner.addRows(rows);
  1503. }
  1504. };
  1505. void addRows(CThorExpandingRowArray &inRows)
  1506. {
  1507. loop
  1508. {
  1509. {
  1510. CThorArrayLockBlock block(rows);
  1511. if (eos)
  1512. {
  1513. inRows.kill();
  1514. return;
  1515. }
  1516. if (rows.numCommitted() < limit)
  1517. {
  1518. // NB: allowed to go over limit, by as much as inRows.ordinality()-1
  1519. rows.appendRows(inRows, true);
  1520. if (rows.numCommitted() >= readGranularity)
  1521. checkReleaseReader();
  1522. return;
  1523. }
  1524. writersBlocked++;
  1525. }
  1526. fullSem.wait();
  1527. }
  1528. }
  1529. inline void checkReleaseReader()
  1530. {
  1531. if (readerBlocked)
  1532. {
  1533. emptySem.signal();
  1534. readerBlocked = false;
  1535. }
  1536. }
  1537. inline void checkReleaseWriters()
  1538. {
  1539. if (writersBlocked)
  1540. {
  1541. fullSem.signal(writersBlocked);
  1542. writersBlocked = 0;
  1543. }
  1544. }
  1545. public:
  1546. IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
  1547. CRowMultiWriterReader(CActivityBase &_activity, IRowInterfaces *_rowIf, unsigned _limit, unsigned _readGranularity, unsigned _writerGranularity)
  1548. : activity(_activity), rowIf(_rowIf), rows(_activity, _rowIf), limit(_limit), readGranularity(_readGranularity), writeGranularity(_writerGranularity)
  1549. {
  1550. if (readGranularity > limit)
  1551. readGranularity = limit; // readGranularity must be <= limit;
  1552. numWriters = 0;
  1553. roxiemem::IRowManager *rowManager = activity.queryJob().queryRowManager();
  1554. readRows = static_cast<const void * *>(rowManager->allocate(readGranularity * sizeof(void*), activity.queryContainer().queryId()));
  1555. eos = eow = readerBlocked = false;
  1556. rowPos = rowsToRead = 0;
  1557. writersComplete = writersBlocked = 0;
  1558. rows.setup(rowIf, false, stableSort_none, true); // turning on throwOnOom;
  1559. }
  1560. ~CRowMultiWriterReader()
  1561. {
  1562. while (rowPos < rowsToRead)
  1563. {
  1564. ReleaseThorRow(readRows[rowPos++]);
  1565. }
  1566. ReleaseThorRow(readRows);
  1567. }
  1568. void writerStopped()
  1569. {
  1570. CThorArrayLockBlock block(rows);
  1571. writersComplete++;
  1572. if (writersComplete == numWriters)
  1573. {
  1574. rows.flush();
  1575. eow = true;
  1576. checkReleaseReader();
  1577. }
  1578. }
  1579. // ISharedWriteBuffer impl.
  1580. virtual IRowWriter *getWriter()
  1581. {
  1582. ++numWriters;
  1583. return new CAWriter(*this);
  1584. }
  1585. virtual void abort()
  1586. {
  1587. CThorArrayLockBlock block(rows);
  1588. eos = true;
  1589. checkReleaseWriters();
  1590. checkReleaseReader();
  1591. }
  1592. // IRowStream impl.
  1593. virtual const void *nextRow()
  1594. {
  1595. if (eos)
  1596. return NULL;
  1597. if (rowPos == rowsToRead)
  1598. {
  1599. loop
  1600. {
  1601. {
  1602. CThorArrayLockBlock block(rows);
  1603. if (rows.numCommitted() >= readGranularity || eow)
  1604. {
  1605. rowsToRead = (eow && rows.numCommitted() < readGranularity) ? rows.numCommitted() : readGranularity;
  1606. if (0 == rowsToRead)
  1607. {
  1608. eos = true;
  1609. return NULL;
  1610. }
  1611. rows.readBlock(readRows, rowsToRead);
  1612. rowPos = 0;
  1613. checkReleaseWriters();
  1614. break; // fall through to return a row
  1615. }
  1616. readerBlocked = true;
  1617. }
  1618. emptySem.wait();
  1619. if (eos)
  1620. return NULL;
  1621. }
  1622. }
  1623. const void *row = readRows[rowPos];
  1624. readRows[rowPos] = NULL;
  1625. ++rowPos;
  1626. return row;
  1627. }
  1628. virtual void stop()
  1629. {
  1630. eos = true;
  1631. checkReleaseWriters();
  1632. }
  1633. };
  1634. IRowMultiWriterReader *createSharedWriteBuffer(CActivityBase *activity, IRowInterfaces *rowif, unsigned limit, unsigned readGranularity, unsigned writeGranularity)
  1635. {
  1636. return new CRowMultiWriterReader(*activity, rowif, limit, readGranularity, writeGranularity);
  1637. }
  1638. class CRCFileStream: public CSimpleInterface, implements IFileIOStream
  1639. {
  1640. Linked<IFileIOStream> streamin;
  1641. public:
  1642. CRC32 &crc;
  1643. bool valid;
  1644. IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
  1645. CRCFileStream(IFileIOStream *_stream,CRC32 &_crc)
  1646. : streamin(_stream), crc(_crc)
  1647. {
  1648. valid = true;
  1649. }
  1650. size32_t read(size32_t len, void * data)
  1651. {
  1652. size32_t rd = streamin->read(len,data);
  1653. crc.tally(rd,data);
  1654. return rd;
  1655. }
  1656. size32_t write(size32_t len, const void * data)
  1657. {
  1658. throw MakeStringException(-1,"CRCFileStream does not support write");
  1659. }
  1660. void seek(offset_t pos, IFSmode origin)
  1661. {
  1662. offset_t t = streamin->tell();
  1663. streamin->seek(pos,origin);
  1664. if (t!=streamin->tell()) // crc invalid
  1665. if (valid) {
  1666. WARNLOG("CRCFileStream::seek called - CRC will be invalid");
  1667. valid = false;
  1668. }
  1669. }
  1670. offset_t size()
  1671. {
  1672. return streamin->size();
  1673. }
  1674. virtual offset_t tell()
  1675. {
  1676. return streamin->tell();
  1677. }
  1678. void flush()
  1679. {
  1680. // noop as only read
  1681. }
  1682. };