/*##############################################################################
Copyright (C) 2011 HPCC Systems.
All rights reserved. This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see .
############################################################################## */
#include "platform.h"
#include "jliball.hpp"
#include "eclrtl.hpp"
#include "eclhelper.hpp"
#include "rtlds_imp.hpp"
#include "rtlread_imp.hpp"
#define FIRST_CHUNK_SIZE 0x100
#define DOUBLE_LIMIT 0x10000 // must be a power of 2
unsigned getNextSize(unsigned max, unsigned required)
{
if (required > DOUBLE_LIMIT)
{
max = (required + DOUBLE_LIMIT) & ~(DOUBLE_LIMIT-1);
if (required >= max)
throw MakeStringException(-1, "getNextSize: Request for %d bytes oldMax = %d", required, max);
}
else
{
if (max == 0)
max = FIRST_CHUNK_SIZE;
while (required >= max)
max += max;
}
return max;
}
//---------------------------------------------------------------------------
RtlDatasetBuilder::RtlDatasetBuilder()
{
maxSize = 0;
buffer = NULL;
totalSize = 0;
}
RtlDatasetBuilder::~RtlDatasetBuilder()
{
free(buffer);
}
void RtlDatasetBuilder::ensure(size32_t required)
{
if (required > maxSize)
{
maxSize = getNextSize(maxSize, required);
buffer = (byte *)realloc(buffer, maxSize);
if (!buffer)
throw MakeStringException(-1, "Failed to allocate temporary dataset (requesting %d bytes)", maxSize);
}
}
byte * RtlDatasetBuilder::ensureCapacity(size32_t required, const char * fieldName)
{
ensure(totalSize + required);
return buffer + totalSize;
}
void RtlDatasetBuilder::flushDataset()
{
}
void RtlDatasetBuilder::getData(size32_t & len, void * & data)
{
flushDataset();
len = totalSize;
data = malloc(totalSize);
memcpy(data, buffer, totalSize);
}
size32_t RtlDatasetBuilder::getSize()
{
flushDataset();
return totalSize;
}
byte * RtlDatasetBuilder::queryData()
{
flushDataset();
return buffer;
}
void RtlDatasetBuilder::reportMissingRow() const
{
throw MakeStringException(MSGAUD_user, 1000, "RtlDatasetBuilder::row() is NULL");
}
//---------------------------------------------------------------------------
RtlFixedDatasetBuilder::RtlFixedDatasetBuilder(unsigned _recordSize, unsigned maxRows)
{
recordSize = _recordSize;
if ((int)maxRows > 0)
ensure(recordSize * maxRows);
}
byte * RtlFixedDatasetBuilder::createSelf()
{
self = ensureCapacity(recordSize, NULL);
return self;
}
//---------------------------------------------------------------------------
RtlLimitedFixedDatasetBuilder::RtlLimitedFixedDatasetBuilder(unsigned _recordSize, unsigned _maxRows, DefaultRowCreator _rowCreator, IResourceContext *_ctx) : RtlFixedDatasetBuilder(_recordSize, _maxRows)
{
maxRows = _maxRows;
if ((int)maxRows < 0) maxRows = 0;
rowCreator = _rowCreator;
ctx = _ctx;
}
byte * RtlLimitedFixedDatasetBuilder::createRow()
{
if (totalSize >= maxRows * recordSize)
return NULL;
return RtlFixedDatasetBuilder::createRow();
}
void RtlLimitedFixedDatasetBuilder::flushDataset()
{
if (rowCreator)
{
while (totalSize < maxRows * recordSize)
{
createRow();
size32_t size = rowCreator(rowBuilder(), ctx);
finalizeRow(size);
}
}
RtlFixedDatasetBuilder::flushDataset();
}
//---------------------------------------------------------------------------
RtlVariableDatasetBuilder::RtlVariableDatasetBuilder(IRecordSize & _recordSize)
{
recordSize = &_recordSize;
maxRowSize = recordSize->getRecordSize(NULL); // initial size
}
byte * RtlVariableDatasetBuilder::createSelf()
{
self = ensureCapacity(maxRowSize, NULL);
return self;
}
void RtlVariableDatasetBuilder::deserializeRow(IOutputRowDeserializer & deserializer, IRowDeserializerSource & in)
{
createRow();
size32_t rowSize = deserializer.deserialize(rowBuilder(), in);
finalizeRow(rowSize);
}
//---------------------------------------------------------------------------
RtlLimitedVariableDatasetBuilder::RtlLimitedVariableDatasetBuilder(IRecordSize & _recordSize, unsigned _maxRows, DefaultRowCreator _rowCreator, IResourceContext *_ctx) : RtlVariableDatasetBuilder(_recordSize)
{
numRows = 0;
maxRows = _maxRows;
rowCreator = _rowCreator;
ctx = _ctx;
}
byte * RtlLimitedVariableDatasetBuilder::createRow()
{
if (numRows >= maxRows)
return NULL;
numRows++;
return RtlVariableDatasetBuilder::createRow();
}
void RtlLimitedVariableDatasetBuilder::flushDataset()
{
if (rowCreator)
{
while (numRows < maxRows)
{
createRow();
size32_t thisSize = rowCreator(rowBuilder(), ctx);
finalizeRow(thisSize);
}
}
RtlVariableDatasetBuilder::flushDataset();
}
//---------------------------------------------------------------------------
byte * * rtlRowsAttr::linkrows() const
{
if (rows)
rtlLinkRowset(rows);
return rows;
}
void rtlRowsAttr::set(size32_t _count, byte * * _rows)
{
setown(_count, rtlLinkRowset(_rows));
}
void rtlRowsAttr::setRow(IEngineRowAllocator * rowAllocator, const byte * _row)
{
setown(1, rowAllocator->appendRowOwn(NULL, 1, rowAllocator->linkRow(_row)));
}
void rtlRowsAttr::setown(size32_t _count, byte * * _rows)
{
dispose();
count = _count;
rows = _rows;
}
void rtlRowsAttr::dispose()
{
if (rows)
rtlReleaseRowset(count, rows);
}
//---------------------------------------------------------------------------
void rtlReportFieldOverflow(unsigned size, unsigned max, const RtlFieldInfo * field)
{
if (field)
rtlReportFieldOverflow(size, max, field->name->str());
else
rtlReportRowOverflow(size, max);
}
void RtlRowBuilderBase::reportMissingRow() const
{
throw MakeStringException(MSGAUD_user, 1000, "RtlRowBuilderBase::row() is NULL");
}
byte * RtlDynamicRowBuilder::ensureCapacity(size32_t required, const char * fieldName)
{
if (required > maxLength)
{
if (!self)
create();
if (required > maxLength)
{
void * next = rowAllocator->resizeRow(required, self, maxLength);
if (!next)
{
rtlReportFieldOverflow(required, maxLength, fieldName);
return NULL;
}
self = static_cast(next);
}
}
return self;
}
void RtlDynamicRowBuilder::swapWith(RtlDynamicRowBuilder & other)
{
size32_t savedMaxLength = maxLength;
void * savedSelf = getUnfinalizedClear();
setown(other.getMaxLength(), other.getUnfinalizedClear());
other.setown(savedMaxLength, savedSelf);
}
//---------------------------------------------------------------------------
byte * RtlStaticRowBuilder::ensureCapacity(size32_t required, const char * fieldName)
{
if (required <= maxLength)
return static_cast(self);
rtlReportFieldOverflow(required, maxLength, fieldName);
return NULL;
}
byte * RtlStaticRowBuilder::createSelf()
{
throwUnexpected();
}
//---------------------------------------------------------------------------
RtlLinkedDatasetBuilder::RtlLinkedDatasetBuilder(IEngineRowAllocator * _rowAllocator, int _choosenLimit) : builder(_rowAllocator, false)
{
rowAllocator = LINK(_rowAllocator);
rowset = NULL;
count = 0;
max = 0;
choosenLimit = (unsigned)_choosenLimit;
}
RtlLinkedDatasetBuilder::~RtlLinkedDatasetBuilder()
{
builder.clear();
if (rowset)
rowAllocator->releaseRowset(count, rowset);
::Release(rowAllocator);
}
void RtlLinkedDatasetBuilder::append(const void * source)
{
if (count < choosenLimit)
{
ensure(count+1);
rowset[count] = (byte *)rowAllocator->linkRow(source);
count++;
}
}
void RtlLinkedDatasetBuilder::appendRows(size32_t num, byte * * rows)
{
if (num && (count < choosenLimit))
{
unsigned numToAdd = (count + num < choosenLimit) ? num : choosenLimit - count;
ensure(count+numToAdd);
for (unsigned i=0; i < numToAdd; i++)
rowset[count+i] = (byte *)rowAllocator->linkRow(rows[i]);
count += numToAdd;
}
}
void RtlLinkedDatasetBuilder::appendOwn(const void * row)
{
assertex(!builder.exists());
if (count < choosenLimit)
{
ensure(count+1);
rowset[count] = (byte *)row;
count++;
}
else
rowAllocator->releaseRow(row);
}
byte * RtlLinkedDatasetBuilder::createRow()
{
if (count >= choosenLimit)
return NULL;
return builder.getSelf();
}
//cloned from thorcommon.cpp
class RtlChildRowLinkerWalker : implements IIndirectMemberVisitor
{
public:
virtual void visitRowset(size32_t count, byte * * rows)
{
rtlLinkRowset(rows);
}
virtual void visitRow(const byte * row)
{
rtlLinkRow(row);
}
};
void RtlLinkedDatasetBuilder::cloneRow(size32_t len, const void * row)
{
if (count >= choosenLimit)
return;
byte * self = builder.ensureCapacity(len, NULL);
memcpy(self, row, len);
IOutputMetaData * meta = rowAllocator->queryOutputMeta();
if (meta->getMetaFlags() & MDFneedserialize)
{
RtlChildRowLinkerWalker walker;
meta->walkIndirectMembers(self, walker);
}
finalizeRow(len);
}
void RtlLinkedDatasetBuilder::deserializeRow(IOutputRowDeserializer & deserializer, IRowDeserializerSource & in)
{
builder.ensureRow();
size32_t rowSize = deserializer.deserialize(builder, in);
finalizeRow(rowSize);
}
inline void doDeserializeRowset(RtlLinkedDatasetBuilder & builder, IOutputRowDeserializer & deserializer, IRowDeserializerSource & in, offset_t marker, bool isGrouped)
{
byte eogPending = false;
while (!in.finishedNested(marker))
{
if (isGrouped && eogPending)
builder.appendEOG();
builder.deserializeRow(deserializer, in);
if (isGrouped)
in.read(1, &eogPending);
}
}
inline void doSerializeRowset(IRowSerializerTarget & out, IOutputRowSerializer * serializer, size32_t count, byte * * rows, bool isGrouped)
{
for (unsigned i=0; i < count; i++)
{
byte *row = rows[i];
if (row) // When serializing a dictionary, there may be nulls in the rowset. These can be skipped (we rehash on deserialize)
{
serializer->serialize(out, rows[i]);
if (isGrouped)
{
byte eogPending = (i+1 < count) && (rows[i+1] == NULL);
out.put(1, &eogPending);
}
}
}
}
void RtlLinkedDatasetBuilder::deserialize(IOutputRowDeserializer & deserializer, IRowDeserializerSource & in, bool isGrouped)
{
offset_t marker = in.beginNested();
doDeserializeRowset(*this, deserializer, in, marker, isGrouped);
}
void RtlLinkedDatasetBuilder::finalizeRows()
{
if (count != max)
resize(count);
}
void RtlLinkedDatasetBuilder::finalizeRow(size32_t rowSize)
{
assertex(builder.exists());
const void * next = builder.finalizeRowClear(rowSize);
appendOwn(next);
}
byte * * RtlLinkedDatasetBuilder::linkrows()
{
finalizeRows();
return rtlLinkRowset(rowset);
}
void RtlLinkedDatasetBuilder::expand(size32_t required)
{
assertex(required <= choosenLimit);
//MORE: Next factoring change this so it passes this logic over to the row allocator
size32_t newMax = max ? max : 4;
while (newMax < required)
{
newMax += newMax;
assertex(newMax);
}
if (newMax > choosenLimit)
newMax = choosenLimit;
resize(newMax);
}
void RtlLinkedDatasetBuilder::resize(size32_t required)
{
rowset = rowAllocator->reallocRows(rowset, max, required);
max = required;
}
void appendRowsToRowset(size32_t & targetCount, byte * * & targetRowset, IEngineRowAllocator * rowAllocator, size32_t extraCount, byte * * extraRows)
{
if (extraCount)
{
size32_t prevCount = targetCount;
byte * * expandedRowset = rowAllocator->reallocRows(targetRowset, prevCount, prevCount+extraCount);
for (unsigned i=0; i < extraCount; i++)
expandedRowset[prevCount+i] = (byte *)rowAllocator->linkRow(extraRows[i]);
targetCount = prevCount + extraCount;
targetRowset = expandedRowset;
}
}
inline void doDeserializeRowset(size32_t & count, byte * * & rowset, IEngineRowAllocator * _rowAllocator, IOutputRowDeserializer * deserializer, IRowDeserializerSource & in, bool isGrouped)
{
RtlLinkedDatasetBuilder builder(_rowAllocator);
builder.deserialize(*deserializer, in, isGrouped);
count = builder.getcount();
rowset = builder.linkrows();
}
extern ECLRTL_API void rtlDeserializeRowset(size32_t & count, byte * * & rowset, IEngineRowAllocator * _rowAllocator, IOutputRowDeserializer * deserializer, IRowDeserializerSource & in)
{
doDeserializeRowset(count, rowset, _rowAllocator, deserializer, in, false);
}
extern ECLRTL_API void rtlDeserializeGroupedRowset(size32_t & count, byte * * & rowset, IEngineRowAllocator * _rowAllocator, IOutputRowDeserializer * deserializer, IRowDeserializerSource & in)
{
doDeserializeRowset(count, rowset, _rowAllocator, deserializer, in, true);
}
void rtlSerializeRowset(IRowSerializerTarget & out, IOutputRowSerializer * serializer, size32_t count, byte * * rows)
{
size32_t marker = out.beginNested();
doSerializeRowset(out, serializer, count, rows, false);
out.endNested(marker);
}
void rtlSerializeGroupedRowset(IRowSerializerTarget & out, IOutputRowSerializer * serializer, size32_t count, byte * * rows)
{
size32_t marker = out.beginNested();
doSerializeRowset(out, serializer, count, rows, true);
out.endNested(marker);
}
//---------------------------------------------------------------------------
RtlLinkedDictionaryBuilder::RtlLinkedDictionaryBuilder(IEngineRowAllocator * _rowAllocator, IHThorHashLookupInfo *_hashInfo, unsigned _initialSize)
: builder(_rowAllocator, false)
{
init(_rowAllocator, _hashInfo, _initialSize);
}
RtlLinkedDictionaryBuilder::RtlLinkedDictionaryBuilder(IEngineRowAllocator * _rowAllocator, IHThorHashLookupInfo *_hashInfo)
: builder(_rowAllocator, false)
{
init(_rowAllocator, _hashInfo, 8);
}
void RtlLinkedDictionaryBuilder::init(IEngineRowAllocator * _rowAllocator, IHThorHashLookupInfo *_hashInfo, unsigned _initialSize)
{
hash = _hashInfo->queryHash();
compare = _hashInfo->queryCompare();
initialSize = _initialSize;
rowAllocator = LINK(_rowAllocator);
table = NULL;
usedCount = 0;
usedLimit = 0;
tableSize = 0;
}
RtlLinkedDictionaryBuilder::~RtlLinkedDictionaryBuilder()
{
// builder.clear();
if (table)
rowAllocator->releaseRowset(tableSize, table);
::Release(rowAllocator);
}
void RtlLinkedDictionaryBuilder::append(const void * source)
{
if (source)
{
appendOwn(rowAllocator->linkRow(source));
}
}
void RtlLinkedDictionaryBuilder::appendOwn(const void * source)
{
if (source)
{
checkSpace();
unsigned rowidx = hash->hash(source) % tableSize;
loop
{
const void *entry = table[rowidx];
if (entry && compare->docompare(source, entry)==0)
{
rowAllocator->releaseRow(entry);
usedCount--;
entry = NULL;
}
if (!entry)
{
table[rowidx] = (byte *) source;
usedCount++;
break;
}
rowidx++;
if (rowidx==tableSize)
rowidx = 0;
}
}
}
void RtlLinkedDictionaryBuilder::checkSpace()
{
if (!table)
{
table = rowAllocator->createRowset(initialSize);
tableSize = initialSize;
memset(table, 0, tableSize*sizeof(byte *));
usedLimit = (tableSize * 3) / 4;
usedCount = 0;
}
else if (usedCount > usedLimit)
{
// Rehash
byte * * oldTable = table;
unsigned oldSize = tableSize;
table = rowAllocator->createRowset(tableSize*2);
tableSize = tableSize*2; // Don't update until we have successfully allocated, so that we remain consistent if createRowset throws an exception.
memset(table, 0, tableSize * sizeof(byte *));
usedLimit = (tableSize * 3) / 4;
usedCount = 0;
unsigned i;
for (i = 0; i < oldSize; i++)
{
append(oldTable[i]); // we link the rows here...
}
rowAllocator->releaseRowset(oldSize, oldTable); // ... because this will release them
}
}
void RtlLinkedDictionaryBuilder::appendRows(size32_t num, byte * * rows)
{
// MORE - if we know that the source is already a hashtable, we can optimize the add to an empty table...
for (unsigned i=0; i < num; i++)
append(rows[i]);
}
void RtlLinkedDictionaryBuilder::finalizeRow(size32_t rowSize)
{
assertex(builder.exists());
const void * next = builder.finalizeRowClear(rowSize);
appendOwn(next);
}
extern ECLRTL_API byte *rtlDictionaryLookup(IHThorHashLookupInfo &hashInfo, size32_t tableSize, byte **table, const byte *source, byte *defaultRow)
{
IHash *hash = hashInfo.queryHash();
ICompare *compare = hashInfo.queryCompare();
unsigned rowidx = hash->hash(source) % tableSize;
loop
{
const void *entry = table[rowidx];
if (!entry)
return (byte *) rtlLinkRow(defaultRow);
if (compare->docompare(source, entry)==0)
return (byte *) rtlLinkRow(entry);
rowidx++;
if (rowidx==tableSize)
rowidx = 0;
}
}
//---------------------------------------------------------------------------
//These definitions should be shared with thorcommon, but to do that
//they would need to be moved to an rtlds.ipp header, which thorcommon then included.
class ECLRTL_API CThorRtlRowSerializer : implements IRowSerializerTarget
{
public:
CThorRtlRowSerializer(MemoryBuffer & _buffer) : buffer(_buffer)
{
}
virtual void put(size32_t len, const void * ptr)
{
buffer.append(len, ptr);
}
virtual size32_t beginNested()
{
unsigned pos = buffer.length();
buffer.append((size32_t)0);
return pos;
}
virtual void endNested(size32_t sizePos)
{
unsigned pos = buffer.length();
buffer.rewrite(sizePos);
buffer.append((size32_t)(pos - (sizePos + sizeof(size32_t))));
buffer.rewrite(pos);
}
protected:
MemoryBuffer & buffer;
};
inline void doDataset2RowsetX(size32_t & count, byte * * & rowset, IEngineRowAllocator * rowAllocator, IOutputRowDeserializer * deserializer, size32_t lenSrc, const void * src, bool isGrouped)
{
RtlLinkedDatasetBuilder builder(rowAllocator);
Owned stream = createMemorySerialStream(src, lenSrc);
CThorStreamDeserializerSource source(stream);
doDeserializeRowset(builder, *deserializer, source, lenSrc, isGrouped);
count = builder.getcount();
rowset = builder.linkrows();
}
inline void doRowset2DatasetX(unsigned & tlen, void * & tgt, IOutputRowSerializer * serializer, size32_t count, byte * * rows, bool isGrouped)
{
MemoryBuffer buffer;
CThorRtlRowSerializer out(buffer);
doSerializeRowset(out, serializer, count, rows, isGrouped);
rtlFree(tgt);
tlen = buffer.length();
tgt = buffer.detach(); // not strictly speaking correct - it should have been allocated with rtlMalloc();
}
extern ECLRTL_API void rtlDataset2RowsetX(size32_t & count, byte * * & rowset, IEngineRowAllocator * rowAllocator, IOutputRowDeserializer * deserializer, size32_t lenSrc, const void * src, bool isGrouped)
{
doDataset2RowsetX(count, rowset, rowAllocator, deserializer, lenSrc, src, isGrouped);
}
extern ECLRTL_API void rtlDataset2RowsetX(size32_t & count, byte * * & rowset, IEngineRowAllocator * rowAllocator, IOutputRowDeserializer * deserializer, size32_t lenSrc, const void * src)
{
doDataset2RowsetX(count, rowset, rowAllocator, deserializer, lenSrc, src, false);
}
extern ECLRTL_API void rtlGroupedDataset2RowsetX(size32_t & count, byte * * & rowset, IEngineRowAllocator * rowAllocator, IOutputRowDeserializer * deserializer, size32_t lenSrc, const void * src)
{
doDataset2RowsetX(count, rowset, rowAllocator, deserializer, lenSrc, src, true);
}
extern ECLRTL_API void rtlRowset2DatasetX(unsigned & tlen, void * & tgt, IOutputRowSerializer * serializer, size32_t count, byte * * rows, bool isGrouped)
{
doRowset2DatasetX(tlen, tgt, serializer, count, rows, isGrouped);
}
extern ECLRTL_API void rtlRowset2DatasetX(unsigned & tlen, void * & tgt, IOutputRowSerializer * serializer, size32_t count, byte * * rows)
{
doRowset2DatasetX(tlen, tgt, serializer, count, rows, false);
}
extern ECLRTL_API void rtlGroupedRowset2DatasetX(unsigned & tlen, void * & tgt, IOutputRowSerializer * serializer, size32_t count, byte * * rows)
{
doRowset2DatasetX(tlen, tgt, serializer, count, rows, true);
}
void deserializeRowsetX(size32_t & count, byte * * & rowset, IEngineRowAllocator * _rowAllocator, IOutputRowDeserializer * deserializer, MemoryBuffer &in)
{
Owned stream = createMemoryBufferSerialStream(in);
CThorStreamDeserializerSource rowSource(stream);
doDeserializeRowset(count, rowset, _rowAllocator, deserializer, rowSource, false);
}
void deserializeGroupedRowsetX(size32_t & count, byte * * & rowset, IEngineRowAllocator * _rowAllocator, IOutputRowDeserializer * deserializer, MemoryBuffer &in)
{
Owned stream = createMemoryBufferSerialStream(in);
CThorStreamDeserializerSource rowSource(stream);
doDeserializeRowset(count, rowset, _rowAllocator, deserializer, rowSource, true);
}
void serializeRowsetX(size32_t count, byte * * rows, IOutputRowSerializer * serializer, MemoryBuffer & buffer)
{
CThorRtlRowSerializer out(buffer);
rtlSerializeRowset(out, serializer, count, rows);
}
void serializeGroupedRowsetX(size32_t count, byte * * rows, IOutputRowSerializer * serializer, MemoryBuffer & buffer)
{
CThorRtlRowSerializer out(buffer);
rtlSerializeGroupedRowset(out, serializer, count, rows);
}
void serializeRow(const void * row, IOutputRowSerializer * serializer, MemoryBuffer & buffer)
{
CThorRtlRowSerializer out(buffer);
serializer->serialize(out, static_cast(row));
}
extern ECLRTL_API byte * rtlDeserializeBufferRow(IEngineRowAllocator * rowAllocator, IOutputRowDeserializer * deserializer, MemoryBuffer & buffer)
{
Owned stream = createMemoryBufferSerialStream(buffer);
CThorStreamDeserializerSource source(stream);
RtlDynamicRowBuilder rowBuilder(rowAllocator);
size32_t rowSize = deserializer->deserialize(rowBuilder, source);
return static_cast(const_cast(rowBuilder.finalizeRowClear(rowSize)));
}
extern ECLRTL_API byte * rtlDeserializeRow(IEngineRowAllocator * rowAllocator, IOutputRowDeserializer * deserializer, const void * src)
{
Owned stream = createMemorySerialStream(src, 0x7fffffff);
CThorStreamDeserializerSource source(stream);
RtlDynamicRowBuilder rowBuilder(rowAllocator);
size32_t rowSize = deserializer->deserialize(rowBuilder, source);
return static_cast(const_cast(rowBuilder.finalizeRowClear(rowSize)));
}
extern ECLRTL_API size32_t rtlSerializeRow(size32_t lenOut, void * out, IOutputRowSerializer * serializer, const void * src)
{
MemoryBuffer buffer;
CThorRtlRowSerializer result(buffer);
buffer.setBuffer(lenOut, out, false);
buffer.setWritePos(0);
serializer->serialize(result, (const byte *)src);
return buffer.length();
}
class ECLRTL_API CThorRtlBuilderSerializer : implements IRowSerializerTarget
{
public:
CThorRtlBuilderSerializer(ARowBuilder & _builder) : builder(_builder)
{
offset = 0;
}
virtual void put(size32_t len, const void * ptr)
{
byte * data = builder.ensureCapacity(offset + len, "");
memcpy(data+offset, ptr, len);
offset += len;
}
virtual size32_t beginNested()
{
unsigned pos = offset;
offset += sizeof(size32_t);
return pos;
}
virtual void endNested(size32_t sizePos)
{
byte * self = builder.getSelf();
*(size32_t *)(self + sizePos) = offset - (sizePos + sizeof(size32_t));
}
inline size32_t length() const { return offset; }
protected:
ARowBuilder & builder;
size32_t offset;
};
extern ECLRTL_API size32_t rtlDeserializeToBuilder(ARowBuilder & builder, IOutputRowDeserializer * deserializer, const void * src)
{
Owned stream = createMemorySerialStream(src, 0x7fffffff);
CThorStreamDeserializerSource source(stream);
return deserializer->deserialize(builder, source);
}
extern ECLRTL_API size32_t rtlSerializeToBuilder(ARowBuilder & builder, IOutputRowSerializer * serializer, const void * src)
{
CThorRtlBuilderSerializer target(builder);
serializer->serialize(target, (const byte *)src);
return target.length();
}
//---------------------------------------------------------------------------
RtlDatasetCursor::RtlDatasetCursor(size32_t _len, const void * _data)
{
setDataset(_len, _data);
}
bool RtlDatasetCursor::exists()
{
return (end != buffer);
}
const byte * RtlDatasetCursor::first()
{
if (buffer != end)
cur = buffer;
return cur;
}
const byte * RtlDatasetCursor::get()
{
return cur;
}
void RtlDatasetCursor::setDataset(size32_t _len, const void * _data)
{
buffer = (const byte *)_data;
end = buffer + _len;
cur = NULL;
}
bool RtlDatasetCursor::isValid()
{
return (cur != NULL);
}
/*
const byte * RtlDatasetCursor::next()
{
if (cur)
{
cur += getRowSize();
if (cur >= end)
cur = NULL;
}
return cur;
}
*/
//---------------------------------------------------------------------------
RtlFixedDatasetCursor::RtlFixedDatasetCursor(size32_t _len, const void * _data, unsigned _recordSize) : RtlDatasetCursor(_len, _data)
{
recordSize = _recordSize;
}
RtlFixedDatasetCursor::RtlFixedDatasetCursor() : RtlDatasetCursor(0, NULL)
{
recordSize = 1;
}
size32_t RtlFixedDatasetCursor::count()
{
return (size32_t)((end - buffer) / recordSize);
}
size32_t RtlFixedDatasetCursor::getSize()
{
return recordSize;
}
void RtlFixedDatasetCursor::init(size32_t _len, const void * _data, unsigned _recordSize)
{
recordSize = _recordSize;
setDataset(_len, _data);
}
const byte * RtlFixedDatasetCursor::next()
{
if (cur)
{
cur += recordSize;
if (cur >= end)
cur = NULL;
}
return cur;
}
const byte * RtlFixedDatasetCursor::select(unsigned idx)
{
cur = buffer + idx * recordSize;
if (cur >= end)
cur = NULL;
return cur;
}
//---------------------------------------------------------------------------
RtlVariableDatasetCursor::RtlVariableDatasetCursor(size32_t _len, const void * _data, IRecordSize & _recordSize) : RtlDatasetCursor(_len, _data)
{
recordSize = &_recordSize;
}
RtlVariableDatasetCursor::RtlVariableDatasetCursor() : RtlDatasetCursor(0, NULL)
{
recordSize = NULL;
}
void RtlVariableDatasetCursor::init(size32_t _len, const void * _data, IRecordSize & _recordSize)
{
recordSize = &_recordSize;
setDataset(_len, _data);
}
size32_t RtlVariableDatasetCursor::count()
{
const byte * finger = buffer;
unsigned c = 0;
while (finger < end)
{
finger += recordSize->getRecordSize(finger);
c++;
}
assertex(finger == end);
return c;
}
size32_t RtlVariableDatasetCursor::getSize()
{
return recordSize->getRecordSize(cur);
}
const byte * RtlVariableDatasetCursor::next()
{
if (cur)
{
cur += recordSize->getRecordSize(cur);
if (cur >= end)
cur = NULL;
}
return cur;
}
const byte * RtlVariableDatasetCursor::select(unsigned idx)
{
const byte * finger = buffer;
unsigned c = 0;
while (finger < end)
{
if (c == idx)
{
cur = finger;
return cur;
}
finger += recordSize->getRecordSize(finger);
c++;
}
assertex(finger == end);
cur = NULL;
return NULL;
}
//---------------------------------------------------------------------------
RtlLinkedDatasetCursor::RtlLinkedDatasetCursor(unsigned _numRows, byte * * _rows) : numRows(_numRows), rows(_rows)
{
cur = (unsigned)-1;
}
RtlLinkedDatasetCursor::RtlLinkedDatasetCursor()
{
numRows = 0;
rows = NULL;
cur = (unsigned)-1;
}
void RtlLinkedDatasetCursor::init(unsigned _numRows, byte * * _rows)
{
numRows = _numRows;
rows = _rows;
cur = (unsigned)-1;
}
const byte * RtlLinkedDatasetCursor::first()
{
cur = 0;
return cur < numRows ? rows[cur] : NULL;
}
const byte * RtlLinkedDatasetCursor::get()
{
return cur < numRows ? rows[cur] : NULL;
}
bool RtlLinkedDatasetCursor::isValid()
{
return (cur < numRows);
}
const byte * RtlLinkedDatasetCursor::next()
{
if (cur < numRows)
cur++;
return cur < numRows ? rows[cur] : NULL;
}
const byte * RtlLinkedDatasetCursor::select(unsigned idx)
{
cur = idx;
return cur < numRows ? rows[cur] : NULL;
}
//---------------------------------------------------------------------------
bool rtlCheckInList(const void * lhs, IRtlDatasetCursor * cursor, ICompare * compare)
{
const byte * cur;
for (cur = cursor->first(); cur; cur = cursor->next())
{
if (compare->docompare(lhs, cur) == 0)
return true;
}
return false;
}
void rtlSetToSetX(bool & outIsAll, size32_t & outLen, void * & outData, bool inIsAll, size32_t inLen, void * inData)
{
outIsAll = inIsAll;
outLen = inLen;
outData = malloc(inLen);
memcpy(outData, inData, inLen);
}
void rtlAppendSetX(bool & outIsAll, size32_t & outLen, void * & outData, bool leftIsAll, size32_t leftLen, void * leftData, bool rightIsAll, size32_t rightLen, void * rightData)
{
outIsAll = leftIsAll | rightIsAll;
if (outIsAll)
{
outLen = 0;
outData = NULL;
}
else
{
outLen = leftLen+rightLen;
outData = malloc(outLen);
memcpy(outData, leftData, leftLen);
memcpy((byte*)outData+leftLen, rightData, rightLen);
}
}
//------------------------------------------------------------------------------
RtlCompoundIterator::RtlCompoundIterator()
{
ok = false;
numLevels = 0;
iters = NULL;
cursors = NULL;
}
RtlCompoundIterator::~RtlCompoundIterator()
{
delete [] iters;
delete [] cursors;
}
void RtlCompoundIterator::addIter(unsigned idx, IRtlDatasetSimpleCursor * iter, byte * * cursor)
{
assertex(idx < numLevels);
iters[idx] = iter;
cursors[idx] = cursor;
}
void RtlCompoundIterator::init(unsigned _numLevels)
{
numLevels = _numLevels;
iters = new IRtlDatasetSimpleCursor * [numLevels];
cursors = new byte * * [numLevels];
}
//Could either duplicate this function, N times, or have it as a helper function that accesses pre-defined virtuals.
bool RtlCompoundIterator::first(unsigned level)
{
IRtlDatasetSimpleCursor * curIter = iters[level];
if (level == 0)
{
const byte * cur = curIter->first();
setCursor(level, cur);
return (cur != NULL);
}
if (!first(level-1))
return false;
loop
{
const byte * cur = curIter->first();
if (cur)
{
setCursor(level, cur);
return true;
}
if (!next(level-1))
return false;
}
}
bool RtlCompoundIterator::next(unsigned level)
{
IRtlDatasetSimpleCursor * curIter = iters[level];
const byte * cur = curIter->next();
if (cur)
{
setCursor(level, cur);
return true;
}
if (level == 0)
return false;
loop
{
if (!next(level-1))
return false;
const byte * cur = curIter->first();
if (cur)
{
setCursor(level, cur);
return true;
}
}
}
//------------------------------------------------------------------------------
void RtlSimpleIterator::addIter(unsigned idx, IRtlDatasetSimpleCursor * _iter, byte * * _cursor)
{
assertex(idx == 0);
iter = _iter;
cursor = _cursor;
*cursor = NULL;
}
bool RtlSimpleIterator::first()
{
const byte * cur = iter->first();
*cursor = (byte *)cur;
return (cur != NULL);
}
bool RtlSimpleIterator::next()
{
const byte * cur = iter->next();
*cursor = (byte *)cur;
return (cur != NULL);
}