Quellcode durchsuchen

Fix #1083, use 64bit row count on 32/64 systems

And make ThorExpandingRowArray max row count 32bit (in mem only)
Remove a couple of 32bit restrictions, in particular, sort's local count was
restricted to 2^32

Signed-off-by: Jake Smith <jake.smith@lexisnexis.com>
Jake Smith vor 13 Jahren
Ursprung
Commit
90c5352c61

+ 1 - 1
thorlcr/activities/lookupjoin/thlookupjoinslave.cpp

@@ -1082,7 +1082,7 @@ public:
         if (exception.get())
         {
             StringBuffer errStr(joinStr);
-            errStr.append("(").append(container.queryId()).appendf(") right-hand side is too large (%"I64F"u bytes in %"RCPF"d rows) for %s : (",(unsigned __int64) rhs.serializedSize(),rhs.ordinality(),joinStr.get());
+            errStr.append("(").append(container.queryId()).appendf(") right-hand side is too large (%"I64F"u bytes in %"RIPF"d rows) for %s : (",(unsigned __int64) rhs.serializedSize(),rhs.ordinality(),joinStr.get());
             errStr.append(exception->errorCode()).append(", ");
             exception->errorMessage(errStr);
             errStr.append(")");

+ 1 - 1
thorlcr/activities/loop/thloop.cpp

@@ -175,7 +175,7 @@ class CLoopActivityMaster : public CLoopActivityMasterBase
             bool overLimit = slaveEmptyIterations > maxEmptyLoopIterations;
             emptyIterations->set(sender-1, overLimit);
             if (emptyIterations->scan(0, 0) >= nodes) // all empty
-                throw MakeActivityException(this, 0, "Executed LOOP with empty input and output > %maxEmptyLoopIterations times on all nodes", maxEmptyLoopIterations);
+                throw MakeActivityException(this, 0, "Executed LOOP with empty input and output > %d maxEmptyLoopIterations times on all nodes", maxEmptyLoopIterations);
         }
     }
 public:

+ 1 - 1
thorlcr/activities/loop/thloopslave.cpp

@@ -219,7 +219,7 @@ class CLoopSlaveActivity : public CLoopSlaveActivityBase
     Owned<IRowStream> curInput;
     Owned<CNextRowFeeder> nextRowFeeder;
     Owned<IRowWriterMultiReader> loopPending;
-    unsigned loopPendingCount;
+    rowcount_t loopPendingCount;
     unsigned flags, lastMs;
     IHThorLoopArg *helper;
     bool eof, finishedLooping;

+ 4 - 3
thorlcr/graph/thgraph.cpp

@@ -77,7 +77,7 @@ class CThorGraphResult : public CInterface, implements IThorResult, implements I
         virtual void flush() { }
         virtual IRowStream *getReader()
         {
-            return rows.createRowStream(0, (rowcount_t)-1, false);
+            return rows.createRowStream(0, (rowidx_t)-1, false);
         }
     };
 public:
