/*##############################################################################
Copyright (C) 2011 HPCC Systems.
All rights reserved. This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see .
############################################################################## */
#include "jexcept.hpp"
#include "jmisc.hpp"
#include "jthread.hpp"
#include "jsocket.hpp"
#include "jprop.hpp"
#include "jdebug.hpp"
#include "jlzw.hpp"
#include "junicode.hpp"
#include "eclhelper.hpp"
#include "thorcommon.ipp"
#include "eclrtl.hpp"
#include "rtlread_imp.hpp"
#include "thorstep.hpp"
#define ROWAGG_PERROWOVERHEAD (sizeof(AggregateRowBuilder))
RowAggregator::RowAggregator(IHThorHashAggregateExtra &_extra, IHThorRowAggregator & _helper) : helper(_helper)
{
comparer = _extra.queryCompareRowElement();
hasher = _extra.queryHash();
elementHasher = _extra.queryHashElement();
elementComparer = _extra.queryCompareElements();
cursor = NULL;
eof = false;
totalSize = overhead = 0;
}
RowAggregator::~RowAggregator()
{
reset();
}
void RowAggregator::start(IEngineRowAllocator *_rowAllocator)
{
rowAllocator.set(_rowAllocator);
}
void RowAggregator::reset()
{
while (!eof)
{
AggregateRowBuilder *n = nextResult();
if (n)
n->Release();
}
SuperHashTable::releaseAll();
eof = false;
cursor = NULL;
rowAllocator.clear();
totalSize = overhead = 0;
}
AggregateRowBuilder &RowAggregator::addRow(const void * row)
{
AggregateRowBuilder *result;
unsigned hash = hasher->hash(row);
void * match = find(hash, row);
if (match)
{
result = static_cast(match);
totalSize -= result->querySize();
size32_t sz = helper.processNext(*result, row);
result->setSize(sz);
totalSize += sz;
}
else
{
Owned rowBuilder = new AggregateRowBuilder(rowAllocator, hash);
helper.clearAggregate(*rowBuilder);
size32_t sz = helper.processFirst(*rowBuilder, row);
rowBuilder->setSize(sz);
result = rowBuilder.getClear();
addNew(result, hash);
totalSize += sz;
overhead += ROWAGG_PERROWOVERHEAD;
}
return *result;
}
void RowAggregator::mergeElement(const void * otherElement)
{
unsigned hash = elementHasher->hash(otherElement);
void * match = findElement(hash, otherElement);
if (match)
{
AggregateRowBuilder *rowBuilder = static_cast(match);
totalSize -= rowBuilder->querySize();
size32_t sz = helper.mergeAggregate(*rowBuilder, otherElement);
rowBuilder->setSize(sz);
totalSize += sz;
}
else
{
Owned rowBuilder = new AggregateRowBuilder(rowAllocator, hash);
rowBuilder->setSize(cloneRow(*rowBuilder, otherElement, rowAllocator->queryOutputMeta()));
addNew(rowBuilder.getClear(), hash);
}
}
const void * RowAggregator::getFindParam(const void *et) const
{
// Slightly odd name for this function... it actually gets the comparable element
const AggregateRowBuilder *rb = static_cast(et);
return rb->row();
}
bool RowAggregator::matchesFindParam(const void *et, const void *key, unsigned fphash) const
{
if (fphash != hashFromElement(et))
return false;
// et = element in the table (an AggregateRowBuilder) key = new row (in input row layout).
return comparer->docompare(key, getFindParam(et)) == 0;
}
bool RowAggregator::matchesElement(const void *et, const void * searchET) const
{
return elementComparer->docompare(getFindParam(et), searchET) == 0;
}
AggregateRowBuilder *RowAggregator::nextResult()
{
void *ret = next(cursor);
if (!ret)
{
eof = true;
return NULL;
}
cursor = ret;
return static_cast(ret);
}
//=====================================================================================================
void CStreamMerger::fillheap(const void * seek, unsigned numFields, const SmartStepExtra * stepExtra)
{
assertex(activeInputs == 0);
for(unsigned i = 0; i < numInputs; i++)
if(pullInput(i, seek, numFields, stepExtra))
mergeheap[activeInputs++] = i;
}
void CStreamMerger::permute(const void * seek, unsigned numFields, const SmartStepExtra * stepExtra)
{
// the tree structure: element p has children p*2+1 and p*2+2, or element c has parent (unsigned)(c-1)/2
// the heap property: no element should be smaller than its parent
// the dedup variant: if(dedup), the top of the heap should also not be equal to either child
// the method: establish this by starting with the parent of the bottom element and working up to the top element, sifting each down to its correct place
if (activeInputs >= 2)
for(unsigned p = (activeInputs-2)/2; p > 0; --p)
siftDown(p);
if(dedup)
siftDownDedupTop(seek, numFields, stepExtra);
else
siftDown(0);
}
const void * CStreamMerger::consumeTop()
{
unsigned top = mergeheap[0];
if (!pullConsumes)
consumeInput(top);
const void *next = pending[top];
pending[top] = NULL;
return next;
}
bool CStreamMerger::ensureNext(const void * seek, unsigned numFields, bool & wasCompleteMatch, const SmartStepExtra * stepExtra)
{
//wasCompleteMatch must be initialised from the actual row returned. (See bug #30388)
if (first)
{
fillheap(seek, numFields, stepExtra);
permute(seek, numFields, stepExtra);
first = false;
if (activeInputs == 0)
return false;
unsigned top = mergeheap[0];
wasCompleteMatch = pendingMatches[top];
return true;
}
while (activeInputs)
{
unsigned top = mergeheap[0];
const void *next = pending[top];
if (next)
{
if (seek)
{
int c = rangeCompare->docompare(next, seek, numFields);
if (c >= 0)
{
if (stepExtra->returnMismatches() && (c > 0))
{
wasCompleteMatch = pendingMatches[top];
return true;
}
else
{
if (pendingMatches[top])
return true;
}
}
}
else
{
if (pendingMatches[top])
return true;
}
skipInput(top);
}
if(!pullInput(top, seek, numFields, stepExtra))
if(!promote(0))
return false;
// we have changed the element at the top of the heap, so need to sift it down to maintain the heap property
if(dedup)
siftDownDedupTop(seek, numFields, stepExtra);
else
siftDown(0);
}
return false;
}
bool CStreamMerger::ensureNext()
{
bool isCompleteMatch = true;
return ensureNext(NULL, 0, isCompleteMatch, NULL);
}
void CStreamMerger::permute()
{
permute(NULL, 0, NULL);
}
bool CStreamMerger::promote(unsigned p)
{
activeInputs--;
if(activeInputs == p)
return false;
mergeheap[p] = mergeheap[activeInputs];
return true;
}
void CStreamMerger::siftDownDedupTop(const void * seek, unsigned numFields, const SmartStepExtra * stepExtra)
{
// same as siftDown(0), except that it also ensures that the top of the heap is not equal to either of its children
if(activeInputs < 2)
return;
unsigned c = 1;
int childcmp = 1;
if(activeInputs >= 3)
{
childcmp = compare->docompare(pending[mergeheap[2]], pending[mergeheap[1]]);
if(childcmp < 0)
c = 2;
}
int cmp = compare->docompare(pending[mergeheap[c]], pending[mergeheap[0]]);
if(cmp > 0)
return;
// the following loop ensures the correct property holds on the smaller branch, and that childcmp==0 iff the top matches the other branch
while(cmp <= 0)
{
if(cmp == 0)
{
if(mergeheap[c] < mergeheap[0])
{
unsigned r = mergeheap[c];
mergeheap[c] = mergeheap[0];
mergeheap[0] = r;
}
unsigned top = mergeheap[c];
skipInput(top);
if(!pullInput(top, seek, numFields, stepExtra))
if(!promote(c))
break;
siftDown(c);
}
else
{
unsigned r = mergeheap[c];
mergeheap[c] = mergeheap[0];
mergeheap[0] = r;
if(siftDown(c))
break;
}
cmp = compare->docompare(pending[mergeheap[c]], pending[mergeheap[0]]);
}
// the following loop ensures the uniqueness property holds on the other branch too
c = 3-c;
if(activeInputs <= c)
return;
while(childcmp == 0)
{
if(mergeheap[c] < mergeheap[0])
{
unsigned r = mergeheap[c];
mergeheap[c] = mergeheap[0];
mergeheap[0] = r;
}
unsigned top = mergeheap[c];
skipInput(top);
if(!pullInput(top, seek, numFields, stepExtra))
if(!promote(c))
break;
siftDown(c);
childcmp = compare->docompare(pending[mergeheap[c]], pending[mergeheap[0]]);
}
}
void CStreamMerger::cleanup()
{
clearPending();
delete [] pending;
pending = NULL;
delete [] pendingMatches;
pendingMatches = NULL;
delete [] mergeheap;
mergeheap = NULL;
}
void CStreamMerger::clearPending()
{
if (pending && activeInputs)
{
for(unsigned i = 0; i < numInputs; i++)
{
if (pullConsumes)
releaseRow(pending[i]);
pending[i] = NULL;
}
activeInputs = 0;
}
first = true;
}
CStreamMerger::CStreamMerger(bool _pullConsumes)
{
pending = NULL;
pendingMatches = NULL;
mergeheap = NULL;
compare = NULL;
rangeCompare = NULL;
dedup = false;
activeInputs = 0;
pullConsumes = _pullConsumes;
numInputs = 0;
first = true;
}
CStreamMerger::~CStreamMerger()
{
//can't call cleanup() because virtual releaseRow() won't be defined.
// NOTE: use assert rather than assertex as exceptions from within destructors are not handled well.
assert(!pending && !mergeheap);
}
void CStreamMerger::init(ICompare * _compare, bool _dedup, IRangeCompare * _rangeCompare)
{
compare = _compare;
dedup = _dedup;
rangeCompare = _rangeCompare;
}
void CStreamMerger::initInputs(unsigned _numInputs)
{
assertex(!pending); // cleanup should have been called before reinitializing
numInputs = _numInputs;
mergeheap = new unsigned[numInputs];
pending = new const void *[numInputs];
pendingMatches = new bool [numInputs];
for (unsigned i = 0; i < numInputs; i++)
pending[i] = NULL;
activeInputs = 0;
first = true;
}
void CStreamMerger::consumeInput(unsigned i)
{
//should be over-ridden if pullConsumes is false;
throwUnexpected();
}
void CStreamMerger::skipInput(unsigned i)
{
if (!pullConsumes)
consumeInput(i);
releaseRow(pending[i]);
pending[i] = NULL;
}
void CStreamMerger::primeRows(const void * * rows)
{
assertex(first && (activeInputs == 0));
first = false;
for(unsigned i = 0; i < numInputs; i++)
{
if ((pending[i] = rows[i]) != NULL)
{
mergeheap[activeInputs++] = i;
pendingMatches[i] = true;
}
}
permute();
}
const void * CStreamMerger::nextRow()
{
if (ensureNext())
return consumeTop();
return NULL;
}
const void * CStreamMerger::queryNextRow()
{
if (ensureNext())
return pending[mergeheap[0]];
return NULL;
}
unsigned CStreamMerger::queryNextInput()
{
if (ensureNext())
return mergeheap[0];
return NotFound;
}
const void * CStreamMerger::nextRowGE(const void * seek, unsigned numFields, bool & wasCompleteMatch, const SmartStepExtra & stepExtra)
{
if (ensureNext(seek, numFields, wasCompleteMatch, &stepExtra))
return consumeTop();
return NULL;
}
void CStreamMerger::skipRow()
{
assertex(!first);
skipInput(mergeheap[0]);
}
//=====================================================================================================
CThorDemoRowSerializer::CThorDemoRowSerializer(MemoryBuffer & _buffer) : buffer(_buffer)
{
nesting = 0;
}
void CThorDemoRowSerializer::put(size32_t len, const void * ptr)
{
buffer.append(len, ptr);
//ok to flush if nesting == 0;
}
size32_t CThorDemoRowSerializer::beginNested()
{
nesting++;
unsigned pos = buffer.length();
buffer.append((size32_t)0);
return pos;
}
void CThorDemoRowSerializer::endNested(size32_t sizePos)
{
unsigned pos = buffer.length();
buffer.rewrite(sizePos);
buffer.append((size32_t)(pos - (sizePos + sizeof(size32_t))));
buffer.rewrite(pos);
nesting--;
}
IOutputRowSerializer * CachedOutputMetaData::createRowSerializer(ICodeContext * ctx, unsigned activityId) const
{
if (metaFlags & (MDFhasserialize|MDFneedserialize))
return meta->createRowSerializer(ctx, activityId);
if (isFixedSize())
return new CSimpleFixedRowSerializer(getFixedSize());
return new CSimpleVariableRowSerializer(this);
}
IOutputRowDeserializer * CachedOutputMetaData::createRowDeserializer(ICodeContext * ctx, unsigned activityId) const
{
if (metaFlags & (MDFhasserialize|MDFneedserialize))
return meta->createRowDeserializer(ctx, activityId);
if (isFixedSize())
return new CSimpleFixedRowDeserializer(getFixedSize());
assertex(!"createRowDeserializer variable meta has no serializer");
//return new CSimpleVariableRowDeserializer(this);
return NULL;
}
void CSizingSerializer::put(size32_t len, const void * ptr)
{
totalsize += len;
}
size32_t CSizingSerializer::beginNested()
{
totalsize += sizeof(size32_t);
return totalsize;
}
void CSizingSerializer::endNested(size32_t position)
{
}
void CMemoryRowSerializer::put(size32_t len, const void * ptr)
{
buffer.append(len, ptr);
}
size32_t CMemoryRowSerializer::beginNested()
{
nesting++;
unsigned pos = buffer.length();
buffer.append((size32_t)0);
return pos;
}
void CMemoryRowSerializer::endNested(size32_t sizePos)
{
size32_t sz = buffer.length()-(sizePos + sizeof(size32_t));
buffer.writeDirect(sizePos,sizeof(sz),&sz);
nesting--;
}
static void ensureClassesAreNotAbstract()
{
MemoryBuffer temp;
CThorStreamDeserializerSource x1(NULL);
CThorContiguousRowBuffer x2(NULL);
CSizingSerializer x3;
CMemoryRowSerializer x4(temp);
}
//=====================================================================================================
class ChildRowLinkerWalker : implements IIndirectMemberVisitor
{
public:
virtual void visitRowset(size32_t count, byte * * rows)
{
rtlLinkRowset(rows);
}
virtual void visitRow(const byte * row)
{
rtlLinkRow(row);
}
};
//Deprecated - should use the second definition below
void * cloneRow(IEngineRowAllocator * allocator, const void * row, size32_t &sizeout)
{
IOutputMetaData * meta = allocator->queryOutputMeta();
void * ret = allocator->createRow();
sizeout = meta->getRecordSize(row); // TBD could be better?
//GH this may no longer be big enough
memcpy(ret, row, sizeout);
if (meta->getMetaFlags() & MDFneedserialize)
{
ChildRowLinkerWalker walker;
meta->walkIndirectMembers(static_cast(ret), walker);
}
//NB: Does not call finalizeRow()...
return ret;
}
//the visitor callback is used to ensure link counts for children are updated.
size32_t cloneRow(ARowBuilder & rowBuilder, const void * row, IOutputMetaData * meta)
{
size32_t rowSize = meta->getRecordSize(row); // TBD could be better?
byte * self = rowBuilder.ensureCapacity(rowSize, NULL);
memcpy(self, row, rowSize);
if (meta->getMetaFlags() & MDFneedserialize)
{
ChildRowLinkerWalker walker;
meta->walkIndirectMembers(self, walker);
}
return rowSize;
}
//---------------------------------------------------------------------------------------------------
extern const char * getActivityText(ThorActivityKind kind)
{
switch (kind)
{
case TAKnone: return "None";
case TAKdiskwrite: return "Disk Write";
case TAKsort: return "Sort";
case TAKdedup: return "Dedup";
case TAKfilter: return "Filter";
case TAKsplit: return "Split";
case TAKproject: return "Project";
case TAKrollup: return "Rollup";
case TAKiterate: return "Iterate";
case TAKaggregate: return "Aggregate";
case TAKhashaggregate: return "Hash Aggregate";
case TAKfirstn: return "Firstn";
case TAKsample: return "Sample";
case TAKdegroup: return "Degroup";
case TAKjoin: return "Join";
case TAKhashjoin: return "Hash Join";
case TAKlookupjoin: return "Lookup Join";
case TAKselfjoin: return "Self Join";
case TAKkeyedjoin: return "Keyed Join";
case TAKgroup: return "Group";
case TAKworkunitwrite: return "Output";
case TAKfunnel: return "Funnel";
case TAKapply: return "Apply";
case TAKtemptable: return "Inline Dataset";
case TAKtemprow: return "Inline Row";
case TAKhashdistribute: return "Hash Distribute";
case TAKhashdedup: return "Hash Dedup";
case TAKnormalize: return "Normalize";
case TAKremoteresult: return "Remote Result";
case TAKpull: return "Pull";
case TAKdenormalize: return "Denormalize";
case TAKnormalizechild: return "Normalize Child";
case TAKchilddataset: return "Child Dataset";
case TAKselectn: return "Select Nth";
case TAKenth: return "Enth";
case TAKif: return "If";
case TAKnull: return "Null";
case TAKdistribution: return "Distribution";
case TAKcountproject: return "Count Project";
case TAKchoosesets: return "Choose Sets";
case TAKpiperead: return "Pipe Read";
case TAKpipewrite: return "Pipe Write";
case TAKcsvwrite: return "Csv Write";
case TAKpipethrough: return "Pipe Through";
case TAKindexwrite: return "Index Write";
case TAKchoosesetsenth: return "Choose Sets Enth";
case TAKchoosesetslast: return "Choose Sets Last";
case TAKfetch: return "Fetch";
case TAKhashdenormalize: return "Hash Denormalize";
case TAKworkunitread: return "Read";
case TAKthroughaggregate: return "Through Aggregate";
case TAKspill: return "Spill";
case TAKcase: return "Case";
case TAKlimit: return "Limit";
case TAKcsvfetch: return "Csv Fetch";
case TAKxmlwrite: return "Xml Write";
case TAKparse: return "Parse";
case TAKcountdisk: return "Count Disk";
case TAKsideeffect: return "Simple Action";
case TAKtopn: return "Top N";
case TAKmerge: return "Merge";
case TAKxmlfetch: return "Xml Fetch";
case TAKxmlparse: return "Parse Xml";
case TAKkeyeddistribute: return "Keyed Distribute";
case TAKjoinlight: return "Lightweight Join";
case TAKalljoin: return "All Join";
case TAKsoap_rowdataset: return "SOAP dataset";
case TAKsoap_rowaction: return "SOAP action";
case TAKsoap_datasetdataset: return "SOAP dataset";
case TAKsoap_datasetaction: return "SOAP action";
case TAKkeydiff: return "Key Difference";
case TAKkeypatch: return "Key Patch";
case TAKkeyeddenormalize: return "Keyed Denormalize";
case TAKsequential: return "Sequential";
case TAKparallel: return "Parallel";
case TAKchilditerator: return "Child Dataset";
case TAKdatasetresult: return "Dataset Result";
case TAKrowresult: return "Row Result";
case TAKchildif: return "If";
case TAKpartition: return "Partition Distribute";
case TAKsubgraph: return "Sub Graph";
case TAKlocalgraph: return "Local Graph";
case TAKifaction: return "If Action";
case TAKemptyaction: return "Empty Action";
case TAKskiplimit: return "Skip Limit";
case TAKdiskread: return "Disk Read";
case TAKdisknormalize: return "Disk Normalize";
case TAKdiskaggregate: return "Disk Aggregate";
case TAKdiskcount: return "Disk Count";
case TAKdiskgroupaggregate: return "Disk Grouped Aggregate";
case TAKindexread: return "Index Read";
case TAKindexnormalize: return "Index Normalize";
case TAKindexaggregate: return "Index Aggregate";
case TAKindexcount: return "Index Count";
case TAKindexgroupaggregate: return "Index Grouped Aggregate";
case TAKchildnormalize: return "Child Normalize";
case TAKchildaggregate: return "Child Aggregate";
case TAKchildgroupaggregate: return "Child Grouped Aggregate";
case TAKchildthroughnormalize: return "Normalize";
case TAKcsvread: return "Csv Read";
case TAKxmlread: return "Xml Read";
case TAKlocalresultread: return "Read Local Result";
case TAKlocalresultwrite: return "Local Result";
case TAKcombine: return "Combine";
case TAKregroup: return "Regroup";
case TAKrollupgroup: return "Rollup Group";
case TAKcombinegroup: return "Combine Group";
case TAKlookupdenormalize: return "Lookup Denormalize";
case TAKalldenormalize: return "All Denormalize";
case TAKdenormalizegroup: return "Denormalize Group";
case TAKhashdenormalizegroup: return "Hash Denormalize Group";
case TAKlookupdenormalizegroup: return "Lookup Denormalize Group";
case TAKkeyeddenormalizegroup: return "Keyed Denormalize Group";
case TAKalldenormalizegroup: return "All Denormalize Group";
case TAKlocalresultspill: return "Spill Local Result";
case TAKsimpleaction: return "Action";
case TAKloopcount: return "Loop";
case TAKlooprow: return "Loop";
case TAKloopdataset: return "Loop";
case TAKchildcase: return "Case";
case TAKremotegraph: return "Remote";
case TAKlibrarycall: return "Library Call";
case TAKrawiterator: return "Child Dataset";
case TAKlocalstreamread: return "Read Input";
case TAKprocess: return "Process";
case TAKgraphloop: return "Graph";
case TAKparallelgraphloop: return "Graph";
case TAKgraphloopresultread: return "Graph Input";
case TAKgraphloopresultwrite: return "Graph Result";
case TAKgrouped: return "Grouped";
case TAKsorted: return "Sorted";
case TAKdistributed: return "Distributed";
case TAKnwayjoin: return "Join";
case TAKnwaymerge: return "Merge";
case TAKnwaymergejoin: return "Merge Join";
case TAKnwayinput: return "Nway Input";
case TAKnwaygraphloopresultread: return "Nway Graph Input";
case TAKnwayselect: return "Select Nway Input";
case TAKnonempty: return "Non Empty";
case TAKcreaterowlimit: return "OnFail Limit";
case TAKexistsaggregate: return "Exists";
case TAKcountaggregate: return "Count";
case TAKprefetchproject: return "Prefetch Project";
case TAKprefetchcountproject: return "Prefetch Count Project";
case TAKfiltergroup: return "Filter Group";
case TAKmemoryspillread: return "Read Spill";
case TAKmemoryspillwrite: return "Write Spill";
case TAKmemoryspillsplit: return "Spill";
case TAKsection: return "Section";
case TAKlinkedrawiterator: return "Child Dataset";
case TAKnormalizelinkedchild: return "Normalize";
case TAKfilterproject: return "Filtered Project";
case TAKcatch: return "Catch";
case TAKskipcatch: return "Skip Catch";
case TAKcreaterowcatch: return "OnFail Catch";
case TAKsectioninput: return "Section Input";
case TAKindexgroupcount: return "Index Grouped Count";
case TAKindexgroupexists: return "Index Grouped Exists";
case TAKhashdistributemerge: return "Distribute Merge";
case TAKselfjoinlight: return "Lightweight Self Join";
case TAKwhen_dataset: return "When";
case TAKhttp_rowdataset: return "HTTP dataset";
case TAKstreamediterator: return "Streamed Dataset";
case TAKexternalsource: return "User Source";
case TAKexternalsink: return "User Output";
case TAKexternalprocess: return "User Proceess";
case TAKwhen_action: return "When";
}
throwUnexpected();
}
extern bool isActivitySource(ThorActivityKind kind)
{
switch (kind)
{
case TAKpiperead:
case TAKtemptable:
case TAKtemprow:
case TAKworkunitread:
case TAKnull:
case TAKsideeffect:
case TAKsoap_rowdataset:
case TAKsoap_rowaction:
case TAKkeydiff:
case TAKkeypatch:
case TAKchilditerator:
case TAKlocalgraph:
case TAKemptyaction:
case TAKdiskread:
case TAKdisknormalize:
case TAKdiskaggregate:
case TAKdiskcount:
case TAKdiskgroupaggregate:
case TAKindexread:
case TAKindexnormalize:
case TAKindexaggregate:
case TAKindexcount:
case TAKindexgroupaggregate:
case TAKchildnormalize:
case TAKchildaggregate:
case TAKchildgroupaggregate:
case TAKcsvread:
case TAKxmlread:
case TAKlocalresultread:
case TAKsimpleaction:
case TAKrawiterator:
case TAKlocalstreamread:
case TAKgraphloopresultread:
case TAKnwaygraphloopresultread:
case TAKlinkedrawiterator:
case TAKindexgroupexists:
case TAKindexgroupcount:
case TAKstreamediterator:
case TAKexternalsource:
return true;
}
return false;
}
extern bool isActivitySink(ThorActivityKind kind)
{
switch (kind)
{
case TAKdiskwrite:
case TAKworkunitwrite:
case TAKapply:
case TAKremoteresult:
case TAKdistribution:
case TAKpipewrite:
case TAKcsvwrite:
case TAKindexwrite:
case TAKxmlwrite:
case TAKsoap_rowaction:
case TAKsoap_datasetaction:
case TAKkeydiff:
case TAKkeypatch:
case TAKdatasetresult:
case TAKrowresult:
case TAKemptyaction:
case TAKlocalresultwrite:
case TAKgraphloopresultwrite:
case TAKsimpleaction:
case TAKexternalsink:
case TAKifaction:
case TAKparallel:
case TAKsequential:
case TAKwhen_action:
return true;
}
return false;
}
//------------------------------------------------------------------------------------------------
byte * CStaticRowBuilder::ensureCapacity(size32_t required, const char * fieldName)
{
if (required <= maxLength)
return static_cast(self);
rtlReportFieldOverflow(required, maxLength, fieldName);
return NULL;
}
//=====================================================================================================
CThorContiguousRowBuffer::CThorContiguousRowBuffer(ISerialStream * _in) : in(_in)
{
buffer = NULL;
maxOffset = 0;
readOffset = 0;
}
void CThorContiguousRowBuffer::doRead(size32_t len, void * ptr)
{
ensureAccessible(readOffset + len);
memcpy(ptr, buffer+readOffset, len);
readOffset += len;
}
size32_t CThorContiguousRowBuffer::read(size32_t len, void * ptr)
{
doRead(len, ptr);
return len;
}
size32_t CThorContiguousRowBuffer::readSize()
{
size32_t value;
doRead(sizeof(value), &value);
return value;
}
size32_t CThorContiguousRowBuffer::readPackedInt(void * ptr)
{
size32_t size = sizePackedInt();
doRead(size, ptr);
return size;
}
size32_t CThorContiguousRowBuffer::readUtf8(ARowBuilder & target, size32_t offset, size32_t fixedSize, size32_t len)
{
if (len == 0)
return 0;
size32_t size = sizeUtf8(len);
byte * self = target.ensureCapacity(fixedSize + size, NULL);
doRead(size, self+offset);
return size;
}
size32_t CThorContiguousRowBuffer::readVStr(ARowBuilder & target, size32_t offset, size32_t fixedSize)
{
size32_t size = sizeVStr();
byte * self = target.ensureCapacity(fixedSize + size, NULL);
doRead(size, self+offset);
return size;
}
size32_t CThorContiguousRowBuffer::readVUni(ARowBuilder & target, size32_t offset, size32_t fixedSize)
{
size32_t size = sizeVUni();
byte * self = target.ensureCapacity(fixedSize + size, NULL);
doRead(size, self+offset);
return size;
}
size32_t CThorContiguousRowBuffer::sizePackedInt()
{
ensureAccessible(readOffset+1);
return rtlGetPackedSizeFromFirst(buffer[readOffset]);
}
size32_t CThorContiguousRowBuffer::sizeUtf8(size32_t len)
{
if (len == 0)
return 0;
//The len is the number of utf characters, size depends on which characters are included.
size32_t nextOffset = readOffset;
while (len)
{
ensureAccessible(nextOffset+1);
for (;nextOffset < maxOffset;)
{
nextOffset += readUtf8Size(buffer+nextOffset); // This function only accesses the first byte
if (--len == 0)
break;
}
}
return nextOffset - readOffset;
}
size32_t CThorContiguousRowBuffer::sizeVStr()
{
size32_t nextOffset = readOffset;
loop
{
ensureAccessible(nextOffset+1);
for (; nextOffset < maxOffset; nextOffset++)
{
if (buffer[nextOffset] == 0)
return (nextOffset + 1) - readOffset;
}
}
}
size32_t CThorContiguousRowBuffer::sizeVUni()
{
size32_t nextOffset = readOffset;
const size32_t sizeOfUChar = 2;
loop
{
ensureAccessible(nextOffset+sizeOfUChar);
for (; nextOffset+1 < maxOffset; nextOffset += sizeOfUChar)
{
if (buffer[nextOffset] == 0 && buffer[nextOffset+1] == 0)
return (nextOffset + sizeOfUChar) - readOffset;
}
}
}
void CThorContiguousRowBuffer::reportReadFail()
{
throwUnexpected();
}
const byte * CThorContiguousRowBuffer::peek(size32_t maxSize)
{
if (maxSize+readOffset > maxOffset)
doPeek(maxSize+readOffset);
return buffer + readOffset;
}
offset_t CThorContiguousRowBuffer::beginNested()
{
size32_t len = readSize();
return len+readOffset;
}
bool CThorContiguousRowBuffer::finishedNested(offset_t endPos)
{
return readOffset >= endPos;
}
void CThorContiguousRowBuffer::skip(size32_t size)
{
ensureAccessible(readOffset+size);
readOffset += size;
}
void CThorContiguousRowBuffer::skipPackedInt()
{
size32_t size = sizePackedInt();
ensureAccessible(readOffset+size);
readOffset += size;
}
void CThorContiguousRowBuffer::skipUtf8(size32_t len)
{
size32_t size = sizeUtf8(len);
ensureAccessible(readOffset+size);
readOffset += size;
}
void CThorContiguousRowBuffer::skipVStr()
{
size32_t size = sizeVStr();
ensureAccessible(readOffset+size);
readOffset += size;
}
void CThorContiguousRowBuffer::skipVUni()
{
size32_t size = sizeVUni();
ensureAccessible(readOffset+size);
readOffset += size;
}
// ===========================================
IRowInterfaces *createRowInterfaces(IOutputMetaData *meta, unsigned actid, ICodeContext *context)
{
class cRowInterfaces: public CSimpleInterface, implements IRowInterfaces
{
Linked meta;
ICodeContext* context;
unsigned actid;
Linked allocator;
Linked serializer;
Linked deserializer;
CSingletonLock allocatorlock;
CSingletonLock serializerlock;
CSingletonLock deserializerlock;
public:
IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
cRowInterfaces(IOutputMetaData *_meta,unsigned _actid, ICodeContext *_context)
: meta(_meta)
{
context = _context;
actid = _actid;
}
IEngineRowAllocator * queryRowAllocator()
{
if (allocatorlock.lock()) {
if (!allocator&&meta)
allocator.setown(context->getRowAllocator(meta, actid));
allocatorlock.unlock();
}
return allocator;
}
IOutputRowSerializer * queryRowSerializer()
{
if (serializerlock.lock()) {
if (!serializer&&meta)
serializer.setown(meta->createRowSerializer(context,actid));
serializerlock.unlock();
}
return serializer;
}
IOutputRowDeserializer * queryRowDeserializer()
{
if (deserializerlock.lock()) {
if (!deserializer&&meta)
deserializer.setown(meta->createRowDeserializer(context,actid));
deserializerlock.unlock();
}
return deserializer;
}
IOutputMetaData *queryRowMetaData()
{
return meta;
}
unsigned queryActivityId()
{
return actid;
}
ICodeContext *queryCodeContext()
{
return context;
}
};
return new cRowInterfaces(meta,actid,context);
};
class CRowStreamReader : public CSimpleInterface, implements IExtRowStream
{
Linked fileio;
Linked mmfile;
Linked deserializer;
Linked allocator;
Owned strm;
CThorStreamDeserializerSource source;
Owned prefetcher;
CThorContiguousRowBuffer prefetchBuffer; // used if prefetcher set
bool grouped;
unsigned __int64 maxrows;
unsigned __int64 rownum;
bool eoi;
bool eos;
bool eog;
offset_t bufofs;
#ifdef TRACE_CREATE
static unsigned rdnum;
#endif
class : implements IFileSerialStreamCallback
{
public:
CRC32 crc;
void process(offset_t ofs, size32_t sz, const void *buf)
{
crc.tally(sz,buf);
}
} crccb;
public:
IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
CRowStreamReader(IFileIO *_fileio,IMemoryMappedFile *_mmfile,offset_t _ofs, offset_t _len, IRowInterfaces *rowif,unsigned __int64 _maxrows,bool _tallycrc, bool _grouped)
: fileio(_fileio), mmfile(_mmfile), allocator(rowif->queryRowAllocator()), prefetchBuffer(NULL)
{
#ifdef TRACE_CREATE
PROGLOG("CRowStreamReader %d = %p",++rdnum,this);
#endif
maxrows = _maxrows;
grouped = _grouped;
eoi = false;
eos = maxrows==0;
eog = false;
bufofs = 0;
rownum = 0;
if (fileio)
strm.setown(createFileSerialStream(fileio,_ofs,_len,(size32_t)-1, _tallycrc?&crccb:NULL));
else
strm.setown(createFileSerialStream(mmfile,_ofs,_len,_tallycrc?&crccb:NULL));
prefetcher.setown(rowif->queryRowMetaData()->createRowPrefetcher(rowif->queryCodeContext(), rowif->queryActivityId()));
if (prefetcher)
prefetchBuffer.setStream(strm);
source.setStream(strm);
deserializer.set(rowif->queryRowDeserializer());
}
~CRowStreamReader()
{
#ifdef TRACE_CREATE
PROGLOG("~CRowStreamReader %d = %p",rdnum--,this);
#endif
}
void reinit(offset_t _ofs,offset_t _len,unsigned __int64 _maxrows)
{
maxrows = _maxrows;
eoi = false;
eos = (maxrows==0)||(_len==0);
eog = false;
bufofs = 0;
rownum = 0;
strm->reset(_ofs,_len);
}
const void *nextRow()
{
if (eog) {
eog = false;
return NULL;
}
if (eos)
return NULL;
if (source.eos()) {
eos = true;
return NULL;
}
RtlDynamicRowBuilder rowBuilder(allocator);
size_t size = deserializer->deserialize(rowBuilder,source);
if (grouped && !eos) {
byte b;
source.read(sizeof(b),&b);
eog = (b==1);
}
if (++rownum==maxrows)
eos = true;
return rowBuilder.finalizeRowClear(size);
}
const void *prefetchRow(size32_t *sz)
{
if (eog)
eog = false;
else if (!eos) {
if (source.eos())
eos = true;
else {
assertex(prefetcher);
prefetcher->readAhead(prefetchBuffer);
const byte * ret = prefetchBuffer.queryRow();
if (sz)
*sz = prefetchBuffer.queryRowSize();
return ret;
}
}
if (sz)
sz = 0;
return NULL;
}
void prefetchDone()
{
prefetchBuffer.finishedRow();
if (grouped) {
byte b;
strm->get(sizeof(b),&b);
eog = (b==1);
}
}
virtual void stop()
{
stop(NULL);
}
void clear()
{
strm.clear();
source.clearStream();
fileio.clear();
}
void stop(CRC32 *crcout)
{
if (!eos) {
eos = true;
clear();
}
// NB CRC will only be right if stopped at eos
if (crcout)
*crcout = crccb.crc;
}
offset_t getOffset()
{
return source.tell();
}
};
#ifdef TRACE_CREATE
unsigned CRowStreamReader::rdnum;
#endif
bool UseMemoryMappedRead = false;
IExtRowStream *createRowStream(IFile *file,IRowInterfaces *rowif,offset_t offset,offset_t len,unsigned __int64 maxrows,bool tallycrc,bool grouped)
{
IExtRowStream *ret;
if (UseMemoryMappedRead) {
PROGLOG("Memory Mapped read of %s",file->queryFilename());
Owned mmfile = file->openMemoryMapped();
if (!mmfile)
return NULL;
ret = new CRowStreamReader(NULL,mmfile,offset,len,rowif,maxrows,tallycrc,grouped);
}
else {
Owned fileio = file->open(IFOread);
if (!fileio)
return NULL;
ret = new CRowStreamReader(fileio,NULL,offset,len,rowif,maxrows,tallycrc,grouped);
}
return ret;
}
IExtRowStream *createCompressedRowStream(IFile *file,IRowInterfaces *rowif,offset_t offset,offset_t len,unsigned __int64 maxrows,bool tallycrc,bool grouped,IExpander *eexp)
{
Owned fileio = createCompressedFileReader(file, eexp, UseMemoryMappedRead);
if (!fileio)
return NULL;
IExtRowStream *ret = new CRowStreamReader(fileio,NULL,offset,len,rowif,maxrows,tallycrc,grouped);
return ret;
}
void useMemoryMappedRead(bool on)
{
#if defined(_DEBUG) || defined(__64BIT__)
UseMemoryMappedRead = on;
#endif
}
#define ROW_WRITER_BUFFERSIZE (0x100000)
class CRowStreamWriter : public CSimpleInterface, private IRowSerializerTarget, implements IExtRowWriter
{
Linked stream;
Linked serializer;
Linked allocator;
CRC32 crc;
bool grouped;
bool tallycrc;
unsigned nested;
MemoryAttr ma;
MemoryBuffer extbuf; // may need to spill to disk at some point
byte *buf;
size32_t bufpos;
bool autoflush;
#ifdef TRACE_CREATE
static unsigned wrnum;
#endif
void flushBuffer(bool final)
{
if (bufpos) {
stream->write(bufpos,buf);
if (tallycrc)
crc.tally(bufpos,buf);
bufpos = 0;
}
size32_t extpos = extbuf.length();
if (!extpos)
return;
if (!final)
extpos = (extpos/ROW_WRITER_BUFFERSIZE)*ROW_WRITER_BUFFERSIZE;
if (extpos) {
stream->write(extpos,extbuf.toByteArray());
if (tallycrc)
crc.tally(extpos,extbuf.toByteArray());
}
if (extposserialize(*this,(const byte *)row);
if (grouped) {
byte b = 0;
if (bufposreleaseRow(row);
}
else if (grouped) { // backpatch
byte b = 1;
if (extbuf.length())
extbuf.writeDirect(extbuf.length()-1,sizeof(b),&b);
else {
assertex(bufpos);
buf[bufpos-1] = b;
}
}
}
void flush()
{
flushBuffer(true);
stream->flush();
}
void flush(CRC32 *crcout)
{
flushBuffer(true);
stream->flush();
if (crcout)
*crcout = crc;
}
offset_t getPosition()
{
return stream->tell()+bufpos+extbuf.length();
}
void put(size32_t len, const void * ptr)
{
// first fill buf
loop {
if (bufposlen)
wr = len;
memcpy(buf+bufpos,ptr,wr);
bufpos += wr;
len -= wr;
if (len==0)
break; // quick exit
ptr = (const byte *)ptr + wr;
}
if (nested) {
// have to append to ext buffer (will need to spill to disk here if gets *too* big)
extbuf.append(len,ptr);
break;
}
else
flushBuffer(false);
}
}
size32_t beginNested()
{
if (nested++==0)
if (bufpos==ROW_WRITER_BUFFERSIZE)
flushBuffer(false);
size32_t ret = bufpos+extbuf.length();
size32_t sz = 0;
put(sizeof(sz),&sz);
return ret;
}
void endNested(size32_t pos)
{
size32_t sz = bufpos+extbuf.length()-(pos + sizeof(size32_t));
size32_t wr = sizeof(size32_t);
byte *out = (byte *)&sz;
if (poswr)
space = wr;
memcpy(buf+pos,out,space);
wr -= space;
if (wr==0) {
--nested;
return; // quick exit
}
out += space;
pos += space;
}
extbuf.writeDirect(pos-ROW_WRITER_BUFFERSIZE,wr,out);
--nested;
}
};
#ifdef TRACE_CREATE
unsigned CRowStreamWriter::wrnum=0;
#endif
IExtRowWriter *createRowWriter(IFile *file,IOutputRowSerializer *serializer,IEngineRowAllocator *allocator,bool grouped, bool tallycrc, bool extend)
{
Owned fileio = file->open(extend?IFOwrite:IFOcreate);
if (!fileio)
return NULL;
Owned stream = createIOStream(fileio);
if (extend)
stream->seek(0,IFSend);
return createRowWriter(stream,serializer,allocator,grouped,tallycrc,true);
}
IExtRowWriter *createRowWriter(IFileIOStream *strm,IOutputRowSerializer *serializer,IEngineRowAllocator *allocator,bool grouped, bool tallycrc, bool autoflush)
{
Owned writer = new CRowStreamWriter(strm, serializer, allocator, grouped, tallycrc, autoflush);
return writer.getClear();
}
class CDiskMerger : public CInterface, implements IDiskMerger
{
IArrayOf tempfiles;
IRowStream **strms;
Linked irecsize;
StringAttr tempnamebase;
Linked linker;
Linked rowInterfaces;
public:
IMPLEMENT_IINTERFACE;
CDiskMerger(IRowInterfaces *_rowInterfaces, IRowLinkCounter *_linker, const char *_tempnamebase)
: rowInterfaces(_rowInterfaces), linker(_linker), tempnamebase(_tempnamebase)
{
strms = NULL;
}
~CDiskMerger()
{
for (unsigned i=0;iRelease();
tempfiles.item(i).remove();
}
free(strms);
}
IRowWriter *createWriteBlock()
{
StringBuffer tempname(tempnamebase);
tempname.append('.').append(tempfiles.ordinality()).append('_').append((__int64)GetCurrentThreadId()).append('_').append((unsigned)GetCurrentProcessId());
IFile *file = createIFile(tempname.str());
tempfiles.append(*file);
return createRowWriter(file,rowInterfaces->queryRowSerializer(),rowInterfaces->queryRowAllocator(),false,false,false); // flushed by close
}
void put(const void **rows,unsigned numrows)
{
Owned out = createWriteBlock();
for (unsigned i=0;iputRow(rows[i]);
}
void putIndirect(const void ***rowptrs,unsigned numrows)
{
Owned out = createWriteBlock();
for (unsigned i=0;iputRow(*(rowptrs[i]));
}
virtual void put(ISortedRowProvider *rows)
{
Owned out = createWriteBlock();
void * row;
while(row = rows->getNextSorted())
out->putRow(row);
}
IRowStream *merge(ICompare *icompare, bool partdedup)
{
unsigned numstrms = tempfiles.ordinality();
strms = (IRowStream **)calloc(numstrms,sizeof(IRowStream *));
unsigned i;
for (i=0;i mergedStream = merge(icompare, partdedup);
loop
{
const void *row = mergedStream->nextRow();
if (!row)
return count;
dest->putRow(row); // takes ownership
++count;
}
return count;
}
};
IDiskMerger *createDiskMerger(IRowInterfaces *rowInterfaces, IRowLinkCounter *linker, const char *tempnamebase)
{
return new CDiskMerger(rowInterfaces, linker, tempnamebase);
}