|
@@ -909,6 +909,7 @@ protected:
|
|
|
Owned<CRowSet> inMemRows;
|
|
|
CriticalSection crit;
|
|
|
Linked<IOutputMetaData> meta;
|
|
|
+ Linked<IOutputRowSerializer> serializer;
|
|
|
QueueOf<CRowSet, false> chunkPool;
|
|
|
unsigned maxPoolChunks;
|
|
|
bool reuseRowSets;
|
|
@@ -1042,7 +1043,8 @@ public:
|
|
|
|
|
|
IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
|
|
|
|
|
|
- CSharedWriteAheadBase(CActivityBase *_activity, unsigned _outputCount, IThorRowInterfaces *rowIf) : activity(_activity), outputCount(_outputCount), meta(rowIf->queryRowMetaData())
|
|
|
+ CSharedWriteAheadBase(CActivityBase *_activity, unsigned _outputCount, IThorRowInterfaces *rowIf)
|
|
|
+ : activity(_activity), outputCount(_outputCount), meta(rowIf->queryRowMetaData()), serializer(rowIf->queryRowSerializer())
|
|
|
{
|
|
|
init();
|
|
|
minChunkSize = 0x2000;
|
|
@@ -1193,7 +1195,6 @@ class CSharedWriteAheadDisk : public CSharedWriteAheadBase
|
|
|
QueueOf<Chunk, false> savedChunks;
|
|
|
offset_t highOffset;
|
|
|
Linked<IEngineRowAllocator> allocator;
|
|
|
- Linked<IOutputRowSerializer> serializer;
|
|
|
Linked<IOutputRowDeserializer> deserializer;
|
|
|
IOutputMetaData *serializeMeta;
|
|
|
|
|
@@ -1501,7 +1502,7 @@ class CSharedWriteAheadDisk : public CSharedWriteAheadBase
|
|
|
}
|
|
|
public:
|
|
|
CSharedWriteAheadDisk(CActivityBase *activity, const char *spillName, unsigned outputCount, IThorRowInterfaces *rowIf, IDiskUsage *_iDiskUsage) : CSharedWriteAheadBase(activity, outputCount, rowIf),
|
|
|
- allocator(rowIf->queryRowAllocator()), serializer(rowIf->queryRowSerializer()), deserializer(rowIf->queryRowDeserializer()), serializeMeta(meta->querySerializedDiskMeta()), iDiskUsage(_iDiskUsage)
|
|
|
+ allocator(rowIf->queryRowAllocator()), deserializer(rowIf->queryRowDeserializer()), serializeMeta(meta->querySerializedDiskMeta()), iDiskUsage(_iDiskUsage)
|
|
|
{
|
|
|
assertex(spillName);
|
|
|
spillFile.setown(createIFile(spillName));
|
|
@@ -1608,7 +1609,11 @@ class CSharedWriteAheadMem : public CSharedWriteAheadBase
|
|
|
}
|
|
|
virtual size32_t rowSize(const void *row)
|
|
|
{
|
|
|
- return meta->getRecordSize(row); // space in mem.
|
|
|
+ if (!row)
|
|
|
+ return 1; // eog;
|
|
|
+ CSizingSerializer ssz;
|
|
|
+ serializer->serialize(ssz, (const byte *)row);
|
|
|
+ return ssz.size();
|
|
|
}
|
|
|
public:
|
|
|
CSharedWriteAheadMem(CActivityBase *activity, unsigned outputCount, IThorRowInterfaces *rowif, unsigned buffSize) : CSharedWriteAheadBase(activity, outputCount, rowif)
|