|
@@ -367,7 +367,7 @@ public:
|
|
|
|
|
|
//====
|
|
|
|
|
|
-void CThorExpandingRowArray::init(rowidx_t initialSize, bool _stableSort)
|
|
|
+void CThorExpandingRowArray::init(rowidx_t initialSize, StableSortFlag _stableSort)
|
|
|
{
|
|
|
rowManager = activity.queryJob().queryRowManager();
|
|
|
stableSort = _stableSort;
|
|
@@ -376,10 +376,12 @@ void CThorExpandingRowArray::init(rowidx_t initialSize, bool _stableSort)
|
|
|
if (initialSize)
|
|
|
{
|
|
|
rows = static_cast<const void * *>(rowManager->allocate(initialSize * sizeof(void*), activity.queryContainer().queryId()));
|
|
|
- maxRows = RoxieRowCapacity(rows) / sizeof(void *);
|
|
|
+ maxRows = getRowsCapacity();
|
|
|
memset(rows, 0, maxRows * sizeof(void *));
|
|
|
- if (stableSort)
|
|
|
- stableSortTmp = static_cast<void **>(rowManager->allocate(initialSize * sizeof(void*), activity.queryContainer().queryId()));
|
|
|
+ if (stableSort_earlyAlloc == stableSort)
|
|
|
+ stableSortTmp = static_cast<void **>(rowManager->allocate(maxRows * sizeof(void*), activity.queryContainer().queryId()));
|
|
|
+ else
|
|
|
+ stableSortTmp = NULL;
|
|
|
}
|
|
|
else
|
|
|
{
|
|
@@ -389,9 +391,28 @@ void CThorExpandingRowArray::init(rowidx_t initialSize, bool _stableSort)
|
|
|
numRows = 0;
|
|
|
}
|
|
|
|
|
|
-const void *CThorExpandingRowArray::allocateNewRows(rowidx_t requiredRows, OwnedConstThorRow &newStableSortTmp)
|
|
|
+const void *CThorExpandingRowArray::allocateRowTable(rowidx_t num)
|
|
|
{
|
|
|
- unsigned newSize = maxRows;
|
|
|
+ try
|
|
|
+ {
|
|
|
+ return rowManager->allocate(num * sizeof(void*), activity.queryContainer().queryId());
|
|
|
+ }
|
|
|
+ catch (IException * e)
|
|
|
+ {
|
|
|
+ //Pahological cases - not enough memory to reallocate the target row buffer, or no contiguous pages available.
|
|
|
+ unsigned code = e->errorCode();
|
|
|
+ if ((code == ROXIEMM_MEMORY_LIMIT_EXCEEDED) || (code == ROXIEMM_MEMORY_POOL_EXHAUSTED))
|
|
|
+ {
|
|
|
+ e->Release();
|
|
|
+ return NULL;
|
|
|
+ }
|
|
|
+ throw;
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+const void *CThorExpandingRowArray::allocateNewRows(rowidx_t requiredRows)
|
|
|
+{
|
|
|
+ rowidx_t newSize = maxRows;
|
|
|
//This condition must be <= at least 1/scaling factor below otherwise you'll get an infinite loop.
|
|
|
if (newSize <= 4)
|
|
|
newSize = requiredRows;
|
|
@@ -405,40 +426,44 @@ const void *CThorExpandingRowArray::allocateNewRows(rowidx_t requiredRows, Owned
|
|
|
while (newSize < requiredRows)
|
|
|
newSize += newSize/4;
|
|
|
}
|
|
|
- OwnedConstThorRow newRows;
|
|
|
- try
|
|
|
- {
|
|
|
- newRows.setown(rowManager->allocate(newSize * sizeof(void*), activity.queryContainer().queryId()));
|
|
|
- if (!newRows)
|
|
|
- return NULL;
|
|
|
- if (stableSort)
|
|
|
- {
|
|
|
- newStableSortTmp.setown(rowManager->allocate(newSize * sizeof(void*), activity.queryContainer().queryId()));
|
|
|
- if (!newStableSortTmp)
|
|
|
- return NULL;
|
|
|
- }
|
|
|
- }
|
|
|
- catch (IException * e)
|
|
|
+ return allocateRowTable(newSize);
|
|
|
+}
|
|
|
+
|
|
|
+void **CThorExpandingRowArray::allocateStableTable(bool error)
|
|
|
+{
|
|
|
+ dbgassertex(NULL != rows);
|
|
|
+ rowidx_t rowsCapacity = getRowsCapacity();
|
|
|
+ OwnedConstThorRow newStableSortTmp = allocateRowTable(rowsCapacity);
|
|
|
+ if (!newStableSortTmp)
|
|
|
{
|
|
|
- //Pahological cases - not enough memory to reallocate the target row buffer, or no contiguous pages available.
|
|
|
- unsigned code = e->errorCode();
|
|
|
- if ((code == ROXIEMM_MEMORY_LIMIT_EXCEEDED) || (code == ROXIEMM_MEMORY_POOL_EXHAUSTED))
|
|
|
- {
|
|
|
- e->Release();
|
|
|
- return NULL;
|
|
|
- }
|
|
|
- throw;
|
|
|
+ if (error)
|
|
|
+ throw MakeActivityException(&activity, 0, "Out of memory, allocating stable row array, trying to allocate %"RIPF"d elements", rowsCapacity);
|
|
|
+ return NULL;
|
|
|
}
|
|
|
- return newRows.getClear();
|
|
|
+ return (void **)newStableSortTmp.getClear();
|
|
|
}
|
|
|
|
|
|
-void CThorExpandingRowArray::doSort(unsigned n, void **const rows, ICompare &compare, unsigned maxCores)
|
|
|
+void CThorExpandingRowArray::doSort(rowidx_t n, void **const rows, ICompare &compare, unsigned maxCores)
|
|
|
{
|
|
|
- if (stableSort)
|
|
|
+ // NB: will only be called if numRows>1
|
|
|
+ if (stableSort_none != stableSort)
|
|
|
{
|
|
|
+ OwnedConstThorRow newStableSortTmp;
|
|
|
+ void **stableTable;
|
|
|
+ if (stableSort_lateAlloc == stableSort)
|
|
|
+ {
|
|
|
+ dbgassertex(NULL == stableSortTmp);
|
|
|
+ newStableSortTmp.setown(allocateStableTable(true));
|
|
|
+ stableTable = (void **)newStableSortTmp.get();
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ dbgassertex(NULL != stableSortTmp);
|
|
|
+ stableTable = stableSortTmp;
|
|
|
+ }
|
|
|
void **_rows = rows;
|
|
|
- memcpy(stableSortTmp, _rows, n*sizeof(void **));
|
|
|
- parqsortvecstable(stableSortTmp, n, compare, (void ***)_rows, maxCores);
|
|
|
+ memcpy(stableTable, _rows, n*sizeof(void **));
|
|
|
+ parqsortvecstable(stableTable, n, compare, (void ***)_rows, maxCores);
|
|
|
while (n--)
|
|
|
{
|
|
|
*_rows = **((void ***)_rows);
|
|
@@ -449,7 +474,7 @@ void CThorExpandingRowArray::doSort(unsigned n, void **const rows, ICompare &com
|
|
|
parqsortvec((void **const)rows, n, compare, maxCores);
|
|
|
}
|
|
|
|
|
|
-CThorExpandingRowArray::CThorExpandingRowArray(CActivityBase &_activity, IRowInterfaces *_rowIf, bool _allowNulls, bool _stableSort, bool _throwOnOom, rowidx_t initialSize) : activity(_activity)
|
|
|
+CThorExpandingRowArray::CThorExpandingRowArray(CActivityBase &_activity, IRowInterfaces *_rowIf, bool _allowNulls, StableSortFlag _stableSort, bool _throwOnOom, rowidx_t initialSize) : activity(_activity)
|
|
|
{
|
|
|
init(initialSize, _stableSort);
|
|
|
setup(_rowIf, _allowNulls, _stableSort, _throwOnOom);
|
|
@@ -462,7 +487,7 @@ CThorExpandingRowArray::~CThorExpandingRowArray()
|
|
|
ReleaseThorRow(stableSortTmp);
|
|
|
}
|
|
|
|
|
|
-void CThorExpandingRowArray::setup(IRowInterfaces *_rowIf, bool _allowNulls, bool _stableSort, bool _throwOnOom)
|
|
|
+void CThorExpandingRowArray::setup(IRowInterfaces *_rowIf, bool _allowNulls, StableSortFlag _stableSort, bool _throwOnOom)
|
|
|
{
|
|
|
rowIf = _rowIf;
|
|
|
stableSort = _stableSort;
|
|
@@ -506,7 +531,7 @@ void CThorExpandingRowArray::swap(CThorExpandingRowArray &other)
|
|
|
const void **otherRows = other.rows;
|
|
|
void **otherStableSortTmp = other.stableSortTmp;
|
|
|
bool otherAllowNulls = other.allowNulls;
|
|
|
- bool otherStableSort = other.stableSort;
|
|
|
+ StableSortFlag otherStableSort = other.stableSort;
|
|
|
bool otherThrowOnOom = other.throwOnOom;
|
|
|
rowidx_t otherMaxRows = other.maxRows;
|
|
|
rowidx_t otherNumRows = other.numRows;
|
|
@@ -542,7 +567,7 @@ void CThorExpandingRowArray::transferFrom(CThorExpandingRowArray &donor)
|
|
|
kill();
|
|
|
donor.transferRows(numRows, rows);
|
|
|
maxRows = numRows;
|
|
|
- if (stableSort && maxRows)
|
|
|
+ if (maxRows && (stableSort_earlyAlloc == stableSort))
|
|
|
ensure(maxRows);
|
|
|
}
|
|
|
|
|
@@ -574,21 +599,31 @@ void CThorExpandingRowArray::clearUnused()
|
|
|
|
|
|
bool CThorExpandingRowArray::ensure(rowidx_t requiredRows)
|
|
|
{
|
|
|
- OwnedConstThorRow newStableSortTmp;
|
|
|
- OwnedConstThorRow newRows = allocateNewRows(requiredRows, newStableSortTmp);
|
|
|
- if (!newRows)
|
|
|
- throw MakeActivityException(&activity, 0, "Out of memory, allocating row array, had %"RIPF"d, trying to allocate %"RIPF"d elements", ordinality(), requiredRows);
|
|
|
-
|
|
|
- const void **oldRows = rows;
|
|
|
- void **oldStableSortTmp = stableSortTmp;
|
|
|
-
|
|
|
- memcpy((void *)newRows.get(), rows, numRows * sizeof(void*));
|
|
|
+ if (getRowsCapacity() < requiredRows) // check, because may have expanded previously, but failed to allocate stableSortTmp and set new maxRows
|
|
|
+ {
|
|
|
+ OwnedConstThorRow newRows = allocateNewRows(requiredRows);
|
|
|
+ if (!newRows)
|
|
|
+ {
|
|
|
+ if (throwOnOom)
|
|
|
+ throw MakeActivityException(&activity, 0, "Out of memory, allocating row array, had %"RIPF"d, trying to allocate %"RIPF"d elements", ordinality(), requiredRows);
|
|
|
+ return false;
|
|
|
+ }
|
|
|
|
|
|
- rows = (const void **)newRows.getClear();
|
|
|
- maxRows = RoxieRowCapacity(rows) / sizeof(void *);
|
|
|
- stableSortTmp = (void **)newStableSortTmp.getClear();
|
|
|
- ReleaseThorRow(oldRows);
|
|
|
- ReleaseThorRow(oldStableSortTmp);
|
|
|
+ const void **oldRows = rows;
|
|
|
+ memcpy((void *)newRows.get(), rows, numRows * sizeof(void*));
|
|
|
+ rows = (const void **)newRows.getClear();
|
|
|
+ ReleaseThorRow(oldRows);
|
|
|
+ }
|
|
|
+ if (stableSort_earlyAlloc == stableSort)
|
|
|
+ {
|
|
|
+ OwnedConstThorRow newStableSortTmp = allocateStableTable(throwOnOom);
|
|
|
+ if (!newStableSortTmp)
|
|
|
+ return false;
|
|
|
+ void **oldStableSortTmp = stableSortTmp;
|
|
|
+ stableSortTmp = (void **)newStableSortTmp.getClear();
|
|
|
+ ReleaseThorRow(oldStableSortTmp);
|
|
|
+ }
|
|
|
+ maxRows = getRowsCapacity();
|
|
|
|
|
|
return true;
|
|
|
}
|
|
@@ -599,7 +634,7 @@ void CThorExpandingRowArray::sort(ICompare &compare, unsigned maxCores)
|
|
|
doSort(numRows, (void **const)rows, compare, maxCores);
|
|
|
}
|
|
|
|
|
|
-void CThorExpandingRowArray::reorder(rowidx_t start, rowidx_t num, unsigned *neworder)
|
|
|
+void CThorExpandingRowArray::reorder(rowidx_t start, rowidx_t num, rowidx_t *neworder)
|
|
|
{
|
|
|
if (start>=numRows)
|
|
|
return;
|
|
@@ -611,17 +646,17 @@ void CThorExpandingRowArray::reorder(rowidx_t start, rowidx_t num, unsigned *new
|
|
|
void **tmp = (void **)ma.allocate(num*sizeof(void *));
|
|
|
const void **p = rows + start;
|
|
|
memcpy(tmp, p, num*sizeof(void *));
|
|
|
- for (unsigned i=0; i<num; i++)
|
|
|
+ for (rowidx_t i=0; i<num; i++)
|
|
|
p[i] = tmp[neworder[i]];
|
|
|
}
|
|
|
|
|
|
bool CThorExpandingRowArray::equal(ICompare *icmp, CThorExpandingRowArray &other)
|
|
|
{
|
|
|
// slow but better than prev!
|
|
|
- unsigned n = other.ordinality();
|
|
|
+ rowidx_t n = other.ordinality();
|
|
|
if (n!=ordinality())
|
|
|
return false;
|
|
|
- for (unsigned i=0;i<n;i++)
|
|
|
+ for (rowidx_t i=0;i<n;i++)
|
|
|
{
|
|
|
const void *p1 = rows[i];
|
|
|
const void *p2 = other.query(i);
|
|
@@ -633,9 +668,8 @@ bool CThorExpandingRowArray::equal(ICompare *icmp, CThorExpandingRowArray &other
|
|
|
|
|
|
bool CThorExpandingRowArray::checkSorted(ICompare *icmp)
|
|
|
{
|
|
|
- unsigned i;
|
|
|
- unsigned n=ordinality();
|
|
|
- for (i=1; i<n; i++)
|
|
|
+ rowidx_t n=ordinality();
|
|
|
+ for (rowidx_t i=1; i<n; i++)
|
|
|
{
|
|
|
if (icmp->docompare(rows[i-1], rows[i])>0)
|
|
|
return false;
|
|
@@ -685,15 +719,16 @@ IRowStream *CThorExpandingRowArray::createRowStream(rowidx_t start, rowidx_t num
|
|
|
|
|
|
void CThorExpandingRowArray::partition(ICompare &compare, unsigned num, UnsignedArray &out)
|
|
|
{
|
|
|
- unsigned p=0;
|
|
|
- unsigned n = ordinality();
|
|
|
+ rowidx_t p=0;
|
|
|
+ rowidx_t n = ordinality();
|
|
|
while (num)
|
|
|
{
|
|
|
out.append(p);
|
|
|
if (p<n)
|
|
|
{
|
|
|
- unsigned q = p+(n-p)/num;
|
|
|
- if (p==q){ // skip to next group
|
|
|
+ rowidx_t q = p+(n-p)/num;
|
|
|
+ if (p==q) // skip to next group
|
|
|
+ {
|
|
|
while (q<n)
|
|
|
{
|
|
|
q++;
|
|
@@ -718,7 +753,7 @@ offset_t CThorExpandingRowArray::serializedSize()
|
|
|
rowidx_t c = ordinality();
|
|
|
assertex(serializer);
|
|
|
offset_t total = 0;
|
|
|
- for (unsigned i=0; i<c; i++)
|
|
|
+ for (rowidx_t i=0; i<c; i++)
|
|
|
{
|
|
|
CSizingSerializer ssz;
|
|
|
serializer->serialize(ssz, (const byte *)rows[i]);
|
|
@@ -780,18 +815,18 @@ void CThorExpandingRowArray::serializeCompress(MemoryBuffer &mb)
|
|
|
fastLZCompressToBuffer(mb,exp.length(), exp.toByteArray());
|
|
|
}
|
|
|
|
|
|
-unsigned CThorExpandingRowArray::serializeBlock(MemoryBuffer &mb, size32_t dstmax, unsigned idx, unsigned count)
|
|
|
+rowidx_t CThorExpandingRowArray::serializeBlock(MemoryBuffer &mb, size32_t dstmax, rowidx_t idx, rowidx_t count)
|
|
|
{
|
|
|
assertex(serializer);
|
|
|
CMemoryRowSerializer out(mb);
|
|
|
bool warnnull = true;
|
|
|
- unsigned num=ordinality();
|
|
|
+ rowidx_t num=ordinality();
|
|
|
if (idx>=num)
|
|
|
return 0;
|
|
|
if (num-idx<count)
|
|
|
count = num-idx;
|
|
|
- unsigned ret = 0;
|
|
|
- for (unsigned i=0;i<count;i++)
|
|
|
+ rowidx_t ret = 0;
|
|
|
+ for (rowidx_t i=0;i<count;i++)
|
|
|
{
|
|
|
size32_t ln = mb.length();
|
|
|
const void *row = query(i+idx);
|
|
@@ -866,8 +901,8 @@ void CThorSpillableRowArray::unregisterWriteCallback(IWritePosCallback &cb)
|
|
|
writeCallbacks.zap(cb);
|
|
|
}
|
|
|
|
|
|
-CThorSpillableRowArray::CThorSpillableRowArray(CActivityBase &activity, IRowInterfaces *rowIf, bool allowNulls, bool stable, rowidx_t initialSize, size32_t _commitDelta)
|
|
|
- : CThorExpandingRowArray(activity, rowIf, false, stable, false, initialSize), commitDelta(_commitDelta)
|
|
|
+CThorSpillableRowArray::CThorSpillableRowArray(CActivityBase &activity, IRowInterfaces *rowIf, bool allowNulls, StableSortFlag stableSort, rowidx_t initialSize, size32_t _commitDelta)
|
|
|
+ : CThorExpandingRowArray(activity, rowIf, false, stableSort, false, initialSize), commitDelta(_commitDelta)
|
|
|
{
|
|
|
commitRows = 0;
|
|
|
firstRow = 0;
|
|
@@ -897,35 +932,47 @@ bool CThorSpillableRowArray::ensure(rowidx_t requiredRows)
|
|
|
{
|
|
|
//Only the writer is allowed to reallocate rows (otherwise append can't be optimized), so rows is valid outside the lock
|
|
|
|
|
|
- OwnedConstThorRow newStableSortTmp;
|
|
|
- OwnedConstThorRow newRows = allocateNewRows(requiredRows, newStableSortTmp);
|
|
|
- if (!newRows)
|
|
|
- return false;
|
|
|
+ OwnedConstThorRow newRows;
|
|
|
+ if (getRowsCapacity() < requiredRows) // check, because may have expanded previously, but failed to allocate stableSortTmp and set new maxRows
|
|
|
+ {
|
|
|
+ newRows.setown(allocateNewRows(requiredRows));
|
|
|
+ if (!newRows)
|
|
|
+ return false;
|
|
|
+ }
|
|
|
|
|
|
- const void **oldRows;
|
|
|
- void **oldStableSortTmp;
|
|
|
{
|
|
|
CThorSpillableRowArrayLock block(*this);
|
|
|
+ if (newRows)
|
|
|
+ {
|
|
|
+ const void **oldRows = rows;
|
|
|
+ memcpy((void *)newRows.get(), rows+firstRow, (numRows - firstRow) * sizeof(void*));
|
|
|
+ numRows -= firstRow;
|
|
|
+ commitRows -= firstRow;
|
|
|
+ firstRow = 0;
|
|
|
|
|
|
- oldRows = rows;
|
|
|
- oldStableSortTmp = stableSortTmp;
|
|
|
- memcpy((void *)newRows.get(), rows+firstRow, (numRows - firstRow) * sizeof(void*));
|
|
|
- numRows -= firstRow;
|
|
|
- commitRows -= firstRow;
|
|
|
- firstRow = 0;
|
|
|
+ rows = (const void **)newRows.getClear();
|
|
|
+ ReleaseThorRow(oldRows);
|
|
|
+ }
|
|
|
|
|
|
- rows = (const void **)newRows.getClear();
|
|
|
- maxRows = RoxieRowCapacity(rows) / sizeof(void *);
|
|
|
- stableSortTmp = (void **)newStableSortTmp.getClear();
|
|
|
+ // NB: can't release lock, or change maxRows, until know this succeeds
|
|
|
+ if (stableSort_earlyAlloc == stableSort)
|
|
|
+ {
|
|
|
+ OwnedConstThorRow newStableSortTmp = allocateStableTable(false);
|
|
|
+ if (!newStableSortTmp)
|
|
|
+ return false;
|
|
|
+ void **oldStableSortTmp = stableSortTmp;
|
|
|
+ stableSortTmp = (void **)newStableSortTmp.getClear();
|
|
|
+ ReleaseThorRow(oldStableSortTmp);
|
|
|
+ }
|
|
|
+ maxRows = getRowsCapacity();
|
|
|
}
|
|
|
- ReleaseThorRow(oldRows);
|
|
|
- ReleaseThorRow(oldStableSortTmp);
|
|
|
return true;
|
|
|
}
|
|
|
|
|
|
void CThorSpillableRowArray::sort(ICompare &compare, unsigned maxCores)
|
|
|
{
|
|
|
- unsigned n = numCommitted();
|
|
|
+ // NB: only to be called inside lock
|
|
|
+ rowidx_t n = numCommitted();
|
|
|
if (n>1)
|
|
|
{
|
|
|
void **const rows = (void **const)getBlock(n);
|
|
@@ -933,7 +980,7 @@ void CThorSpillableRowArray::sort(ICompare &compare, unsigned maxCores)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-unsigned CThorSpillableRowArray::save(IFile &iFile, rowidx_t watchRecNum, offset_t *watchFilePosResult)
|
|
|
+rowidx_t CThorSpillableRowArray::save(IFile &iFile, rowidx_t watchRecNum, offset_t *watchFilePosResult)
|
|
|
{
|
|
|
rowidx_t n = numCommitted();
|
|
|
if (0 == n)
|
|
@@ -1233,7 +1280,7 @@ public:
|
|
|
}
|
|
|
maxCores = activity.queryMaxCores();
|
|
|
|
|
|
- spillableRows.setup(rowIf, false, isStable);
|
|
|
+ spillableRows.setup(rowIf, false, isStable?stableSort_earlyAlloc:stableSort_none);
|
|
|
}
|
|
|
~CThorRowCollectorBase()
|
|
|
{
|
|
@@ -1284,7 +1331,7 @@ public:
|
|
|
mmRegistered = false;
|
|
|
activity.queryJob().queryRowManager()->removeRowBuffer(this);
|
|
|
}
|
|
|
- spillableRows.setup(rowIf, false, isStable);
|
|
|
+ spillableRows.setup(rowIf, false, isStable?stableSort_earlyAlloc:stableSort_none);
|
|
|
}
|
|
|
// IBufferedRowCallback
|
|
|
virtual unsigned getPriority() const
|