123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236 |
- /*##############################################################################
- Copyright (C) 2011 HPCC Systems.
- All rights reserved. This program is free software: you can redistribute it and/or modify
- it under the terms of the GNU Affero General Public License as
- published by the Free Software Foundation, either version 3 of the
- License, or (at your option) any later version.
- This program is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU Affero General Public License for more details.
- You should have received a copy of the GNU Affero General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
- ############################################################################## */
- #include "platform.h"
- #include "jliball.hpp"
- #include "eclrtl.hpp"
- #include "eclhelper.hpp"
- #include "rtlds_imp.hpp"
- #include "rtlread_imp.hpp"
- #define FIRST_CHUNK_SIZE 0x100
- #define DOUBLE_LIMIT 0x10000 // must be a power of 2
- unsigned getNextSize(unsigned max, unsigned required)
- {
- if (required > DOUBLE_LIMIT)
- {
- max = (required + DOUBLE_LIMIT) & ~(DOUBLE_LIMIT-1);
- if (required >= max)
- throw MakeStringException(-1, "getNextSize: Request for %d bytes oldMax = %d", required, max);
- }
- else
- {
- if (max == 0)
- max = FIRST_CHUNK_SIZE;
- while (required >= max)
- max += max;
- }
- return max;
- }
- //---------------------------------------------------------------------------
- RtlDatasetBuilder::RtlDatasetBuilder()
- {
- maxSize = 0;
- buffer = NULL;
- totalSize = 0;
- }
- RtlDatasetBuilder::~RtlDatasetBuilder()
- {
- free(buffer);
- }
- void RtlDatasetBuilder::ensure(size32_t required)
- {
- if (required > maxSize)
- {
- maxSize = getNextSize(maxSize, required);
- buffer = (byte *)realloc(buffer, maxSize);
- if (!buffer)
- throw MakeStringException(-1, "Failed to allocate temporary dataset (requesting %d bytes)", maxSize);
- }
- }
- byte * RtlDatasetBuilder::ensureCapacity(size32_t required, const char * fieldName)
- {
- ensure(totalSize + required);
- return buffer + totalSize;
- }
- void RtlDatasetBuilder::flushDataset()
- {
- }
- void RtlDatasetBuilder::getData(size32_t & len, void * & data)
- {
- flushDataset();
- len = totalSize;
- data = malloc(totalSize);
- memcpy(data, buffer, totalSize);
- }
- size32_t RtlDatasetBuilder::getSize()
- {
- flushDataset();
- return totalSize;
- }
- byte * RtlDatasetBuilder::queryData()
- {
- flushDataset();
- return buffer;
- }
- void RtlDatasetBuilder::reportMissingRow() const
- {
- throw MakeStringException(MSGAUD_user, 1000, "RtlDatasetBuilder::row() is NULL");
- }
- //---------------------------------------------------------------------------
- RtlFixedDatasetBuilder::RtlFixedDatasetBuilder(unsigned _recordSize, unsigned maxRows)
- {
- recordSize = _recordSize;
- if ((int)maxRows > 0)
- ensure(recordSize * maxRows);
- }
- byte * RtlFixedDatasetBuilder::createSelf()
- {
- self = ensureCapacity(recordSize, NULL);
- return self;
- }
- //---------------------------------------------------------------------------
- RtlLimitedFixedDatasetBuilder::RtlLimitedFixedDatasetBuilder(unsigned _recordSize, unsigned _maxRows, DefaultRowCreator _rowCreator, IResourceContext *_ctx) : RtlFixedDatasetBuilder(_recordSize, _maxRows)
- {
- maxRows = _maxRows;
- if ((int)maxRows < 0) maxRows = 0;
- rowCreator = _rowCreator;
- ctx = _ctx;
- }
- byte * RtlLimitedFixedDatasetBuilder::createRow()
- {
- if (totalSize >= maxRows * recordSize)
- return NULL;
- return RtlFixedDatasetBuilder::createRow();
- }
- void RtlLimitedFixedDatasetBuilder::flushDataset()
- {
- if (rowCreator)
- {
- while (totalSize < maxRows * recordSize)
- {
- createRow();
- size32_t size = rowCreator(rowBuilder(), ctx);
- finalizeRow(size);
- }
- }
- RtlFixedDatasetBuilder::flushDataset();
- }
- //---------------------------------------------------------------------------
- RtlVariableDatasetBuilder::RtlVariableDatasetBuilder(IRecordSize & _recordSize)
- {
- recordSize = &_recordSize;
- maxRowSize = recordSize->getRecordSize(NULL); // initial size
- }
- byte * RtlVariableDatasetBuilder::createSelf()
- {
- self = ensureCapacity(maxRowSize, NULL);
- return self;
- }
- void RtlVariableDatasetBuilder::deserializeRow(IOutputRowDeserializer & deserializer, IRowDeserializerSource & in)
- {
- createRow();
- size32_t rowSize = deserializer.deserialize(rowBuilder(), in);
- finalizeRow(rowSize);
- }
- //---------------------------------------------------------------------------
- RtlLimitedVariableDatasetBuilder::RtlLimitedVariableDatasetBuilder(IRecordSize & _recordSize, unsigned _maxRows, DefaultRowCreator _rowCreator, IResourceContext *_ctx) : RtlVariableDatasetBuilder(_recordSize)
- {
- numRows = 0;
- maxRows = _maxRows;
- rowCreator = _rowCreator;
- ctx = _ctx;
- }
- byte * RtlLimitedVariableDatasetBuilder::createRow()
- {
- if (numRows >= maxRows)
- return NULL;
- numRows++;
- return RtlVariableDatasetBuilder::createRow();
- }
- void RtlLimitedVariableDatasetBuilder::flushDataset()
- {
- if (rowCreator)
- {
- while (numRows < maxRows)
- {
- createRow();
- size32_t thisSize = rowCreator(rowBuilder(), ctx);
- finalizeRow(thisSize);
- }
- }
- RtlVariableDatasetBuilder::flushDataset();
- }
- //---------------------------------------------------------------------------
- byte * * rtlRowsAttr::linkrows() const
- {
- if (rows)
- rtlLinkRowset(rows);
- return rows;
- }
- void rtlRowsAttr::set(size32_t _count, byte * * _rows)
- {
- setown(_count, rtlLinkRowset(_rows));
- }
- void rtlRowsAttr::setRow(IEngineRowAllocator * rowAllocator, const byte * _row)
- {
- setown(1, rowAllocator->appendRowOwn(NULL, 1, rowAllocator->linkRow(_row)));
- }
- void rtlRowsAttr::setown(size32_t _count, byte * * _rows)
- {
- dispose();
- count = _count;
- rows = _rows;
- }
- void rtlRowsAttr::dispose()
- {
- if (rows)
- rtlReleaseRowset(count, rows);
- }
- //---------------------------------------------------------------------------
- void rtlReportFieldOverflow(unsigned size, unsigned max, const RtlFieldInfo * field)
- {
- if (field)
- rtlReportFieldOverflow(size, max, field->name->str());
- else
- rtlReportRowOverflow(size, max);
- }
- void RtlRowBuilderBase::reportMissingRow() const
- {
- throw MakeStringException(MSGAUD_user, 1000, "RtlRowBuilderBase::row() is NULL");
- }
- byte * RtlDynamicRowBuilder::ensureCapacity(size32_t required, const char * fieldName)
- {
- if (required > maxLength)
- {
- if (!self)
- create();
- if (required > maxLength)
- {
- void * next = rowAllocator->resizeRow(required, self, maxLength);
- if (!next)
- {
- rtlReportFieldOverflow(required, maxLength, fieldName);
- return NULL;
- }
- self = static_cast<byte *>(next);
- }
- }
- return self;
- }
- void RtlDynamicRowBuilder::swapWith(RtlDynamicRowBuilder & other)
- {
- size32_t savedMaxLength = maxLength;
- void * savedSelf = getUnfinalizedClear();
- setown(other.getMaxLength(), other.getUnfinalizedClear());
- other.setown(savedMaxLength, savedSelf);
- }
- //---------------------------------------------------------------------------
- byte * RtlStaticRowBuilder::ensureCapacity(size32_t required, const char * fieldName)
- {
- if (required <= maxLength)
- return static_cast<byte *>(self);
- rtlReportFieldOverflow(required, maxLength, fieldName);
- return NULL;
- }
- byte * RtlStaticRowBuilder::createSelf()
- {
- throwUnexpected();
- }
- //---------------------------------------------------------------------------
- RtlLinkedDatasetBuilder::RtlLinkedDatasetBuilder(IEngineRowAllocator * _rowAllocator, int _choosenLimit) : builder(_rowAllocator, false)
- {
- rowAllocator = LINK(_rowAllocator);
- rowset = NULL;
- count = 0;
- max = 0;
- choosenLimit = (unsigned)_choosenLimit;
- }
- RtlLinkedDatasetBuilder::~RtlLinkedDatasetBuilder()
- {
- builder.clear();
- if (rowset)
- rowAllocator->releaseRowset(count, rowset);
- ::Release(rowAllocator);
- }
- void RtlLinkedDatasetBuilder::append(const void * source)
- {
- if (count < choosenLimit)
- {
- ensure(count+1);
- rowset[count] = (byte *)rowAllocator->linkRow(source);
- count++;
- }
- }
- void RtlLinkedDatasetBuilder::appendRows(size32_t num, byte * * rows)
- {
- if (num && (count < choosenLimit))
- {
- unsigned numToAdd = (count + num < choosenLimit) ? num : choosenLimit - count;
- ensure(count+numToAdd);
- for (unsigned i=0; i < numToAdd; i++)
- rowset[count+i] = (byte *)rowAllocator->linkRow(rows[i]);
- count += numToAdd;
- }
- }
- void RtlLinkedDatasetBuilder::appendOwn(const void * row)
- {
- assertex(!builder.exists());
- if (count < choosenLimit)
- {
- ensure(count+1);
- rowset[count] = (byte *)row;
- count++;
- }
- else
- rowAllocator->releaseRow(row);
- }
- byte * RtlLinkedDatasetBuilder::createRow()
- {
- if (count >= choosenLimit)
- return NULL;
- return builder.getSelf();
- }
- //cloned from thorcommon.cpp
- class RtlChildRowLinkerWalker : implements IIndirectMemberVisitor
- {
- public:
- virtual void visitRowset(size32_t count, byte * * rows)
- {
- rtlLinkRowset(rows);
- }
- virtual void visitRow(const byte * row)
- {
- rtlLinkRow(row);
- }
- };
- void RtlLinkedDatasetBuilder::cloneRow(size32_t len, const void * row)
- {
- if (count >= choosenLimit)
- return;
- byte * self = builder.ensureCapacity(len, NULL);
- memcpy(self, row, len);
-
- IOutputMetaData * meta = rowAllocator->queryOutputMeta();
- if (meta->getMetaFlags() & MDFneedserialize)
- {
- RtlChildRowLinkerWalker walker;
- meta->walkIndirectMembers(self, walker);
- }
- finalizeRow(len);
- }
- void RtlLinkedDatasetBuilder::deserializeRow(IOutputRowDeserializer & deserializer, IRowDeserializerSource & in)
- {
- builder.ensureRow();
- size32_t rowSize = deserializer.deserialize(builder, in);
- finalizeRow(rowSize);
- }
- inline void doDeserializeRowset(RtlLinkedDatasetBuilder & builder, IOutputRowDeserializer & deserializer, IRowDeserializerSource & in, offset_t marker, bool isGrouped)
- {
- byte eogPending = false;
- while (!in.finishedNested(marker))
- {
- if (isGrouped && eogPending)
- builder.appendEOG();
- builder.deserializeRow(deserializer, in);
- if (isGrouped)
- in.read(1, &eogPending);
- }
- }
- inline void doSerializeRowset(IRowSerializerTarget & out, IOutputRowSerializer * serializer, size32_t count, byte * * rows, bool isGrouped)
- {
- for (unsigned i=0; i < count; i++)
- {
- byte *row = rows[i];
- if (row) // When serializing a dictionary, there may be nulls in the rowset. These can be skipped (we rehash on deserialize)
- {
- serializer->serialize(out, rows[i]);
- if (isGrouped)
- {
- byte eogPending = (i+1 < count) && (rows[i+1] == NULL);
- out.put(1, &eogPending);
- }
- }
- }
- }
- void RtlLinkedDatasetBuilder::deserialize(IOutputRowDeserializer & deserializer, IRowDeserializerSource & in, bool isGrouped)
- {
- offset_t marker = in.beginNested();
- doDeserializeRowset(*this, deserializer, in, marker, isGrouped);
- }
- void RtlLinkedDatasetBuilder::finalizeRows()
- {
- if (count != max)
- resize(count);
- }
- void RtlLinkedDatasetBuilder::finalizeRow(size32_t rowSize)
- {
- assertex(builder.exists());
- const void * next = builder.finalizeRowClear(rowSize);
- appendOwn(next);
- }
- byte * * RtlLinkedDatasetBuilder::linkrows()
- {
- finalizeRows();
- return rtlLinkRowset(rowset);
- }
- void RtlLinkedDatasetBuilder::expand(size32_t required)
- {
- assertex(required <= choosenLimit);
- //MORE: Next factoring change this so it passes this logic over to the row allocator
- size32_t newMax = max ? max : 4;
- while (newMax < required)
- {
- newMax += newMax;
- assertex(newMax);
- }
- if (newMax > choosenLimit)
- newMax = choosenLimit;
- resize(newMax);
- }
- void RtlLinkedDatasetBuilder::resize(size32_t required)
- {
- rowset = rowAllocator->reallocRows(rowset, max, required);
- max = required;
- }
- void appendRowsToRowset(size32_t & targetCount, byte * * & targetRowset, IEngineRowAllocator * rowAllocator, size32_t extraCount, byte * * extraRows)
- {
- if (extraCount)
- {
- size32_t prevCount = targetCount;
- byte * * expandedRowset = rowAllocator->reallocRows(targetRowset, prevCount, prevCount+extraCount);
- for (unsigned i=0; i < extraCount; i++)
- expandedRowset[prevCount+i] = (byte *)rowAllocator->linkRow(extraRows[i]);
- targetCount = prevCount + extraCount;
- targetRowset = expandedRowset;
- }
- }
- inline void doDeserializeRowset(size32_t & count, byte * * & rowset, IEngineRowAllocator * _rowAllocator, IOutputRowDeserializer * deserializer, IRowDeserializerSource & in, bool isGrouped)
- {
- RtlLinkedDatasetBuilder builder(_rowAllocator);
- builder.deserialize(*deserializer, in, isGrouped);
- count = builder.getcount();
- rowset = builder.linkrows();
- }
- extern ECLRTL_API void rtlDeserializeRowset(size32_t & count, byte * * & rowset, IEngineRowAllocator * _rowAllocator, IOutputRowDeserializer * deserializer, IRowDeserializerSource & in)
- {
- doDeserializeRowset(count, rowset, _rowAllocator, deserializer, in, false);
- }
- extern ECLRTL_API void rtlDeserializeGroupedRowset(size32_t & count, byte * * & rowset, IEngineRowAllocator * _rowAllocator, IOutputRowDeserializer * deserializer, IRowDeserializerSource & in)
- {
- doDeserializeRowset(count, rowset, _rowAllocator, deserializer, in, true);
- }
- void rtlSerializeRowset(IRowSerializerTarget & out, IOutputRowSerializer * serializer, size32_t count, byte * * rows)
- {
- size32_t marker = out.beginNested();
- doSerializeRowset(out, serializer, count, rows, false);
- out.endNested(marker);
- }
- void rtlSerializeGroupedRowset(IRowSerializerTarget & out, IOutputRowSerializer * serializer, size32_t count, byte * * rows)
- {
- size32_t marker = out.beginNested();
- doSerializeRowset(out, serializer, count, rows, true);
- out.endNested(marker);
- }
- //---------------------------------------------------------------------------
- RtlLinkedDictionaryBuilder::RtlLinkedDictionaryBuilder(IEngineRowAllocator * _rowAllocator, IHThorHashLookupInfo *_hashInfo, unsigned _initialSize)
- : builder(_rowAllocator, false)
- {
- init(_rowAllocator, _hashInfo, _initialSize);
- }
- RtlLinkedDictionaryBuilder::RtlLinkedDictionaryBuilder(IEngineRowAllocator * _rowAllocator, IHThorHashLookupInfo *_hashInfo)
- : builder(_rowAllocator, false)
- {
- init(_rowAllocator, _hashInfo, 8);
- }
- void RtlLinkedDictionaryBuilder::init(IEngineRowAllocator * _rowAllocator, IHThorHashLookupInfo *_hashInfo, unsigned _initialSize)
- {
- hash = _hashInfo->queryHash();
- compare = _hashInfo->queryCompare();
- initialSize = _initialSize;
- rowAllocator = LINK(_rowAllocator);
- table = NULL;
- usedCount = 0;
- usedLimit = 0;
- tableSize = 0;
- }
- RtlLinkedDictionaryBuilder::~RtlLinkedDictionaryBuilder()
- {
- // builder.clear();
- if (table)
- rowAllocator->releaseRowset(tableSize, table);
- ::Release(rowAllocator);
- }
- void RtlLinkedDictionaryBuilder::append(const void * source)
- {
- if (source)
- {
- appendOwn(rowAllocator->linkRow(source));
- }
- }
- void RtlLinkedDictionaryBuilder::appendOwn(const void * source)
- {
- if (source)
- {
- checkSpace();
- unsigned rowidx = hash->hash(source) % tableSize;
- loop
- {
- const void *entry = table[rowidx];
- if (entry && compare->docompare(source, entry)==0)
- {
- rowAllocator->releaseRow(entry);
- usedCount--;
- entry = NULL;
- }
- if (!entry)
- {
- table[rowidx] = (byte *) source;
- usedCount++;
- break;
- }
- rowidx++;
- if (rowidx==tableSize)
- rowidx = 0;
- }
- }
- }
- void RtlLinkedDictionaryBuilder::checkSpace()
- {
- if (!table)
- {
- table = rowAllocator->createRowset(initialSize);
- tableSize = initialSize;
- memset(table, 0, tableSize*sizeof(byte *));
- usedLimit = (tableSize * 3) / 4;
- usedCount = 0;
- }
- else if (usedCount > usedLimit)
- {
- // Rehash
- byte * * oldTable = table;
- unsigned oldSize = tableSize;
- tableSize = tableSize*2;
- table = rowAllocator->createRowset(tableSize);
- memset(table, 0, tableSize * sizeof(byte *));
- usedLimit = (tableSize * 3) / 4;
- usedCount = 0;
- unsigned i;
- for (i = 0; i < oldSize; i++)
- {
- append(oldTable[i]); // we link the rows here...
- }
- rowAllocator->releaseRowset(oldSize, oldTable); // ... because this will release them
- }
- }
- void RtlLinkedDictionaryBuilder::appendRows(size32_t num, byte * * rows)
- {
- // MORE - if we know that the source is already a hashtable, we can optimize the add to an empty table...
- for (unsigned i=0; i < num; i++)
- append(rows[i]);
- }
- void RtlLinkedDictionaryBuilder::finalizeRow(size32_t rowSize)
- {
- assertex(builder.exists());
- const void * next = builder.finalizeRowClear(rowSize);
- appendOwn(next);
- }
- //---------------------------------------------------------------------------
- //These definitions should be shared with thorcommon, but to do that
- //they would need to be moved to an rtlds.ipp header, which thorcommon then included.
- class ECLRTL_API CThorRtlRowSerializer : implements IRowSerializerTarget
- {
- public:
- CThorRtlRowSerializer(MemoryBuffer & _buffer) : buffer(_buffer)
- {
- }
- virtual void put(size32_t len, const void * ptr)
- {
- buffer.append(len, ptr);
- }
- virtual size32_t beginNested()
- {
- unsigned pos = buffer.length();
- buffer.append((size32_t)0);
- return pos;
- }
- virtual void endNested(size32_t sizePos)
- {
- unsigned pos = buffer.length();
- buffer.rewrite(sizePos);
- buffer.append((size32_t)(pos - (sizePos + sizeof(size32_t))));
- buffer.rewrite(pos);
- }
- protected:
- MemoryBuffer & buffer;
- };
- inline void doDataset2RowsetX(size32_t & count, byte * * & rowset, IEngineRowAllocator * rowAllocator, IOutputRowDeserializer * deserializer, size32_t lenSrc, const void * src, bool isGrouped)
- {
- RtlLinkedDatasetBuilder builder(rowAllocator);
- Owned<ISerialStream> stream = createMemorySerialStream(src, lenSrc);
- CThorStreamDeserializerSource source(stream);
- doDeserializeRowset(builder, *deserializer, source, lenSrc, isGrouped);
- count = builder.getcount();
- rowset = builder.linkrows();
- }
- inline void doRowset2DatasetX(unsigned & tlen, void * & tgt, IOutputRowSerializer * serializer, size32_t count, byte * * rows, bool isGrouped)
- {
- MemoryBuffer buffer;
- CThorRtlRowSerializer out(buffer);
- doSerializeRowset(out, serializer, count, rows, isGrouped);
- rtlFree(tgt);
- tlen = buffer.length();
- tgt = buffer.detach(); // not strictly speaking correct - it should have been allocated with rtlMalloc();
- }
- extern ECLRTL_API void rtlDataset2RowsetX(size32_t & count, byte * * & rowset, IEngineRowAllocator * rowAllocator, IOutputRowDeserializer * deserializer, size32_t lenSrc, const void * src, bool isGrouped)
- {
- doDataset2RowsetX(count, rowset, rowAllocator, deserializer, lenSrc, src, isGrouped);
- }
- extern ECLRTL_API void rtlDataset2RowsetX(size32_t & count, byte * * & rowset, IEngineRowAllocator * rowAllocator, IOutputRowDeserializer * deserializer, size32_t lenSrc, const void * src)
- {
- doDataset2RowsetX(count, rowset, rowAllocator, deserializer, lenSrc, src, false);
- }
- extern ECLRTL_API void rtlGroupedDataset2RowsetX(size32_t & count, byte * * & rowset, IEngineRowAllocator * rowAllocator, IOutputRowDeserializer * deserializer, size32_t lenSrc, const void * src)
- {
- doDataset2RowsetX(count, rowset, rowAllocator, deserializer, lenSrc, src, true);
- }
- extern ECLRTL_API void rtlRowset2DatasetX(unsigned & tlen, void * & tgt, IOutputRowSerializer * serializer, size32_t count, byte * * rows, bool isGrouped)
- {
- doRowset2DatasetX(tlen, tgt, serializer, count, rows, isGrouped);
- }
- extern ECLRTL_API void rtlRowset2DatasetX(unsigned & tlen, void * & tgt, IOutputRowSerializer * serializer, size32_t count, byte * * rows)
- {
- doRowset2DatasetX(tlen, tgt, serializer, count, rows, false);
- }
- extern ECLRTL_API void rtlGroupedRowset2DatasetX(unsigned & tlen, void * & tgt, IOutputRowSerializer * serializer, size32_t count, byte * * rows)
- {
- doRowset2DatasetX(tlen, tgt, serializer, count, rows, true);
- }
- void deserializeRowsetX(size32_t & count, byte * * & rowset, IEngineRowAllocator * _rowAllocator, IOutputRowDeserializer * deserializer, MemoryBuffer &in)
- {
- Owned<ISerialStream> stream = createMemoryBufferSerialStream(in);
- CThorStreamDeserializerSource rowSource(stream);
- doDeserializeRowset(count, rowset, _rowAllocator, deserializer, rowSource, false);
- }
- void deserializeGroupedRowsetX(size32_t & count, byte * * & rowset, IEngineRowAllocator * _rowAllocator, IOutputRowDeserializer * deserializer, MemoryBuffer &in)
- {
- Owned<ISerialStream> stream = createMemoryBufferSerialStream(in);
- CThorStreamDeserializerSource rowSource(stream);
- doDeserializeRowset(count, rowset, _rowAllocator, deserializer, rowSource, true);
- }
- void serializeRowsetX(size32_t count, byte * * rows, IOutputRowSerializer * serializer, MemoryBuffer & buffer)
- {
- CThorRtlRowSerializer out(buffer);
- rtlSerializeRowset(out, serializer, count, rows);
- }
- void serializeGroupedRowsetX(size32_t count, byte * * rows, IOutputRowSerializer * serializer, MemoryBuffer & buffer)
- {
- CThorRtlRowSerializer out(buffer);
- rtlSerializeGroupedRowset(out, serializer, count, rows);
- }
- void serializeRow(const void * row, IOutputRowSerializer * serializer, MemoryBuffer & buffer)
- {
- CThorRtlRowSerializer out(buffer);
- serializer->serialize(out, static_cast<const byte *>(row));
- }
- extern ECLRTL_API byte * rtlDeserializeBufferRow(IEngineRowAllocator * rowAllocator, IOutputRowDeserializer * deserializer, MemoryBuffer & buffer)
- {
- Owned<ISerialStream> stream = createMemoryBufferSerialStream(buffer);
- CThorStreamDeserializerSource source(stream);
- RtlDynamicRowBuilder rowBuilder(rowAllocator);
- size32_t rowSize = deserializer->deserialize(rowBuilder, source);
- return static_cast<byte *>(const_cast<void *>(rowBuilder.finalizeRowClear(rowSize)));
- }
- extern ECLRTL_API byte * rtlDeserializeRow(IEngineRowAllocator * rowAllocator, IOutputRowDeserializer * deserializer, const void * src)
- {
- Owned<ISerialStream> stream = createMemorySerialStream(src, 0x7fffffff);
- CThorStreamDeserializerSource source(stream);
- RtlDynamicRowBuilder rowBuilder(rowAllocator);
- size32_t rowSize = deserializer->deserialize(rowBuilder, source);
- return static_cast<byte *>(const_cast<void *>(rowBuilder.finalizeRowClear(rowSize)));
- }
- extern ECLRTL_API size32_t rtlSerializeRow(size32_t lenOut, void * out, IOutputRowSerializer * serializer, const void * src)
- {
- MemoryBuffer buffer;
- CThorRtlRowSerializer result(buffer);
- buffer.setBuffer(lenOut, out, false);
- buffer.setWritePos(0);
- serializer->serialize(result, (const byte *)src);
- return buffer.length();
- }
- class ECLRTL_API CThorRtlBuilderSerializer : implements IRowSerializerTarget
- {
- public:
- CThorRtlBuilderSerializer(ARowBuilder & _builder) : builder(_builder)
- {
- offset = 0;
- }
- virtual void put(size32_t len, const void * ptr)
- {
- byte * data = builder.ensureCapacity(offset + len, "");
- memcpy(data+offset, ptr, len);
- offset += len;
- }
- virtual size32_t beginNested()
- {
- unsigned pos = offset;
- offset += sizeof(size32_t);
- return pos;
- }
- virtual void endNested(size32_t sizePos)
- {
- byte * self = builder.getSelf();
- *(size32_t *)(self + sizePos) = offset - (sizePos + sizeof(size32_t));
- }
- inline size32_t length() const { return offset; }
- protected:
- ARowBuilder & builder;
- size32_t offset;
- };
- extern ECLRTL_API size32_t rtlDeserializeToBuilder(ARowBuilder & builder, IOutputRowDeserializer * deserializer, const void * src)
- {
- Owned<ISerialStream> stream = createMemorySerialStream(src, 0x7fffffff);
- CThorStreamDeserializerSource source(stream);
- return deserializer->deserialize(builder, source);
- }
- extern ECLRTL_API size32_t rtlSerializeToBuilder(ARowBuilder & builder, IOutputRowSerializer * serializer, const void * src)
- {
- CThorRtlBuilderSerializer target(builder);
- serializer->serialize(target, (const byte *)src);
- return target.length();
- }
- //---------------------------------------------------------------------------
- RtlDatasetCursor::RtlDatasetCursor(size32_t _len, const void * _data)
- {
- setDataset(_len, _data);
- }
- bool RtlDatasetCursor::exists()
- {
- return (end != buffer);
- }
- const byte * RtlDatasetCursor::first()
- {
- if (buffer != end)
- cur = buffer;
- return cur;
- }
- const byte * RtlDatasetCursor::get()
- {
- return cur;
- }
- void RtlDatasetCursor::setDataset(size32_t _len, const void * _data)
- {
- buffer = (const byte *)_data;
- end = buffer + _len;
- cur = NULL;
- }
- bool RtlDatasetCursor::isValid()
- {
- return (cur != NULL);
- }
- /*
- const byte * RtlDatasetCursor::next()
- {
- if (cur)
- {
- cur += getRowSize();
- if (cur >= end)
- cur = NULL;
- }
- return cur;
- }
- */
- //---------------------------------------------------------------------------
- RtlFixedDatasetCursor::RtlFixedDatasetCursor(size32_t _len, const void * _data, unsigned _recordSize) : RtlDatasetCursor(_len, _data)
- {
- recordSize = _recordSize;
- }
- RtlFixedDatasetCursor::RtlFixedDatasetCursor() : RtlDatasetCursor(0, NULL)
- {
- recordSize = 1;
- }
- size32_t RtlFixedDatasetCursor::count()
- {
- return (size32_t)((end - buffer) / recordSize);
- }
- size32_t RtlFixedDatasetCursor::getSize()
- {
- return recordSize;
- }
- void RtlFixedDatasetCursor::init(size32_t _len, const void * _data, unsigned _recordSize)
- {
- recordSize = _recordSize;
- setDataset(_len, _data);
- }
- const byte * RtlFixedDatasetCursor::next()
- {
- if (cur)
- {
- cur += recordSize;
- if (cur >= end)
- cur = NULL;
- }
- return cur;
- }
- const byte * RtlFixedDatasetCursor::select(unsigned idx)
- {
- cur = buffer + idx * recordSize;
- if (cur >= end)
- cur = NULL;
- return cur;
- }
- //---------------------------------------------------------------------------
- RtlVariableDatasetCursor::RtlVariableDatasetCursor(size32_t _len, const void * _data, IRecordSize & _recordSize) : RtlDatasetCursor(_len, _data)
- {
- recordSize = &_recordSize;
- }
- RtlVariableDatasetCursor::RtlVariableDatasetCursor() : RtlDatasetCursor(0, NULL)
- {
- recordSize = NULL;
- }
- void RtlVariableDatasetCursor::init(size32_t _len, const void * _data, IRecordSize & _recordSize)
- {
- recordSize = &_recordSize;
- setDataset(_len, _data);
- }
- size32_t RtlVariableDatasetCursor::count()
- {
- const byte * finger = buffer;
- unsigned c = 0;
- while (finger < end)
- {
- finger += recordSize->getRecordSize(finger);
- c++;
- }
- assertex(finger == end);
- return c;
- }
- size32_t RtlVariableDatasetCursor::getSize()
- {
- return recordSize->getRecordSize(cur);
- }
- const byte * RtlVariableDatasetCursor::next()
- {
- if (cur)
- {
- cur += recordSize->getRecordSize(cur);
- if (cur >= end)
- cur = NULL;
- }
- return cur;
- }
- const byte * RtlVariableDatasetCursor::select(unsigned idx)
- {
- const byte * finger = buffer;
- unsigned c = 0;
- while (finger < end)
- {
- if (c == idx)
- {
- cur = finger;
- return cur;
- }
- finger += recordSize->getRecordSize(finger);
- c++;
- }
- assertex(finger == end);
- cur = NULL;
- return NULL;
- }
- //---------------------------------------------------------------------------
- RtlLinkedDatasetCursor::RtlLinkedDatasetCursor(unsigned _numRows, byte * * _rows) : numRows(_numRows), rows(_rows)
- {
- cur = (unsigned)-1;
- }
- RtlLinkedDatasetCursor::RtlLinkedDatasetCursor()
- {
- numRows = 0;
- rows = NULL;
- cur = (unsigned)-1;
- }
- void RtlLinkedDatasetCursor::init(unsigned _numRows, byte * * _rows)
- {
- numRows = _numRows;
- rows = _rows;
- cur = (unsigned)-1;
- }
- const byte * RtlLinkedDatasetCursor::first()
- {
- cur = 0;
- return cur < numRows ? rows[cur] : NULL;
- }
- const byte * RtlLinkedDatasetCursor::get()
- {
- return cur < numRows ? rows[cur] : NULL;
- }
- bool RtlLinkedDatasetCursor::isValid()
- {
- return (cur < numRows);
- }
- const byte * RtlLinkedDatasetCursor::next()
- {
- if (cur < numRows)
- cur++;
- return cur < numRows ? rows[cur] : NULL;
- }
- const byte * RtlLinkedDatasetCursor::select(unsigned idx)
- {
- cur = idx;
- return cur < numRows ? rows[cur] : NULL;
- }
- //---------------------------------------------------------------------------
- bool rtlCheckInList(const void * lhs, IRtlDatasetCursor * cursor, ICompare * compare)
- {
- const byte * cur;
- for (cur = cursor->first(); cur; cur = cursor->next())
- {
- if (compare->docompare(lhs, cur) == 0)
- return true;
- }
- return false;
- }
- void rtlSetToSetX(bool & outIsAll, size32_t & outLen, void * & outData, bool inIsAll, size32_t inLen, void * inData)
- {
- outIsAll = inIsAll;
- outLen = inLen;
- outData = malloc(inLen);
- memcpy(outData, inData, inLen);
- }
- void rtlAppendSetX(bool & outIsAll, size32_t & outLen, void * & outData, bool leftIsAll, size32_t leftLen, void * leftData, bool rightIsAll, size32_t rightLen, void * rightData)
- {
- outIsAll = leftIsAll | rightIsAll;
- if (outIsAll)
- {
- outLen = 0;
- outData = NULL;
- }
- else
- {
- outLen = leftLen+rightLen;
- outData = malloc(outLen);
- memcpy(outData, leftData, leftLen);
- memcpy((byte*)outData+leftLen, rightData, rightLen);
- }
- }
- //------------------------------------------------------------------------------
- RtlCompoundIterator::RtlCompoundIterator()
- {
- ok = false;
- numLevels = 0;
- iters = NULL;
- cursors = NULL;
- }
- RtlCompoundIterator::~RtlCompoundIterator()
- {
- delete [] iters;
- delete [] cursors;
- }
- void RtlCompoundIterator::addIter(unsigned idx, IRtlDatasetSimpleCursor * iter, byte * * cursor)
- {
- assertex(idx < numLevels);
- iters[idx] = iter;
- cursors[idx] = cursor;
- }
- void RtlCompoundIterator::init(unsigned _numLevels)
- {
- numLevels = _numLevels;
- iters = new IRtlDatasetSimpleCursor * [numLevels];
- cursors = new byte * * [numLevels];
- }
- //Could either duplicate this function, N times, or have it as a helper function that accesses pre-defined virtuals.
- bool RtlCompoundIterator::first(unsigned level)
- {
- IRtlDatasetSimpleCursor * curIter = iters[level];
- if (level == 0)
- {
- const byte * cur = curIter->first();
- setCursor(level, cur);
- return (cur != NULL);
- }
- if (!first(level-1))
- return false;
- loop
- {
- const byte * cur = curIter->first();
- if (cur)
- {
- setCursor(level, cur);
- return true;
- }
- if (!next(level-1))
- return false;
- }
- }
- bool RtlCompoundIterator::next(unsigned level)
- {
- IRtlDatasetSimpleCursor * curIter = iters[level];
- const byte * cur = curIter->next();
- if (cur)
- {
- setCursor(level, cur);
- return true;
- }
- if (level == 0)
- return false;
- loop
- {
- if (!next(level-1))
- return false;
- const byte * cur = curIter->first();
- if (cur)
- {
- setCursor(level, cur);
- return true;
- }
- }
- }
- //------------------------------------------------------------------------------
- void RtlSimpleIterator::addIter(unsigned idx, IRtlDatasetSimpleCursor * _iter, byte * * _cursor)
- {
- assertex(idx == 0);
- iter = _iter;
- cursor = _cursor;
- *cursor = NULL;
- }
- bool RtlSimpleIterator::first()
- {
- const byte * cur = iter->first();
- *cursor = (byte *)cur;
- return (cur != NULL);
- }
- bool RtlSimpleIterator::next()
- {
- const byte * cur = iter->next();
- *cursor = (byte *)cur;
- return (cur != NULL);
- }
|