|
@@ -670,14 +670,23 @@ IRowWriterMultiReader *createOverflowableBuffer(CActivityBase &activity, IRowInt
|
|
|
#define VALIDATELT(LHS, RHS) if ((LHS)>=(RHS)) { StringBuffer s("FAIL(LT) - LHS="); s.append(LHS).append(", RHS=").append(RHS); PROGLOG("%s", s.str()); throwUnexpected(); }
|
|
|
|
|
|
//#define TRACE_WRITEAHEAD
|
|
|
-class CRowSet : public CSimpleInterface
|
|
|
+class CSharedWriteAheadBase;
|
|
|
+class CRowSet : public CSimpleInterface, implements IInterface
|
|
|
{
|
|
|
unsigned chunk;
|
|
|
CThorExpandingRowArray rows;
|
|
|
+ CSharedWriteAheadBase &sharedWriteAhead;
|
|
|
+ mutable SpinLock lock;
|
|
|
+ mutable CriticalSection crit;
|
|
|
public:
|
|
|
- CRowSet(CActivityBase &activity, unsigned _chunk) : rows(activity, &activity, true), chunk(_chunk)
|
|
|
+ CRowSet(CSharedWriteAheadBase &_sharedWriteAhead, unsigned _chunk, unsigned maxRows);
|
|
|
+ virtual void Link() const
|
|
|
{
|
|
|
+ CSimpleInterface::Link();
|
|
|
}
|
|
|
+ virtual bool Release() const;
|
|
|
+ void clear() { rows.clearRows(); }
|
|
|
+ void setChunk(unsigned _chunk) { chunk = _chunk; }
|
|
|
void reset(unsigned _chunk)
|
|
|
{
|
|
|
chunk = _chunk;
|
|
@@ -697,6 +706,8 @@ class Chunk : public CInterface
|
|
|
public:
|
|
|
offset_t offset;
|
|
|
size32_t size;
|
|
|
+ Linked<CRowSet> rowSet;
|
|
|
+ Chunk(CRowSet *_rowSet) : rowSet(_rowSet), offset(0), size(0) { }
|
|
|
Chunk(offset_t _offset, size_t _size) : offset(_offset), size(_size) { }
|
|
|
Chunk(const Chunk &other) : offset(other.offset), size(other.size) { }
|
|
|
bool operator==(Chunk const &other) const { return size==other.size && offset==other.offset; }
|
|
@@ -724,6 +735,7 @@ int chunkSizeCompare2(Chunk *lhs, Chunk *rhs)
|
|
|
return (int)lhs->size - (int)rhs->size;
|
|
|
}
|
|
|
|
|
|
+#define MIN_POOL_CHUNKS 10
|
|
|
class CSharedWriteAheadBase : public CSimpleInterface, implements ISharedSmartBuffer
|
|
|
{
|
|
|
size32_t totalOutChunkSize;
|
|
@@ -731,7 +743,16 @@ class CSharedWriteAheadBase : public CSimpleInterface, implements ISharedSmartBu
|
|
|
rowcount_t rowsWritten;
|
|
|
IArrayOf<IRowStream> outputs;
|
|
|
unsigned readersWaiting;
|
|
|
+ mutable IArrayOf<CRowSet> cachedRowSets;
|
|
|
+ CriticalSection rowSetCacheCrit;
|
|
|
|
|
|
+ void reuse(CRowSet *rowset)
|
|
|
+ {
|
|
|
+ rowset->clear();
|
|
|
+ CriticalBlock b(rowSetCacheCrit);
|
|
|
+ if (cachedRowSets.ordinality() < (outputs.ordinality()*2))
|
|
|
+ cachedRowSets.append(*LINK(rowset));
|
|
|
+ }
|
|
|
virtual void init()
|
|
|
{
|
|
|
stopped = false;
|
|
@@ -763,11 +784,6 @@ class CSharedWriteAheadBase : public CSimpleInterface, implements ISharedSmartBu
|
|
|
queryCOutput(o).wakeRead(); // if necessary
|
|
|
}
|
|
|
}
|
|
|
- inline const rowcount_t &readerWait(const rowcount_t &rowsRead)
|
|
|
- {
|
|
|
- ++readersWaiting;
|
|
|
- return rowsWritten;
|
|
|
- }
|
|
|
CRowSet *loadMore(unsigned output)
|
|
|
{
|
|
|
// needs to be called in crit
|
|
@@ -819,32 +835,6 @@ protected:
|
|
|
eof = true;
|
|
|
parent.stopOutput(output);
|
|
|
}
|
|
|
- inline const rowcount_t readerWait(const rowcount_t &rowsRead)
|
|
|
- {
|
|
|
- if (rowsRead == parent.rowsWritten)
|
|
|
- {
|
|
|
- if (parent.stopped || parent.writeAtEof)
|
|
|
- return 0;
|
|
|
- readerWaiting = true;
|
|
|
-#ifdef TRACE_WRITEAHEAD
|
|
|
- ActPrintLogEx(&parent.activity->queryContainer(), thorlog_all, MCdebugProgress, "readerWait(%d)", output);
|
|
|
-#endif
|
|
|
- const rowcount_t &rowsWritten = parent.readerWait(rowsRead);
|
|
|
- {
|
|
|
- CriticalUnblock b(parent.crit);
|
|
|
- unsigned mins=0;
|
|
|
- loop
|
|
|
- {
|
|
|
- if (readWaitSem.wait(60000))
|
|
|
- break; // NB: will also be signal if aborting
|
|
|
- ActPrintLog(parent.activity, "output %d @ row # %"RCPF"d, has been blocked for %d minute(s)", output, rowsRead, ++mins);
|
|
|
- }
|
|
|
- }
|
|
|
- if (parent.isEof(rowsRead))
|
|
|
- return 0;
|
|
|
- }
|
|
|
- return parent.rowsWritten;
|
|
|
- }
|
|
|
inline void wakeRead()
|
|
|
{
|
|
|
if (readerWaiting)
|
|
@@ -868,6 +858,7 @@ protected:
|
|
|
init();
|
|
|
outputOwnedRows.clear();
|
|
|
}
|
|
|
+ inline unsigned queryOutput() const { return output; }
|
|
|
inline CRowSet *queryRowSet() { return rowSet; }
|
|
|
const void *nextRow()
|
|
|
{
|
|
@@ -878,13 +869,13 @@ protected:
|
|
|
CriticalBlock b(parent.crit);
|
|
|
if (!rowSet || (row == (rowsInRowSet = rowSet->getRowCount())))
|
|
|
{
|
|
|
- rowcount_t totalRows = readerWait(rowsRead);
|
|
|
+ rowcount_t totalRows = parent.readerWait(*this, rowsRead);
|
|
|
if (0 == totalRows)
|
|
|
{
|
|
|
doStop();
|
|
|
return NULL;
|
|
|
}
|
|
|
- if (!rowSet || (row == (rowsInRowSet = rowSet->getRowCount()))) // maybe have caught up in same rowSet
|
|
|
+ if (!rowSet || (row == (rowsInRowSet = rowSet->getRowCount()))) // may have caught up in same rowSet
|
|
|
{
|
|
|
Owned<CRowSet> newRows = parent.loadMore(output);
|
|
|
if (rowSet)
|
|
@@ -897,7 +888,6 @@ protected:
|
|
|
}
|
|
|
}
|
|
|
rowsRead++;
|
|
|
- SpinBlock b(parent.spin);
|
|
|
const void *retrow = rowSet->getRow(row++);
|
|
|
return retrow;
|
|
|
}
|
|
@@ -911,12 +901,53 @@ protected:
|
|
|
CActivityBase *activity;
|
|
|
size32_t minChunkSize;
|
|
|
unsigned lowestChunk, lowestOutput, outputCount, totalChunksOut;
|
|
|
+ rowidx_t maxRows;
|
|
|
bool stopped;
|
|
|
Owned<CRowSet> inMemRows;
|
|
|
CriticalSection crit;
|
|
|
- SpinLock spin;
|
|
|
Linked<IOutputMetaData> meta;
|
|
|
+ QueueOf<CRowSet, false> chunkPool;
|
|
|
+ unsigned maxPoolChunks;
|
|
|
|
|
|
+ inline const rowcount_t readerWait(COutput &output, const rowcount_t rowsRead)
|
|
|
+ {
|
|
|
+ if (rowsRead == rowsWritten)
|
|
|
+ {
|
|
|
+ if (stopped || writeAtEof)
|
|
|
+ return 0;
|
|
|
+ output.readerWaiting = true;
|
|
|
+#ifdef TRACE_WRITEAHEAD
|
|
|
+ ActPrintLogEx(&activity->queryContainer(), thorlog_all, MCdebugProgress, "readerWait(%d)", output.queryOutput());
|
|
|
+#endif
|
|
|
+ ++readersWaiting;
|
|
|
+ {
|
|
|
+ CriticalUnblock b(crit);
|
|
|
+ unsigned mins=0;
|
|
|
+ loop
|
|
|
+ {
|
|
|
+ if (output.readWaitSem.wait(60000))
|
|
|
+ break; // NB: will also be signal if aborting
|
|
|
+ ActPrintLog(activity, "output %d @ row # %"RCPF"d, has been blocked for %d minute(s)", output.queryOutput(), rowsRead, ++mins);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (isEof(rowsRead))
|
|
|
+ return 0;
|
|
|
+ }
|
|
|
+ return rowsWritten;
|
|
|
+ }
|
|
|
+ CRowSet *newRowSet(unsigned chunk)
|
|
|
+ {
|
|
|
+ {
|
|
|
+ CriticalBlock b(rowSetCacheCrit);
|
|
|
+ if (cachedRowSets.ordinality())
|
|
|
+ {
|
|
|
+ CRowSet *rowSet = &cachedRowSets.popGet();
|
|
|
+ rowSet->setChunk(chunk);
|
|
|
+ return rowSet;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return new CRowSet(*this, chunk, maxRows);
|
|
|
+ }
|
|
|
inline COutput &queryCOutput(unsigned i) { return (COutput &) outputs.item(i); }
|
|
|
inline unsigned getLowest()
|
|
|
{
|
|
@@ -1001,7 +1032,7 @@ protected:
|
|
|
}
|
|
|
virtual void freeOffsetChunk(unsigned chunk) = 0;
|
|
|
virtual CRowSet *readRows(unsigned output, unsigned chunk) = 0;
|
|
|
- virtual void flushRows() = 0;
|
|
|
+ virtual void flushRows(ISharedSmartBufferCallback *callback) = 0;
|
|
|
virtual size32_t rowSize(const void *row) = 0;
|
|
|
public:
|
|
|
|
|
@@ -1018,13 +1049,15 @@ public:
|
|
|
if (minChunkSize > 0x10000)
|
|
|
minChunkSize += 2*(minSize+1);
|
|
|
}
|
|
|
+ maxRows = (minChunkSize / minSize) + 1;
|
|
|
outputCount = _outputCount;
|
|
|
unsigned c=0;
|
|
|
for (; c<outputCount; c++)
|
|
|
{
|
|
|
outputs.append(* new COutput(*this, c));
|
|
|
}
|
|
|
- inMemRows.setown(new CRowSet(*activity, 0));
|
|
|
+ inMemRows.setown(newRowSet(0));
|
|
|
+ maxPoolChunks = MIN_POOL_CHUNKS;
|
|
|
}
|
|
|
~CSharedWriteAheadBase()
|
|
|
{
|
|
@@ -1052,7 +1085,7 @@ public:
|
|
|
}
|
|
|
|
|
|
// ISharedSmartBuffer
|
|
|
- virtual void putRow(const void *row)
|
|
|
+ virtual void putRow(const void *row, ISharedSmartBufferCallback *callback)
|
|
|
{
|
|
|
if (stopped)
|
|
|
{
|
|
@@ -1061,27 +1094,38 @@ public:
|
|
|
}
|
|
|
unsigned len=rowSize(row);
|
|
|
CriticalBlock b(crit);
|
|
|
+ bool paged = false;
|
|
|
if (totalOutChunkSize >= minChunkSize) // chunks required to be at least minChunkSize
|
|
|
{
|
|
|
unsigned reader=anyReaderBehind();
|
|
|
if (NotFound != reader)
|
|
|
- flushRows();
|
|
|
- inMemRows.setown(new CRowSet(*activity, ++totalChunksOut));
|
|
|
+ flushRows(callback);
|
|
|
+ inMemRows.setown(newRowSet(++totalChunksOut));
|
|
|
#ifdef TRACE_WRITEAHEAD
|
|
|
totalOutChunkSize = sizeof(unsigned);
|
|
|
#else
|
|
|
totalOutChunkSize = 0;
|
|
|
#endif
|
|
|
+ /* If callback used to signal paged out, only signal readers on page event,
|
|
|
+ * This is to minimize time spent by fast readers constantly catching up and waiting and getting woken up per record
|
|
|
+ */
|
|
|
+ if (callback)
|
|
|
+ {
|
|
|
+ paged = true;
|
|
|
+ callback->paged();
|
|
|
+ }
|
|
|
}
|
|
|
- {
|
|
|
- SpinBlock b(spin);
|
|
|
- inMemRows->addRow(row);
|
|
|
- }
|
|
|
+ inMemRows->addRow(row);
|
|
|
|
|
|
totalOutChunkSize += len;
|
|
|
rowsWritten++; // including NULLs(eogs)
|
|
|
|
|
|
- signalReaders();
|
|
|
+ if (!callback || paged)
|
|
|
+ signalReaders();
|
|
|
+ }
|
|
|
+ virtual void putRow(const void *row)
|
|
|
+ {
|
|
|
+ return putRow(row, NULL);
|
|
|
}
|
|
|
virtual void flush()
|
|
|
{
|
|
@@ -1113,8 +1157,25 @@ public:
|
|
|
inMemRows->reset(0);
|
|
|
}
|
|
|
friend class COutput;
|
|
|
+friend class CRowSet;
|
|
|
};
|
|
|
|
|
|
+CRowSet::CRowSet(CSharedWriteAheadBase &_sharedWriteAhead, unsigned _chunk, unsigned maxRows)
|
|
|
+ : sharedWriteAhead(_sharedWriteAhead), rows(*_sharedWriteAhead.activity, _sharedWriteAhead.activity, true, stableSort_none, true, maxRows), chunk(_chunk)
|
|
|
+{
|
|
|
+}
|
|
|
+
|
|
|
+bool CRowSet::Release() const
|
|
|
+{
|
|
|
+ {
|
|
|
+ // NB: Occasionally, >1 thread may be releasing a CRowSet concurrently and miss a opportunity to reuse, but that's ok.
|
|
|
+ SpinBlock b(lock);
|
|
|
+ if (!IsShared())
|
|
|
+ sharedWriteAhead.reuse((CRowSet *)this);
|
|
|
+ }
|
|
|
+ return CSimpleInterface::Release();
|
|
|
+}
|
|
|
+
|
|
|
class CSharedWriteAheadDisk : public CSharedWriteAheadBase
|
|
|
{
|
|
|
IDiskUsage *iDiskUsage;
|
|
@@ -1226,7 +1287,7 @@ class CSharedWriteAheadDisk : public CSharedWriteAheadBase
|
|
|
return chunk.getClear();
|
|
|
}
|
|
|
#ifdef TRACE_WRITEAHEAD
|
|
|
- ActPrintLogEx(&activity->queryContainer(), thorlog_all, MCdebugProgress, "getOutOffset: got [free] offset = %"I64F"d, size=%d", diskChunk.offset, diskChunk.size);
|
|
|
+ ActPrintLogEx(&activity->queryContainer(), thorlog_all, MCdebugProgress, "getOutOffset: got [free] offset = %"I64F"d, size=%d", nextChunk->offset, nextChunk->size);
|
|
|
#endif
|
|
|
freeChunksSized.remove(nextPos);
|
|
|
freeChunks.zap(*nextChunk);
|
|
@@ -1280,6 +1341,15 @@ class CSharedWriteAheadDisk : public CSharedWriteAheadBase
|
|
|
// chunk(unused here) is nominal sequential page #, savedChunks is page # in diskfile
|
|
|
assertex(savedChunks.ordinality());
|
|
|
Owned<Chunk> freeChunk = savedChunks.dequeue();
|
|
|
+ if (freeChunk->rowSet)
|
|
|
+ {
|
|
|
+ Owned<CRowSet> rowSet = chunkPool.dequeue(freeChunk->rowSet);
|
|
|
+#ifdef TRACE_WRITEAHEAD
|
|
|
+ ActPrintLogEx(&activity->queryContainer(), thorlog_all, MCdebugProgress, "freeOffsetChunk (chunk=%d) chunkPool, savedChunks size=%d, chunkPool size=%d", rowSet->queryChunk(), savedChunks.ordinality(), chunkPool.ordinality());
|
|
|
+#endif
|
|
|
+ VALIDATEEQ(chunk, rowSet->queryChunk());
|
|
|
+ return;
|
|
|
+ }
|
|
|
unsigned nmemb = freeChunks.ordinality();
|
|
|
if (0 == nmemb)
|
|
|
addFreeChunk(freeChunk);
|
|
@@ -1329,63 +1399,88 @@ class CSharedWriteAheadDisk : public CSharedWriteAheadBase
|
|
|
}
|
|
|
}
|
|
|
Chunk &chunk = *savedChunks.item(whichChunk);
|
|
|
- Owned<ISerialStream> stream = createFileSerialStream(spillFileIO, chunk.offset);
|
|
|
+ Owned<CRowSet> rowSet;
|
|
|
+ if (chunk.rowSet)
|
|
|
+ {
|
|
|
+ rowSet.set(chunk.rowSet);
|
|
|
#ifdef TRACE_WRITEAHEAD
|
|
|
- unsigned diskChunkNum;
|
|
|
- stream->get(sizeof(diskChunkNum), &diskChunkNum);
|
|
|
- VALIDATEEQ(diskChunkNum, currentChunkNum);
|
|
|
+ ActPrintLogEx(&activity->queryContainer(), thorlog_all, MCdebugProgress, "readRows (chunk=%d) output: %d, savedChunks size=%d, chunkPool size=%d, currentChunkNum=%d, whichChunk=%d", rowSet->queryChunk(), output, savedChunks.ordinality(), chunkPool.ordinality(), currentChunkNum, whichChunk);
|
|
|
#endif
|
|
|
- CThorStreamDeserializerSource ds(stream);
|
|
|
- Owned<CRowSet> rowSet = new CRowSet(*activity, currentChunkNum);
|
|
|
- loop
|
|
|
- {
|
|
|
- byte b;
|
|
|
- ds.read(sizeof(b),&b);
|
|
|
- if (!b)
|
|
|
- break;
|
|
|
- if (1==b)
|
|
|
+ VALIDATEEQ(rowSet->queryChunk(), currentChunkNum);
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ Owned<ISerialStream> stream = createFileSerialStream(spillFileIO, chunk.offset);
|
|
|
+#ifdef TRACE_WRITEAHEAD
|
|
|
+ unsigned diskChunkNum;
|
|
|
+ stream->get(sizeof(diskChunkNum), &diskChunkNum);
|
|
|
+ VALIDATEEQ(diskChunkNum, currentChunkNum);
|
|
|
+#endif
|
|
|
+ CThorStreamDeserializerSource ds(stream);
|
|
|
+ rowSet.setown(newRowSet(currentChunkNum));
|
|
|
+ loop
|
|
|
{
|
|
|
- RtlDynamicRowBuilder rowBuilder(allocator);
|
|
|
- size32_t sz = deserializer->deserialize(rowBuilder, ds);
|
|
|
- rowSet->addRow(rowBuilder.finalizeRowClear(sz));
|
|
|
+ byte b;
|
|
|
+ ds.read(sizeof(b),&b);
|
|
|
+ if (!b)
|
|
|
+ break;
|
|
|
+ if (1==b)
|
|
|
+ {
|
|
|
+ RtlDynamicRowBuilder rowBuilder(allocator);
|
|
|
+ size32_t sz = deserializer->deserialize(rowBuilder, ds);
|
|
|
+ rowSet->addRow(rowBuilder.finalizeRowClear(sz));
|
|
|
+ }
|
|
|
+ else if (2==b)
|
|
|
+ rowSet->addRow(NULL);
|
|
|
}
|
|
|
- else if (2==b)
|
|
|
- rowSet->addRow(NULL);
|
|
|
}
|
|
|
return rowSet.getClear();
|
|
|
}
|
|
|
- virtual void flushRows()
|
|
|
+ virtual void flushRows(ISharedSmartBufferCallback *callback)
|
|
|
{
|
|
|
// NB: called in crit
|
|
|
- MemoryBuffer mb;
|
|
|
- mb.ensureCapacity(minChunkSize); // starting size/could be more if variable and bigger
|
|
|
+ Owned<Chunk> chunk;
|
|
|
+ if (chunkPool.ordinality() < maxPoolChunks)
|
|
|
+ {
|
|
|
+ chunk.setown(new Chunk(inMemRows));
|
|
|
+ chunkPool.enqueue(inMemRows.getLink());
|
|
|
#ifdef TRACE_WRITEAHEAD
|
|
|
- mb.append(inMemRows->queryChunk()); // for debug purposes only
|
|
|
+ ActPrintLogEx(&activity->queryContainer(), thorlog_all, MCdebugProgress, "flushRows (chunk=%d) savedChunks size=%d, chunkPool size=%d", inMemRows->queryChunk(), savedChunks.ordinality()+1, chunkPool.ordinality());
|
|
|
#endif
|
|
|
- CMemoryRowSerializer mbs(mb);
|
|
|
- unsigned r=0;
|
|
|
- for (;r<inMemRows->getRowCount();r++)
|
|
|
+ }
|
|
|
+ else
|
|
|
{
|
|
|
- OwnedConstThorRow row = inMemRows->getRow(r);
|
|
|
- if (row)
|
|
|
+ /* It might be worth adding a heuristic here, to estimate time for readers to catch up, vs time spend writing and reading.
|
|
|
+ * Could block for readers to catch up if cost of writing/reads outweighs avg cost of catch up...
|
|
|
+ */
|
|
|
+ MemoryBuffer mb;
|
|
|
+ mb.ensureCapacity(minChunkSize); // starting size/could be more if variable and bigger
|
|
|
+#ifdef TRACE_WRITEAHEAD
|
|
|
+ mb.append(inMemRows->queryChunk()); // for debug purposes only
|
|
|
+#endif
|
|
|
+ CMemoryRowSerializer mbs(mb);
|
|
|
+ unsigned r=0;
|
|
|
+ for (;r<inMemRows->getRowCount();r++)
|
|
|
{
|
|
|
- mb.append((byte)1);
|
|
|
- serializer->serialize(mbs,(const byte *)row.get());
|
|
|
+ OwnedConstThorRow row = inMemRows->getRow(r);
|
|
|
+ if (row)
|
|
|
+ {
|
|
|
+ mb.append((byte)1);
|
|
|
+ serializer->serialize(mbs,(const byte *)row.get());
|
|
|
+ }
|
|
|
+ else
|
|
|
+ mb.append((byte)2); // eog
|
|
|
}
|
|
|
- else
|
|
|
- mb.append((byte)2); // eog
|
|
|
- }
|
|
|
- mb.append((byte)0);
|
|
|
-
|
|
|
- size32_t len = mb.length();
|
|
|
- Owned<Chunk> freeChunk = getOutOffset(len); // will find space for 'len', might be bigger if from free list
|
|
|
-
|
|
|
- spillFileIO->write(freeChunk->offset, len, mb.toByteArray());
|
|
|
-
|
|
|
- savedChunks.enqueue(freeChunk.getClear());
|
|
|
+ mb.append((byte)0);
|
|
|
+ size32_t len = mb.length();
|
|
|
+ chunk.setown(getOutOffset(len)); // will find space for 'len', might be bigger if from free list
|
|
|
+ spillFileIO->write(chunk->offset, len, mb.toByteArray());
|
|
|
#ifdef TRACE_WRITEAHEAD
|
|
|
- ActPrintLogEx(&activity->queryContainer(), thorlog_all, MCdebugProgress, "Flushed chunk = %d (savedChunks pos=%d), writeOffset = %"I64F"d, writeSize = %d", inMemRows->queryChunk(), savedChunks.ordinality()-1, freeChunk->offset, len);
|
|
|
+ ActPrintLogEx(&activity->queryContainer(), thorlog_all, MCdebugProgress, "Flushed chunk = %d (savedChunks pos=%d), writeOffset = %"I64F"d, writeSize = %d", inMemRows->queryChunk(), savedChunks.ordinality(), chunk->offset, len);
|
|
|
#endif
|
|
|
+ }
|
|
|
+
|
|
|
+ savedChunks.enqueue(chunk.getClear());
|
|
|
}
|
|
|
virtual size32_t rowSize(const void *row)
|
|
|
{
|
|
@@ -1440,13 +1535,10 @@ ISharedSmartBuffer *createSharedSmartDiskBuffer(CActivityBase *activity, const c
|
|
|
return new CSharedWriteAheadDisk(activity, spillname, outputs, rowIf, iDiskUsage);
|
|
|
}
|
|
|
|
|
|
-#define MIN_POOL_CHUNKS 10
|
|
|
class CSharedWriteAheadMem : public CSharedWriteAheadBase
|
|
|
{
|
|
|
- QueueOf<CRowSet, false> chunkPool;
|
|
|
Semaphore poolSem;
|
|
|
bool writerBlocked;
|
|
|
- unsigned maxPoolChunks;
|
|
|
|
|
|
virtual void markStop()
|
|
|
{
|
|
@@ -1477,14 +1569,19 @@ class CSharedWriteAheadMem : public CSharedWriteAheadBase
|
|
|
VALIDATEEQ(queryCOutput(output).currentChunkNum, rowSet->queryChunk());
|
|
|
return rowSet.getClear();
|
|
|
}
|
|
|
- virtual void flushRows()
|
|
|
+ virtual void flushRows(ISharedSmartBufferCallback *callback)
|
|
|
{
|
|
|
// NB: called in crit
|
|
|
if (chunkPool.ordinality() >= maxPoolChunks)
|
|
|
{
|
|
|
writerBlocked = true;
|
|
|
- { CriticalUnblock b(crit);
|
|
|
+ {
|
|
|
+ CriticalUnblock b(crit);
|
|
|
+ if (callback)
|
|
|
+ callback->blocked();
|
|
|
poolSem.wait();
|
|
|
+ if (callback)
|
|
|
+ callback->unblocked();
|
|
|
if (stopped) return;
|
|
|
}
|
|
|
unsigned reader=anyReaderBehind();
|