|
@@ -431,18 +431,13 @@ using roxiemem::OwnedConstRoxieRow;
|
|
|
class InputReaderBase : public CInterfaceOf<IGroupedInput>
|
|
|
{
|
|
|
protected:
|
|
|
- IInputBase *input;
|
|
|
+ IEngineRowStream *input;
|
|
|
public:
|
|
|
- InputReaderBase(IInputBase *_input)
|
|
|
+ InputReaderBase(IEngineRowStream *_input)
|
|
|
: input(_input)
|
|
|
{
|
|
|
}
|
|
|
|
|
|
- virtual IOutputMetaData * queryOutputMeta() const
|
|
|
- {
|
|
|
- return input->queryOutputMeta();
|
|
|
- }
|
|
|
-
|
|
|
virtual void stop()
|
|
|
{
|
|
|
input->stop();
|
|
@@ -458,7 +453,7 @@ protected:
|
|
|
OwnedConstRoxieRow next;
|
|
|
const ICompare *compare;
|
|
|
public:
|
|
|
- GroupedInputReader(IInputBase *_input, const ICompare *_compare)
|
|
|
+ GroupedInputReader(IEngineRowStream *_input, const ICompare *_compare)
|
|
|
: InputReaderBase(_input), compare(_compare)
|
|
|
{
|
|
|
firstRead = false;
|
|
@@ -498,7 +493,7 @@ public:
|
|
|
class DegroupedInputReader : public InputReaderBase
|
|
|
{
|
|
|
public:
|
|
|
- DegroupedInputReader(IInputBase *_input) : InputReaderBase(_input)
|
|
|
+ DegroupedInputReader(IEngineRowStream *_input) : InputReaderBase(_input)
|
|
|
{
|
|
|
}
|
|
|
virtual const void *nextRow()
|
|
@@ -514,7 +509,7 @@ protected:
|
|
|
Owned<ISortAlgorithm> sorter;
|
|
|
bool firstRead;
|
|
|
public:
|
|
|
- SortedInputReader(IInputBase *_input, ISortAlgorithm *_sorter)
|
|
|
+ SortedInputReader(IEngineRowStream *_input, ISortAlgorithm *_sorter)
|
|
|
: InputReaderBase(_input), degroupedInput(_input), sorter(_sorter), firstRead(false)
|
|
|
{
|
|
|
sorter->reset();
|
|
@@ -539,7 +534,7 @@ protected:
|
|
|
OwnedConstRoxieRow next;
|
|
|
const ICompare *compare;
|
|
|
public:
|
|
|
- SortedGroupedInputReader(IInputBase *_input, const ICompare *_compare, ISortAlgorithm *_sorter)
|
|
|
+ SortedGroupedInputReader(IEngineRowStream *_input, const ICompare *_compare, ISortAlgorithm *_sorter)
|
|
|
: SortedInputReader(_input, _sorter), compare(_compare), eof(false), endGroupPending(false)
|
|
|
{
|
|
|
}
|
|
@@ -574,25 +569,25 @@ public:
|
|
|
}
|
|
|
};
|
|
|
|
|
|
-extern IGroupedInput *createGroupedInputReader(IInputBase *_input, const ICompare *_groupCompare)
|
|
|
+extern IGroupedInput *createGroupedInputReader(IEngineRowStream *_input, const ICompare *_groupCompare)
|
|
|
{
|
|
|
dbgassertex(_input && _groupCompare);
|
|
|
return new GroupedInputReader(_input, _groupCompare);
|
|
|
}
|
|
|
|
|
|
-extern IGroupedInput *createDegroupedInputReader(IInputBase *_input)
|
|
|
+extern IGroupedInput *createDegroupedInputReader(IEngineRowStream *_input)
|
|
|
{
|
|
|
dbgassertex(_input);
|
|
|
return new DegroupedInputReader(_input);
|
|
|
}
|
|
|
|
|
|
-extern IGroupedInput *createSortedInputReader(IInputBase *_input, ISortAlgorithm *_sorter)
|
|
|
+extern IGroupedInput *createSortedInputReader(IEngineRowStream *_input, ISortAlgorithm *_sorter)
|
|
|
{
|
|
|
dbgassertex(_input && _sorter);
|
|
|
return new SortedInputReader(_input, _sorter);
|
|
|
}
|
|
|
|
|
|
-extern IGroupedInput *createSortedGroupedInputReader(IInputBase *_input, const ICompare *_groupCompare, ISortAlgorithm *_sorter)
|
|
|
+extern IGroupedInput *createSortedGroupedInputReader(IEngineRowStream *_input, const ICompare *_groupCompare, ISortAlgorithm *_sorter)
|
|
|
{
|
|
|
dbgassertex(_input && _groupCompare && _sorter);
|
|
|
return new SortedGroupedInputReader(_input, _groupCompare, _sorter);
|
|
@@ -666,7 +661,7 @@ class CQuickSortAlgorithm : public CInplaceSortAlgorithm
|
|
|
public:
|
|
|
CQuickSortAlgorithm(ICompare *_compare) : CInplaceSortAlgorithm(_compare) {}
|
|
|
|
|
|
- virtual void prepare(IInputBase *input)
|
|
|
+ virtual void prepare(IEngineRowStream *input)
|
|
|
{
|
|
|
curIndex = 0;
|
|
|
if (input->nextGroup(sorted))
|
|
@@ -683,7 +678,7 @@ class CParallelQuickSortAlgorithm : public CInplaceSortAlgorithm
|
|
|
public:
|
|
|
CParallelQuickSortAlgorithm(ICompare *_compare) : CInplaceSortAlgorithm(_compare) {}
|
|
|
|
|
|
- virtual void prepare(IInputBase *input)
|
|
|
+ virtual void prepare(IEngineRowStream *input)
|
|
|
{
|
|
|
curIndex = 0;
|
|
|
if (input->nextGroup(sorted))
|
|
@@ -700,7 +695,7 @@ class CTbbQuickSortAlgorithm : public CInplaceSortAlgorithm
|
|
|
public:
|
|
|
CTbbQuickSortAlgorithm(ICompare *_compare) : CInplaceSortAlgorithm(_compare) {}
|
|
|
|
|
|
- virtual void prepare(IInputBase *input)
|
|
|
+ virtual void prepare(IEngineRowStream *input)
|
|
|
{
|
|
|
curIndex = 0;
|
|
|
if (input->nextGroup(sorted))
|
|
@@ -719,7 +714,7 @@ public:
|
|
|
|
|
|
virtual void sortRows(void * * rows, size_t numRows, void * * temp) = 0;
|
|
|
|
|
|
- virtual void prepare(IInputBase *input)
|
|
|
+ virtual void prepare(IEngineRowStream *input)
|
|
|
{
|
|
|
curIndex = 0;
|
|
|
if (input->nextGroup(sorted))
|
|
@@ -968,7 +963,7 @@ public:
|
|
|
blockNo = 0;
|
|
|
}
|
|
|
|
|
|
- virtual void prepare(IInputBase *input)
|
|
|
+ virtual void prepare(IEngineRowStream *input)
|
|
|
{
|
|
|
blockNo = 0;
|
|
|
curBlock = new SortedBlock(blockNo++, rowManager, activityId);
|
|
@@ -1174,7 +1169,7 @@ public:
|
|
|
sequences.kill();
|
|
|
}
|
|
|
|
|
|
- virtual void prepare(IInputBase *input)
|
|
|
+ virtual void prepare(IEngineRowStream *input)
|
|
|
{
|
|
|
inputAlreadySorted = true;
|
|
|
curIndex = 0;
|
|
@@ -1247,7 +1242,7 @@ public:
|
|
|
|
|
|
virtual void sortRows(void * * rows, size_t numRows, ICompare & compare, void * * stableTemp) = 0;
|
|
|
|
|
|
- virtual void prepare(IInputBase *input)
|
|
|
+ virtual void prepare(IEngineRowStream *input)
|
|
|
{
|
|
|
loop
|
|
|
{
|