1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783 |
- /*##############################################################################
- HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- ############################################################################## */
- #include "platform.h"
- #include <limits.h>
- #include <stddef.h>
- #include "jlib.hpp"
- #include "jmisc.hpp"
- #include "jio.hpp"
- #include "jlzw.hpp"
- #include "jsem.hpp"
- #include "jthread.hpp"
- #include "jarray.hpp"
- #include "jiface.hpp"
- #include "jfile.hpp"
- #include "jset.hpp"
- #include "jqueue.tpp"
- #include "thmem.hpp"
- #include "thalloc.hpp"
- #include "thbuf.hpp"
- #include "eclrtl.hpp"
- #include "thgraph.hpp"
- #ifdef _DEBUG
- //#define _FULL_TRACE
- //#define TRACE_CREATE
- struct SmartBufReenterCheck
- {
- bool ✓
- SmartBufReenterCheck(bool &_check)
- : check(_check)
- {
- assertex(!check);
- check = true;
- }
- ~SmartBufReenterCheck()
- {
- assertex(check);
- check = false;
- }
- };
- #define REENTRANCY_CHECK(n) SmartBufReenterCheck recheck(n);
- #else
- #define REENTRANCY_CHECK(n)
- #endif
- class CSmartRowBuffer: public CSimpleInterface, implements ISmartRowBuffer, implements IRowWriter
- {
- CActivityBase *activity;
- ThorRowQueue *in;
- size32_t insz;
- ThorRowQueue *out;
- Linked<IFile> file;
- Owned<IFileIO> fileio;
- SpinLock lock;
- bool waiting;
- Semaphore waitsem;
- bool waitflush;
- Semaphore waitflushsem;
- size32_t blocksize;
- unsigned numblocks;
- Owned<IBitSet> diskfree;
- UnsignedArray diskin;
- UnsignedArray diskinlen;
- Linked<IOutputRowSerializer> serializer;
- Linked<IOutputRowDeserializer> deserializer;
- Linked<IEngineRowAllocator> allocator;
- bool eoi;
- #ifdef _DEBUG
- bool putrecheck;
- bool getrecheck;
- #endif
- unsigned allocblk(unsigned nb)
- {
- if (nb<=numblocks) {
- for (unsigned i=0;i<=numblocks-nb;i++) {
- unsigned j;
- for (j=i;j<i+nb;j++)
- if (!diskfree->test(j))
- break;
- if (j==i+nb) {
- while (j>i)
- diskfree->set(--j,false);
- return i;
- }
- }
- }
- unsigned ret = numblocks;
- numblocks += nb;
- return ret;
- }
- void freeblk(unsigned b,unsigned nb)
- {
- for (unsigned i=0;i<nb;i++)
- diskfree->set(i+b,true);
- }
- void diskflush()
- {
- struct cSaveIn {
- ThorRowQueue *rq;
- ThorRowQueue *∈
- cSaveIn(ThorRowQueue *&_in)
- : in(_in)
- {
- rq = in;
- in = NULL;
- }
- ~cSaveIn()
- {
- if (!in)
- in = rq;
- }
- } csavein(in); // mark as not there so consumer will pause
- if (out && (out->ordinality()==0) && (diskin.ordinality()==0)) {
- in = out;
- out = csavein.rq;
- insz = 0;
- return;
- }
- if (!fileio) {
- SpinUnblock unblock(lock);
- fileio.setown(file->open(IFOcreaterw));
- if (!fileio)
- {
- throw MakeStringException(-1,"CSmartRowBuffer::flush cannot write file %s",file->queryFilename());
- }
- }
- MemoryBuffer mb;
- CMemoryRowSerializer mbs(mb);
- unsigned nb = 0;
- unsigned blk = 0;
- if (!waiting) {
- mb.ensureCapacity(blocksize);
- {
- SpinUnblock unblock(lock);
- unsigned p = 0;
- byte b;
- for (unsigned i=0;i<csavein.rq->ordinality();i++) {
- if (waiting)
- break;
- const void *row = csavein.rq->item(i);
- if (row) {
- b = 1;
- mb.append(b);
- serializer->serialize(mbs,(const byte *)row);
- }
- else {
- b = 2; // eog
- mb.append(b);
- }
- }
- b = 0;
- mb.append(b);
- }
- }
- if (!waiting) {
- nb = (mb.length()+blocksize-1)/blocksize;
- blk = allocblk(nb);
- SpinUnblock unblock(lock);
- if (mb.length()<nb*blocksize) { // bit overkill!
- size32_t left = nb*blocksize-mb.length();
- memset(mb.reserve(left),0,left);
- }
- fileio->write(blk*(offset_t)blocksize,mb.length(),mb.bufferBase());
- mb.clear();
- }
- if (waiting) {
- // if eoi, stopped while serializing/writing, putRow may put final row to existing 'in'
- if (!eoi)
- {
- // consumer caught up
- freeblk(blk,nb);
- assertex(out->ordinality()==0);
- assertex(diskin.ordinality()==0);
- in = out;
- out = csavein.rq;
- }
- }
- else {
- diskin.append(blk);
- diskinlen.append(nb);
- while (csavein.rq->ordinality())
- ReleaseThorRow(csavein.rq->dequeue());
- }
- insz = 0;
- }
- void load()
- {
- ThorRowQueue *rq = out;
- out = NULL; // mark as not there so producer won't fill
- unsigned blk = diskin.item(0);
- unsigned nb = diskinlen.item(0);
- diskin.remove(0); // better as q but given reading from disk...
- diskinlen.remove(0);
- {
- SpinUnblock unblock(lock);
- MemoryAttr ma;
- size32_t readBlockSize = nb*blocksize;
- byte *buf = (byte *)ma.allocate(readBlockSize);
- CThorStreamDeserializerSource ds(readBlockSize,buf);
- assertex(fileio.get());
- size32_t rd = fileio->read(blk*(offset_t)blocksize,readBlockSize,buf);
- assertex(rd==readBlockSize);
- unsigned p = 0;
- loop {
- byte b;
- ds.read(sizeof(b),&b);
- if (!b)
- break;
- if (b==1) {
- RtlDynamicRowBuilder rowBuilder(allocator);
- size32_t sz = deserializer->deserialize(rowBuilder,ds);
- rq->enqueue(rowBuilder.finalizeRowClear(sz));
- }
- else if (b==2)
- rq->enqueue(NULL);
- }
- }
- freeblk(blk,nb);
- out = rq;
- }
- public:
- IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
- CSmartRowBuffer(CActivityBase *_activity, IFile *_file,size32_t bufsize,IRowInterfaces *rowif)
- : activity(_activity), file(_file), allocator(rowif->queryRowAllocator()), serializer(rowif->queryRowSerializer()), deserializer(rowif->queryRowDeserializer())
- {
- #ifdef _DEBUG
- putrecheck = false;
- getrecheck = false;
- #endif
- in = new ThorRowQueue;
- out = new ThorRowQueue;
- waiting = false;
- waitflush = false;
- blocksize = ((bufsize/2+0xfffff)/0x100000)*0x100000;
- numblocks = 0;
- insz = 0;
- eoi = false;
- diskfree.setown(createBitSet());
- #ifdef _FULL_TRACE
- ActPrintLog(activity, "SmartBuffer create %x",(unsigned)(memsize_t)this);
- #endif
- }
- ~CSmartRowBuffer()
- {
- #ifdef _FULL_TRACE
- ActPrintLog(activity, "SmartBuffer destroy %x",(unsigned)(memsize_t)this);
- #endif
- assertex(!waiting);
- assertex(!waitflush);
- // clear in/out contents
- while (in->ordinality())
- ReleaseThorRow(in->dequeue());
- delete in;
- while (out->ordinality())
- ReleaseThorRow(out->dequeue());
- delete out;
- }
- void putRow(const void *row)
- {
- REENTRANCY_CHECK(putrecheck)
- size32_t sz = thorRowMemoryFootprint(serializer, row);
- SpinBlock block(lock);
- if (eoi) {
- ReleaseThorRow(row);
-
- return;
- }
- assertex(in); // reentry check
- if (sz+insz+(in->ordinality()+2)*sizeof(byte)>blocksize) // byte extra per row + end byte
- diskflush();
- in->enqueue(row);
- insz += sz;
- if (waiting) {
- waitsem.signal();
- waiting = false;
- }
- }
- void stop()
- {
- #ifdef _FULL_TRACE
- ActPrintLog(activity, "SmartBuffer stop %x",(unsigned)(memsize_t)this);
- #endif
- SpinBlock block(lock);
- #ifdef _DEBUG
- if (waiting)
- {
- ActPrintLogEx(&activity->queryContainer(), thorlog_null, MCwarning, "CSmartRowBuffer::stop while nextRow waiting");
- PrintStackReport();
- }
- #endif
- eoi = true;
- while (out&&out->ordinality())
- ReleaseThorRow(out->dequeue());
- while (NULL == in)
- {
- waiting = true;
- SpinUnblock unblock(lock);
- waitsem.wait();
- }
- while (out&&out->ordinality())
- ReleaseThorRow(out->dequeue());
- while (in&&in->ordinality())
- ReleaseThorRow(in->dequeue());
- diskin.kill();
- if (waiting) {
- waitsem.signal();
- waiting = false;
- }
- if (waitflush) {
- waitflushsem.signal();
- waitflush = false;
- }
- }
- const void *nextRow()
- {
- REENTRANCY_CHECK(getrecheck)
- const void * ret;
- assertex(out);
- assertex(!waiting); // reentrancy checks
- loop {
- {
- SpinBlock block(lock);
- if (out->ordinality()) {
- ret = out->dequeue();
- break;
- }
- if (diskin.ordinality()) {
- load();
- ret = out->dequeue();
- break;
- }
- if (in) {
- if (in->ordinality()) {
- ret = in->dequeue();
- if (ret) {
- size32_t sz = thorRowMemoryFootprint(serializer, ret);
- assertex(insz>=sz);
- insz -= sz;
- }
- break;
- }
- else {
- if (waitflush) {
- waitflushsem.signal();
- waitflush = false;
- }
- if (eoi)
- return NULL;
- }
- }
- assertex(!waiting); // reentrancy check
- waiting = true;
- }
- waitsem.wait();
- }
- return ret;
- }
- void flush()
- {
- // I think flush should wait til all rows read or stopped
- #ifdef _FULL_TRACE
- ActPrintLog(activity, "SmartBuffer flush %x",(unsigned)(memsize_t)this);
- #endif
- SpinBlock block(lock);
- if (eoi) return;
- loop {
- assertex(in); // reentry check
- diskflush();
- eoi = true;
- if (waiting) {
- waitsem.signal();
- waiting = false;
- }
- if (out&&!out->ordinality()&&!diskin.ordinality()&&!in->ordinality())
- break;
- waitflush = true;
- SpinUnblock unblock(lock);
- while (!waitflushsem.wait(1000*60))
- ActPrintLogEx(&activity->queryContainer(), thorlog_null, MCwarning, "CSmartRowBuffer::flush stalled");
- }
- }
- IRowWriter *queryWriter()
- {
- return this;
- }
- };
- class CSmartRowInMemoryBuffer: public CSimpleInterface, implements ISmartRowBuffer, implements IRowWriter
- {
- // NB must *not* call LinkThorRow or ReleaseThorRow (or Owned*ThorRow) if deallocator set
- CActivityBase *activity;
- IRowInterfaces *rowIf;
- ThorRowQueue *in;
- size32_t insz;
- SpinLock lock;
- bool waitingin;
- Semaphore waitinsem;
- bool waitingout;
- Semaphore waitoutsem;
- size32_t blocksize;
- bool eoi;
- #ifdef _DEBUG
- bool putrecheck;
- bool getrecheck;
- #endif
- public:
- IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
- CSmartRowInMemoryBuffer(CActivityBase *_activity, IRowInterfaces *_rowIf, size32_t bufsize)
- : activity(_activity), rowIf(_rowIf)
- {
- #ifdef _DEBUG
- putrecheck = false;
- getrecheck = false;
- #endif
- in = new ThorRowQueue;
- waitingin = false;
- waitingout = false;
- blocksize = ((bufsize/2+0xfffff)/0x100000)*0x100000;
- insz = 0;
- eoi = false;
- }
- ~CSmartRowInMemoryBuffer()
- {
- // clear in contents
- while (in->ordinality())
- ReleaseThorRow(in->dequeue());
- delete in;
- }
- void putRow(const void *row)
- {
- REENTRANCY_CHECK(putrecheck)
- size32_t sz = 0;
- if (row) {
- sz = thorRowMemoryFootprint(rowIf->queryRowSerializer(), row);
- #ifdef _DEBUG
- assertex(sz<0x1000000);
- #endif
- assertex((int)sz>=0); // trap invalid sizes a bit earlier
- #ifdef _TRACE_SMART_PUTGET
- ActPrintLog(activity, "***putRow(%x) %d {%x}",(unsigned)row,sz,*(const unsigned *)row);
- #endif
- }
- {
- SpinBlock block(lock);
- if (!eoi) {
- assertex(in); // reentry check
- while ((sz+insz>blocksize)&&insz) {
- waitingin = true;
- {
- SpinUnblock unblock(lock);
- waitinsem.wait();
- }
- if (eoi)
- break;
- }
- }
- if (!eoi) {
- in->enqueue(row);
- insz += sz;
- #ifdef _TRACE_SMART_PUTGET
- ActPrintLog(activity, "***putRow2(%x) insize = %d ",insz);
- #endif
- if (waitingout) {
- waitoutsem.signal();
- waitingout = false;
- }
- return;
- }
- }
- // cancelled
- ReleaseThorRow(row);
- }
- const void *nextRow()
- {
- REENTRANCY_CHECK(getrecheck)
- const void * ret;
- SpinBlock block(lock);
- assertex(!waitingout); // reentrancy checks
- loop {
- if (in->ordinality()) {
- ret = in->dequeue();
- if (ret) {
- size32_t sz = thorRowMemoryFootprint(rowIf->queryRowSerializer(), ret);
- #ifdef _TRACE_SMART_PUTGET
- ActPrintLog(activity, "***dequeueRow(%x) %d insize=%d {%x}",(unsigned)ret,sz,insz,*(const unsigned *)ret);
- #endif
- if (insz<sz) {
- ActPrintLog(activity, "CSmartRowInMemoryBuffer::nextRow(%x) %d insize=%d {%x} ord = %d",(unsigned)(memsize_t)ret,sz,insz,*(const unsigned *)ret,in->ordinality());
- assertex(insz>=sz);
- }
- insz -= sz;
- }
- break;
- }
- else if (eoi) {
- ret = NULL;
- break;
- }
- assertex(!waitingout); // reentrancy check
- waitingout = true;
- SpinUnblock unblock(lock);
- waitoutsem.wait();
- }
- if (waitingin) {
- waitinsem.signal();
- waitingin = false;
- }
- return ret;
- }
- void stop()
- {
- #ifdef _FULL_TRACE
- ActPrintLog(activity, "CSmartRowInMemoryBuffer stop %x",(unsigned)(memsize_t)this);
- #endif
- SpinBlock block(lock);
- #ifdef _DEBUG
- if (waitingout) {
- ActPrintLogEx(&activity->queryContainer(), thorlog_null, MCwarning, "CSmartRowInMemoryBuffer::stop while nextRow waiting");
- PrintStackReport();
- }
- #endif
- eoi = true;
- while (in->ordinality())
- ReleaseThorRow(in->dequeue());
- in->clear();
- if (waitingin) {
- waitinsem.signal();
- waitingin = false;
- }
- }
- // offset_t getPosition()
- // {
- // SpinBlock block(lock);
- // return pos;
- // }
- void flush()
- {
- // I think flush should wait til all rows read
- SpinBlock block(lock);
- eoi = true;
- loop {
- assertex(in); // reentry check
- if (waitingout) {
- waitoutsem.signal();
- waitingout = false;
- }
- if (!in->ordinality())
- break;
- waitingin = true;
- SpinUnblock unblock(lock);
- while (!waitinsem.wait(1000*60))
- ActPrintLogEx(&activity->queryContainer(), thorlog_null, MCwarning, "CSmartRowInMemoryBuffer::flush stalled");
- }
- }
- IRowWriter *queryWriter()
- {
- return this;
- }
- };
- ISmartRowBuffer * createSmartBuffer(CActivityBase *activity, const char * tempname, size32_t buffsize, IRowInterfaces *rowif)
- {
- Owned<IFile> file = createIFile(tempname);
- return new CSmartRowBuffer(activity,file,buffsize,rowif);
- }
- ISmartRowBuffer * createSmartInMemoryBuffer(CActivityBase *activity, IRowInterfaces *rowIf, size32_t buffsize)
- {
- return new CSmartRowInMemoryBuffer(activity, rowIf, buffsize);
- }
- class COverflowableBuffer : public CSimpleInterface, implements IRowWriterMultiReader
- {
- CActivityBase &activity;
- IRowInterfaces *rowIf;
- Owned<IThorRowCollector> collector;
- Owned<IRowWriter> writer;
- bool eoi, shared;
- public:
- IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
- COverflowableBuffer(CActivityBase &_activity, IRowInterfaces *_rowIf, bool grouped, bool _shared, unsigned spillPriority)
- : activity(_activity), rowIf(_rowIf), shared(_shared)
- {
- collector.setown(createThorRowCollector(activity, rowIf, NULL, stableSort_none, rc_mixed, spillPriority, grouped));
- writer.setown(collector->getWriter());
- eoi = false;
- }
- ~COverflowableBuffer()
- {
- writer.clear();
- collector.clear();
- }
- // IRowWriterMultiReader
- virtual IRowStream *getReader()
- {
- flush();
- return collector->getStream(shared);
- }
- // IRowWriter
- virtual void putRow(const void *row)
- {
- assertex(!eoi);
- writer->putRow(row);
- }
- virtual void flush()
- {
- eoi = true;
- }
- };
- IRowWriterMultiReader *createOverflowableBuffer(CActivityBase &activity, IRowInterfaces *rowIf, bool grouped, bool shared, unsigned spillPriority)
- {
- return new COverflowableBuffer(activity, rowIf, grouped, shared, spillPriority);
- }
- #define VALIDATEEQ(LHS, RHS) if ((LHS)!=(RHS)) { StringBuffer s("FAIL(EQ) - LHS="); s.append(LHS).append(", RHS=").append(RHS); PROGLOG("%s", s.str()); throwUnexpected(); }
- #define VALIDATELT(LHS, RHS) if ((LHS)>=(RHS)) { StringBuffer s("FAIL(LT) - LHS="); s.append(LHS).append(", RHS=").append(RHS); PROGLOG("%s", s.str()); throwUnexpected(); }
- //#define TRACE_WRITEAHEAD
- class CRowSet : public CSimpleInterface
- {
- unsigned chunk;
- CThorExpandingRowArray rows;
- public:
- CRowSet(CActivityBase &activity, unsigned _chunk) : rows(activity, &activity, true), chunk(_chunk)
- {
- }
- void reset(unsigned _chunk)
- {
- chunk = _chunk;
- rows.kill();
- }
- inline unsigned queryChunk() const { return chunk; }
- inline unsigned getRowCount() const { return rows.ordinality(); }
- inline void addRow(const void *row) { rows.append(row); }
- inline const void *getRow(unsigned r)
- {
- return rows.get(r);
- }
- };
- class Chunk : public CInterface
- {
- public:
- offset_t offset;
- size32_t size;
- Chunk(offset_t _offset, size_t _size) : offset(_offset), size(_size) { }
- Chunk(const Chunk &other) : offset(other.offset), size(other.size) { }
- bool operator==(Chunk const &other) const { return size==other.size && offset==other.offset; }
- Chunk &operator=(Chunk const &other) { offset = other.offset; size = other.size; return *this; }
- offset_t endOffset() { return offset+size; }
- };
- typedef int (*ChunkCompareFunc)(Chunk *, Chunk *);
- int chunkOffsetCompare(Chunk *lhs, Chunk *rhs)
- {
- if (lhs->offset>rhs->offset)
- return 1;
- else if (lhs->offset<rhs->offset)
- return -1;
- return 0;
- }
- int chunkSizeCompare(Chunk **_lhs, Chunk **_rhs)
- {
- Chunk *lhs = *(Chunk **)_lhs;
- Chunk *rhs = *(Chunk **)_rhs;
- return (int)lhs->size - (int)rhs->size;
- }
- int chunkSizeCompare2(Chunk *lhs, Chunk *rhs)
- {
- return (int)lhs->size - (int)rhs->size;
- }
- class CSharedWriteAheadBase : public CSimpleInterface, implements ISharedSmartBuffer
- {
- size32_t totalOutChunkSize;
- bool writeAtEof;
- rowcount_t rowsWritten;
- IArrayOf<IRowStream> outputs;
- unsigned readersWaiting;
- virtual void init()
- {
- stopped = false;
- writeAtEof = false;
- rowsWritten = 0;
- readersWaiting = 0;
- totalChunksOut = lowestChunk = 0;
- lowestOutput = 0;
- #ifdef TRACE_WRITEAHEAD
- totalOutChunkSize = sizeof(unsigned);
- #else
- totalOutChunkSize = 0;
- #endif
- }
- inline bool isEof(rowcount_t rowsRead)
- {
- return stopped || (writeAtEof && rowsWritten == rowsRead);
- }
- inline void signalReaders()
- {
- if (readersWaiting)
- {
- #ifdef TRACE_WRITEAHEAD
- ActPrintLogEx(&activity->queryContainer(), thorlog_all, MCdebugProgress, "signalReaders: %d", readersWaiting);
- #endif
- readersWaiting = 0;
- ForEachItemIn(o, outputs)
- queryCOutput(o).wakeRead(); // if necessary
- }
- }
- inline const rowcount_t &readerWait(const rowcount_t &rowsRead)
- {
- ++readersWaiting;
- return rowsWritten;
- }
- CRowSet *loadMore(unsigned output)
- {
- // needs to be called in crit
- Linked<CRowSet> rowSet;
- unsigned currentChunkNum = queryCOutput(output).currentChunkNum;
- if (currentChunkNum == totalChunksOut)
- {
- #ifdef TRACE_WRITEAHEAD
- PROGLOG("output=%d, chunk=%d INMEM", output, currentChunkNum);
- #endif
- rowSet.set(inMemRows);
- }
- else
- {
- unsigned whichChunk = queryCOutput(output).currentChunkNum - lowestChunk;
- #ifdef TRACE_WRITEAHEAD
- PROGLOG("output=%d, chunk=%d (whichChunk=%d)", output, currentChunkNum, whichChunk);
- #endif
- rowSet.setown(readRows(output, whichChunk));
- assertex(rowSet);
- }
- VALIDATEEQ(currentChunkNum, rowSet->queryChunk());
- advanceNextReadChunk(output);
- return rowSet.getClear();
- }
- protected:
- class COutput : public CSimpleInterface, implements IRowStream
- {
- CSharedWriteAheadBase &parent;
- unsigned output;
- rowcount_t rowsRead;
- bool eof, readerWaiting;
- Semaphore readWaitSem;
- Owned<CRowSet> outputOwnedRows;
- CRowSet *rowSet;
- unsigned row, rowsInRowSet;
- void init()
- {
- rowsRead = 0;
- currentChunkNum = 0;
- rowsInRowSet = row = 0;
- readerWaiting = eof = false;
- rowSet = NULL;
- }
- inline void doStop()
- {
- if (eof) return;
- eof = true;
- parent.stopOutput(output);
- }
- inline const rowcount_t readerWait(const rowcount_t &rowsRead)
- {
- if (rowsRead == parent.rowsWritten)
- {
- if (parent.stopped || parent.writeAtEof)
- return 0;
- readerWaiting = true;
- #ifdef TRACE_WRITEAHEAD
- ActPrintLogEx(&parent.activity->queryContainer(), thorlog_all, MCdebugProgress, "readerWait(%d)", output);
- #endif
- const rowcount_t &rowsWritten = parent.readerWait(rowsRead);
- {
- CriticalUnblock b(parent.crit);
- unsigned mins=0;
- loop
- {
- if (readWaitSem.wait(60000))
- break; // NB: will also be signal if aborting
- ActPrintLog(parent.activity, "output %d @ row # %"RCPF"d, has been blocked for %d minute(s)", output, rowsRead, ++mins);
- }
- }
- if (parent.isEof(rowsRead))
- return 0;
- }
- return parent.rowsWritten;
- }
- inline void wakeRead()
- {
- if (readerWaiting)
- {
- readerWaiting = false;
- readWaitSem.signal();
- }
- }
- public:
- unsigned currentChunkNum;
- public:
- IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
- COutput(CSharedWriteAheadBase &_parent, unsigned _output) : parent(_parent), output(_output)
- {
- init();
- }
- void reset()
- {
- init();
- outputOwnedRows.clear();
- }
- inline CRowSet *queryRowSet() { return rowSet; }
- const void *nextRow()
- {
- if (eof)
- return NULL;
- if (row == rowsInRowSet)
- {
- CriticalBlock b(parent.crit);
- if (!rowSet || (row == (rowsInRowSet = rowSet->getRowCount())))
- {
- rowcount_t totalRows = readerWait(rowsRead);
- if (0 == totalRows)
- {
- doStop();
- return NULL;
- }
- if (!rowSet || (row == (rowsInRowSet = rowSet->getRowCount()))) // maybe have caught up in same rowSet
- {
- Owned<CRowSet> newRows = parent.loadMore(output);
- if (rowSet)
- VALIDATEEQ(newRows->queryChunk(), rowSet->queryChunk()+1);
- outputOwnedRows.setown(newRows.getClear());
- rowSet = outputOwnedRows.get();
- rowsInRowSet = rowSet->getRowCount();
- row = 0;
- }
- }
- }
- rowsRead++;
- SpinBlock b(parent.spin);
- const void *retrow = rowSet->getRow(row++);
- return retrow;
- }
- virtual void stop()
- {
- CriticalBlock b(parent.crit);
- doStop();
- }
- friend class CSharedWriteAheadBase;
- };
- CActivityBase *activity;
- size32_t minChunkSize;
- unsigned lowestChunk, lowestOutput, outputCount, totalChunksOut;
- bool stopped;
- Owned<CRowSet> inMemRows;
- CriticalSection crit;
- SpinLock spin;
- Linked<IOutputMetaData> meta;
- inline COutput &queryCOutput(unsigned i) { return (COutput &) outputs.item(i); }
- inline unsigned getLowest()
- {
- unsigned o=0;
- offset_t lsf = (unsigned)-1;
- unsigned lsfOutput = (unsigned)-1;
- loop
- {
- if (queryCOutput(o).currentChunkNum < lsf)
- {
- lsf = queryCOutput(o).currentChunkNum;
- lsfOutput = o;
- }
- if (++o == outputCount)
- break;
- }
- return lsfOutput;
- }
- void advanceNextReadChunk(unsigned output)
- {
- unsigned ¤tChunkNum = queryCOutput(output).currentChunkNum;
- unsigned prevChunkNum = currentChunkNum;
- currentChunkNum++;
- if (output == lowestOutput) // might be joint lowest
- {
- lowestOutput = getLowest();
- if (currentChunkNum == queryCOutput(lowestOutput).currentChunkNum) // has new page moved up to join new lowest, in which case it was last.
- {
- if (prevChunkNum != totalChunksOut) // if equal, then prevOffset was mem page and not in pool/disk to free
- freeOffsetChunk(prevChunkNum);
- else
- {
- VALIDATEEQ(prevChunkNum, inMemRows->queryChunk()); // validate is current in mem page
- }
- ++lowestChunk;
- #ifdef TRACE_WRITEAHEAD
- ActPrintLogEx(&activity->queryContainer(), thorlog_all, MCdebugProgress, "aNRP: %d, lowestChunk=%d, totalChunksOut=%d", prevChunkNum, lowestChunk, totalChunksOut);
- #endif
- }
- }
- }
- void stopOutput(unsigned output)
- {
- // mark stopped output with very high page (as if read all), will not be picked again
- unsigned lowestA = getLowest();
- if (output == lowestA)
- {
- unsigned chunkOrig = queryCOutput(output).currentChunkNum;
- queryCOutput(output).currentChunkNum = (unsigned)-1;
- unsigned lO = getLowest(); // get next lowest
- queryCOutput(output).currentChunkNum = chunkOrig;
- if (((unsigned)-1) == lO) // if last to stop
- {
- markStop();
- return;
- }
- #ifdef TRACE_WRITEAHEAD
- ActPrintLogEx(&activity->queryContainer(), thorlog_all, MCdebugProgress, "Input %d stopped, forcing catchup to free pages", output);
- #endif
- while (queryCOutput(output).currentChunkNum < queryCOutput(lO).currentChunkNum)
- {
- Owned<CRowSet> tmpChunk = loadMore(output); // discard
- }
- #ifdef TRACE_WRITEAHEAD
- ActPrintLog(activity, "Input %d stopped. Caught up", output);
- #endif
- queryCOutput(output).currentChunkNum = (unsigned)-1;
- lowestOutput = getLowest();
- }
- else
- queryCOutput(output).currentChunkNum = (unsigned)-1;
- }
- void stopAll()
- {
- CriticalBlock b(crit);
- markStop();
- }
- virtual void markStop()
- {
- if (stopped) return;
- stopped = true;
- }
- virtual void freeOffsetChunk(unsigned chunk) = 0;
- virtual CRowSet *readRows(unsigned output, unsigned chunk) = 0;
- virtual void flushRows() = 0;
- virtual size32_t rowSize(const void *row) = 0;
- public:
- IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
- CSharedWriteAheadBase(CActivityBase *_activity, unsigned _outputCount, IRowInterfaces *rowIf) : activity(_activity), outputCount(_outputCount), meta(rowIf->queryRowMetaData())
- {
- init();
- minChunkSize = 0x2000;
- size32_t minSize = meta->getMinRecordSize();
- if (minChunkSize<minSize*10) // if rec minSize bigish, ensure reasonable minChunkSize
- {
- minChunkSize = minSize*10;
- if (minChunkSize > 0x10000)
- minChunkSize += 2*(minSize+1);
- }
- outputCount = _outputCount;
- unsigned c=0;
- for (; c<outputCount; c++)
- {
- outputs.append(* new COutput(*this, c));
- }
- inMemRows.setown(new CRowSet(*activity, 0));
- }
- ~CSharedWriteAheadBase()
- {
- ForEachItemIn(o, outputs)
- {
- COutput &output = queryCOutput(o);
- output.outputOwnedRows.clear();
- }
- inMemRows.clear();
- }
- unsigned anyReaderBehind()
- {
- unsigned reader=0;
- loop
- {
- if (reader>=outputCount) // so all waiting, don't need in mem page.
- break;
- if (!queryCOutput(reader).eof && (queryCOutput(reader).rowSet != inMemRows.get())) // if output using this in mem page, then it has linked already, need to find one that is not (i.e. behind)
- return reader;
- ++reader;
- }
- return NotFound;
- }
- // ISharedSmartBuffer
- virtual void putRow(const void *row)
- {
- if (stopped)
- {
- ReleaseThorRow(row);
- return;
- }
- unsigned len=rowSize(row);
- CriticalBlock b(crit);
- if (totalOutChunkSize >= minChunkSize) // chunks required to be at least minChunkSize
- {
- unsigned reader=anyReaderBehind();
- if (NotFound != reader)
- flushRows();
- inMemRows.setown(new CRowSet(*activity, ++totalChunksOut));
- #ifdef TRACE_WRITEAHEAD
- totalOutChunkSize = sizeof(unsigned);
- #else
- totalOutChunkSize = 0;
- #endif
- }
- {
- SpinBlock b(spin);
- inMemRows->addRow(row);
- }
- totalOutChunkSize += len;
- rowsWritten++; // including NULLs(eogs)
- signalReaders();
- }
- virtual void flush()
- {
- CriticalBlock b(crit);
- writeAtEof = true;
- signalReaders();
- }
- virtual offset_t getPosition()
- {
- throwUnexpected();
- return 0;
- }
- virtual IRowStream *queryOutput(unsigned output)
- {
- return &outputs.item(output);
- }
- virtual void cancel()
- {
- CriticalBlock b(crit);
- stopAll();
- signalReaders();
- }
- virtual void reset()
- {
- init();
- unsigned c=0;
- for (; c<outputCount; c++)
- queryCOutput(c).reset();
- inMemRows->reset(0);
- }
- friend class COutput;
- };
- class CSharedWriteAheadDisk : public CSharedWriteAheadBase
- {
- IDiskUsage *iDiskUsage;
- Owned<IFile> spillFile;
- Owned<IFileIO> spillFileIO;
- CIArrayOf<Chunk> freeChunks;
- PointerArrayOf<Chunk> freeChunksSized;
- QueueOf<Chunk, false> savedChunks;
- offset_t highOffset;
- Linked<IEngineRowAllocator> allocator;
- Linked<IOutputRowSerializer> serializer;
- Linked<IOutputRowDeserializer> deserializer;
- IOutputMetaData *serializeMeta;
- struct AddRemoveFreeChunk
- {
- PointerArrayOf<Chunk> &chunkArray;
- Chunk *chunk;
- AddRemoveFreeChunk(PointerArrayOf<Chunk> &_chunkArray, Chunk *_chunk) : chunkArray(_chunkArray), chunk(_chunk)
- {
- chunkArray.zap(chunk);
- }
- ~AddRemoveFreeChunk()
- {
- bool isNew;
- aindex_t sizePos = chunkArray.bAdd(chunk, chunkSizeCompare, isNew);
- if (!isNew) // might match size of another block
- chunkArray.add(chunk, sizePos);
- }
- };
- unsigned findNext(Chunk **base, unsigned arrCount, Chunk *searchChunk, ChunkCompareFunc compareFunc, bool &matched)
- {
- unsigned middle;
- unsigned left = 0;
- int result;
- Chunk *CurEntry;
- unsigned right = arrCount-1;
- /*
- * Loop until we've narrowed down the search range to one or two
- * items. This ensures that middle != 0, preventing 'right' from wrapping.
- */
- while (right - left > 1)
- {
- /* Calculate the middle entry in the array - NOT the middle offset */
- middle = (right + left) >> 1;
- CurEntry = base[middle];
- result = compareFunc(searchChunk, CurEntry);
- if (result < 0)
- right = middle - 1;
- else if (result > 0)
- left = middle + 1;
- else
- {
- matched = true;
- return middle;
- }
- }
- middle = left;
- /*
- * The range has been narrowed down to 1 or 2 items.
- * Perform an optimal check on these last two elements.
- */
- result = compareFunc(searchChunk, base[middle]);
- if (0 == result)
- matched = true;
- else if (result > 0)
- {
- ++middle;
- if (right == left + 1)
- {
- result = compareFunc(searchChunk, base[middle]);
- if (0 == result)
- matched = true;
- else
- {
- matched = false;
- if (result > 0)
- ++middle;
- }
- }
- else
- matched = false;
- }
- else
- matched = false;
- if (middle < arrCount)
- return middle;
- return NotFound;
- }
- inline Chunk *getOutOffset(size32_t required)
- {
- unsigned numFreeChunks = freeChunks.ordinality();
- if (numFreeChunks)
- {
- Chunk searchChunk(0, required);
- bool matched;
- unsigned nextPos = findNext(freeChunksSized.getArray(), freeChunksSized.ordinality(), &searchChunk, chunkSizeCompare2, matched);
- if (NotFound != nextPos)
- {
- Linked<Chunk> nextChunk = freeChunksSized.item(nextPos);
- if (nextChunk->size-required >= minChunkSize)
- {
- Owned<Chunk> chunk = new Chunk(nextChunk->offset, required);
- decFreeChunk(nextChunk, required);
- #ifdef TRACE_WRITEAHEAD
- ActPrintLogEx(&activity->queryContainer(), thorlog_all, MCdebugProgress, "getOutOffset: got [free] offset = %"I64F"d, size=%d, but took required=%d", nextChunk->offset, nextChunk->size+required, required);
- #endif
- return chunk.getClear();
- }
- #ifdef TRACE_WRITEAHEAD
- ActPrintLogEx(&activity->queryContainer(), thorlog_all, MCdebugProgress, "getOutOffset: got [free] offset = %"I64F"d, size=%d", diskChunk.offset, diskChunk.size);
- #endif
- freeChunksSized.remove(nextPos);
- freeChunks.zap(*nextChunk);
- return nextChunk.getClear(); // if smaller than minChunkSize, return all, don't leave useless fragment
- }
- }
- #ifdef TRACE_WRITEAHEAD
- ActPrintLog(activity, "getOutOffset: got new upper offset # = %"I64F"d", highOffset);
- #endif
- Chunk *chunk = new Chunk(highOffset, required);
- if (iDiskUsage)
- {
- iDiskUsage->decrease(highOffset);
- highOffset += required; // NB next
- iDiskUsage->increase(highOffset);
- }
- else
- highOffset += required; // NB next
- return chunk;
- }
- inline void mergeFreeChunk(Chunk *dst, const Chunk *src)
- {
- AddRemoveFreeChunk ar(freeChunksSized, dst);
- dst->offset = src->offset;
- dst->size += src->size;
- }
- inline void decFreeChunk(Chunk *chunk, size32_t sz)
- {
- AddRemoveFreeChunk ar(freeChunksSized, chunk);
- chunk->size -= sz;
- chunk->offset += sz;
- }
- inline void incFreeChunk(Chunk *chunk, size32_t sz)
- {
- AddRemoveFreeChunk ar(freeChunksSized, chunk);
- chunk->size += sz;
- }
- inline void addFreeChunk(Chunk *freeChunk, unsigned pos=NotFound)
- {
- bool isNew;
- aindex_t sizePos = freeChunksSized.bAdd(freeChunk, chunkSizeCompare, isNew);
- if (!isNew)
- freeChunksSized.add(freeChunk, sizePos);
- if (NotFound == pos)
- freeChunks.append(*LINK(freeChunk));
- else
- freeChunks.add(*LINK(freeChunk), pos);
- }
- virtual void freeOffsetChunk(unsigned chunk)
- {
- // chunk(unused here) is nominal sequential page #, savedChunks is page # in diskfile
- assertex(savedChunks.ordinality());
- Owned<Chunk> freeChunk = savedChunks.dequeue();
- unsigned nmemb = freeChunks.ordinality();
- if (0 == nmemb)
- addFreeChunk(freeChunk);
- else
- {
- bool matched;
- unsigned nextPos = findNext(freeChunks.getArray(), freeChunks.ordinality(), freeChunk, chunkOffsetCompare, matched);
- assertex(!matched); // should never happen, have found a chunk with same offset as one being freed.
- if (NotFound != nextPos)
- {
- Chunk *nextChunk = &freeChunks.item(nextPos);
- if (freeChunk->endOffset() == nextChunk->offset)
- mergeFreeChunk(nextChunk, freeChunk);
- else if (nextPos > 0)
- {
- Chunk *prevChunk = &freeChunks.item(nextPos-1);
- if (prevChunk->endOffset() == freeChunk->offset)
- incFreeChunk(prevChunk, freeChunk->size);
- else
- addFreeChunk(freeChunk, nextPos);
- }
- else
- addFreeChunk(freeChunk, nextPos);
- }
- else
- addFreeChunk(freeChunk);
- }
- #ifdef TRACE_WRITEAHEAD
- ActPrintLogEx(&activity->queryContainer(), thorlog_all, MCdebugProgress, "Added chunk, offset %"I64F"d size=%d to freeChunks", freeChunk->offset, freeChunk->size);
- #endif
- }
- virtual CRowSet *readRows(unsigned output, unsigned whichChunk)
- {
- assertex(savedChunks.ordinality() > whichChunk);
- unsigned currentChunkNum = queryCOutput(output).currentChunkNum;
- unsigned o=0;
- for (; o<outputCount; o++)
- {
- if (o == output) continue;
- COutput &coutput = queryCOutput(o);
- if (coutput.queryRowSet() && (coutput.queryRowSet()->queryChunk() == currentChunkNum))
- {
- #ifdef TRACE_WRITEAHEAD
- ActPrintLogEx(&activity->queryContainer(), thorlog_all, MCdebugProgress, "Output: %d, readRows found other output %d with matching offset: %d", output, o, currentChunkNum);
- #endif
- return LINK(coutput.queryRowSet());
- }
- }
- Chunk &chunk = *savedChunks.item(whichChunk);
- Owned<ISerialStream> stream = createFileSerialStream(spillFileIO, chunk.offset);
- #ifdef TRACE_WRITEAHEAD
- unsigned diskChunkNum;
- stream->get(sizeof(diskChunkNum), &diskChunkNum);
- VALIDATEEQ(diskChunkNum, currentChunkNum);
- #endif
- CThorStreamDeserializerSource ds(stream);
- Owned<CRowSet> rowSet = new CRowSet(*activity, currentChunkNum);
- loop
- {
- byte b;
- ds.read(sizeof(b),&b);
- if (!b)
- break;
- if (1==b)
- {
- RtlDynamicRowBuilder rowBuilder(allocator);
- size32_t sz = deserializer->deserialize(rowBuilder, ds);
- rowSet->addRow(rowBuilder.finalizeRowClear(sz));
- }
- else if (2==b)
- rowSet->addRow(NULL);
- }
- return rowSet.getClear();
- }
- virtual void flushRows()
- {
- // NB: called in crit
- MemoryBuffer mb;
- mb.ensureCapacity(minChunkSize); // starting size/could be more if variable and bigger
- #ifdef TRACE_WRITEAHEAD
- mb.append(inMemRows->queryChunk()); // for debug purposes only
- #endif
- CMemoryRowSerializer mbs(mb);
- unsigned r=0;
- for (;r<inMemRows->getRowCount();r++)
- {
- OwnedConstThorRow row = inMemRows->getRow(r);
- if (row)
- {
- mb.append((byte)1);
- serializer->serialize(mbs,(const byte *)row.get());
- }
- else
- mb.append((byte)2); // eog
- }
- mb.append((byte)0);
- size32_t len = mb.length();
- Owned<Chunk> freeChunk = getOutOffset(len); // will find space for 'len', might be bigger if from free list
- spillFileIO->write(freeChunk->offset, len, mb.toByteArray());
- savedChunks.enqueue(freeChunk.getClear());
- #ifdef TRACE_WRITEAHEAD
- ActPrintLogEx(&activity->queryContainer(), thorlog_all, MCdebugProgress, "Flushed chunk = %d (savedChunks pos=%d), writeOffset = %"I64F"d, writeSize = %d", inMemRows->queryChunk(), savedChunks.ordinality()-1, freeChunk->offset, len);
- #endif
- }
- virtual size32_t rowSize(const void *row)
- {
- if (!row)
- return 1; // eog;
- else if (meta == serializeMeta)
- return meta->getRecordSize(row)+1; // space on disk, +1 = eog marker
- CSizingSerializer ssz;
- serializer->serialize(ssz,(const byte *)row);
- return ssz.size()+1; // space on disk, +1 = eog marker
- }
- public:
- CSharedWriteAheadDisk(CActivityBase *activity, const char *spillName, unsigned outputCount, IRowInterfaces *rowIf, IDiskUsage *_iDiskUsage) : CSharedWriteAheadBase(activity, outputCount, rowIf),
- allocator(rowIf->queryRowAllocator()), serializer(rowIf->queryRowSerializer()), deserializer(rowIf->queryRowDeserializer()), serializeMeta(meta->querySerializedDiskMeta()), iDiskUsage(_iDiskUsage)
- {
- assertex(spillName);
- spillFile.setown(createIFile(spillName));
- spillFile->setShareMode(IFSHnone);
- spillFileIO.setown(spillFile->open(IFOcreaterw));
- highOffset = 0;
- }
- ~CSharedWriteAheadDisk()
- {
- spillFileIO.clear();
- if (spillFile)
- spillFile->remove();
- loop
- {
- Owned<Chunk> chunk = savedChunks.dequeue();
- if (!chunk) break;
- }
- PROGLOG("CSharedWriteAheadDisk: highOffset=%"I64F"d", highOffset);
- }
- virtual void reset()
- {
- CSharedWriteAheadBase::reset();
- loop
- {
- Owned<Chunk> chunk = savedChunks.dequeue();
- if (!chunk) break;
- }
- freeChunks.kill();
- freeChunksSized.kill();
- highOffset = 0;
- spillFileIO->setSize(0);
- }
- };
- ISharedSmartBuffer *createSharedSmartDiskBuffer(CActivityBase *activity, const char *spillname, unsigned outputs, IRowInterfaces *rowIf, IDiskUsage *iDiskUsage)
- {
- return new CSharedWriteAheadDisk(activity, spillname, outputs, rowIf, iDiskUsage);
- }
- #define MIN_POOL_CHUNKS 10
- class CSharedWriteAheadMem : public CSharedWriteAheadBase
- {
- QueueOf<CRowSet, false> chunkPool;
- Semaphore poolSem;
- bool writerBlocked;
- unsigned maxPoolChunks;
- virtual void markStop()
- {
- CSharedWriteAheadBase::markStop();
- if (writerBlocked)
- {
- writerBlocked = false;
- poolSem.signal();
- }
- }
- virtual void freeOffsetChunk(unsigned chunk)
- {
- Owned<CRowSet> topRowSet = chunkPool.dequeue();
- VALIDATEEQ(chunk, topRowSet->queryChunk());
- #ifdef TRACE_WRITEAHEAD
- ActPrintLogEx(&activity->queryContainer(), thorlog_all, MCdebugProgress, "freeOffsetChunk: Dequeue chunkPool chunks: %d, chunkPool.ordinality() = %d", topRowSet->queryChunk(), chunkPool.ordinality());
- #endif
- topRowSet.clear();
- if (writerBlocked)
- {
- writerBlocked = false;
- poolSem.signal();
- }
- }
- virtual CRowSet *readRows(unsigned output, unsigned whichChunk)
- {
- Linked<CRowSet> rowSet = chunkPool.item(whichChunk);
- VALIDATEEQ(queryCOutput(output).currentChunkNum, rowSet->queryChunk());
- return rowSet.getClear();
- }
- virtual void flushRows()
- {
- // NB: called in crit
- if (chunkPool.ordinality() >= maxPoolChunks)
- {
- writerBlocked = true;
- { CriticalUnblock b(crit);
- poolSem.wait();
- if (stopped) return;
- }
- unsigned reader=anyReaderBehind();
- if (NotFound == reader)
- {
- #ifdef TRACE_WRITEAHEAD
- ActPrintLogEx(&activity->queryContainer(), thorlog_all, MCdebugProgress, "flushRows: caught up whilst blocked to: %d", inMemRows->queryChunk());
- #endif
- return; // caught up whilst blocked
- }
- VALIDATELT(chunkPool.ordinality(), maxPoolChunks);
- }
- #ifdef TRACE_WRITEAHEAD
- ActPrintLogEx(&activity->queryContainer(), thorlog_all, MCdebugProgress, "Flushed chunk = %d, chunkPool chunks = %d", inMemRows->queryChunk(), 1+chunkPool.ordinality());
- #endif
- chunkPool.enqueue(inMemRows.getClear());
- }
- virtual size32_t rowSize(const void *row)
- {
- return meta->getRecordSize(row); // space in mem.
- }
- public:
- CSharedWriteAheadMem(CActivityBase *activity, unsigned outputCount, IRowInterfaces *rowif, unsigned buffSize) : CSharedWriteAheadBase(activity, outputCount, rowif)
- {
- if (((unsigned)-1) == buffSize)
- maxPoolChunks = (unsigned)-1; // no limit
- else
- {
- maxPoolChunks = buffSize / minChunkSize;
- if (maxPoolChunks < MIN_POOL_CHUNKS)
- maxPoolChunks = MIN_POOL_CHUNKS;
- }
- writerBlocked = false;
- }
- ~CSharedWriteAheadMem()
- {
- loop
- {
- Owned<CRowSet> rowSet = chunkPool.dequeue();
- if (!rowSet)
- break;
- }
- }
- virtual void reset()
- {
- CSharedWriteAheadBase::reset();
- loop
- {
- Owned<CRowSet> rowSet = chunkPool.dequeue();
- if (!rowSet)
- break;
- }
- writerBlocked = false;
- }
- };
- ISharedSmartBuffer *createSharedSmartMemBuffer(CActivityBase *activity, unsigned outputs, IRowInterfaces *rowIf, unsigned buffSize)
- {
- return new CSharedWriteAheadMem(activity, outputs, rowIf, buffSize);
- }
- class CRowMultiWriterReader : public CSimpleInterface, implements IRowMultiWriterReader
- {
- rowidx_t readGranularity, writeGranularity, rowPos, limit, rowsToRead;
- CThorSpillableRowArray rows;
- const void **readRows;
- CActivityBase &activity;
- IRowInterfaces *rowIf;
- bool readerBlocked, eos, eow;
- Semaphore emptySem, fullSem;
- unsigned numWriters, writersComplete, writersBlocked;
- class CAWriter : public CSimpleInterface, implements IRowWriter
- {
- CRowMultiWriterReader &owner;
- CThorExpandingRowArray rows;
- public:
- IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
- CAWriter(CRowMultiWriterReader &_owner) : owner(_owner), rows(_owner.activity, _owner.rowIf)
- {
- }
- ~CAWriter()
- {
- flush();
- owner.writerStopped();
- }
- // IRowWriter impl.
- virtual void putRow(const void *row)
- {
- if (rows.ordinality() >= owner.writeGranularity)
- owner.addRows(rows);
- rows.append(row);
- }
- virtual void flush()
- {
- if (rows.ordinality())
- owner.addRows(rows);
- }
- };
- void addRows(CThorExpandingRowArray &inRows)
- {
- loop
- {
- {
- CThorArrayLockBlock block(rows);
- if (eos)
- {
- inRows.kill();
- return;
- }
- if (rows.numCommitted() < limit)
- {
- // NB: allowed to go over limit, by as much as inRows.ordinality()-1
- rows.appendRows(inRows, true);
- if (rows.numCommitted() >= readGranularity)
- checkReleaseReader();
- return;
- }
- writersBlocked++;
- }
- fullSem.wait();
- }
- }
- inline void checkReleaseReader()
- {
- if (readerBlocked)
- {
- emptySem.signal();
- readerBlocked = false;
- }
- }
- inline void checkReleaseWriters()
- {
- if (writersBlocked)
- {
- fullSem.signal(writersBlocked);
- writersBlocked = 0;
- }
- }
- public:
- IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
- CRowMultiWriterReader(CActivityBase &_activity, IRowInterfaces *_rowIf, unsigned _limit, unsigned _readGranularity, unsigned _writerGranularity)
- : activity(_activity), rowIf(_rowIf), rows(_activity, _rowIf), limit(_limit), readGranularity(_readGranularity), writeGranularity(_writerGranularity)
- {
- if (readGranularity > limit)
- readGranularity = limit; // readGranularity must be <= limit;
- numWriters = 0;
- roxiemem::IRowManager *rowManager = activity.queryJob().queryRowManager();
- readRows = static_cast<const void * *>(rowManager->allocate(readGranularity * sizeof(void*), activity.queryContainer().queryId()));
- eos = eow = readerBlocked = false;
- rowPos = rowsToRead = 0;
- writersComplete = writersBlocked = 0;
- rows.setup(rowIf, false, stableSort_none, true); // turning on throwOnOom;
- }
- ~CRowMultiWriterReader()
- {
- while (rowPos < rowsToRead)
- {
- ReleaseThorRow(readRows[rowPos++]);
- }
- ReleaseThorRow(readRows);
- }
- void writerStopped()
- {
- CThorArrayLockBlock block(rows);
- writersComplete++;
- if (writersComplete == numWriters)
- {
- rows.flush();
- eow = true;
- checkReleaseReader();
- }
- }
- // ISharedWriteBuffer impl.
- virtual IRowWriter *getWriter()
- {
- ++numWriters;
- return new CAWriter(*this);
- }
- virtual void abort()
- {
- CThorArrayLockBlock block(rows);
- eos = true;
- checkReleaseWriters();
- checkReleaseReader();
- }
- // IRowStream impl.
- virtual const void *nextRow()
- {
- if (eos)
- return NULL;
- if (rowPos == rowsToRead)
- {
- loop
- {
- {
- CThorArrayLockBlock block(rows);
- if (rows.numCommitted() >= readGranularity || eow)
- {
- rowsToRead = (eow && rows.numCommitted() < readGranularity) ? rows.numCommitted() : readGranularity;
- if (0 == rowsToRead)
- {
- eos = true;
- return NULL;
- }
- rows.readBlock(readRows, rowsToRead);
- rowPos = 0;
- checkReleaseWriters();
- break; // fall through to return a row
- }
- readerBlocked = true;
- }
- emptySem.wait();
- if (eos)
- return NULL;
- }
- }
- const void *row = readRows[rowPos];
- readRows[rowPos] = NULL;
- ++rowPos;
- return row;
- }
- virtual void stop()
- {
- eos = true;
- checkReleaseWriters();
- }
- };
- IRowMultiWriterReader *createSharedWriteBuffer(CActivityBase *activity, IRowInterfaces *rowif, unsigned limit, unsigned readGranularity, unsigned writeGranularity)
- {
- return new CRowMultiWriterReader(*activity, rowif, limit, readGranularity, writeGranularity);
- }
- class CRCFileStream: public CSimpleInterface, implements IFileIOStream
- {
- Linked<IFileIOStream> streamin;
- public:
- CRC32 &crc;
- bool valid;
- IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
- CRCFileStream(IFileIOStream *_stream,CRC32 &_crc)
- : streamin(_stream), crc(_crc)
- {
- valid = true;
- }
- size32_t read(size32_t len, void * data)
- {
- size32_t rd = streamin->read(len,data);
- crc.tally(rd,data);
- return rd;
- }
-
- size32_t write(size32_t len, const void * data)
- {
- throw MakeStringException(-1,"CRCFileStream does not support write");
- }
- void seek(offset_t pos, IFSmode origin)
- {
- offset_t t = streamin->tell();
- streamin->seek(pos,origin);
- if (t!=streamin->tell()) // crc invalid
- if (valid) {
- WARNLOG("CRCFileStream::seek called - CRC will be invalid");
- valid = false;
- }
- }
- offset_t size()
- {
- return streamin->size();
- }
- virtual offset_t tell()
- {
- return streamin->tell();
- }
- void flush()
- {
- // noop as only read
- }
- };
|