thbuf.cpp 60 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "platform.h"
  14. #include <limits.h>
  15. #include <stddef.h>
  16. #include "jlib.hpp"
  17. #include "jmisc.hpp"
  18. #include "jio.hpp"
  19. #include "jlzw.hpp"
  20. #include "jsem.hpp"
  21. #include "jthread.hpp"
  22. #include "jarray.hpp"
  23. #include "jiface.hpp"
  24. #include "jfile.hpp"
  25. #include "jset.hpp"
  26. #include "jqueue.tpp"
  27. #include "thmem.hpp"
  28. #include "thalloc.hpp"
  29. #include "thbuf.hpp"
  30. #include "eclrtl.hpp"
  31. #include "thgraph.hpp"
  32. #ifdef _DEBUG
  33. //#define _FULL_TRACE
  34. //#define TRACE_CREATE
  35. struct SmartBufReenterCheck
  36. {
  37. bool &check;
  38. SmartBufReenterCheck(bool &_check)
  39. : check(_check)
  40. {
  41. assertex(!check);
  42. check = true;
  43. }
  44. ~SmartBufReenterCheck()
  45. {
  46. assertex(check);
  47. check = false;
  48. }
  49. };
  50. #define REENTRANCY_CHECK(n) SmartBufReenterCheck recheck(n);
  51. #else
  52. #define REENTRANCY_CHECK(n)
  53. #endif
  54. class CSmartRowBuffer: public CSimpleInterface, implements ISmartRowBuffer, implements IRowWriter
  55. {
  56. CActivityBase *activity;
  57. ThorRowQueue *in;
  58. size32_t insz;
  59. ThorRowQueue *out;
  60. Linked<IFile> file;
  61. Owned<IFileIO> fileio;
  62. SpinLock lock;
  63. bool waiting;
  64. Semaphore waitsem;
  65. bool waitflush;
  66. Semaphore waitflushsem;
  67. size32_t blocksize;
  68. unsigned numblocks;
  69. Owned<IBitSet> diskfree;
  70. UnsignedArray diskin;
  71. UnsignedArray diskinlen;
  72. Linked<IOutputRowSerializer> serializer;
  73. Linked<IOutputRowDeserializer> deserializer;
  74. Linked<IEngineRowAllocator> allocator;
  75. bool eoi;
  76. #ifdef _DEBUG
  77. bool putrecheck;
  78. bool getrecheck;
  79. #endif
  80. unsigned allocblk(unsigned nb)
  81. {
  82. if (nb<=numblocks) {
  83. for (unsigned i=0;i<=numblocks-nb;i++) {
  84. unsigned j;
  85. for (j=i;j<i+nb;j++)
  86. if (!diskfree->test(j))
  87. break;
  88. if (j==i+nb) {
  89. while (j>i)
  90. diskfree->set(--j,false);
  91. return i;
  92. }
  93. }
  94. }
  95. unsigned ret = numblocks;
  96. numblocks += nb;
  97. return ret;
  98. }
  99. void freeblk(unsigned b,unsigned nb)
  100. {
  101. for (unsigned i=0;i<nb;i++)
  102. diskfree->set(i+b,true);
  103. }
  104. void diskflush()
  105. {
  106. struct cSaveIn {
  107. ThorRowQueue *rq;
  108. ThorRowQueue *&in;
  109. cSaveIn(ThorRowQueue *&_in)
  110. : in(_in)
  111. {
  112. rq = in;
  113. in = NULL;
  114. }
  115. ~cSaveIn()
  116. {
  117. if (!in)
  118. in = rq;
  119. }
  120. } csavein(in); // mark as not there so consumer will pause
  121. if (out && (out->ordinality()==0) && (diskin.ordinality()==0)) {
  122. in = out;
  123. out = csavein.rq;
  124. insz = 0;
  125. return;
  126. }
  127. if (!fileio) {
  128. SpinUnblock unblock(lock);
  129. fileio.setown(file->open(IFOcreaterw));
  130. if (!fileio)
  131. {
  132. throw MakeStringException(-1,"CSmartRowBuffer::flush cannot write file %s",file->queryFilename());
  133. }
  134. }
  135. MemoryBuffer mb;
  136. CMemoryRowSerializer mbs(mb);
  137. unsigned nb = 0;
  138. unsigned blk = 0;
  139. if (!waiting) {
  140. mb.ensureCapacity(blocksize);
  141. {
  142. SpinUnblock unblock(lock);
  143. byte b;
  144. for (unsigned i=0;i<csavein.rq->ordinality();i++) {
  145. if (waiting)
  146. break;
  147. const void *row = csavein.rq->item(i);
  148. if (row) {
  149. b = 1;
  150. mb.append(b);
  151. serializer->serialize(mbs,(const byte *)row);
  152. }
  153. else {
  154. b = 2; // eog
  155. mb.append(b);
  156. }
  157. }
  158. b = 0;
  159. mb.append(b);
  160. }
  161. }
  162. if (!waiting) {
  163. nb = (mb.length()+blocksize-1)/blocksize;
  164. blk = allocblk(nb);
  165. SpinUnblock unblock(lock);
  166. if (mb.length()<nb*blocksize) { // bit overkill!
  167. size32_t left = nb*blocksize-mb.length();
  168. memset(mb.reserve(left),0,left);
  169. }
  170. fileio->write(blk*(offset_t)blocksize,mb.length(),mb.bufferBase());
  171. mb.clear();
  172. }
  173. if (waiting) {
  174. // if eoi, stopped while serializing/writing, putRow may put final row to existing 'in'
  175. if (!eoi)
  176. {
  177. // consumer caught up
  178. freeblk(blk,nb);
  179. assertex(out->ordinality()==0);
  180. assertex(diskin.ordinality()==0);
  181. in = out;
  182. out = csavein.rq;
  183. }
  184. }
  185. else {
  186. diskin.append(blk);
  187. diskinlen.append(nb);
  188. while (csavein.rq->ordinality())
  189. ReleaseThorRow(csavein.rq->dequeue());
  190. }
  191. insz = 0;
  192. }
  193. void load()
  194. {
  195. ThorRowQueue *rq = out;
  196. out = NULL; // mark as not there so producer won't fill
  197. unsigned blk = diskin.item(0);
  198. unsigned nb = diskinlen.item(0);
  199. diskin.remove(0); // better as q but given reading from disk...
  200. diskinlen.remove(0);
  201. {
  202. SpinUnblock unblock(lock);
  203. MemoryAttr ma;
  204. size32_t readBlockSize = nb*blocksize;
  205. byte *buf = (byte *)ma.allocate(readBlockSize);
  206. CThorStreamDeserializerSource ds(readBlockSize,buf);
  207. assertex(fileio.get());
  208. size32_t rd = fileio->read(blk*(offset_t)blocksize,readBlockSize,buf);
  209. assertex(rd==readBlockSize);
  210. for (;;) {
  211. byte b;
  212. ds.read(sizeof(b),&b);
  213. if (!b)
  214. break;
  215. if (b==1) {
  216. RtlDynamicRowBuilder rowBuilder(allocator);
  217. size32_t sz = deserializer->deserialize(rowBuilder,ds);
  218. rq->enqueue(rowBuilder.finalizeRowClear(sz));
  219. }
  220. else if (b==2)
  221. rq->enqueue(NULL);
  222. }
  223. }
  224. freeblk(blk,nb);
  225. out = rq;
  226. }
  227. public:
  228. IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
  229. CSmartRowBuffer(CActivityBase *_activity, IFile *_file,size32_t bufsize,IThorRowInterfaces *rowif)
  230. : activity(_activity), file(_file), allocator(rowif->queryRowAllocator()), serializer(rowif->queryRowSerializer()), deserializer(rowif->queryRowDeserializer())
  231. {
  232. #ifdef _DEBUG
  233. putrecheck = false;
  234. getrecheck = false;
  235. #endif
  236. in = new ThorRowQueue;
  237. out = new ThorRowQueue;
  238. waiting = false;
  239. waitflush = false;
  240. blocksize = ((bufsize/2+0xfffff)/0x100000)*0x100000;
  241. numblocks = 0;
  242. insz = 0;
  243. eoi = false;
  244. diskfree.setown(createThreadSafeBitSet());
  245. #ifdef _FULL_TRACE
  246. ActPrintLog(activity, "SmartBuffer create %x",(unsigned)(memsize_t)this);
  247. #endif
  248. }
  249. ~CSmartRowBuffer()
  250. {
  251. #ifdef _FULL_TRACE
  252. ActPrintLog(activity, "SmartBuffer destroy %x",(unsigned)(memsize_t)this);
  253. #endif
  254. assertex(!waiting);
  255. assertex(!waitflush);
  256. // clear in/out contents
  257. while (in->ordinality())
  258. ReleaseThorRow(in->dequeue());
  259. delete in;
  260. while (out->ordinality())
  261. ReleaseThorRow(out->dequeue());
  262. delete out;
  263. if (fileio)
  264. {
  265. fileio.clear();
  266. file->remove();
  267. }
  268. }
  269. void putRow(const void *row)
  270. {
  271. REENTRANCY_CHECK(putrecheck)
  272. size32_t sz = thorRowMemoryFootprint(serializer, row);
  273. SpinBlock block(lock);
  274. if (eoi) {
  275. ReleaseThorRow(row);
  276. return;
  277. }
  278. assertex(in); // reentry check
  279. if (sz+insz+(in->ordinality()+2)*sizeof(byte)>blocksize) // byte extra per row + end byte
  280. diskflush();
  281. in->enqueue(row);
  282. insz += sz;
  283. if (waiting) {
  284. waitsem.signal();
  285. waiting = false;
  286. }
  287. }
  288. void stop()
  289. {
  290. #ifdef _FULL_TRACE
  291. ActPrintLog(activity, "SmartBuffer stop %x",(unsigned)(memsize_t)this);
  292. #endif
  293. SpinBlock block(lock);
  294. #ifdef _DEBUG
  295. if (waiting)
  296. {
  297. ActPrintLogEx(&activity->queryContainer(), thorlog_null, MCwarning, "CSmartRowBuffer::stop while nextRow waiting");
  298. PrintStackReport();
  299. }
  300. #endif
  301. eoi = true;
  302. while (out&&out->ordinality())
  303. ReleaseThorRow(out->dequeue());
  304. while (NULL == in)
  305. {
  306. waiting = true;
  307. SpinUnblock unblock(lock);
  308. waitsem.wait();
  309. }
  310. while (out&&out->ordinality())
  311. ReleaseThorRow(out->dequeue());
  312. while (in&&in->ordinality())
  313. ReleaseThorRow(in->dequeue());
  314. diskin.kill();
  315. if (waiting) {
  316. waitsem.signal();
  317. waiting = false;
  318. }
  319. if (waitflush) {
  320. waitflushsem.signal();
  321. waitflush = false;
  322. }
  323. }
  324. const void *nextRow()
  325. {
  326. REENTRANCY_CHECK(getrecheck)
  327. const void * ret;
  328. assertex(out);
  329. assertex(!waiting); // reentrancy checks
  330. for (;;) {
  331. {
  332. SpinBlock block(lock);
  333. if (out->ordinality()) {
  334. ret = out->dequeue();
  335. break;
  336. }
  337. if (diskin.ordinality()) {
  338. load();
  339. ret = out->dequeue();
  340. break;
  341. }
  342. if (in) {
  343. if (in->ordinality()) {
  344. ret = in->dequeue();
  345. if (ret) {
  346. size32_t sz = thorRowMemoryFootprint(serializer, ret);
  347. assertex(insz>=sz);
  348. insz -= sz;
  349. }
  350. break;
  351. }
  352. else {
  353. if (waitflush) {
  354. waitflushsem.signal();
  355. waitflush = false;
  356. }
  357. if (eoi)
  358. return NULL;
  359. }
  360. }
  361. assertex(!waiting); // reentrancy check
  362. waiting = true;
  363. }
  364. waitsem.wait();
  365. }
  366. return ret;
  367. }
  368. void flush()
  369. {
  370. // I think flush should wait til all rows read or stopped
  371. #ifdef _FULL_TRACE
  372. ActPrintLog(activity, "SmartBuffer flush %x",(unsigned)(memsize_t)this);
  373. #endif
  374. SpinBlock block(lock);
  375. if (eoi) return;
  376. for (;;) {
  377. assertex(in); // reentry check
  378. diskflush();
  379. eoi = true;
  380. if (waiting) {
  381. waitsem.signal();
  382. waiting = false;
  383. }
  384. if (out&&!out->ordinality()&&!diskin.ordinality()&&!in->ordinality())
  385. break;
  386. waitflush = true;
  387. SpinUnblock unblock(lock);
  388. while (!waitflushsem.wait(1000*60))
  389. ActPrintLogEx(&activity->queryContainer(), thorlog_null, MCwarning, "CSmartRowBuffer::flush stalled");
  390. }
  391. }
  392. IRowWriter *queryWriter()
  393. {
  394. return this;
  395. }
  396. };
  397. class CSmartRowInMemoryBuffer: public CSimpleInterface, implements ISmartRowBuffer, implements IRowWriter
  398. {
  399. // NB must *not* call LinkThorRow or ReleaseThorRow (or Owned*ThorRow) if deallocator set
  400. CActivityBase *activity;
  401. IThorRowInterfaces *rowIf;
  402. ThorRowQueue *in;
  403. size32_t insz;
  404. SpinLock lock; // MORE: This lock is held for quite long periods. I suspect it could be significantly optimized.
  405. bool waitingin;
  406. Semaphore waitinsem;
  407. bool waitingout;
  408. Semaphore waitoutsem;
  409. size32_t blocksize;
  410. bool eoi;
  411. #ifdef _DEBUG
  412. bool putrecheck;
  413. bool getrecheck;
  414. #endif
  415. public:
  416. IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
  417. CSmartRowInMemoryBuffer(CActivityBase *_activity, IThorRowInterfaces *_rowIf, size32_t bufsize)
  418. : activity(_activity), rowIf(_rowIf)
  419. {
  420. #ifdef _DEBUG
  421. putrecheck = false;
  422. getrecheck = false;
  423. #endif
  424. in = new ThorRowQueue;
  425. waitingin = false;
  426. waitingout = false;
  427. blocksize = ((bufsize/2+0xfffff)/0x100000)*0x100000;
  428. insz = 0;
  429. eoi = false;
  430. }
  431. ~CSmartRowInMemoryBuffer()
  432. {
  433. // clear in contents
  434. while (in->ordinality())
  435. ReleaseThorRow(in->dequeue());
  436. delete in;
  437. }
  438. void putRow(const void *row)
  439. {
  440. REENTRANCY_CHECK(putrecheck)
  441. size32_t sz = 0;
  442. if (row) {
  443. sz = thorRowMemoryFootprint(rowIf->queryRowSerializer(), row);
  444. assertex((int)sz>=0); // trap invalid sizes a bit earlier
  445. #ifdef _TRACE_SMART_PUTGET
  446. ActPrintLog(activity, "***putRow(%x) %d {%x}",(unsigned)row,sz,*(const unsigned *)row);
  447. #endif
  448. }
  449. {
  450. SpinBlock block(lock);
  451. if (!eoi) {
  452. assertex(in); // reentry check
  453. while ((sz+insz>blocksize)&&insz) {
  454. waitingin = true;
  455. {
  456. SpinUnblock unblock(lock);
  457. waitinsem.wait();
  458. }
  459. if (eoi)
  460. break;
  461. }
  462. }
  463. if (!eoi) {
  464. in->enqueue(row);
  465. insz += sz;
  466. #ifdef _TRACE_SMART_PUTGET
  467. ActPrintLog(activity, "***putRow2(%x) insize = %d ",insz);
  468. #endif
  469. if (waitingout) {
  470. waitoutsem.signal();
  471. waitingout = false;
  472. }
  473. return;
  474. }
  475. }
  476. // cancelled
  477. ReleaseThorRow(row);
  478. }
  479. const void *nextRow()
  480. {
  481. REENTRANCY_CHECK(getrecheck)
  482. const void * ret;
  483. SpinBlock block(lock);
  484. assertex(!waitingout); // reentrancy checks
  485. for (;;) {
  486. if (in->ordinality()) {
  487. ret = in->dequeue();
  488. if (ret) {
  489. size32_t sz = thorRowMemoryFootprint(rowIf->queryRowSerializer(), ret);
  490. #ifdef _TRACE_SMART_PUTGET
  491. ActPrintLog(activity, "***dequeueRow(%x) %d insize=%d {%x}",(unsigned)ret,sz,insz,*(const unsigned *)ret);
  492. #endif
  493. if (insz<sz) {
  494. ActPrintLog(activity, "CSmartRowInMemoryBuffer::nextRow(%x) %d insize=%d {%x} ord = %d",(unsigned)(memsize_t)ret,sz,insz,*(const unsigned *)ret,in->ordinality());
  495. assertex(insz>=sz);
  496. }
  497. insz -= sz;
  498. }
  499. break;
  500. }
  501. else if (eoi) {
  502. ret = NULL;
  503. break;
  504. }
  505. assertex(!waitingout); // reentrancy check
  506. waitingout = true;
  507. SpinUnblock unblock(lock);
  508. waitoutsem.wait();
  509. }
  510. if (waitingin) {
  511. waitinsem.signal();
  512. waitingin = false;
  513. }
  514. return ret;
  515. }
  516. void stop()
  517. {
  518. #ifdef _FULL_TRACE
  519. ActPrintLog(activity, "CSmartRowInMemoryBuffer stop %x",(unsigned)(memsize_t)this);
  520. #endif
  521. SpinBlock block(lock);
  522. #ifdef _DEBUG
  523. if (waitingout) {
  524. ActPrintLogEx(&activity->queryContainer(), thorlog_null, MCwarning, "CSmartRowInMemoryBuffer::stop while nextRow waiting");
  525. PrintStackReport();
  526. }
  527. #endif
  528. eoi = true;
  529. while (in->ordinality())
  530. ReleaseThorRow(in->dequeue());
  531. in->clear();
  532. if (waitingin) {
  533. waitinsem.signal();
  534. waitingin = false;
  535. }
  536. }
  537. // offset_t getPosition()
  538. // {
  539. // SpinBlock block(lock);
  540. // return pos;
  541. // }
  542. void flush()
  543. {
  544. // I think flush should wait til all rows read
  545. SpinBlock block(lock);
  546. eoi = true;
  547. for (;;) {
  548. assertex(in); // reentry check
  549. if (waitingout) {
  550. waitoutsem.signal();
  551. waitingout = false;
  552. }
  553. if (!in->ordinality())
  554. break;
  555. waitingin = true;
  556. SpinUnblock unblock(lock);
  557. while (!waitinsem.wait(1000*60))
  558. ActPrintLogEx(&activity->queryContainer(), thorlog_null, MCwarning, "CSmartRowInMemoryBuffer::flush stalled");
  559. }
  560. }
  561. IRowWriter *queryWriter()
  562. {
  563. return this;
  564. }
  565. };
  566. ISmartRowBuffer * createSmartBuffer(CActivityBase *activity, const char * tempname, size32_t buffsize, IThorRowInterfaces *rowif)
  567. {
  568. Owned<IFile> file = createIFile(tempname);
  569. return new CSmartRowBuffer(activity,file,buffsize,rowif);
  570. }
  571. ISmartRowBuffer * createSmartInMemoryBuffer(CActivityBase *activity, IThorRowInterfaces *rowIf, size32_t buffsize)
  572. {
  573. return new CSmartRowInMemoryBuffer(activity, rowIf, buffsize);
  574. }
  575. class COverflowableBuffer : public CSimpleInterface, implements IRowWriterMultiReader
  576. {
  577. CActivityBase &activity;
  578. IThorRowInterfaces *rowIf;
  579. Owned<IThorRowCollector> collector;
  580. Owned<IRowWriter> writer;
  581. bool eoi, shared;
  582. public:
  583. IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
  584. COverflowableBuffer(CActivityBase &_activity, IThorRowInterfaces *_rowIf, EmptyRowSemantics emptyRowSemantics, bool _shared, unsigned spillPriority)
  585. : activity(_activity), rowIf(_rowIf), shared(_shared)
  586. {
  587. collector.setown(createThorRowCollector(activity, rowIf, NULL, stableSort_none, rc_mixed, spillPriority, emptyRowSemantics));
  588. writer.setown(collector->getWriter());
  589. eoi = false;
  590. }
  591. ~COverflowableBuffer()
  592. {
  593. writer.clear();
  594. collector.clear();
  595. }
  596. // IRowWriterMultiReader
  597. virtual IRowStream *getReader()
  598. {
  599. flush();
  600. return collector->getStream(shared);
  601. }
  602. // IRowWriter
  603. virtual void putRow(const void *row)
  604. {
  605. assertex(!eoi);
  606. writer->putRow(row);
  607. }
  608. virtual void flush()
  609. {
  610. eoi = true;
  611. }
  612. };
  613. IRowWriterMultiReader *createOverflowableBuffer(CActivityBase &activity, IThorRowInterfaces *rowIf, EmptyRowSemantics emptyRowSemantics, bool shared, unsigned spillPriority)
  614. {
  615. return new COverflowableBuffer(activity, rowIf, emptyRowSemantics, shared, spillPriority);
  616. }
  617. #define VALIDATEEQ(LHS, RHS) if ((LHS)!=(RHS)) { StringBuffer s("FAIL(EQ) - LHS="); s.append(LHS).append(", RHS=").append(RHS); PROGLOG("%s", s.str()); throwUnexpected(); }
  618. #define VALIDATELT(LHS, RHS) if ((LHS)>=(RHS)) { StringBuffer s("FAIL(LT) - LHS="); s.append(LHS).append(", RHS=").append(RHS); PROGLOG("%s", s.str()); throwUnexpected(); }
  619. //#define TRACE_WRITEAHEAD
  620. class CSharedWriteAheadBase;
  621. class CRowSet : public CSimpleInterface, implements IInterface
  622. {
  623. unsigned chunk;
  624. CThorExpandingRowArray rows;
  625. CSharedWriteAheadBase &sharedWriteAhead;
  626. mutable CriticalSection crit;
  627. public:
  628. CRowSet(CSharedWriteAheadBase &_sharedWriteAhead, unsigned _chunk, unsigned maxRows);
  629. virtual void Link() const
  630. {
  631. CSimpleInterface::Link();
  632. }
  633. virtual bool Release() const;
  634. void clear() { rows.clearRows(); }
  635. void setChunk(unsigned _chunk) { chunk = _chunk; }
  636. void reset(unsigned _chunk)
  637. {
  638. chunk = _chunk;
  639. rows.clearRows();
  640. }
  641. inline unsigned queryChunk() const { return chunk; }
  642. inline unsigned getRowCount() const { return rows.ordinality(); }
  643. inline void addRow(const void *row) { rows.append(row); }
  644. inline const void *getRow(unsigned r)
  645. {
  646. return rows.get(r);
  647. }
  648. };
  649. class Chunk : public CInterface
  650. {
  651. public:
  652. offset_t offset;
  653. size32_t size;
  654. Linked<CRowSet> rowSet;
  655. Chunk(CRowSet *_rowSet) : rowSet(_rowSet), offset(0), size(0) { }
  656. Chunk(offset_t _offset, size_t _size) : offset(_offset), size(_size) { }
  657. Chunk(const Chunk &other) : offset(other.offset), size(other.size) { }
  658. bool operator==(Chunk const &other) const { return size==other.size && offset==other.offset; }
  659. Chunk &operator=(Chunk const &other) { offset = other.offset; size = other.size; return *this; }
  660. offset_t endOffset() { return offset+size; }
  661. };
  662. typedef int (*ChunkCompareFunc)(Chunk *, Chunk *);
  663. int chunkOffsetCompare(Chunk *lhs, Chunk *rhs)
  664. {
  665. if (lhs->offset>rhs->offset)
  666. return 1;
  667. else if (lhs->offset<rhs->offset)
  668. return -1;
  669. return 0;
  670. }
  671. int chunkSizeCompare(Chunk **_lhs, Chunk **_rhs)
  672. {
  673. Chunk *lhs = *(Chunk **)_lhs;
  674. Chunk *rhs = *(Chunk **)_rhs;
  675. return (int)lhs->size - (int)rhs->size;
  676. }
  677. int chunkSizeCompare2(Chunk *lhs, Chunk *rhs)
  678. {
  679. return (int)lhs->size - (int)rhs->size;
  680. }
  681. #define MIN_POOL_CHUNKS 10
  682. class CSharedWriteAheadBase : public CSimpleInterface, implements ISharedSmartBuffer
  683. {
  684. size32_t totalOutChunkSize;
  685. bool writeAtEof;
  686. rowcount_t rowsWritten;
  687. IArrayOf<IRowStream> outputs;
  688. unsigned readersWaiting;
  689. mutable IArrayOf<CRowSet> cachedRowSets;
  690. CriticalSection rowSetCacheCrit;
  691. void reuse(CRowSet *rowset)
  692. {
  693. if (!reuseRowSets)
  694. return;
  695. rowset->clear();
  696. CriticalBlock b(rowSetCacheCrit);
  697. if (cachedRowSets.ordinality() < (outputs.ordinality()*2))
  698. cachedRowSets.append(*LINK(rowset));
  699. }
  700. virtual void init()
  701. {
  702. stopped = false;
  703. writeAtEof = false;
  704. rowsWritten = 0;
  705. readersWaiting = 0;
  706. totalChunksOut = lowestChunk = 0;
  707. lowestOutput = 0;
  708. #ifdef TRACE_WRITEAHEAD
  709. totalOutChunkSize = sizeof(unsigned);
  710. #else
  711. totalOutChunkSize = 0;
  712. #endif
  713. }
  714. inline bool isEof(rowcount_t rowsRead)
  715. {
  716. return stopped || (writeAtEof && rowsWritten == rowsRead);
  717. }
  718. inline void signalReaders()
  719. {
  720. if (readersWaiting)
  721. {
  722. #ifdef TRACE_WRITEAHEAD
  723. ActPrintLogEx(&activity->queryContainer(), thorlog_all, MCdebugProgress, "signalReaders: %d", readersWaiting);
  724. #endif
  725. readersWaiting = 0;
  726. ForEachItemIn(o, outputs)
  727. queryCOutput(o).wakeRead(); // if necessary
  728. }
  729. }
  730. CRowSet *loadMore(unsigned output)
  731. {
  732. // needs to be called in crit
  733. Linked<CRowSet> rowSet;
  734. unsigned currentChunkNum = queryCOutput(output).currentChunkNum;
  735. if (currentChunkNum == totalChunksOut)
  736. {
  737. #ifdef TRACE_WRITEAHEAD
  738. PROGLOG("output=%d, chunk=%d INMEM", output, currentChunkNum);
  739. #endif
  740. rowSet.set(inMemRows);
  741. }
  742. else
  743. {
  744. unsigned whichChunk = queryCOutput(output).currentChunkNum - lowestChunk;
  745. #ifdef TRACE_WRITEAHEAD
  746. LOG(MCthorDetailedDebugInfo, thorJob, "output=%d, chunk=%d (whichChunk=%d)", output, currentChunkNum, whichChunk);
  747. #endif
  748. rowSet.setown(readRows(output, whichChunk));
  749. assertex(rowSet);
  750. }
  751. VALIDATEEQ(currentChunkNum, rowSet->queryChunk());
  752. advanceNextReadChunk(output);
  753. return rowSet.getClear();
  754. }
  755. protected:
  756. class COutput : public CSimpleInterface, implements IRowStream
  757. {
  758. CSharedWriteAheadBase &parent;
  759. unsigned output;
  760. rowcount_t rowsRead;
  761. bool eof, readerWaiting;
  762. Semaphore readWaitSem;
  763. Owned<CRowSet> outputOwnedRows;
  764. CRowSet *rowSet;
  765. unsigned row, rowsInRowSet;
  766. void init()
  767. {
  768. rowsRead = 0;
  769. currentChunkNum = 0;
  770. rowsInRowSet = row = 0;
  771. readerWaiting = eof = false;
  772. rowSet = NULL;
  773. }
  774. inline void doStop()
  775. {
  776. if (eof) return;
  777. eof = true;
  778. parent.stopOutput(output);
  779. }
  780. inline void wakeRead()
  781. {
  782. if (readerWaiting)
  783. {
  784. readerWaiting = false;
  785. readWaitSem.signal();
  786. }
  787. }
  788. public:
  789. unsigned currentChunkNum;
  790. public:
  791. IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
  792. COutput(CSharedWriteAheadBase &_parent, unsigned _output) : parent(_parent), output(_output)
  793. {
  794. init();
  795. }
  796. void reset()
  797. {
  798. init();
  799. outputOwnedRows.clear();
  800. }
  801. inline unsigned queryOutput() const { return output; }
  802. inline CRowSet *queryRowSet() { return rowSet; }
  803. const void *nextRow()
  804. {
  805. if (eof)
  806. return NULL;
  807. if (row == rowsInRowSet)
  808. {
  809. CriticalBlock b(parent.crit);
  810. if (!rowSet || (row == (rowsInRowSet = rowSet->getRowCount())))
  811. {
  812. rowcount_t totalRows = parent.readerWait(*this, rowsRead);
  813. if (0 == totalRows)
  814. {
  815. doStop();
  816. return NULL;
  817. }
  818. if (!rowSet || (row == (rowsInRowSet = rowSet->getRowCount()))) // may have caught up in same rowSet
  819. {
  820. Owned<CRowSet> newRows = parent.loadMore(output);
  821. if (rowSet)
  822. VALIDATEEQ(newRows->queryChunk(), rowSet->queryChunk()+1);
  823. outputOwnedRows.setown(newRows.getClear());
  824. rowSet = outputOwnedRows.get();
  825. rowsInRowSet = rowSet->getRowCount();
  826. row = 0;
  827. }
  828. }
  829. }
  830. rowsRead++;
  831. const void *retrow = rowSet->getRow(row++);
  832. return retrow;
  833. }
  834. virtual void stop()
  835. {
  836. CriticalBlock b(parent.crit);
  837. doStop();
  838. }
  839. friend class CSharedWriteAheadBase;
  840. };
  841. CActivityBase *activity;
  842. size32_t minChunkSize;
  843. unsigned lowestChunk, lowestOutput, outputCount, totalChunksOut;
  844. rowidx_t maxRows;
  845. bool stopped;
  846. Owned<CRowSet> inMemRows;
  847. CriticalSection crit;
  848. Linked<IOutputMetaData> meta;
  849. Linked<IOutputRowSerializer> serializer;
  850. QueueOf<CRowSet, false> chunkPool;
  851. unsigned maxPoolChunks;
  852. bool reuseRowSets;
  853. inline rowcount_t readerWait(COutput &output, const rowcount_t rowsRead)
  854. {
  855. if (rowsRead == rowsWritten)
  856. {
  857. if (stopped || writeAtEof)
  858. return 0;
  859. output.readerWaiting = true;
  860. #ifdef TRACE_WRITEAHEAD
  861. ActPrintLogEx(&activity->queryContainer(), thorlog_all, MCdebugProgress, "readerWait(%d)", output.queryOutput());
  862. #endif
  863. ++readersWaiting;
  864. {
  865. CriticalUnblock b(crit);
  866. unsigned mins=0;
  867. for (;;)
  868. {
  869. if (output.readWaitSem.wait(60000))
  870. break; // NB: will also be signal if aborting
  871. ActPrintLog(activity, "output %d @ row # %" RCPF "d, has been blocked for %d minute(s)", output.queryOutput(), rowsRead, ++mins);
  872. }
  873. }
  874. if (isEof(rowsRead))
  875. return 0;
  876. }
  877. return rowsWritten;
  878. }
  879. CRowSet *newRowSet(unsigned chunk)
  880. {
  881. {
  882. CriticalBlock b(rowSetCacheCrit);
  883. if (cachedRowSets.ordinality())
  884. {
  885. CRowSet *rowSet = &cachedRowSets.popGet();
  886. rowSet->setChunk(chunk);
  887. return rowSet;
  888. }
  889. }
  890. return new CRowSet(*this, chunk, maxRows);
  891. }
  892. inline COutput &queryCOutput(unsigned i) { return (COutput &) outputs.item(i); }
  893. inline unsigned getLowest()
  894. {
  895. unsigned o=0;
  896. offset_t lsf = (unsigned)-1;
  897. unsigned lsfOutput = (unsigned)-1;
  898. for (;;)
  899. {
  900. if (queryCOutput(o).currentChunkNum < lsf)
  901. {
  902. lsf = queryCOutput(o).currentChunkNum;
  903. lsfOutput = o;
  904. }
  905. if (++o == outputCount)
  906. break;
  907. }
  908. return lsfOutput;
  909. }
  910. void advanceNextReadChunk(unsigned output)
  911. {
  912. unsigned &currentChunkNum = queryCOutput(output).currentChunkNum;
  913. unsigned prevChunkNum = currentChunkNum;
  914. currentChunkNum++;
  915. if (output == lowestOutput) // might be joint lowest
  916. {
  917. lowestOutput = getLowest();
  918. if (currentChunkNum == queryCOutput(lowestOutput).currentChunkNum) // has new page moved up to join new lowest, in which case it was last.
  919. {
  920. if (prevChunkNum != totalChunksOut) // if equal, then prevOffset was mem page and not in pool/disk to free
  921. freeOffsetChunk(prevChunkNum);
  922. else
  923. {
  924. VALIDATEEQ(prevChunkNum, inMemRows->queryChunk()); // validate is current in mem page
  925. }
  926. ++lowestChunk;
  927. #ifdef TRACE_WRITEAHEAD
  928. ActPrintLogEx(&activity->queryContainer(), thorlog_all, MCdebugProgress, "aNRP: %d, lowestChunk=%d, totalChunksOut=%d", prevChunkNum, lowestChunk, totalChunksOut);
  929. #endif
  930. }
  931. }
  932. }
  933. void stopOutput(unsigned output)
  934. {
  935. // mark stopped output with very high page (as if read all), will not be picked again
  936. unsigned lowestA = getLowest();
  937. if (output == lowestA)
  938. {
  939. unsigned chunkOrig = queryCOutput(output).currentChunkNum;
  940. queryCOutput(output).currentChunkNum = (unsigned)-1;
  941. unsigned lO = getLowest(); // get next lowest
  942. queryCOutput(output).currentChunkNum = chunkOrig;
  943. if (((unsigned)-1) == lO) // if last to stop
  944. {
  945. markStop();
  946. return;
  947. }
  948. #ifdef TRACE_WRITEAHEAD
  949. ActPrintLogEx(&activity->queryContainer(), thorlog_all, MCdebugProgress, "Input %d stopped, forcing catchup to free pages", output);
  950. #endif
  951. while (queryCOutput(output).currentChunkNum < queryCOutput(lO).currentChunkNum)
  952. {
  953. Owned<CRowSet> tmpChunk = loadMore(output); // discard
  954. }
  955. #ifdef TRACE_WRITEAHEAD
  956. ActPrintLog(activity, "Input %d stopped. Caught up", output);
  957. #endif
  958. queryCOutput(output).currentChunkNum = (unsigned)-1;
  959. lowestOutput = getLowest();
  960. }
  961. else
  962. queryCOutput(output).currentChunkNum = (unsigned)-1;
  963. }
  964. void stopAll()
  965. {
  966. CriticalBlock b(crit);
  967. markStop();
  968. }
  969. virtual void markStop()
  970. {
  971. if (stopped) return;
  972. stopped = true;
  973. }
  974. virtual void freeOffsetChunk(unsigned chunk) = 0;
  975. virtual CRowSet *readRows(unsigned output, unsigned chunk) = 0;
  976. virtual void flushRows(ISharedSmartBufferCallback *callback) = 0;
  977. virtual size32_t rowSize(const void *row) = 0;
  978. public:
  979. IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
  980. CSharedWriteAheadBase(CActivityBase *_activity, unsigned _outputCount, IThorRowInterfaces *rowIf)
  981. : activity(_activity), outputCount(_outputCount), meta(rowIf->queryRowMetaData()), serializer(rowIf->queryRowSerializer())
  982. {
  983. init();
  984. minChunkSize = 0x2000;
  985. size32_t minSize = meta->getMinRecordSize();
  986. if (minChunkSize<minSize*10) // if rec minSize bigish, ensure reasonable minChunkSize
  987. {
  988. minChunkSize = minSize*10;
  989. if (minChunkSize > 0x10000)
  990. minChunkSize += 2*(minSize+1);
  991. }
  992. else if (0 == minSize) // unknown
  993. minSize = 1;
  994. /* NB: HPCC-25392 (fix to avoid clearing inMemRows on child query reset)
  995. * maxRows is used to intialize the CRowSet::rows' initial allocated row array size.
  996. * These row arrays are never freed, and never resized.
  997. * On reset the rows are cleared, but the row array is untouched.
  998. * When adding rows there are never more added than maxRows added, because a new row set
  999. * is created when minChunkSize is exceeded, which is a multiple of min row size, (see putRow)
  1000. *
  1001. * This is vital, because both the writer and readers may be reading the same array
  1002. * concurrently. Resizing the array in putRow, would invalidate the row array that a
  1003. * reader was currently accessing.
  1004. */
  1005. maxRows = (minChunkSize / minSize) + 1;
  1006. outputCount = _outputCount;
  1007. unsigned c=0;
  1008. for (; c<outputCount; c++)
  1009. {
  1010. outputs.append(* new COutput(*this, c));
  1011. }
  1012. inMemRows.setown(newRowSet(0));
  1013. maxPoolChunks = MIN_POOL_CHUNKS;
  1014. reuseRowSets = true;
  1015. }
  1016. ~CSharedWriteAheadBase()
  1017. {
  1018. reuseRowSets = false;
  1019. ForEachItemIn(o, outputs)
  1020. {
  1021. COutput &output = queryCOutput(o);
  1022. output.outputOwnedRows.clear();
  1023. }
  1024. inMemRows.clear();
  1025. }
  1026. unsigned anyReaderBehind()
  1027. {
  1028. unsigned reader=0;
  1029. for (;;)
  1030. {
  1031. if (reader>=outputCount) // so all waiting, don't need in mem page.
  1032. break;
  1033. if (!queryCOutput(reader).eof && (queryCOutput(reader).rowSet != inMemRows.get())) // if output using this in mem page, then it has linked already, need to find one that is not (i.e. behind)
  1034. return reader;
  1035. ++reader;
  1036. }
  1037. return NotFound;
  1038. }
  1039. // ISharedSmartBuffer
  1040. virtual void putRow(const void *row, ISharedSmartBufferCallback *callback)
  1041. {
  1042. if (stopped)
  1043. {
  1044. ReleaseThorRow(row);
  1045. return;
  1046. }
  1047. unsigned len=rowSize(row);
  1048. CriticalBlock b(crit);
  1049. bool paged = false;
  1050. if (totalOutChunkSize >= minChunkSize) // chunks required to be at least minChunkSize
  1051. {
  1052. unsigned reader=anyReaderBehind();
  1053. if (NotFound != reader)
  1054. flushRows(callback);
  1055. inMemRows.setown(newRowSet(++totalChunksOut));
  1056. #ifdef TRACE_WRITEAHEAD
  1057. totalOutChunkSize = sizeof(unsigned);
  1058. #else
  1059. totalOutChunkSize = 0;
  1060. #endif
  1061. /* If callback used to signal paged out, only signal readers on page event,
  1062. * This is to minimize time spent by fast readers constantly catching up and waiting and getting woken up per record
  1063. */
  1064. if (callback)
  1065. {
  1066. paged = true;
  1067. callback->paged();
  1068. }
  1069. }
  1070. inMemRows->addRow(row);
  1071. totalOutChunkSize += len;
  1072. rowsWritten++; // including NULLs(eogs)
  1073. if (!callback || paged)
  1074. signalReaders();
  1075. }
  1076. virtual void putRow(const void *row)
  1077. {
  1078. return putRow(row, NULL);
  1079. }
  1080. virtual void flush()
  1081. {
  1082. CriticalBlock b(crit);
  1083. writeAtEof = true;
  1084. signalReaders();
  1085. }
  1086. virtual offset_t getPosition()
  1087. {
  1088. throwUnexpected();
  1089. return 0;
  1090. }
  1091. virtual IRowStream *queryOutput(unsigned output)
  1092. {
  1093. return &outputs.item(output);
  1094. }
  1095. virtual void cancel()
  1096. {
  1097. CriticalBlock b(crit);
  1098. stopAll();
  1099. signalReaders();
  1100. }
  1101. virtual void reset()
  1102. {
  1103. init();
  1104. unsigned c=0;
  1105. for (; c<outputCount; c++)
  1106. queryCOutput(c).reset();
  1107. inMemRows->reset(0);
  1108. }
  1109. friend class COutput;
  1110. friend class CRowSet;
  1111. };
  1112. CRowSet::CRowSet(CSharedWriteAheadBase &_sharedWriteAhead, unsigned _chunk, unsigned maxRows)
  1113. : sharedWriteAhead(_sharedWriteAhead), rows(*_sharedWriteAhead.activity, _sharedWriteAhead.activity, ers_eogonly, stableSort_none, true, maxRows), chunk(_chunk)
  1114. {
  1115. }
  1116. bool CRowSet::Release() const
  1117. {
  1118. {
  1119. // NB: Occasionally, >1 thread may be releasing a CRowSet concurrently and miss a opportunity to reuse, but that's ok.
  1120. //No need to protect with a lock, because if not shared then it cannot be called at the same time by another thread,
  1121. if (!IsShared())
  1122. sharedWriteAhead.reuse((CRowSet *)this);
  1123. }
  1124. return CSimpleInterface::Release();
  1125. }
  1126. class CSharedWriteAheadDisk : public CSharedWriteAheadBase
  1127. {
  1128. Owned<IFile> spillFile;
  1129. Owned<IFileIO> spillFileIO;
  1130. CIArrayOf<Chunk> freeChunks;
  1131. PointerArrayOf<Chunk> freeChunksSized;
  1132. QueueOf<Chunk, false> savedChunks;
  1133. offset_t highOffset;
  1134. Linked<IEngineRowAllocator> allocator;
  1135. Linked<IOutputRowDeserializer> deserializer;
  1136. IOutputMetaData *serializeMeta;
  1137. struct AddRemoveFreeChunk
  1138. {
  1139. PointerArrayOf<Chunk> &chunkArray;
  1140. Chunk *chunk;
  1141. AddRemoveFreeChunk(PointerArrayOf<Chunk> &_chunkArray, Chunk *_chunk) : chunkArray(_chunkArray), chunk(_chunk)
  1142. {
  1143. chunkArray.zap(chunk);
  1144. }
  1145. ~AddRemoveFreeChunk()
  1146. {
  1147. bool isNew;
  1148. aindex_t sizePos = chunkArray.bAdd(chunk, chunkSizeCompare, isNew);
  1149. if (!isNew) // might match size of another block
  1150. chunkArray.add(chunk, sizePos);
  1151. }
  1152. };
  1153. unsigned findNext(Chunk **base, unsigned arrCount, Chunk *searchChunk, ChunkCompareFunc compareFunc, bool &matched)
  1154. {
  1155. unsigned middle;
  1156. unsigned left = 0;
  1157. int result;
  1158. Chunk *CurEntry;
  1159. unsigned right = arrCount-1;
  1160. /*
  1161. * Loop until we've narrowed down the search range to one or two
  1162. * items. This ensures that middle != 0, preventing 'right' from wrapping.
  1163. */
  1164. while (right - left > 1)
  1165. {
  1166. /* Calculate the middle entry in the array - NOT the middle offset */
  1167. middle = (right + left) >> 1;
  1168. CurEntry = base[middle];
  1169. result = compareFunc(searchChunk, CurEntry);
  1170. if (result < 0)
  1171. right = middle - 1;
  1172. else if (result > 0)
  1173. left = middle + 1;
  1174. else
  1175. {
  1176. matched = true;
  1177. return middle;
  1178. }
  1179. }
  1180. middle = left;
  1181. /*
  1182. * The range has been narrowed down to 1 or 2 items.
  1183. * Perform an optimal check on these last two elements.
  1184. */
  1185. result = compareFunc(searchChunk, base[middle]);
  1186. if (0 == result)
  1187. matched = true;
  1188. else if (result > 0)
  1189. {
  1190. ++middle;
  1191. if (right == left + 1)
  1192. {
  1193. result = compareFunc(searchChunk, base[middle]);
  1194. if (0 == result)
  1195. matched = true;
  1196. else
  1197. {
  1198. matched = false;
  1199. if (result > 0)
  1200. ++middle;
  1201. }
  1202. }
  1203. else
  1204. matched = false;
  1205. }
  1206. else
  1207. matched = false;
  1208. if (middle < arrCount)
  1209. return middle;
  1210. return NotFound;
  1211. }
  1212. inline Chunk *getOutOffset(size32_t required)
  1213. {
  1214. unsigned numFreeChunks = freeChunks.ordinality();
  1215. if (numFreeChunks)
  1216. {
  1217. Chunk searchChunk(0, required);
  1218. bool matched;
  1219. unsigned nextPos = findNext(freeChunksSized.getArray(), freeChunksSized.ordinality(), &searchChunk, chunkSizeCompare2, matched);
  1220. if (NotFound != nextPos)
  1221. {
  1222. Linked<Chunk> nextChunk = freeChunksSized.item(nextPos);
  1223. if (nextChunk->size-required >= minChunkSize)
  1224. {
  1225. Owned<Chunk> chunk = new Chunk(nextChunk->offset, required);
  1226. decFreeChunk(nextChunk, required);
  1227. #ifdef TRACE_WRITEAHEAD
  1228. ActPrintLogEx(&activity->queryContainer(), thorlog_all, MCdebugProgress, "getOutOffset: got [free] offset = %" I64F "d, size=%d, but took required=%d", nextChunk->offset, nextChunk->size+required, required);
  1229. #endif
  1230. return chunk.getClear();
  1231. }
  1232. #ifdef TRACE_WRITEAHEAD
  1233. ActPrintLogEx(&activity->queryContainer(), thorlog_all, MCdebugProgress, "getOutOffset: got [free] offset = %" I64F "d, size=%d", nextChunk->offset, nextChunk->size);
  1234. #endif
  1235. freeChunksSized.remove(nextPos);
  1236. freeChunks.zap(*nextChunk);
  1237. return nextChunk.getClear(); // if smaller than minChunkSize, return all, don't leave useless fragment
  1238. }
  1239. }
  1240. #ifdef TRACE_WRITEAHEAD
  1241. ActPrintLog(activity, "getOutOffset: got new upper offset # = %" I64F "d", highOffset);
  1242. #endif
  1243. Chunk *chunk = new Chunk(highOffset, required);
  1244. highOffset += required; // NB next
  1245. return chunk;
  1246. }
  1247. inline void mergeFreeChunk(Chunk *dst, const Chunk *src)
  1248. {
  1249. AddRemoveFreeChunk ar(freeChunksSized, dst);
  1250. dst->offset = src->offset;
  1251. dst->size += src->size;
  1252. }
  1253. inline void decFreeChunk(Chunk *chunk, size32_t sz)
  1254. {
  1255. AddRemoveFreeChunk ar(freeChunksSized, chunk);
  1256. chunk->size -= sz;
  1257. chunk->offset += sz;
  1258. }
  1259. inline void incFreeChunk(Chunk *chunk, size32_t sz)
  1260. {
  1261. AddRemoveFreeChunk ar(freeChunksSized, chunk);
  1262. chunk->size += sz;
  1263. }
  1264. inline void addFreeChunk(Chunk *freeChunk, unsigned pos=NotFound)
  1265. {
  1266. bool isNew;
  1267. aindex_t sizePos = freeChunksSized.bAdd(freeChunk, chunkSizeCompare, isNew);
  1268. if (!isNew)
  1269. freeChunksSized.add(freeChunk, sizePos);
  1270. if (NotFound == pos)
  1271. freeChunks.append(*LINK(freeChunk));
  1272. else
  1273. freeChunks.add(*LINK(freeChunk), pos);
  1274. }
  1275. virtual void freeOffsetChunk(unsigned chunk)
  1276. {
  1277. // chunk(unused here) is nominal sequential page #, savedChunks is page # in diskfile
  1278. assertex(savedChunks.ordinality());
  1279. Owned<Chunk> freeChunk = savedChunks.dequeue();
  1280. if (freeChunk->rowSet)
  1281. {
  1282. Owned<CRowSet> rowSet = chunkPool.dequeue(freeChunk->rowSet);
  1283. #ifdef TRACE_WRITEAHEAD
  1284. ActPrintLogEx(&activity->queryContainer(), thorlog_all, MCdebugProgress, "freeOffsetChunk (chunk=%d) chunkPool, savedChunks size=%d, chunkPool size=%d", rowSet->queryChunk(), savedChunks.ordinality(), chunkPool.ordinality());
  1285. #endif
  1286. VALIDATEEQ(chunk, rowSet->queryChunk());
  1287. return;
  1288. }
  1289. unsigned nmemb = freeChunks.ordinality();
  1290. if (0 == nmemb)
  1291. addFreeChunk(freeChunk);
  1292. else
  1293. {
  1294. bool matched;
  1295. unsigned nextPos = findNext(freeChunks.getArray(), freeChunks.ordinality(), freeChunk, chunkOffsetCompare, matched);
  1296. assertex(!matched); // should never happen, have found a chunk with same offset as one being freed.
  1297. if (NotFound != nextPos)
  1298. {
  1299. Chunk *nextChunk = &freeChunks.item(nextPos);
  1300. if (freeChunk->endOffset() == nextChunk->offset)
  1301. mergeFreeChunk(nextChunk, freeChunk);
  1302. else if (nextPos > 0)
  1303. {
  1304. Chunk *prevChunk = &freeChunks.item(nextPos-1);
  1305. if (prevChunk->endOffset() == freeChunk->offset)
  1306. incFreeChunk(prevChunk, freeChunk->size);
  1307. else
  1308. addFreeChunk(freeChunk, nextPos);
  1309. }
  1310. else
  1311. addFreeChunk(freeChunk, nextPos);
  1312. }
  1313. else
  1314. addFreeChunk(freeChunk);
  1315. }
  1316. #ifdef TRACE_WRITEAHEAD
  1317. ActPrintLogEx(&activity->queryContainer(), thorlog_all, MCdebugProgress, "Added chunk, offset %" I64F "d size=%d to freeChunks", freeChunk->offset, freeChunk->size);
  1318. #endif
  1319. }
  1320. virtual CRowSet *readRows(unsigned output, unsigned whichChunk)
  1321. {
  1322. assertex(savedChunks.ordinality() > whichChunk);
  1323. unsigned currentChunkNum = queryCOutput(output).currentChunkNum;
  1324. unsigned o=0;
  1325. for (; o<outputCount; o++)
  1326. {
  1327. if (o == output) continue;
  1328. COutput &coutput = queryCOutput(o);
  1329. if (coutput.queryRowSet() && (coutput.queryRowSet()->queryChunk() == currentChunkNum))
  1330. {
  1331. #ifdef TRACE_WRITEAHEAD
  1332. ActPrintLogEx(&activity->queryContainer(), thorlog_all, MCdebugProgress, "Output: %d, readRows found other output %d with matching offset: %d", output, o, currentChunkNum);
  1333. #endif
  1334. return LINK(coutput.queryRowSet());
  1335. }
  1336. }
  1337. Chunk &chunk = *savedChunks.item(whichChunk);
  1338. Owned<CRowSet> rowSet;
  1339. if (chunk.rowSet)
  1340. {
  1341. rowSet.set(chunk.rowSet);
  1342. #ifdef TRACE_WRITEAHEAD
  1343. ActPrintLogEx(&activity->queryContainer(), thorlog_all, MCdebugProgress, "readRows (chunk=%d) output: %d, savedChunks size=%d, chunkPool size=%d, currentChunkNum=%d, whichChunk=%d", rowSet->queryChunk(), output, savedChunks.ordinality(), chunkPool.ordinality(), currentChunkNum, whichChunk);
  1344. #endif
  1345. VALIDATEEQ(rowSet->queryChunk(), currentChunkNum);
  1346. }
  1347. else
  1348. {
  1349. Owned<ISerialStream> stream = createFileSerialStream(spillFileIO, chunk.offset);
  1350. #ifdef TRACE_WRITEAHEAD
  1351. unsigned diskChunkNum;
  1352. stream->get(sizeof(diskChunkNum), &diskChunkNum);
  1353. VALIDATEEQ(diskChunkNum, currentChunkNum);
  1354. #endif
  1355. CThorStreamDeserializerSource ds(stream);
  1356. rowSet.setown(newRowSet(currentChunkNum));
  1357. for (;;)
  1358. {
  1359. byte b;
  1360. ds.read(sizeof(b),&b);
  1361. if (!b)
  1362. break;
  1363. if (1==b)
  1364. {
  1365. RtlDynamicRowBuilder rowBuilder(allocator);
  1366. size32_t sz = deserializer->deserialize(rowBuilder, ds);
  1367. rowSet->addRow(rowBuilder.finalizeRowClear(sz));
  1368. }
  1369. else if (2==b)
  1370. rowSet->addRow(NULL);
  1371. }
  1372. }
  1373. return rowSet.getClear();
  1374. }
  1375. virtual void flushRows(ISharedSmartBufferCallback *callback)
  1376. {
  1377. // NB: called in crit
  1378. Owned<Chunk> chunk;
  1379. if (chunkPool.ordinality() < maxPoolChunks)
  1380. {
  1381. chunk.setown(new Chunk(inMemRows));
  1382. chunkPool.enqueue(inMemRows.getLink());
  1383. #ifdef TRACE_WRITEAHEAD
  1384. ActPrintLogEx(&activity->queryContainer(), thorlog_all, MCdebugProgress, "flushRows (chunk=%d) savedChunks size=%d, chunkPool size=%d", inMemRows->queryChunk(), savedChunks.ordinality()+1, chunkPool.ordinality());
  1385. #endif
  1386. }
  1387. else
  1388. {
  1389. /* It might be worth adding a heuristic here, to estimate time for readers to catch up, vs time spend writing and reading.
  1390. * Could block for readers to catch up if cost of writing/reads outweighs avg cost of catch up...
  1391. */
  1392. MemoryBuffer mb;
  1393. mb.ensureCapacity(minChunkSize); // starting size/could be more if variable and bigger
  1394. #ifdef TRACE_WRITEAHEAD
  1395. mb.append(inMemRows->queryChunk()); // for debug purposes only
  1396. #endif
  1397. CMemoryRowSerializer mbs(mb);
  1398. unsigned r=0;
  1399. for (;r<inMemRows->getRowCount();r++)
  1400. {
  1401. OwnedConstThorRow row = inMemRows->getRow(r);
  1402. if (row)
  1403. {
  1404. mb.append((byte)1);
  1405. serializer->serialize(mbs,(const byte *)row.get());
  1406. }
  1407. else
  1408. mb.append((byte)2); // eog
  1409. }
  1410. mb.append((byte)0);
  1411. size32_t len = mb.length();
  1412. chunk.setown(getOutOffset(len)); // will find space for 'len', might be bigger if from free list
  1413. spillFileIO->write(chunk->offset, len, mb.toByteArray());
  1414. #ifdef TRACE_WRITEAHEAD
  1415. ActPrintLogEx(&activity->queryContainer(), thorlog_all, MCdebugProgress, "Flushed chunk = %d (savedChunks pos=%d), writeOffset = %" I64F "d, writeSize = %d", inMemRows->queryChunk(), savedChunks.ordinality(), chunk->offset, len);
  1416. #endif
  1417. }
  1418. savedChunks.enqueue(chunk.getClear());
  1419. }
  1420. virtual size32_t rowSize(const void *row)
  1421. {
  1422. if (!row)
  1423. return 1; // eog;
  1424. else if (meta == serializeMeta)
  1425. return meta->getRecordSize(row)+1; // space on disk, +1 = eog marker
  1426. CSizingSerializer ssz;
  1427. serializer->serialize(ssz,(const byte *)row);
  1428. return ssz.size()+1; // space on disk, +1 = eog marker
  1429. }
  1430. public:
  1431. CSharedWriteAheadDisk(CActivityBase *activity, const char *spillName, unsigned outputCount, IThorRowInterfaces *rowIf) : CSharedWriteAheadBase(activity, outputCount, rowIf),
  1432. allocator(rowIf->queryRowAllocator()), deserializer(rowIf->queryRowDeserializer()), serializeMeta(meta->querySerializedDiskMeta())
  1433. {
  1434. assertex(spillName);
  1435. spillFile.setown(createIFile(spillName));
  1436. spillFile->setShareMode(IFSHnone);
  1437. spillFileIO.setown(spillFile->open(IFOcreaterw));
  1438. highOffset = 0;
  1439. }
  1440. ~CSharedWriteAheadDisk()
  1441. {
  1442. spillFileIO.clear();
  1443. if (spillFile)
  1444. spillFile->remove();
  1445. for (;;)
  1446. {
  1447. Owned<Chunk> chunk = savedChunks.dequeue();
  1448. if (!chunk) break;
  1449. }
  1450. LOG(MCthorDetailedDebugInfo, thorJob, "CSharedWriteAheadDisk: highOffset=%" I64F "d", highOffset);
  1451. }
  1452. virtual void reset()
  1453. {
  1454. CSharedWriteAheadBase::reset();
  1455. for (;;)
  1456. {
  1457. Owned<Chunk> chunk = savedChunks.dequeue();
  1458. if (!chunk) break;
  1459. }
  1460. freeChunks.kill();
  1461. freeChunksSized.kill();
  1462. highOffset = 0;
  1463. spillFileIO->setSize(0);
  1464. }
  1465. };
  1466. ISharedSmartBuffer *createSharedSmartDiskBuffer(CActivityBase *activity, const char *spillname, unsigned outputs, IThorRowInterfaces *rowIf)
  1467. {
  1468. return new CSharedWriteAheadDisk(activity, spillname, outputs, rowIf);
  1469. }
  1470. class CSharedWriteAheadMem : public CSharedWriteAheadBase
  1471. {
  1472. Semaphore poolSem;
  1473. bool writerBlocked;
  1474. virtual void markStop()
  1475. {
  1476. CSharedWriteAheadBase::markStop();
  1477. if (writerBlocked)
  1478. {
  1479. writerBlocked = false;
  1480. poolSem.signal();
  1481. }
  1482. }
  1483. virtual void freeOffsetChunk(unsigned chunk)
  1484. {
  1485. Owned<CRowSet> topRowSet = chunkPool.dequeue();
  1486. VALIDATEEQ(chunk, topRowSet->queryChunk());
  1487. #ifdef TRACE_WRITEAHEAD
  1488. ActPrintLogEx(&activity->queryContainer(), thorlog_all, MCdebugProgress, "freeOffsetChunk: Dequeue chunkPool chunks: %d, chunkPool.ordinality() = %d", topRowSet->queryChunk(), chunkPool.ordinality());
  1489. #endif
  1490. topRowSet.clear();
  1491. if (writerBlocked)
  1492. {
  1493. writerBlocked = false;
  1494. poolSem.signal();
  1495. }
  1496. }
  1497. virtual CRowSet *readRows(unsigned output, unsigned whichChunk)
  1498. {
  1499. Linked<CRowSet> rowSet = chunkPool.item(whichChunk);
  1500. VALIDATEEQ(queryCOutput(output).currentChunkNum, rowSet->queryChunk());
  1501. return rowSet.getClear();
  1502. }
  1503. virtual void flushRows(ISharedSmartBufferCallback *callback)
  1504. {
  1505. // NB: called in crit
  1506. if (chunkPool.ordinality() >= maxPoolChunks)
  1507. {
  1508. writerBlocked = true;
  1509. {
  1510. CriticalUnblock b(crit);
  1511. if (callback)
  1512. callback->blocked();
  1513. poolSem.wait();
  1514. if (callback)
  1515. callback->unblocked();
  1516. if (stopped) return;
  1517. }
  1518. unsigned reader=anyReaderBehind();
  1519. if (NotFound == reader)
  1520. {
  1521. #ifdef TRACE_WRITEAHEAD
  1522. ActPrintLogEx(&activity->queryContainer(), thorlog_all, MCdebugProgress, "flushRows: caught up whilst blocked to: %d", inMemRows->queryChunk());
  1523. #endif
  1524. return; // caught up whilst blocked
  1525. }
  1526. VALIDATELT(chunkPool.ordinality(), maxPoolChunks);
  1527. }
  1528. #ifdef TRACE_WRITEAHEAD
  1529. ActPrintLogEx(&activity->queryContainer(), thorlog_all, MCdebugProgress, "Flushed chunk = %d, chunkPool chunks = %d", inMemRows->queryChunk(), 1+chunkPool.ordinality());
  1530. #endif
  1531. chunkPool.enqueue(inMemRows.getClear());
  1532. }
  1533. virtual size32_t rowSize(const void *row)
  1534. {
  1535. if (!row)
  1536. return 1; // eog;
  1537. CSizingSerializer ssz;
  1538. serializer->serialize(ssz, (const byte *)row);
  1539. return ssz.size();
  1540. }
  1541. public:
  1542. CSharedWriteAheadMem(CActivityBase *activity, unsigned outputCount, IThorRowInterfaces *rowif, unsigned buffSize) : CSharedWriteAheadBase(activity, outputCount, rowif)
  1543. {
  1544. if (((unsigned)-1) == buffSize)
  1545. maxPoolChunks = (unsigned)-1; // no limit
  1546. else
  1547. {
  1548. maxPoolChunks = buffSize / minChunkSize;
  1549. if (maxPoolChunks < MIN_POOL_CHUNKS)
  1550. maxPoolChunks = MIN_POOL_CHUNKS;
  1551. }
  1552. writerBlocked = false;
  1553. }
  1554. ~CSharedWriteAheadMem()
  1555. {
  1556. for (;;)
  1557. {
  1558. Owned<CRowSet> rowSet = chunkPool.dequeue();
  1559. if (!rowSet)
  1560. break;
  1561. }
  1562. }
  1563. virtual void reset()
  1564. {
  1565. CSharedWriteAheadBase::reset();
  1566. for (;;)
  1567. {
  1568. Owned<CRowSet> rowSet = chunkPool.dequeue();
  1569. if (!rowSet)
  1570. break;
  1571. }
  1572. writerBlocked = false;
  1573. }
  1574. };
  1575. ISharedSmartBuffer *createSharedSmartMemBuffer(CActivityBase *activity, unsigned outputs, IThorRowInterfaces *rowIf, unsigned buffSize)
  1576. {
  1577. return new CSharedWriteAheadMem(activity, outputs, rowIf, buffSize);
  1578. }
  1579. class CRowMultiWriterReader : public CSimpleInterface, implements IRowMultiWriterReader
  1580. {
  1581. rowidx_t readGranularity, writeGranularity, rowPos, limit, rowsToRead;
  1582. CThorSpillableRowArray rows;
  1583. const void **readRows;
  1584. CActivityBase &activity;
  1585. IThorRowInterfaces *rowIf;
  1586. bool readerBlocked, eos, eow;
  1587. Semaphore emptySem, fullSem;
  1588. unsigned numWriters, writersComplete, writersBlocked;
  1589. class CAWriter : public CSimpleInterface, implements IRowWriter
  1590. {
  1591. CRowMultiWriterReader &owner;
  1592. CThorExpandingRowArray rows;
  1593. public:
  1594. IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
  1595. CAWriter(CRowMultiWriterReader &_owner) : owner(_owner), rows(_owner.activity, _owner.rowIf)
  1596. {
  1597. }
  1598. ~CAWriter()
  1599. {
  1600. flush();
  1601. owner.writerStopped();
  1602. }
  1603. // IRowWriter impl.
  1604. virtual void putRow(const void *row)
  1605. {
  1606. if (rows.ordinality() >= owner.writeGranularity)
  1607. owner.addRows(rows);
  1608. rows.append(row);
  1609. }
  1610. virtual void flush()
  1611. {
  1612. if (rows.ordinality())
  1613. owner.addRows(rows);
  1614. }
  1615. };
  1616. void addRows(CThorExpandingRowArray &inRows)
  1617. {
  1618. for (;;)
  1619. {
  1620. {
  1621. CThorArrayLockBlock block(rows);
  1622. if (eos)
  1623. {
  1624. inRows.kill();
  1625. return;
  1626. }
  1627. if (rows.numCommitted() < limit)
  1628. {
  1629. // NB: allowed to go over limit, by as much as inRows.ordinality()-1
  1630. rows.appendRows(inRows, true);
  1631. if (rows.numCommitted() >= readGranularity)
  1632. checkReleaseReader();
  1633. return;
  1634. }
  1635. writersBlocked++;
  1636. }
  1637. fullSem.wait();
  1638. }
  1639. }
  1640. inline void checkReleaseReader()
  1641. {
  1642. if (readerBlocked)
  1643. {
  1644. emptySem.signal();
  1645. readerBlocked = false;
  1646. }
  1647. }
  1648. inline void checkReleaseWriters()
  1649. {
  1650. if (writersBlocked)
  1651. {
  1652. fullSem.signal(writersBlocked);
  1653. writersBlocked = 0;
  1654. }
  1655. }
  1656. public:
  1657. IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
  1658. CRowMultiWriterReader(CActivityBase &_activity, IThorRowInterfaces *_rowIf, unsigned _limit, unsigned _readGranularity, unsigned _writerGranularity)
  1659. : activity(_activity), rowIf(_rowIf), rows(_activity, _rowIf), limit(_limit), readGranularity(_readGranularity), writeGranularity(_writerGranularity)
  1660. {
  1661. if (readGranularity > limit)
  1662. readGranularity = limit; // readGranularity must be <= limit;
  1663. numWriters = 0;
  1664. readRows = static_cast<const void * *>(activity.queryRowManager()->allocate(readGranularity * sizeof(void*), activity.queryContainer().queryId()));
  1665. eos = eow = readerBlocked = false;
  1666. rowPos = rowsToRead = 0;
  1667. writersComplete = writersBlocked = 0;
  1668. rows.setup(rowIf, ers_forbidden, stableSort_none, true); // turning on throwOnOom;
  1669. }
  1670. ~CRowMultiWriterReader()
  1671. {
  1672. roxiemem::ReleaseRoxieRowRange(readRows, rowPos, rowsToRead);
  1673. ReleaseThorRow(readRows);
  1674. }
  1675. void writerStopped()
  1676. {
  1677. CThorArrayLockBlock block(rows);
  1678. writersComplete++;
  1679. if (writersComplete == numWriters)
  1680. {
  1681. rows.flush();
  1682. eow = true;
  1683. checkReleaseReader();
  1684. }
  1685. }
  1686. // ISharedWriteBuffer impl.
  1687. virtual IRowWriter *getWriter()
  1688. {
  1689. CThorArrayLockBlock block(rows);
  1690. ++numWriters;
  1691. return new CAWriter(*this);
  1692. }
  1693. virtual void abort()
  1694. {
  1695. CThorArrayLockBlock block(rows);
  1696. eos = true;
  1697. checkReleaseWriters();
  1698. checkReleaseReader();
  1699. }
  1700. // IRowStream impl.
  1701. virtual const void *nextRow()
  1702. {
  1703. if (eos)
  1704. return NULL;
  1705. if (rowPos == rowsToRead)
  1706. {
  1707. for (;;)
  1708. {
  1709. {
  1710. CThorArrayLockBlock block(rows);
  1711. if (rows.numCommitted() >= readGranularity || eow)
  1712. {
  1713. rowsToRead = (eow && rows.numCommitted() < readGranularity) ? rows.numCommitted() : readGranularity;
  1714. if (0 == rowsToRead)
  1715. {
  1716. eos = true;
  1717. return NULL;
  1718. }
  1719. rows.readBlock(readRows, rowsToRead);
  1720. rowPos = 0;
  1721. checkReleaseWriters();
  1722. break; // fall through to return a row
  1723. }
  1724. readerBlocked = true;
  1725. }
  1726. emptySem.wait();
  1727. if (eos)
  1728. return NULL;
  1729. }
  1730. }
  1731. const void *row = readRows[rowPos];
  1732. readRows[rowPos] = NULL;
  1733. ++rowPos;
  1734. return row;
  1735. }
  1736. virtual void stop()
  1737. {
  1738. eos = true;
  1739. checkReleaseWriters();
  1740. }
  1741. };
  1742. IRowMultiWriterReader *createSharedWriteBuffer(CActivityBase *activity, IThorRowInterfaces *rowif, unsigned limit, unsigned readGranularity, unsigned writeGranularity)
  1743. {
  1744. return new CRowMultiWriterReader(*activity, rowif, limit, readGranularity, writeGranularity);
  1745. }
  1746. class CRCFileStream: public CSimpleInterface, implements IFileIOStream
  1747. {
  1748. Linked<IFileIOStream> streamin;
  1749. public:
  1750. CRC32 &crc;
  1751. bool valid;
  1752. IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
  1753. CRCFileStream(IFileIOStream *_stream,CRC32 &_crc)
  1754. : streamin(_stream), crc(_crc)
  1755. {
  1756. valid = true;
  1757. }
  1758. size32_t read(size32_t len, void * data)
  1759. {
  1760. size32_t rd = streamin->read(len,data);
  1761. crc.tally(rd,data);
  1762. return rd;
  1763. }
  1764. size32_t write(size32_t len, const void * data)
  1765. {
  1766. throw MakeStringException(-1,"CRCFileStream does not support write");
  1767. }
  1768. void seek(offset_t pos, IFSmode origin)
  1769. {
  1770. offset_t t = streamin->tell();
  1771. streamin->seek(pos,origin);
  1772. if (t!=streamin->tell()) // crc invalid
  1773. if (valid) {
  1774. IWARNLOG("CRCFileStream::seek called - CRC will be invalid");
  1775. valid = false;
  1776. }
  1777. }
  1778. offset_t size()
  1779. {
  1780. return streamin->size();
  1781. }
  1782. virtual offset_t tell()
  1783. {
  1784. return streamin->tell();
  1785. }
  1786. void flush()
  1787. {
  1788. // noop as only read
  1789. }
  1790. };