@@ -171,9 +171,10 @@ public:
     }
     virtual void getLinkedResult(unsigned &countResult, byte * * & result)
     {
+        assertex(rowStreamCount==((unsigned)rowStreamCount)); // catch, just in case
         Owned<IRowStream> stream = getRowStream();
         countResult = 0;
-        OwnedConstThorRow _rowset = allocator->createRowset(rowStreamCount);
+        OwnedConstThorRow _rowset = allocator->createRowset((unsigned)rowStreamCount);
         const void **rowset = (const void **)_rowset.get();
         loop
         {
@@ -243,7 +244,7 @@ public:
         IThorResult *loopResult = results->createResult(activity, 0, resultRowIf, !activity.queryGraph().isLocalChild()); // loop output
         IThorResult *inputResult = results->createResult(activity, 1, resultRowIf, !activity.queryGraph().isLocalChild());
     }
-    virtual IRowStream *execute(CActivityBase &activity, unsigned counter, IRowWriterMultiReader *inputStream, unsigned rowStreamCount, size32_t parentExtractSz, const byte *parentExtract)
+    virtual IRowStream *execute(CActivityBase &activity, unsigned counter, IRowWriterMultiReader *inputStream, rowcount_t rowStreamCount, size32_t parentExtractSz, const byte *parentExtract)
     {
         Owned<IThorGraphResults> results = graph->createThorGraphResults(3);
         prepareLoopResults(activity, results);

+ 1 - 1
thorlcr/graph/thgraph.hpp

@@ -142,7 +142,7 @@ interface IThorBoundLoopGraph : extends IInterface
 {
     virtual void prepareLoopResults(CActivityBase &activity, IThorGraphResults *results) = 0;
     virtual void prepareCounterResult(CActivityBase &activity, IThorGraphResults *results, unsigned loopCounter, unsigned pos) = 0;
-    virtual IRowStream *execute(CActivityBase &activity, unsigned counter, IRowWriterMultiReader *rowStream, unsigned rowStreamCount, size32_t parentExtractSz, const byte * parentExtract) = 0;
+    virtual IRowStream *execute(CActivityBase &activity, unsigned counter, IRowWriterMultiReader *rowStream, rowcount_t rowStreamCount, size32_t parentExtractSz, const byte * parentExtract) = 0;
     virtual void execute(CActivityBase &activity, unsigned counter, IThorGraphResults * graphLoopResults, size32_t parentExtractSz, const byte * parentExtract) = 0;
     virtual CGraphBase *queryGraph() = 0;
 };

+ 1 - 1
thorlcr/graph/thgraphslave.cpp

@@ -1052,7 +1052,7 @@ void CJobSlave::startJob()
     unsigned __int64 freeSpaceRep = getFreeSpace(queryBaseDirectory(true));
     PROGLOG("Disk space: %s = %"I64F"d, %s = %"I64F"d", queryBaseDirectory(), freeSpace/0x100000, queryBaseDirectory(true), freeSpaceRep/0x100000);
 
-    unsigned minFreeSpace = getWorkUnitValueInt("MINIMUM_DISK_SPACE", 0);
+    unsigned minFreeSpace = (unsigned)getWorkUnitValueInt("MINIMUM_DISK_SPACE", 0);
     if (minFreeSpace)
     {
         if (freeSpace < ((unsigned __int64)minFreeSpace)*0x100000)

+ 8 - 8
thorlcr/msort/tsortl.cpp

@@ -63,11 +63,11 @@ public:
 
 struct TransferStreamHeader
 {
-    rowmap_t numrecs;
+    rowcount_t numrecs;
     rowcount_t pos;
-    size32_t   recsize;
+    size32_t recsize;
     unsigned id;
-    TransferStreamHeader(rowcount_t _pos, rowmap_t _numrecs, unsigned _recsize, unsigned _id) 
+    TransferStreamHeader(rowcount_t _pos, rowcount_t _numrecs, unsigned _recsize, unsigned _id)
         : pos(_pos), numrecs(_numrecs), recsize(_recsize), id(_id)
     {
     }
@@ -281,22 +281,22 @@ public:
 };
 
 
-IRowStream *ConnectMergeRead(unsigned id,IRowInterfaces *rowif,SocketEndpoint &nodeaddr,rowcount_t startrec,rowmap_t numrecs)
+IRowStream *ConnectMergeRead(unsigned id,IRowInterfaces *rowif,SocketEndpoint &nodeaddr,rowcount_t startrec,rowcount_t numrecs)
 {
     Owned<ISocket> socket = DoConnect(nodeaddr);
     TransferStreamHeader hdr(startrec,numrecs,0,id);
 #ifdef _FULL_TRACE
     StringBuffer s;
     nodeaddr.getUrlStr(s);
-    PROGLOG("ConnectMergeRead(%d,%s,%x,%"RCPF"d,%"RMF"u)",id,s.str(),(unsigned)(memsize_t)socket.get(),startrec,numrecs);
+    PROGLOG("ConnectMergeRead(%d,%s,%x,%"RCPF"d,%"RCPF"u)",id,s.str(),(unsigned)(memsize_t)socket.get(),startrec,numrecs);
 #endif
     hdr.winrev();
     socket->write(&hdr,sizeof(hdr));
-    return  new CSocketRowStream(id,rowif->queryRowAllocator(),rowif->queryRowDeserializer(),socket);
+    return new CSocketRowStream(id,rowif->queryRowAllocator(),rowif->queryRowDeserializer(),socket);
 }
 
 
-ISocketRowWriter *ConnectMergeWrite(IRowInterfaces *rowif,ISocket *socket,size32_t bufsize,rowcount_t &startrec,rowmap_t &numrecs)
+ISocketRowWriter *ConnectMergeWrite(IRowInterfaces *rowif,ISocket *socket,size32_t bufsize,rowcount_t &startrec,rowcount_t &numrecs)
 {
     TransferStreamHeader hdr;
     socket->read(&hdr,sizeof(hdr));
@@ -306,7 +306,7 @@ ISocketRowWriter *ConnectMergeWrite(IRowInterfaces *rowif,ISocket *socket,size32
 #ifdef _FULL_TRACE
     char name[100];
     int port = socket->peer_name(name,sizeof(name));
-    PROGLOG("ConnectMergeWrite(%d,%s:%d,%x,%"RCPF"d,%"RMF"u)",hdr.id,name,port,(unsigned)(memsize_t)socket,startrec,numrecs);
+    PROGLOG("ConnectMergeWrite(%d,%s:%d,%x,%"RCPF"d,%"RCPF"u)",hdr.id,name,port,(unsigned)(memsize_t)socket,startrec,numrecs);
 #endif
     return new CSocketRowWriter(hdr.id,rowif,socket,bufsize);
 }

+ 36 - 39
thorlcr/msort/tsortm.cpp

@@ -85,7 +85,7 @@ public:
     unsigned short  mpport;
     mptag_t         mpTagRPC;
     unsigned        beat;
-    rowmap_t        numrecs;
+    rowcount_t      numrecs;
     offset_t        slavesize;
     bool            overflow;
     unsigned        scale;     // num times overflowed
@@ -157,8 +157,7 @@ public:
         scale = 1;
     }
 
-    void AdjustNumRecs(rowmap_t n);
-
+    void AdjustNumRecs(rowcount_t n);
 };
 
 class CTimer
@@ -414,7 +413,7 @@ public:
         total = 0;
         stotal = 0;
         totalmem = 0;
-        minrecsonnode = UINT_MAX;
+        minrecsonnode = RCMAX;
         maxrecsonnode = 0;
         numnodes = slaves.ordinality();
         estrecsize = 100;
@@ -534,11 +533,12 @@ public:
     unsigned __int64 CalcMinMax(OwnedConstThorRow &min, OwnedConstThorRow &max)
     {
         // initialize min/max keys
-        unsigned __int64 tot=0;
+        rowcount_t tot=0;
         unsigned i;
         size32_t ers = 0;
         unsigned ersn=0;
-        for (i=0;i<numnodes;i++) {
+        for (i=0;i<numnodes;i++)
+        {
             CSortNode &slave = slaves.item(i);
             if (slave.numrecs==0)
                 continue;
@@ -546,13 +546,14 @@ public:
             void *p = NULL;
             size32_t retlen = 0;
             size32_t avrecsize=0;
-            rowmap_t num=slave.GetMinMax(retlen,p,avrecsize);
+            rowcount_t num=slave.GetMinMax(retlen,p,avrecsize);
             if (avrecsize) {
                 ers += avrecsize;       // should probably do mode but this is OK
                 ersn++;
             }
             tot += num;
-            if (num>0) {
+            if (num>0)
+            {
                 minmax.deserialize(retlen, p);
                 free(p);
                 const void *p = minmax.query(0);
@@ -595,12 +596,10 @@ public:
     }
 
 
-    rowmap_t *CalcPartitionUsingSampling()
+    rowcount_t *CalcPartitionUsingSampling()
     {   // doesn't support between
 #define OVERSAMPLE 16
-        OwnedMalloc<rowmap_t> splitMap(numnodes*numnodes, true);
-        if (sizeof(rowmap_t)<=4) 
-            assertex(total/numnodes<INT_MAX); // keep record numbers on individual nodes in 31 bits
+        OwnedMalloc<rowcount_t> splitMap(numnodes*numnodes, true);
         unsigned numsplits=numnodes-1;
         if (total==0) {
             // no partition info!
@@ -608,7 +607,7 @@ public:
             return splitMap.getClear();
         }
         unsigned averagesamples = OVERSAMPLE*numnodes;  
-        rowmap_t averagerecspernode = (rowmap_t)(total/numnodes);
+        rowcount_t averagerecspernode = (rowcount_t)(total/numnodes);
         CriticalSection asect;
         CThorExpandingRowArray sample(*activity, rowif, true);
 #ifdef ASYNC_PARTIONING
@@ -618,9 +617,9 @@ public:
             CThorExpandingRowArray &sample;
             CriticalSection &asect;
             unsigned averagesamples;
-            rowmap_t averagerecspernode;
+            rowcount_t averagerecspernode;
         public:
-            casyncfor1(NodeArray &_slaves, CThorExpandingRowArray &_sample, unsigned _averagesamples, rowmap_t _averagerecspernode, CriticalSection &_asect)
+            casyncfor1(NodeArray &_slaves, CThorExpandingRowArray &_sample, unsigned _averagesamples, rowcount_t _averagerecspernode, CriticalSection &_asect)
                 : slaves(_slaves), sample(_sample), asect(_asect)
             { 
                 averagesamples = _averagesamples;
@@ -629,7 +628,7 @@ public:
             void Do(unsigned i)
             {
                 CSortNode &slave = slaves.item(i);
-                unsigned slavesamples = averagerecspernode?((unsigned)((averagerecspernode/2+averagesamples*(count_t)slave.numrecs)/averagerecspernode)):1;  
+                unsigned slavesamples = averagerecspernode?((unsigned)((averagerecspernode/2+averagesamples*slave.numrecs)/averagerecspernode)):1;
                 //PrintLog("%d samples for %d",slavesamples,i);
                 if (slavesamples) {
                     size32_t samplebufsize;
@@ -646,7 +645,7 @@ public:
         unsigned i;
         for (i=0;i<numnodes;i++) {
             CSortNode &slave = slaves.item(i);
-            unsigned slavesamples = (unsigned)((count_t)averagesamples*(count_t)slave.numrecs/(count_t)averagerecspernode);
+            unsigned slavesamples = (unsigned)((count_t)averagesamples*slave.numrecs/(count_t)averagerecspernode);
             PrintLog("%d samples for %d",slavesamples,i);
             if (!slavesamples)
                 continue;
@@ -731,11 +730,11 @@ public:
         class casyncfor3: public CAsyncFor
         {
             NodeArray &slaves;
-            rowmap_t *splitmap;
+            rowcount_t *splitmap;
             unsigned numnodes;
             unsigned numsplits;
         public:
-            casyncfor3(NodeArray &_slaves,rowmap_t *_splitmap,unsigned _numnodes,unsigned _numsplits)
+            casyncfor3(NodeArray &_slaves,rowcount_t *_splitmap,unsigned _numnodes,unsigned _numsplits)
                 : slaves(_slaves)
             { 
                 splitmap = _splitmap;
@@ -746,7 +745,7 @@ public:
             {
                 CSortNode &slave = slaves.item(i);
                 if (slave.numrecs!=0) {
-                    rowmap_t *res=splitmap+(i*numnodes);
+                    rowcount_t *res=splitmap+(i*numnodes);
                     slave.MultiBinChopStop(numsplits,res);
                     res[numnodes-1] = slave.numrecs;
                 }
@@ -757,7 +756,7 @@ public:
         for (i=0;i<numnodes;i++) {
             CSortNode &slave = slaves.item(i);
             if (slave.numrecs!=0) {
-                rowmap_t *res=splitMap+(i*numnodes);
+                rowcount_t *res=splitMap+(i*numnodes);
                 slave.MultiBinChopStop(numsplits,res);
                 res[numnodes-1] = slave.numrecs;
             }
@@ -779,17 +778,15 @@ public:
     }
 
 
-    rowmap_t *CalcPartition(bool logging)
+    rowcount_t *CalcPartition(bool logging)
     {
         CriticalBlock block(ECFcrit);       
         // this is a bit long winded
-        if (sizeof(rowmap_t)<=4) 
-            assertex(stotal/numnodes<INT_MAX); // keep record numbers on individual nodes in 31 bits
 
         OwnedConstThorRow mink;
         OwnedConstThorRow maxk;
         // so as won't overflow
-        OwnedMalloc<rowmap_t> splitmap(numnodes*numnodes, true);
+        OwnedMalloc<rowcount_t> splitmap(numnodes*numnodes, true);
         if (CalcMinMax(mink,maxk)==0) {
             // no partition info!
             partitioninfo->kill();
@@ -956,7 +953,7 @@ public:
                 for (i=0;i<numnodes;i++) {
                     CSortNode &slave = slaves.item(i);
                     if (slave.numrecs!=0) {
-                        rowmap_t *res=splitmap+(i*numnodes);
+                        rowcount_t *res=splitmap+(i*numnodes);
                         slave.MultiBinChopStop(numsplits,res);
                         res[numnodes-1] = slave.numrecs;
                     }
@@ -1041,7 +1038,7 @@ public:
     }
 
 
-    rowmap_t *UsePartitionInfo(PartitionInfo &pi, bool uppercmp)
+    rowcount_t *UsePartitionInfo(PartitionInfo &pi, bool uppercmp)
     {
         unsigned i;
 #ifdef _TRACE
@@ -1057,10 +1054,10 @@ public:
         // first find split points
         unsigned numnodes = pi.numnodes;
         unsigned numsplits = numnodes-1;
-        OwnedMalloc<rowmap_t> splitMap(numnodes*numnodes, true);
-        OwnedMalloc<rowmap_t> res(numsplits);
+        OwnedMalloc<rowcount_t> splitMap(numnodes*numnodes, true);
+        OwnedMalloc<rowcount_t> res(numsplits);
         unsigned j;
-        rowmap_t *mapp=splitMap;
+        rowcount_t *mapp=splitMap;
         for (i=0;i<numnodes;i++) {
             CSortNode &slave = slaves.item(i);
             if (numsplits>0) {
@@ -1068,13 +1065,13 @@ public:
                 pi.splitkeys.serialize(mb);
                 assertex(pi.splitkeys.ordinality()==numsplits);
                 slave.MultiBinChop(mb.length(),(const byte *)mb.bufferBase(),numsplits,res,uppercmp?CMPFN_UPPER:CMPFN_COLLATE,true);
-                rowmap_t *resp = res;
-                rowmap_t p=*resp;
+                rowcount_t *resp = res;
+                rowcount_t p=*resp;
                 *mapp = p;
                 resp++;
                 mapp++;
                 for (j=1;j<numsplits;j++) {
-                    rowmap_t n = *resp;
+                    rowcount_t n = *resp;
                     *mapp = n;
                     if (p>n) {
                         ActPrintLog(activity, "ERROR: Split positions out of order!");
@@ -1091,7 +1088,7 @@ public:
 #ifdef _TRACE
 #ifdef TRACE_PARTITION
         ActPrintLog(activity, "UsePartitionInfo result");
-        rowmap_t *p = splitMap;
+        rowcount_t *p = splitMap;
         for (i=0;i<numnodes;i++) {
             StringBuffer s;
             s.appendf("%d: ",i);
@@ -1165,7 +1162,7 @@ public:
         partitioninfo->numnodes = numnodes;
     }
 
-    IThorException *CheckSkewed(unsigned __int64 threshold, double skewWarning, double skewError, rowmap_t n, unsigned __int64 total, rowcount_t max)
+    IThorException *CheckSkewed(unsigned __int64 threshold, double skewWarning, double skewError, unsigned n, rowcount_t total, rowcount_t max)
     {
         if (n<=0)
             return NULL;
@@ -1238,7 +1235,7 @@ public:
 #endif
         bool useAux = false; // JCSMORE using existing partioning and auxillary rowIf (only used if overflow)
         loop {
-            OwnedMalloc<rowmap_t> splitMap, splitMapUpper;
+            OwnedMalloc<rowcount_t> splitMap, splitMapUpper;
             CTimer timer;
             if (numnodes>1) {
                 timer.start();
@@ -1356,7 +1353,7 @@ public:
                     }
                 }
 
-                OwnedMalloc<rowmap_t> tot(numnodes, true);
+                OwnedMalloc<rowcount_t> tot(numnodes, true);
                 rowcount_t max=0;
                 unsigned imax=numnodes;
                 for (i=0;i<imax;i++) {
@@ -1461,9 +1458,9 @@ IThorSorterMaster *CreateThorSorterMaster(CActivityBase *activity)
 
 
 
-void CSortNode::AdjustNumRecs(rowmap_t num)
+void CSortNode::AdjustNumRecs(rowcount_t num)
 {
-    rowmap_t old = numrecs;
+    rowcount_t old = numrecs;
     numrecs = num;
     sorter.total += num-old;
     if (num>sorter.maxrecsonnode)

+ 33 - 58
thorlcr/msort/tsortmp.cpp

@@ -19,7 +19,6 @@ enum MPSlaveFunctions
     FN_GetMultiMidPoint,
     FN_GetMultiMidPointStart,
     FN_GetMultiMidPointStop,
-    FN_SingleBinChop,
     FN_MultiBinChop,
     FN_MultiBinChopStart,
     FN_MultiBinChopStop,
@@ -106,7 +105,7 @@ void SortSlaveMP::StartGather()
     sendRecv(mb);
 }
 
-void SortSlaveMP::GetGatherInfo(rowmap_t &numlocal, offset_t &totalsize, unsigned &overflowscale, bool hasserializer)
+void SortSlaveMP::GetGatherInfo(rowcount_t &numlocal, offset_t &totalsize, unsigned &overflowscale, bool hasserializer)
 {
     CMessageBuffer mb;
     mb.append((byte)FN_GetGatherInfo);
@@ -115,14 +114,14 @@ void SortSlaveMP::GetGatherInfo(rowmap_t &numlocal, offset_t &totalsize, unsigne
     mb.read(numlocal).read(totalsize).read(overflowscale);
 }
 
-rowmap_t SortSlaveMP::GetMinMax(size32_t &keybuffsize,void *&keybuff, size32_t &avrecsizesize)
+rowcount_t SortSlaveMP::GetMinMax(size32_t &keybuffsize,void *&keybuff, size32_t &avrecsizesize)
 {
     CMessageBuffer mb;
     mb.append((byte)FN_GetMinMax);
     sendRecv(mb);
     deserializeblk(mb,keybuffsize,keybuff);
     mb.read(avrecsizesize);
-    rowmap_t ret;
+    rowcount_t ret;
     mb.read(ret);
     return ret;
 }
@@ -167,28 +166,15 @@ void SortSlaveMP::GetMultiMidPointStop(size32_t &mkeybuffsize, void * &mkeybuf)
     deserializeblk(mb,mkeybuffsize,mkeybuf);
 }
 
-rowmap_t SortSlaveMP::SingleBinChop(size32_t keysize,const byte *key,byte cmpfn)
-{
-    CMessageBuffer mb;
-    mb.append((byte)FN_SingleBinChop);
-    serializeblk(mb,keysize,key).append(cmpfn);
-    sendRecv(mb);
-    rowmap_t ret;
-    mb.read(ret);
-    return ret;
-}
-
-void SortSlaveMP::MultiBinChop(size32_t keybuffsize,const byte *keybuff, unsigned num,rowmap_t *pos,byte cmpfn,bool useaux)
+void SortSlaveMP::MultiBinChop(size32_t keybuffsize,const byte *keybuff, unsigned num,rowcount_t *pos,byte cmpfn,bool useaux)
 {
     CMessageBuffer mb;
     mb.append((byte)FN_MultiBinChop);
     serializeblk(mb,keybuffsize,keybuff).append(num).append(cmpfn).append(useaux);
     sendRecv(mb);
-    mb.read(num*sizeof(rowmap_t),pos);
+    mb.read(num*sizeof(rowcount_t),pos);
 }
 
-
-
 void SortSlaveMP::MultiBinChopStart(size32_t keybuffsize,const byte *keybuff, byte cmpfn) /* async */
 {
     CMessageBuffer mb;
@@ -197,41 +183,41 @@ void SortSlaveMP::MultiBinChopStart(size32_t keybuffsize,const byte *keybuff, by
     sendRecv(mb);
 }
 
-void SortSlaveMP::MultiBinChopStop(unsigned num,rowmap_t *pos)
+void SortSlaveMP::MultiBinChopStop(unsigned num,rowcount_t *pos)
 {
     CMessageBuffer mb;
     mb.append((byte)FN_MultiBinChopStop);
     mb.append(num);
     sendRecv(mb);
-    mb.read(num*sizeof(rowmap_t),pos);
+    mb.read(num*sizeof(rowcount_t),pos);
 }
 
-void SortSlaveMP::OverflowAdjustMapStart( unsigned mapsize,rowmap_t *map,size32_t keybuffsize,const byte *keybuff, byte cmpfn,bool useaux) /* async */
+void SortSlaveMP::OverflowAdjustMapStart(unsigned mapsize,rowcount_t *map,size32_t keybuffsize,const byte *keybuff, byte cmpfn,bool useaux) /* async */
 {
     CMessageBuffer mb;
     mb.append((byte)FN_OverflowAdjustMapStart);
-    mb.append(mapsize).append(mapsize*sizeof(rowmap_t),map);
+    mb.append(mapsize).append(mapsize*sizeof(rowcount_t),map);
     serializeblk(mb,keybuffsize,keybuff).append(cmpfn).append(useaux);
     sendRecv(mb);
 
 }
 
-rowmap_t SortSlaveMP::OverflowAdjustMapStop( unsigned mapsize, rowmap_t *map)
+rowcount_t SortSlaveMP::OverflowAdjustMapStop( unsigned mapsize, rowcount_t *map)
 {
     CMessageBuffer mb;
     mb.append((byte)FN_OverflowAdjustMapStop);
     mb.append(mapsize);
     sendRecv(mb);
-    rowmap_t ret;
-    mb.read(ret).read(mapsize*sizeof(rowmap_t),map);
+    rowcount_t ret;
+    mb.read(ret).read(mapsize*sizeof(rowcount_t),map);
     return ret;
 }
 
-void SortSlaveMP::MultiMerge(unsigned mapsize,rowmap_t *map,unsigned num,SocketEndpoint* endpoints) /* async */
+void SortSlaveMP::MultiMerge(unsigned mapsize,rowcount_t *map,unsigned num,SocketEndpoint* endpoints) /* async */
 {
     CMessageBuffer mb;
     mb.append((byte)FN_MultiMerge);
-    mb.append(mapsize).append(mapsize*sizeof(rowmap_t),map);
+    mb.append(mapsize).append(mapsize*sizeof(rowcount_t),map);
     mb.append(num);
     while (num--) {
         endpoints->serialize(mb);
@@ -241,12 +227,12 @@ void SortSlaveMP::MultiMerge(unsigned mapsize,rowmap_t *map,unsigned num,SocketE
 }
 
 
-void SortSlaveMP::MultiMergeBetween(unsigned mapsize,rowmap_t *map,rowmap_t *mapupper,unsigned num,SocketEndpoint* endpoints) /* async */
+void SortSlaveMP::MultiMergeBetween(unsigned mapsize,rowcount_t *map,rowcount_t *mapupper,unsigned num,SocketEndpoint* endpoints) /* async */
 {
     CMessageBuffer mb;
     mb.append((byte)FN_MultiMergeBetween);
-    mb.append(mapsize).append(mapsize*sizeof(rowmap_t),map);
-    mb.append(mapsize*sizeof(rowmap_t),mapupper);
+    mb.append(mapsize).append(mapsize*sizeof(rowcount_t),map);
+    mb.append(mapsize*sizeof(rowcount_t),mapupper);
     mb.append(num);
     while (num--) {
         endpoints->serialize(mb);
@@ -352,7 +338,7 @@ bool SortSlaveMP::marshall(ISortSlaveMP &slave, ICommunicator* comm, mptag_t tag
             case FN_GetGatherInfo: {
                 bool hasserializer;
                 mb.read(hasserializer);
-                rowmap_t numlocal;
+                rowcount_t numlocal;
                 unsigned overflowscale;
                 offset_t totalsize;
                 slave.GetGatherInfo(numlocal,totalsize,overflowscale,hasserializer);
@@ -363,7 +349,7 @@ bool SortSlaveMP::marshall(ISortSlaveMP &slave, ICommunicator* comm, mptag_t tag
                 size32_t keybuffsize;
                 void *keybuff;
                 size32_t avrecsize;
-                rowmap_t ret = slave.GetMinMax(keybuffsize,keybuff,avrecsize);
+                rowcount_t ret = slave.GetMinMax(keybuffsize,keybuff,avrecsize);
                 serializeblk(mbout,keybuffsize,keybuff).append(avrecsize).append(ret);
                 free(keybuff);
             }
@@ -417,8 +403,8 @@ bool SortSlaveMP::marshall(ISortSlaveMP &slave, ICommunicator* comm, mptag_t tag
             case FN_MultiBinChopStop: {
                 unsigned num;
                 mb.read(num);
-                void *out = mbout.reserveTruncate(num*sizeof(rowmap_t));
-                slave.MultiBinChopStop(num,(rowmap_t *)out);
+                void *out = mbout.reserveTruncate(num*sizeof(rowcount_t));
+                slave.MultiBinChopStop(num,(rowcount_t *)out);
             }
             break;
             case FN_GetMultiMidPointStop: {
@@ -429,17 +415,6 @@ bool SortSlaveMP::marshall(ISortSlaveMP &slave, ICommunicator* comm, mptag_t tag
                 free(mkeybuff);
             }
             break;
-            case FN_SingleBinChop: {
-                size32_t keysize;
-                byte * key;
-                deserializeblk(mb,keysize,key);
-                byte cmpfn;
-                mb.read(cmpfn);
-                rowmap_t ret = slave.SingleBinChop(keysize,key,cmpfn);
-                mbout.append(ret);
-                free(key);
-            }
-            break;
             case FN_MultiBinChopStart: {
                 replydone = true;
                 comm->reply(mbout);
@@ -460,8 +435,8 @@ bool SortSlaveMP::marshall(ISortSlaveMP &slave, ICommunicator* comm, mptag_t tag
                 byte cmpfn;
                 bool useaux;
                 mb.read(num).read(cmpfn).read(useaux);
-                void *out = mbout.reserveTruncate(num*sizeof(rowmap_t));
-                slave.MultiBinChop(keybuffsize,(const byte *)keybuff,num,(rowmap_t *)out,cmpfn,useaux);
+                void *out = mbout.reserveTruncate(num*sizeof(rowcount_t));
+                slave.MultiBinChop(keybuffsize,(const byte *)keybuff,num,(rowcount_t *)out,cmpfn,useaux);
                 free(keybuff);
             }
             break;
@@ -470,7 +445,7 @@ bool SortSlaveMP::marshall(ISortSlaveMP &slave, ICommunicator* comm, mptag_t tag
                 comm->reply(mbout);
                 unsigned mapsize;
                 mb.read(mapsize);
-                const void * map = mb.readDirect(mapsize*sizeof(rowmap_t));
+                const void * map = mb.readDirect(mapsize*sizeof(rowcount_t));
                 size32_t keybuffsize;
                 void * keybuff;
                 deserializeblk(mb,keybuffsize,keybuff);
@@ -478,18 +453,18 @@ bool SortSlaveMP::marshall(ISortSlaveMP &slave, ICommunicator* comm, mptag_t tag
                 mb.read(cmpfn);
                 bool useaux;
                 mb.read(useaux);
-                slave.OverflowAdjustMapStart(mapsize,(rowmap_t *)map,keybuffsize,(const byte *)keybuff,cmpfn,useaux);
+                slave.OverflowAdjustMapStart(mapsize,(rowcount_t *)map,keybuffsize,(const byte *)keybuff,cmpfn,useaux);
                 free(keybuff);
             }
             break;
             case FN_OverflowAdjustMapStop: {
                 unsigned mapsize;
                 mb.read(mapsize);
-                rowmap_t ret=0;
+                rowcount_t ret=0;
                 size32_t retofs = mbout.length();
                 mbout.append(ret);
-                void *map=mbout.reserveTruncate(mapsize*sizeof(rowmap_t));
-                ret = slave.OverflowAdjustMapStop(mapsize,(rowmap_t *)map);     // could avoid copy here if passed mb
+                void *map=mbout.reserveTruncate(mapsize*sizeof(rowcount_t));
+                ret = slave.OverflowAdjustMapStop(mapsize,(rowcount_t *)map);     // could avoid copy here if passed mb
                 mbout.writeDirect(retofs,sizeof(ret),&ret);
             }
             break;
@@ -498,7 +473,7 @@ bool SortSlaveMP::marshall(ISortSlaveMP &slave, ICommunicator* comm, mptag_t tag
                 comm->reply(mbout);
                 unsigned mapsize;
                 mb.read(mapsize);
-                const void *map = mb.readDirect(mapsize*sizeof(rowmap_t));
+                const void *map = mb.readDirect(mapsize*sizeof(rowcount_t));
                 unsigned num;
                 mb.read(num);
                 SocketEndpointArray epa;
@@ -507,7 +482,7 @@ bool SortSlaveMP::marshall(ISortSlaveMP &slave, ICommunicator* comm, mptag_t tag
                     ep.deserialize(mb);
                     epa.append(ep);
                 }
-                slave.MultiMerge(mapsize,(rowmap_t *)map,num,epa.getArray());
+                slave.MultiMerge(mapsize,(rowcount_t *)map,num,epa.getArray());
             }
             break;
             case FN_MultiMergeBetween: {
@@ -515,8 +490,8 @@ bool SortSlaveMP::marshall(ISortSlaveMP &slave, ICommunicator* comm, mptag_t tag
                 comm->reply(mbout);
                 unsigned mapsize;
                 mb.read(mapsize);
-                const void *map = mb.readDirect(mapsize*sizeof(rowmap_t));
-                const void *mapupper = mb.readDirect(mapsize*sizeof(rowmap_t));
+                const void *map = mb.readDirect(mapsize*sizeof(rowcount_t));
+                const void *mapupper = mb.readDirect(mapsize*sizeof(rowcount_t));
                 unsigned num;
                 mb.read(num);
                 SocketEndpointArray epa;
@@ -525,7 +500,7 @@ bool SortSlaveMP::marshall(ISortSlaveMP &slave, ICommunicator* comm, mptag_t tag
                     ep.deserialize(mb);
                     epa.append(ep);
                 }
-                slave.MultiMergeBetween(mapsize,(rowmap_t *)map,(rowmap_t *)mapupper,num,epa.getArray());
+                slave.MultiMergeBetween(mapsize,(rowcount_t *)map,(rowcount_t *)mapupper,num,epa.getArray());
             }
             break;
             case FN_SingleMerge: {

+ 16 - 18
thorlcr/msort/tsortmp.hpp

@@ -15,20 +15,19 @@ interface ISortSlaveMP
 {
     virtual bool Connect(unsigned _part, unsigned _numnodes)=0;
     virtual void StartGather()=0;
-    virtual void GetGatherInfo(rowmap_t &numlocal, offset_t &totalsize, unsigned &overflowscale, bool hasserializer)=0;
-    virtual rowmap_t GetMinMax(size32_t &keybuffsize,void *&keybuff, size32_t &avrecsizesize)=0;
+    virtual void GetGatherInfo(rowcount_t &numlocal, offset_t &totalsize, unsigned &overflowscale, bool hasserializer)=0;
+    virtual rowcount_t GetMinMax(size32_t &keybuffsize,void *&keybuff, size32_t &avrecsizesize)=0;
     virtual bool GetMidPoint     (size32_t lkeysize, const byte * lkey, size32_t hkeysize, const byte * hkey, size32_t &mkeysize, byte * &mkey)=0;
     virtual void GetMultiMidPoint(size32_t lkeybuffsize, const void * lkeybuff, size32_t hkeybuffsize, const void * hkeybuff, size32_t &mkeybuffsize, void * &mkeybuf)=0;
     virtual void GetMultiMidPointStart(size32_t lkeybuffsize, const void * lkeybuff, size32_t hkeybuffsize, const void * hkeybuff)=0; /* async */
     virtual void GetMultiMidPointStop(size32_t &mkeybuffsize, void * &mkeybuf)=0;
-    virtual rowmap_t SingleBinChop(size32_t keysize,const byte *key,byte cmpfn)=0;
-    virtual void MultiBinChop(size32_t keybuffsize,const byte *keybuff, unsigned num,rowmap_t *pos,byte cmpfn,bool useaux)=0;
+    virtual void MultiBinChop(size32_t keybuffsize,const byte *keybuff, unsigned num,rowcount_t *pos,byte cmpfn,bool useaux)=0;
     virtual void MultiBinChopStart(size32_t keybuffsize,const byte *keybuff, byte cmpfn)=0; /* async */
-    virtual void MultiBinChopStop(unsigned num,rowmap_t *pos)=0;
-    virtual void OverflowAdjustMapStart( unsigned mapsize,rowmap_t *map,size32_t keybuffsize,const byte *keybuff, byte cmpfn, bool useaux)=0; /* async */
-    virtual rowmap_t OverflowAdjustMapStop( unsigned mapsize, rowmap_t *map)=0;
-    virtual void MultiMerge(unsigned mapsize,rowmap_t *map,unsigned num,SocketEndpoint* endpoints)=0; /* async */
-    virtual void MultiMergeBetween(unsigned mapsize,rowmap_t *map,rowmap_t *mapupper,unsigned num,SocketEndpoint* endpoints)=0; /* async */
+    virtual void MultiBinChopStop(unsigned num,rowcount_t *pos)=0;
+    virtual void OverflowAdjustMapStart(unsigned mapsize,rowcount_t *map,size32_t keybuffsize,const byte *keybuff, byte cmpfn, bool useaux)=0; /* async */
+    virtual rowcount_t OverflowAdjustMapStop(unsigned mapsize, rowcount_t *map)=0;
+    virtual void MultiMerge(unsigned mapsize,rowcount_t *map,unsigned num,SocketEndpoint* endpoints)=0; /* async */
+    virtual void MultiMergeBetween(unsigned mapsize,rowcount_t *map,rowcount_t *mapupper,unsigned num,SocketEndpoint* endpoints)=0; /* async */
     virtual void SingleMerge()=0; /* async */
     virtual bool FirstRowOfFile(const char *filename,size32_t &rowbuffsize, byte * &rowbuf)=0;
     virtual void GetMultiNthRow(unsigned numsplits,size32_t &mkeybuffsize, void * &mkeybuf)=0;              
@@ -52,20 +51,19 @@ public:
     void init(ICommunicator *_comm, rank_t _rank,mptag_t _tag);
     bool Connect(unsigned _part, unsigned _numnodes);
     void StartGather();
-    void GetGatherInfo(rowmap_t &numlocal, offset_t &totalsize, unsigned &overflowscale, bool hasserializer);
-    rowmap_t GetMinMax(size32_t &keybuffsize,void *&keybuff, size32_t &avrecsizesize);
+    void GetGatherInfo(rowcount_t &numlocal, offset_t &totalsize, unsigned &overflowscale, bool hasserializer);
+    rowcount_t GetMinMax(size32_t &keybuffsize,void *&keybuff, size32_t &avrecsizesize);
     bool GetMidPoint     (size32_t lkeysize, const byte * lkey, size32_t hkeysize, const byte * hkey, size32_t &mkeysize, byte * &mkey);
     void GetMultiMidPoint(size32_t lkeybuffsize, const void * lkeybuff, size32_t hkeybuffsize, const void * hkeybuff, size32_t &mkeybuffsize, void * &mkeybuf);
     void GetMultiMidPointStart(size32_t lkeybuffsize, const void * lkeybuff, size32_t hkeybuffsize, const void * hkeybuff); /* async */
     void GetMultiMidPointStop(size32_t &mkeybuffsize, void * &mkeybuf);
-    rowmap_t SingleBinChop(size32_t keysize,const byte *key,byte cmpfn);
-    void MultiBinChop(size32_t keybuffsize,const byte *keybuff, unsigned num,rowmap_t *pos,byte cmpfn,bool useaux);
+    void MultiBinChop(size32_t keybuffsize,const byte *keybuff, unsigned num,rowcount_t *pos,byte cmpfn,bool useaux);
     void MultiBinChopStart(size32_t keybuffsize,const byte *keybuff, byte cmpfn); /* async */
-    void MultiBinChopStop(unsigned num,rowmap_t *pos);
-    void OverflowAdjustMapStart( unsigned mapsize,rowmap_t *map,size32_t keybuffsize,const byte *keybuff, byte cmpfn,bool useaux); /* async */
-    rowmap_t OverflowAdjustMapStop( unsigned mapsize, rowmap_t *map);
-    void MultiMerge(unsigned mapsize,rowmap_t *map,unsigned num,SocketEndpoint* endpoints); /* async */
-    void MultiMergeBetween(unsigned mapsize,rowmap_t *map,rowmap_t *mapupper,unsigned num,SocketEndpoint* endpoints); /* async */
+    void MultiBinChopStop(unsigned num,rowcount_t *pos);
+    void OverflowAdjustMapStart(unsigned mapsize,rowcount_t *map,size32_t keybuffsize,const byte *keybuff, byte cmpfn,bool useaux); /* async */
+    rowcount_t OverflowAdjustMapStop(unsigned mapsize, rowcount_t *map);
+    void MultiMerge(unsigned mapsize,rowcount_t *map,unsigned num,SocketEndpoint* endpoints); /* async */
+    void MultiMergeBetween(unsigned mapsize,rowcount_t *map,rowcount_t *mapupper,unsigned num,SocketEndpoint* endpoints); /* async */
     void SingleMerge(); /* async */
     bool FirstRowOfFile(const char *filename,size32_t &rowbuffsize, byte * &rowbuf);
     void GetMultiNthRow(unsigned numsplits,size32_t &mkeybuffsize, void * &mkeybuf);                

+ 34 - 40
thorlcr/msort/tsorts.cpp

@@ -124,7 +124,7 @@ class CWriteIntercept : public CSimpleInterface
             fixedsize = (size32_t)(o-lastofs);
         lastofs = o;
     }
-    size32_t _readOverflowPos(rowmap_t pos, unsigned n, offset_t *ofs, bool closeIO)
+    size32_t _readOverflowPos(rowcount_t pos, unsigned n, offset_t *ofs, bool closeIO)
     {
         if (fixedsize)
         {
@@ -247,12 +247,12 @@ public:
         dataFileDeserializerSource.setStream(NULL);
         idxFileIO.clear();
     }
-    size32_t readOverflowPos(rowmap_t pos, unsigned n, offset_t *ofs, bool closeIO)
+    size32_t readOverflowPos(rowcount_t pos, unsigned n, offset_t *ofs, bool closeIO)
     {
         CriticalBlock block(crit);
         return _readOverflowPos(pos, n, ofs, closeIO);
     }
-    const void *getRow(rowmap_t pos)
+    const void *getRow(rowcount_t pos)
     {
         CriticalBlock block(crit);
         offset_t ofs[2]; // JCSMORE doesn't really need 2, only to verify read right amount below
@@ -574,7 +574,7 @@ class CThorSorter : public CSimpleInterface, implements IThorSorter, implements
     unsigned partno, numnodes; // JCSMORE - shouldn't be necessary
     rowcount_t totalrows, grandtotal;
     offset_t grandtotalsize;
-    rowmap_t *overflowmap, *multibinchoppos;
+    rowcount_t *overflowmap, *multibinchoppos;
     bool stopping, gatherdone, nosort, isstable;
     ICompare *icompare;
     ICompare *icollate; // used for co-sort
@@ -643,14 +643,14 @@ class CThorSorter : public CSimpleInterface, implements IThorSorter, implements
         }
         return NULL;
     }
-    unsigned BinChop(const void *row, bool lesseq, bool firstdup, byte cmpfn)
+    rowidx_t BinChop(const void *row, bool lesseq, bool firstdup, byte cmpfn)
     {
-        unsigned n = rowArray.ordinality();
-        unsigned l=0;
-        unsigned r=n;
+        rowidx_t n = rowArray.ordinality();
+        rowidx_t l=0;
+        rowidx_t r=n;
         ICompare* icmp=queryCmpFn(cmpfn);
         while (l<r) {
-            unsigned m = (l+r)/2;
+            rowidx_t m = (l+r)/2;
             const void *p = rowArray.query(m);
             int cmp = icmp->docompare(row, p);
             if (cmp < 0)
@@ -673,7 +673,7 @@ class CThorSorter : public CSimpleInterface, implements IThorSorter, implements
             return l-1;
         return l;
     }
-    void doBinChop(CThorExpandingRowArray &keys, rowmap_t * pos, unsigned num, byte cmpfn)
+    void doBinChop(CThorExpandingRowArray &keys, rowcount_t * pos, unsigned num, byte cmpfn)
     {
         MemoryBuffer tmp;
         for (unsigned n=0;n<num;n++)
@@ -695,15 +695,15 @@ class CThorSorter : public CSimpleInterface, implements IThorSorter, implements
             }
         }
     }
-    void AdjustOverflow(rowmap_t &apos, const void *key, byte cmpfn)
+    void AdjustOverflow(rowcount_t &apos, const void *key, byte cmpfn)
     {
 #ifdef TRACE_PARTITION_OVERFLOW
         ActPrintLog(activity, "AdjustOverflow: in (%"RCPF"d)",apos);
         TraceKey(" ",(byte *)key);
 #endif
-        rowmap_t pos = (rowmap_t)(apos+1)*(rowmap_t)overflowinterval;
+        rowcount_t pos = (apos+1)*(rowcount_t)overflowinterval;
         if (pos>grandtotal)
-            pos = (rowmap_t)grandtotal;
+            pos = grandtotal;
         assertex(intercept);
         MemoryBuffer bufma;
         while (pos>0)
@@ -769,7 +769,7 @@ public:
         gatherdone = false;
         startgathersem.signal();
     }
-    virtual void GetGatherInfo(rowmap_t &numlocal, offset_t &totalsize, unsigned &_overflowscale, bool haskeyserializer)
+    virtual void GetGatherInfo(rowcount_t &numlocal, offset_t &totalsize, unsigned &_overflowscale, bool haskeyserializer)
     {
         if (!gatherdone)
             ERRLOG("GetGatherInfo:***Error called before gather complete");
@@ -784,7 +784,7 @@ public:
         _overflowscale = overflowinterval;
         totalsize = grandtotalsize; // used by master, if nothing overflowed to see if can MiniSort
     }
-    virtual rowmap_t GetMinMax(size32_t &keybufsize,void *&keybuf,size32_t &avrecsize)
+    virtual rowcount_t GetMinMax(size32_t &keybufsize,void *&keybuf,size32_t &avrecsize)
     {
         CThorExpandingRowArray ret(*activity, rowif, true);
         avrecsize = 0;
@@ -902,13 +902,7 @@ public:
         mbufsize = midkeybufsize;
         midkeybuf = NULL;
     }
-    virtual rowmap_t SingleBinChop(size32_t keysize, const byte * key,byte cmpfn)
-    {
-        OwnedConstThorRow row;
-        row.deserialize(rowif,keysize,key);
-        return BinChop(row.get(),false,true,cmpfn);
-    }
-    virtual void MultiBinChop(size32_t keybufsize, const byte * keybuf, unsigned num, rowmap_t * pos, byte cmpfn, bool useaux)
+    virtual void MultiBinChop(size32_t keybufsize, const byte * keybuf, unsigned num, rowcount_t * pos, byte cmpfn, bool useaux)
     {
         CThorExpandingRowArray keys(*activity, useaux?auxrowif:rowif, true);
         keys.deserialize(keybufsize, keybuf);
@@ -920,23 +914,23 @@ public:
         keys.deserializeExpand(keybufsize, keybuf);
         assertex(multibinchoppos==NULL); // check for reentrancy
         multibinchopnum = keys.ordinality();
-        multibinchoppos = (rowmap_t *)malloc(sizeof(rowmap_t)*multibinchopnum);
+        multibinchoppos = (rowcount_t *)malloc(sizeof(rowcount_t)*multibinchopnum);
         doBinChop(keys, multibinchoppos, multibinchopnum, cmpfn);
     }
-    virtual void MultiBinChopStop(unsigned num, rowmap_t * pos)
+    virtual void MultiBinChopStop(unsigned num, rowcount_t * pos)
     {
         assertex(multibinchoppos);
         assertex(multibinchopnum==num);
-        memcpy(pos,multibinchoppos,num*sizeof(rowmap_t));
+        memcpy(pos,multibinchoppos,num*sizeof(rowcount_t));
         free(multibinchoppos);
         multibinchoppos = NULL;
     }
-    virtual void OverflowAdjustMapStart(unsigned mapsize, rowmap_t * map,
+    virtual void OverflowAdjustMapStart(unsigned mapsize, rowcount_t * map,
                                size32_t keybufsize, const byte * keybuf, byte cmpfn, bool useaux)
     {
         assertex(intercept);
-        overflowmap = (rowmap_t *)malloc(mapsize*sizeof(rowmap_t));
-        memcpy(overflowmap,map,mapsize*sizeof(rowmap_t));
+        overflowmap = (rowcount_t *)malloc(mapsize*sizeof(rowcount_t));
+        memcpy(overflowmap,map,mapsize*sizeof(rowcount_t));
         unsigned i;
 #ifdef TRACE_PARTITION_OVERFLOW
 
@@ -949,27 +943,25 @@ public:
         keys.deserialize(keybufsize, keybuf);
         for (i=0;i<mapsize-1;i++)
             AdjustOverflow(overflowmap[i], keys.query(i), cmpfn);
-        assertex(grandtotal==(unsigned)grandtotal);
-        overflowmap[mapsize-1] = (unsigned)grandtotal;
+        overflowmap[mapsize-1] = grandtotal;
 #ifdef TRACE_PARTITION_OVERFLOW
         ActPrintLog(activity, "Out: ");
         for (i=0;i<mapsize;i++)
             ActPrintLog(activity, "%"RCPF"u ",overflowmap[i]);
 #endif
     }
-    virtual rowmap_t OverflowAdjustMapStop(unsigned mapsize, rowmap_t * map)
+    virtual rowcount_t OverflowAdjustMapStop(unsigned mapsize, rowcount_t * map)
     {
-        memcpy(map,overflowmap,mapsize*sizeof(rowmap_t));
+        memcpy(map,overflowmap,mapsize*sizeof(rowcount_t));
         free(overflowmap);
-        assertex(grandtotal==(rowmap_t)grandtotal);
-        return (rowmap_t)grandtotal;
+        return grandtotal;
     }
-    virtual void MultiMerge(unsigned mapsize,rowmap_t *map,
+    virtual void MultiMerge(unsigned mapsize,rowcount_t *map,
                     unsigned num,SocketEndpoint* endpoints)
     {
         MultiMergeBetween(mapsize,map,NULL,num,endpoints);
     }
-    virtual void MultiMergeBetween(unsigned mapsize, rowmap_t * map, rowmap_t * mapupper, unsigned num, SocketEndpoint * endpoints)
+    virtual void MultiMergeBetween(unsigned mapsize, rowcount_t * map, rowcount_t * mapupper, unsigned num, SocketEndpoint * endpoints)
     {
         assertex(transferserver.get()!=NULL);
         if (intercept)
@@ -1090,10 +1082,8 @@ public:
     }
 
 // ISortSlaveBase
-    virtual IRowStream *createMergeInputStream(rowmap_t sstart, rowcount_t snum)
+    virtual IRowStream *createMergeInputStream(rowcount_t sstart, rowcount_t snum)
     {
-        unsigned _snum = (unsigned)snum;    // only support 2^32 rows locally
-        assertex(snum==_snum);
         if (intercept)
         {
             offset_t startofs;  
@@ -1102,7 +1092,11 @@ public:
             return intercept->getStream(startofs, snum);
         }
         else
-            return rowArray.createRowStream((unsigned)sstart, _snum, false); // must be false as rows may overlap (between join)
+        {
+            unsigned _snum = (rowidx_t)snum; // only support 2^32 rows in memory
+            assertex(snum==_snum);
+            return rowArray.createRowStream((rowidx_t)sstart, _snum, false); // must be false as rows may overlap (between join)
+        }
     }
     virtual size32_t getTransferBlockSize()
     {

+ 4 - 6
thorlcr/msort/tsorts.hpp

@@ -28,8 +28,6 @@
 #include "mptag.hpp"
 #include "mpbase.hpp"
 
-typedef rowcount_t rowmap_t;
-
 interface ISortKeySerializer;
 interface IRowInterfaces;
 interface IThorDataLink;
@@ -67,8 +65,8 @@ interface ISocketRowWriter: extends IRowWriter
 
 class CActivityBase;
 IThorSorter *CreateThorSorter(CActivityBase *activity, SocketEndpoint &ep,IDiskUsage *iDiskUsage,ICommunicator *clusterComm, mptag_t _mpTagRPC);
-IRowStream *ConnectMergeRead(unsigned id,IRowInterfaces *rowif,SocketEndpoint &nodeaddr,rowcount_t startrec,rowmap_t numrecs);
-ISocketRowWriter *ConnectMergeWrite(IRowInterfaces *rowif,ISocket *socket,size32_t bufsize,rowcount_t &startrec,rowmap_t &numrecs);
+IRowStream *ConnectMergeRead(unsigned id,IRowInterfaces *rowif,SocketEndpoint &nodeaddr,rowcount_t startrec,rowcount_t numrecs);
+ISocketRowWriter *ConnectMergeWrite(IRowInterfaces *rowif,ISocket *socket,size32_t bufsize,rowcount_t &startrec,rowcount_t &numrecs);
 #define SOCKETSERVERINC                    1
 #define NUMSLAVESOCKETS                    2
 
@@ -84,7 +82,7 @@ interface ISortedInput: extends IInterface // reads rows from sorted local data
 
 interface ISortSlaveBase  // for global merging 
 {
-    virtual IRowStream *createMergeInputStream(rowmap_t sstart, rowcount_t _snum) = 0;
+    virtual IRowStream *createMergeInputStream(rowcount_t sstart, rowcount_t _snum) = 0;
     virtual size32_t getTransferBlockSize() = 0;
     virtual unsigned getTransferPort() = 0;
     virtual void startMerging(IArrayOf<IRowStream> &readers, rowcount_t _totalrows) = 0;
@@ -95,7 +93,7 @@ interface ISortSlaveBase  // for global merging
 interface IMergeTransferServer: extends IInterface
 {
     virtual void start() = 0;
-    virtual rowmap_t merge(unsigned mapsize,rowmap_t *map,rowmap_t *mapupper,
+    virtual rowcount_t merge(unsigned mapsize,rowcount_t *map,rowcount_t *mapupper,
                             unsigned num,SocketEndpoint* endpoints,
                             unsigned partno
                            ) = 0;

+ 25 - 20
thorlcr/msort/tsorts1.cpp

@@ -58,7 +58,7 @@ protected:
     }
 public:
     IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
-    CMergeReadStream(IRowInterfaces *rowif, unsigned streamno,SocketEndpoint &targetep, rowcount_t startrec, rowmap_t numrecs)
+    CMergeReadStream(IRowInterfaces *rowif, unsigned streamno,SocketEndpoint &targetep, rowcount_t startrec, rowcount_t numrecs)
     {
         endpoint = targetep;
 #ifdef _TRACE
@@ -127,7 +127,7 @@ class CSortMerge: public CSimpleInterface, implements ISocketSelectNotify
     ISortSlaveBase &src;
     Owned<ISocketRowWriter> out;
     rowcount_t poscount;
-    rowmap_t numrecs;
+    rowcount_t numrecs;
 //  unsigned pos;
 //  unsigned endpos;
     unsigned ndone;
@@ -144,7 +144,7 @@ protected:
 public:
     IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
 
-    CSortMerge(CSortTransferServerThread *_parent,ISocket* _socket,ISocketRowWriter *_out,rowcount_t _poscount,rowmap_t _numrecs,ISocketSelectHandler *_selecthandler);
+    CSortMerge(CSortTransferServerThread *_parent,ISocket* _socket,ISocketRowWriter *_out,rowcount_t _poscount,rowcount_t _numrecs,ISocketSelectHandler *_selecthandler);
     ~CSortMerge()
     {
 #ifdef _FULL_TRACE
@@ -166,12 +166,13 @@ public:
         url.append(name).append(':').append(port);
         PrintLog("SORT Merge WRITE: start %s, pos=%"RCPF"d, len=%"RCPF"d",url.str(),poscount,numrecs);
 #endif
-        rowmap_t pos=(rowmap_t)poscount;
-        assertex(pos==poscount);
-        try {
+        rowcount_t pos=poscount;
+        try
+        {
             iseq.setown(src.createMergeInputStream(pos,numrecs));
         }
-        catch (IException *e) {
+        catch (IException *e)
+        {
             PrintExceptionLog(e,"**Exception(4a)");
             throw;
         }
@@ -343,11 +344,11 @@ public:
                 }
 
                 rowcount_t poscount=0;
-                rowmap_t numrecs=0;
+                rowcount_t numrecs=0;
                 ISocketRowWriter *strm=NULL;
                 try {
                     waitRowIF();
-                    strm = ConnectMergeWrite(rowif,socket,0x100000,poscount,numrecs);   
+                    strm = ConnectMergeWrite(rowif,socket,0x100000,poscount,numrecs);
                 }
                 catch (IJSOCK_Exception *e) { // retry if failed
                     PrintExceptionLog(e,"WARNING: Exception(ConnectMergeWrite)");
@@ -407,7 +408,7 @@ public:
 
     }
 
-    void add(ISocketRowWriter *strm,ISocket *socket,rowcount_t poscount,rowmap_t numrecs) // takes ownership of sock
+    void add(ISocketRowWriter *strm,ISocket *socket,rowcount_t poscount,rowcount_t numrecs) // takes ownership of sock
     {
         CriticalBlock proc(childsect);
         if (!selecthandler) {
@@ -429,7 +430,7 @@ public:
 
 
 
-    rowmap_t merge(unsigned mapsize,rowmap_t *map,rowmap_t *mapupper,
+    rowcount_t merge(unsigned mapsize,rowcount_t *map,rowcount_t *mapupper,
                    unsigned numnodes,SocketEndpoint* endpoints,
                    unsigned partno)
     {
@@ -458,7 +459,7 @@ public:
             }
         }
 #endif  
-        rowmap_t resnum=0;
+        rowcount_t resnum=0;
         for (i=0;i<numnodes;i++) 
             resnum += vMAPU(i,partno)-vMAPL(i,partno-1);
         // calculate start position
@@ -467,18 +468,22 @@ public:
             for (j=0;j<numnodes;j++) 
                 respos += vMAPL(j,i)-vMAPL(j,i-1);      // note we are adding up all of the lower as we want start
 
-        rowmap_t totalrows = resnum;
+        rowcount_t totalrows = resnum;
         PrintLog("Output start = %"RCPF"d, num = %"RCPF"u",respos,resnum);
 
         IArrayOf<IRowStream> readers;
         IException *exc = NULL;
-        try {
-            for (j=0;j<numnodes;j++) {
+        try
+        {
+            for (j=0;j<numnodes;j++)
+            {
                 unsigned i=j;
-                rowmap_t sstart=vMAPL(i,partno-1);
-                rowmap_t snum=vMAPU(i,partno)-sstart; 
-                if (snum>0) {
-                    if (i==partno) {
+                rowcount_t sstart=vMAPL(i,partno-1);
+                rowcount_t snum=vMAPU(i,partno)-sstart;
+                if (snum>0)
+                {
+                    if (i==partno)
+                    {
                         PrintLog("SORT Merge READ: Stream(%u) local, pos=%"RCPF"u len=%"RCPF"u",i,sstart,snum);
                         readers.append(*slave.createMergeInputStream(sstart,snum));
                     }
@@ -508,7 +513,7 @@ public:
     }
 };
 
-CSortMerge::CSortMerge(CSortTransferServerThread *_parent,ISocket* _socket,ISocketRowWriter *_out,rowcount_t _poscount,rowmap_t _numrecs,ISocketSelectHandler *_selecthandler)
+CSortMerge::CSortMerge(CSortTransferServerThread *_parent,ISocket* _socket,ISocketRowWriter *_out,rowcount_t _poscount,rowcount_t _numrecs,ISocketSelectHandler *_selecthandler)
     : src(_parent->slave),socket(_socket),out(_out)
 {
     parent = _parent;

+ 2 - 6
thorlcr/shared/thor.hpp

@@ -27,17 +27,13 @@ typedef unsigned graph_id;
 #define ACTPF
 #define GIDPF
 
-#ifdef __64BIT__
 typedef unsigned __int64 rowcount_t;
 #define RCPF I64F
 #define RCMAX ((rowcount_t)(__int64)-1)
-#else
-typedef unsigned rowcount_t;
-#define RCPF ""
-#define RCMAX UINT_MAX
-#endif
 #define RCUNBOUND RCMAX
 #define RCUNSET RCMAX
+typedef size32_t rowidx_t;
+#define RIPF ""
 
 template <class T>
 inline rowcount_t validRC(T X)

+ 43 - 47
thorlcr/thorutil/thmem.cpp

@@ -205,7 +205,6 @@ class CSpillableStreamBase : public CSimpleInterface, implements roxiemem::IBuff
 {
 protected:
     CActivityBase &activity;
-    rowcount_t pos;
     IRowInterfaces *rowIf;
     bool preserveNulls, ownsRows;
     CThorSpillableRowArray rows;
@@ -215,7 +214,7 @@ protected:
     bool spillRows()
     {
         // NB: Should always be called whilst 'rows' is locked (with CThorSpillableRowArrayLock)
-        rowcount_t numRows = rows.numCommitted();
+        rowidx_t numRows = rows.numCommitted();
         if (0 == numRows)
             return false;
 
@@ -235,8 +234,6 @@ public:
         : activity(_activity), rowIf(_rowIf), rows(_activity, _rowIf, _preserveNulls), preserveNulls(_preserveNulls)
     {
         rows.swap(inRows);
-        pos = 0;
-
         activity.queryJob().queryRowManager()->addRowBuffer(this);
     }
     ~CSpillableStreamBase()
@@ -265,7 +262,7 @@ class CSharedSpillableRowSet : public CSpillableStreamBase, implements IInterfac
 {
     class CStream : public CSimpleInterface, implements IRowStream, implements IWritePosCallback
     {
-        rowcount_t pos;
+        rowidx_t pos;
         offset_t outputOffset;
         Owned<IRowStream> spillStream;
         Linked<CSharedSpillableRowSet> owner;
@@ -302,7 +299,7 @@ class CSharedSpillableRowSet : public CSpillableStreamBase, implements IInterfac
         }
         virtual void stop() { }
     // IWritePosCallback
-        virtual rowcount_t queryRecordNumber()
+        virtual rowidx_t queryRecordNumber()
         {
             return pos;
         }
@@ -335,7 +332,7 @@ public:
 // NB: A single unshared spillable stream
 class CSpillableStream : public CSpillableStreamBase, implements IRowStream
 {
-    rowcount_t numReadRows, granularity;
+    rowidx_t pos, numReadRows, granularity;
     const void **readRows;
 
 public:
@@ -344,7 +341,7 @@ public:
     CSpillableStream(CActivityBase &_activity, CThorSpillableRowArray &inRows, IRowInterfaces *_rowIf, bool _preserveNulls)
         : CSpillableStreamBase(_activity, inRows, _rowIf, _preserveNulls)
     {
-        numReadRows = 0;
+        pos = numReadRows = 0;
         granularity = 500; // JCSMORE - rows
 
         // a small amount of rows to read from swappable rows
@@ -373,7 +370,7 @@ public:
                 spillStream.setown(createRowStream(spillFile, rowIf, 0, (offset_t)-1, (unsigned __int64)-1, false, preserveNulls));
                 return spillStream->nextRow();
             }
-            rowcount_t fetch = rows.numCommitted();
+            rowidx_t fetch = rows.numCommitted();
             if (0 == fetch)
                 return NULL;
             if (fetch >= granularity)
@@ -395,7 +392,7 @@ public:
 
 //====
 
-void CThorExpandingRowArray::init(rowcount_t initialSize, bool _stableSort)
+void CThorExpandingRowArray::init(rowidx_t initialSize, bool _stableSort)
 {
     rowManager = activity.queryJob().queryRowManager();
     stableSort = _stableSort;
@@ -417,7 +414,7 @@ void CThorExpandingRowArray::init(rowcount_t initialSize, bool _stableSort)
     numRows = 0;
 }
 
-const void *CThorExpandingRowArray::allocateNewRows(rowcount_t requiredRows, OwnedConstThorRow &newStableSortTmp)
+const void *CThorExpandingRowArray::allocateNewRows(rowidx_t requiredRows, OwnedConstThorRow &newStableSortTmp)
 {
     unsigned newSize = maxRows;
     //This condition must be <= at least 1/scaling factor below otherwise you'll get an infinite loop.
@@ -477,7 +474,7 @@ void CThorExpandingRowArray::doSort(unsigned n, void **const rows, ICompare &com
         parqsortvec((void **const)rows, n, compare, maxCores);
 }
 
-CThorExpandingRowArray::CThorExpandingRowArray(CActivityBase &_activity, IRowInterfaces *_rowIf, bool _allowNulls, bool _stableSort, bool _throwOnOom, rowcount_t initialSize) : activity(_activity)
+CThorExpandingRowArray::CThorExpandingRowArray(CActivityBase &_activity, IRowInterfaces *_rowIf, bool _allowNulls, bool _stableSort, bool _throwOnOom, rowidx_t initialSize) : activity(_activity)
 {
     init(initialSize, _stableSort);
     setup(_rowIf, _allowNulls, _stableSort, _throwOnOom);
@@ -512,7 +509,7 @@ void CThorExpandingRowArray::setup(IRowInterfaces *_rowIf, bool _allowNulls, boo
 
 void CThorExpandingRowArray::clearRows()
 {
-    for (rowcount_t i = 0; i < numRows; i++)
+    for (rowidx_t i = 0; i < numRows; i++)
         ReleaseThorRow(rows[i]);
     numRows = 0;
 }
@@ -536,8 +533,8 @@ void CThorExpandingRowArray::swap(CThorExpandingRowArray &other)
     bool otherAllowNulls = other.allowNulls;
     bool otherStableSort = other.stableSort;
     bool otherThrowOnOom = other.throwOnOom;
-    rowcount_t otherMaxRows = other.maxRows;
-    rowcount_t otherNumRows = other.numRows;
+    rowidx_t otherMaxRows = other.maxRows;
+    rowidx_t otherNumRows = other.numRows;
 
     other.rowManager = rowManager;
     other.setup(rowIf, allowNulls, stableSort, throwOnOom);
@@ -554,7 +551,7 @@ void CThorExpandingRowArray::swap(CThorExpandingRowArray &other)
     numRows = otherNumRows;
 }
 
-void CThorExpandingRowArray::transferRows(rowcount_t & outNumRows, const void * * & outRows)
+void CThorExpandingRowArray::transferRows(rowidx_t & outNumRows, const void * * & outRows)
 {
     outNumRows = numRows;
     outRows = rows;
@@ -579,13 +576,13 @@ void CThorExpandingRowArray::transferFrom(CThorSpillableRowArray &donor)
 	transferFrom((CThorExpandingRowArray &)donor);
 }
 
-void CThorExpandingRowArray::removeRows(rowcount_t start, rowcount_t n)
+void CThorExpandingRowArray::removeRows(rowidx_t start, rowidx_t n)
 {
     assertex(numRows-start >= n);
     assertex(!n || rows);
     if (rows)
     {
-        for (rowcount_t i = start; i < start+n; i++)
+        for (rowidx_t i = start; i < start+n; i++)
             ReleaseThorRow(rows[i]);
         //firstRow = 0;
         numRows -= n;
@@ -600,12 +597,12 @@ void CThorExpandingRowArray::clearUnused()
         memset(rows+numRows, 0, (maxRows-numRows) * sizeof(void *));
 }
 
-bool CThorExpandingRowArray::ensure(rowcount_t requiredRows)
+bool CThorExpandingRowArray::ensure(rowidx_t requiredRows)
 {
     OwnedConstThorRow newStableSortTmp;
     OwnedConstThorRow newRows = allocateNewRows(requiredRows, newStableSortTmp);
     if (!newRows)
-        throw MakeActivityException(&activity, 0, "Out of memory, allocating row array, had %"RCPF"d, trying to allocate %"RCPF"d elements", ordinality(), requiredRows);
+        throw MakeActivityException(&activity, 0, "Out of memory, allocating row array, had %"RIPF"d, trying to allocate %"RIPF"d elements", ordinality(), requiredRows);
 
     const void **oldRows = rows;
     void **oldStableSortTmp = stableSortTmp;
@@ -627,7 +624,7 @@ void CThorExpandingRowArray::sort(ICompare &compare, unsigned maxCores)
         doSort(numRows, (void **const)rows, compare, maxCores);
 }
 
-void CThorExpandingRowArray::reorder(rowcount_t start, rowcount_t num, unsigned *neworder)
+void CThorExpandingRowArray::reorder(rowidx_t start, rowidx_t num, unsigned *neworder)
 {
     if (start>=numRows)
         return;
@@ -671,19 +668,18 @@ bool CThorExpandingRowArray::checkSorted(ICompare *icmp)
     return true;
 }
 
-IRowStream *CThorExpandingRowArray::createRowStream(rowcount_t start, rowcount_t num, bool streamOwns)
+IRowStream *CThorExpandingRowArray::createRowStream(rowidx_t start, rowidx_t num, bool streamOwns)
 {
     class CStream : public CSimpleInterface, implements IRowStream
     {
-        rowcount_t pos;
         CThorExpandingRowArray &parent;
+        rowidx_t pos, lastRow;
         bool owns;
-        rowcount_t lastRow;
 
     public:
         IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
 
-        CStream(CThorExpandingRowArray &_parent, rowcount_t firstRow, rowcount_t _lastRow, bool _owns)
+        CStream(CThorExpandingRowArray &_parent, rowidx_t firstRow, rowidx_t _lastRow, bool _owns)
             : parent(_parent), pos(firstRow), lastRow(_lastRow), owns(_owns)
         {
         }
@@ -703,8 +699,8 @@ IRowStream *CThorExpandingRowArray::createRowStream(rowcount_t start, rowcount_t
 
     if (start>ordinality())
         start = ordinality();
-    rowcount_t lastRow;
-    if ((num==(rowcount_t)-1)||(start+num>ordinality()))
+    rowidx_t lastRow;
+    if ((num==(rowidx_t)-1)||(start+num>ordinality()))
         lastRow = ordinality();
     else
         lastRow = start+num;
@@ -744,7 +740,7 @@ void CThorExpandingRowArray::partition(ICompare &compare, unsigned num, Unsigned
 
 offset_t CThorExpandingRowArray::serializedSize()
 {
-    rowcount_t c = ordinality();
+    rowidx_t c = ordinality();
     assertex(serializer);
     offset_t total = 0;
     for (unsigned i=0; i<c; i++)
@@ -760,10 +756,10 @@ void CThorExpandingRowArray::serialize(IRowSerializerTarget &out)
 {
     bool warnnull = true;
     assertex(serializer);
-    rowcount_t n = ordinality();
+    rowidx_t n = ordinality();
     if (n)
     {
-        for (rowcount_t i = 0; i < n; i++)
+        for (rowidx_t i = 0; i < n; i++)
         {
             const void *row = query(i);
             if (row)
@@ -787,10 +783,10 @@ void CThorExpandingRowArray::serialize(MemoryBuffer &mb)
     {
         unsigned short guard = 0x7631;
         mb.append(guard);
-        rowcount_t n = ordinality();
+        rowidx_t n = ordinality();
         if (n)
         {
-            for (rowcount_t i = 0; i < n; i++)
+            for (rowidx_t i = 0; i < n; i++)
             {
                 const void *row = query(i);
                 bool isnull = (row==NULL);
@@ -895,7 +891,7 @@ void CThorSpillableRowArray::unregisterWriteCallback(IWritePosCallback &cb)
     writeCallbacks.zap(cb);
 }
 
-CThorSpillableRowArray::CThorSpillableRowArray(CActivityBase &activity, IRowInterfaces *rowIf, bool allowNulls, bool stable, rowcount_t initialSize, size32_t _commitDelta)
+CThorSpillableRowArray::CThorSpillableRowArray(CActivityBase &activity, IRowInterfaces *rowIf, bool allowNulls, bool stable, rowidx_t initialSize, size32_t _commitDelta)
     : CThorExpandingRowArray(activity, rowIf, false, stable, false, initialSize), commitDelta(_commitDelta)
 {
     commitRows = 0;
@@ -909,7 +905,7 @@ CThorSpillableRowArray::~CThorSpillableRowArray()
 
 void CThorSpillableRowArray::clearRows()
 {
-    for (rowcount_t i = firstRow; i < numRows; i++)
+    for (rowidx_t i = firstRow; i < numRows; i++)
         ReleaseThorRow(rows[i]);
     numRows = 0;
     firstRow = 0;
@@ -922,7 +918,7 @@ void CThorSpillableRowArray::kill()
     CThorExpandingRowArray::kill();
 }
 
-bool CThorSpillableRowArray::ensure(rowcount_t requiredRows)
+bool CThorSpillableRowArray::ensure(rowidx_t requiredRows)
 {
     //Only the writer is allowed to reallocate rows (otherwise append can't be optimized), so rows is valid outside the lock
 
@@ -962,16 +958,16 @@ void CThorSpillableRowArray::sort(ICompare &compare, unsigned maxCores)
     }
 }
 
-unsigned CThorSpillableRowArray::save(IFile &iFile, rowcount_t watchRecNum, offset_t *watchFilePosResult)
+unsigned CThorSpillableRowArray::save(IFile &iFile, rowidx_t watchRecNum, offset_t *watchFilePosResult)
 {
-    rowcount_t n = numCommitted();
+    rowidx_t n = numCommitted();
     if (0 == n)
         return 0;
     const void **rows = getBlock(n);
     Owned<IExtRowWriter> writer = createRowWriter(&iFile, rowIf->queryRowSerializer(), rowIf->queryRowAllocator(), allowNulls, false, true);
-    ActPrintLog(&activity, "CThorSpillableRowArray::save %"RCPF"d rows", numRows);
+    ActPrintLog(&activity, "CThorSpillableRowArray::save %"RIPF"d rows", numRows);
     offset_t startPos = writer->getPosition();
-    for (rowcount_t i=0; i < n; i++)
+    for (rowidx_t i=0; i < n; i++)
     {
         const void *row = rows[i];
         assertex(row || allowNulls);
@@ -996,7 +992,7 @@ unsigned CThorSpillableRowArray::save(IFile &iFile, rowcount_t watchRecNum, offs
 
 
 // JCSMORE - these methods are essentially borrowed from RoxieOutputRowArray, would be good to unify
-const void **CThorSpillableRowArray::getBlock(rowcount_t readRows)
+const void **CThorSpillableRowArray::getBlock(rowidx_t readRows)
 {
     dbgassertex(firstRow+readRows <= commitRows);
     return rows + firstRow;
@@ -1029,8 +1025,8 @@ void CThorSpillableRowArray::swap(CThorSpillableRowArray &other)
 {
     CThorSpillableRowArrayLock block(*this);
     CThorExpandingRowArray::swap(other);
-    rowcount_t otherFirstRow = other.firstRow;
-    rowcount_t otherCommitRows = other.commitRows;
+    rowidx_t otherFirstRow = other.firstRow;
+    rowidx_t otherCommitRows = other.commitRows;
 
     other.firstRow = firstRow;
     other.commitRows = commitRows;
@@ -1068,8 +1064,8 @@ protected:
     PointerIArrayOf<CFileOwner> spillFiles;
     Owned<IOutputRowSerializer> serializer;
     RowCollectorFlags diskMemMix;
+    rowcount_t totalRows;
     unsigned spillPriority;
-    unsigned totalRows;
     unsigned overflowCount;
     unsigned maxCores;
     unsigned outStreams;
@@ -1084,7 +1080,7 @@ protected:
     {
         if (rc_allMem == diskMemMix)
             return false;
-        rowcount_t numRows = spillableRows.numCommitted();
+        rowidx_t numRows = spillableRows.numCommitted();
         if (numRows == 0)
             return false;
 
@@ -1240,7 +1236,8 @@ protected:
     {
         spillableRows.kill();
         spillFiles.kill();
-        totalRows = overflowCount = outStreams = 0;
+        totalRows = 0;
+        overflowCount = outStreams = 0;
     }
 public:
     CThorRowCollectorBase(CActivityBase &_activity, IRowInterfaces *_rowIf, ICompare *_iCompare, bool _isStable, RowCollectorFlags _diskMemMix, unsigned _spillPriority)
@@ -1250,8 +1247,7 @@ public:
     {
         preserveGrouping = false;
         totalRows = 0;
-        overflowCount = 0;
-        outStreams = 0;
+        overflowCount = outStreams = 0;
         mmRegistered = false;
         if (rc_allMem == diskMemMix)
             spillPriority = SPILL_PRIORITY_DISABLE; // all mem, implies no spilling

+ 28 - 28
thorlcr/thorutil/thmem.hpp

@@ -241,16 +241,16 @@ protected:
     const void **rows;
     void **stableSortTmp;
     bool stableSort, throwOnOom, allowNulls;
-    rowcount_t maxRows;  // Number of rows that can fit in the allocated memory.
-    rowcount_t numRows;  // rows that have been added can only be updated by writing thread.
+    rowidx_t maxRows;  // Number of rows that can fit in the allocated memory.
+    rowidx_t numRows;  // rows that have been added can only be updated by writing thread.
 
-    void init(rowcount_t initialSize, bool stable);
-    const void *allocateNewRows(rowcount_t requiredRows, OwnedConstThorRow &newStableSortTmp);
+    void init(rowidx_t initialSize, bool stable);
+    const void *allocateNewRows(rowidx_t requiredRows, OwnedConstThorRow &newStableSortTmp);
     void serialize(IRowSerializerTarget &out);
     void doSort(unsigned n, void **const rows, ICompare &compare, unsigned maxCores);
 
 public:
-    CThorExpandingRowArray(CActivityBase &activity, IRowInterfaces *rowIf, bool allowNulls=false, bool stableSort=false, bool throwOnOom=true, rowcount_t initialSize=InitialSortElements);
+    CThorExpandingRowArray(CActivityBase &activity, IRowInterfaces *rowIf, bool allowNulls=false, bool stableSort=false, bool throwOnOom=true, rowidx_t initialSize=InitialSortElements);
     ~CThorExpandingRowArray();
     CActivityBase &queryActivity() { return activity; }
     // NB: throws error on OOM by default
@@ -260,7 +260,7 @@ public:
     void clearRows();
     void kill();
 
-    void setRow(rowcount_t idx, const void *row) // NB: takes ownership
+    void setRow(rowidx_t idx, const void *row) // NB: takes ownership
     {
         OwnedConstThorRow _row = row;
         assertex(idx < maxRows);
@@ -282,13 +282,13 @@ public:
         rows[numRows++] = row;
         return true;
     }
-    inline const void *query(rowcount_t i) const
+    inline const void *query(rowidx_t i) const
     {
         if (i>=numRows)
             return NULL;
         return rows[i];
     }
-    inline const void *get(rowcount_t i) const
+    inline const void *get(rowidx_t i) const
     {
         if (i>=numRows)
             return NULL;
@@ -297,7 +297,7 @@ public:
             LinkThorRow(row);
         return row;
     }
-    inline const void *getClear(rowcount_t i)
+    inline const void *getClear(rowidx_t i)
     {
         if (i>=numRows)
             return NULL;
@@ -305,7 +305,7 @@ public:
         rows[i] = NULL;
         return row;
     }
-    inline rowcount_t ordinality() const { return numRows; }
+    inline rowidx_t ordinality() const { return numRows; }
 
     inline const void **getRowArray() { return rows; }
     void swap(CThorExpandingRowArray &src);
@@ -314,18 +314,18 @@ public:
         kill();
         swap(from);
     }
-    void transferRows(rowcount_t & outNumRows, const void * * & outRows);
+    void transferRows(rowidx_t & outNumRows, const void * * & outRows);
     void transferFrom(CThorExpandingRowArray &src); 
     void transferFrom(CThorSpillableRowArray &src);
-    void removeRows(rowcount_t start, rowcount_t n);
+    void removeRows(rowidx_t start, rowidx_t n);
     void clearUnused();
     void sort(ICompare &compare, unsigned maxCores);
-    void reorder(rowcount_t start, rowcount_t num, unsigned *neworder);
+    void reorder(rowidx_t start, rowidx_t num, unsigned *neworder);
 
     bool equal(ICompare *icmp, CThorExpandingRowArray &other);
     bool checkSorted(ICompare *icmp);
 
-    IRowStream *createRowStream(rowcount_t start=0, rowcount_t num=(rowcount_t)-1, bool streamOwns=true);
+    IRowStream *createRowStream(rowidx_t start=0, rowidx_t num=(rowidx_t)-1, bool streamOwns=true);
 
     void partition(ICompare &compare, unsigned num, UnsignedArray &out); // returns num+1 points
 
@@ -337,25 +337,25 @@ public:
     void deserialize(size32_t sz, const void *buf);
     void deserializeExpand(size32_t sz, const void *data);
 
-    virtual bool ensure(rowcount_t requiredRows);
+    virtual bool ensure(rowidx_t requiredRows);
 };
 
 interface IWritePosCallback : extends IInterface
 {
-    virtual rowcount_t queryRecordNumber() = 0;
+    virtual rowidx_t queryRecordNumber() = 0;
     virtual void filePosition(offset_t pos) = 0;
 };
 
 class graph_decl CThorSpillableRowArray : private CThorExpandingRowArray
 {
     const size32_t commitDelta;  // How many rows need to be written before they are added to the committed region?
-    rowcount_t firstRow; // Only rows firstRow..numRows are considered initialized.  Only read/write within cs.
-    rowcount_t commitRows;  // can only be updated by writing thread within a critical section
+    rowidx_t firstRow; // Only rows firstRow..numRows are considered initialized.  Only read/write within cs.
+    rowidx_t commitRows;  // can only be updated by writing thread within a critical section
     mutable CriticalSection cs;
     ICopyArrayOf<IWritePosCallback> writeCallbacks;
 
 protected:
-    virtual bool ensure(rowcount_t requiredRows);
+    virtual bool ensure(rowidx_t requiredRows);
 
 public:
 
@@ -368,7 +368,7 @@ public:
         inline ~CThorSpillableRowArrayLock() { rows.unlock(); }
     };
 
-    CThorSpillableRowArray(CActivityBase &activity, IRowInterfaces *rowIf, bool allowNulls=false, bool stableSort=false, rowcount_t initialSize=InitialSortElements, size32_t commitDelta=CommitStep);
+    CThorSpillableRowArray(CActivityBase &activity, IRowInterfaces *rowIf, bool allowNulls=false, bool stableSort=false, rowidx_t initialSize=InitialSortElements, size32_t commitDelta=CommitStep);
     ~CThorSpillableRowArray();
     // NB: throwOnOom false
     void setup(IRowInterfaces *rowIf, bool allowNulls=false, bool stableSort=false, bool throwOnOom=false)
@@ -400,17 +400,17 @@ public:
     }
 
     //The following can be accessed from the reader without any need to lock
-    inline const void *query(rowcount_t i) const
+    inline const void *query(rowidx_t i) const
     {
         CThorSpillableRowArrayLock block(*this);
         return CThorExpandingRowArray::query(i);
     }
-    inline const void *get(rowcount_t i) const
+    inline const void *get(rowidx_t i) const
     {
         CThorSpillableRowArrayLock block(*this);
         return CThorExpandingRowArray::get(i);
     }
-    inline const void *getClear(rowcount_t i)
+    inline const void *getClear(rowidx_t i)
     {
         CThorSpillableRowArrayLock block(*this);
         return CThorExpandingRowArray::getClear(i);
@@ -418,17 +418,17 @@ public:
 
     //A thread calling the following functions must own the lock, or guarantee no other thread will access
     void sort(ICompare & compare, unsigned maxcores);
-    unsigned save(IFile &file, rowcount_t watchRecNum=(rowcount_t)-1, offset_t *watchFilePosResult=NULL);
-    const void **getBlock(rowcount_t readRows);
-    inline void noteSpilled(rowcount_t spilledRows)
+    unsigned save(IFile &file, rowidx_t watchRecNum=(rowidx_t)-1, offset_t *watchFilePosResult=NULL);
+    const void **getBlock(rowidx_t readRows);
+    inline void noteSpilled(rowidx_t spilledRows)
     {
         firstRow += spilledRows;
     }
 
     //The block returned is only valid until the critical section is released
 
-    inline rowcount_t firstCommitted() const { return firstRow; }
-    inline rowcount_t numCommitted() const { return commitRows - firstRow; }
+    inline rowidx_t firstCommitted() const { return firstRow; }
+    inline rowidx_t numCommitted() const { return commitRows - firstRow; }
 
     //Locking functions - use CThorSpillableRowArrayLock above
     inline void lock() const { cs.enter(); }