Browse Source

Merge pull request #7549 from jakesmith/hpcc-13791

HPCC-13791 Use keys in sort partitioning

Reviewed-by: Gavin Halliday <ghalliday@hpccsystems.com>
Gavin Halliday 10 years ago
parent
commit
e10759156a

+ 60 - 26
thorlcr/activities/join/thjoin.cpp

@@ -75,8 +75,6 @@ class JoinActivityMaster : public CMasterActivity
                 return -1;
             return 0;
         }
-
-
     } *climitedcmp;
 public:
     JoinActivityMaster(CMasterGraphElement * info, bool local) : CMasterActivity(info)
@@ -128,7 +126,8 @@ public:
     {
         ActPrintLog("process");
         CMasterActivity::process();
-        if (!islocal) {
+        if (!islocal)
+        {
             helper = (IHThorJoinArg *)queryHelper();
             StringBuffer skewV;
             double skewError;
@@ -151,30 +150,61 @@ public:
             unsigned __int64 skewThreshold = container.queryJob().getWorkUnitValueInt("overrideSkewThreshold", 0);
             if (!skewThreshold)
             {
-                skewThreshold = helper->getThreshold();         
+                skewThreshold = helper->getThreshold();
                 if (!skewThreshold)
                     skewThreshold = container.queryJob().getWorkUnitValueInt("defaultSkewThreshold", 0);
             }
             try
             {
                 size32_t maxdeviance = getOptUInt(THOROPT_SORT_MAX_DEVIANCE, 10*1024*1024);
-                rightpartition = (container.getKind() == TAKjoin)&&((helper->getJoinFlags()&JFpartitionright)!=0);
+                rightpartition = (container.getKind() == TAKjoin) && ((helper->getJoinFlags()&JFpartitionright)!=0);
+
+                CGraphElementBase *primaryInput = NULL;
+                CGraphElementBase *secondaryInput = NULL;
+                ICompare *primaryCompare = NULL, *secondaryCompare = NULL;
+                ISortKeySerializer *primaryKeySerializer = NULL;
+                if (!rightpartition)
+                {
+                    primaryInput = container.queryInput(0);
+                    primaryCompare = helper->queryCompareLeft();
+                    primaryKeySerializer = helper->querySerializeLeft();
+                    if (container.getKind() != TAKselfjoin)
+                    {
+                        secondaryInput = container.queryInput(1);
+                        secondaryCompare = helper->queryCompareRight();
+                    }
+                }
+                else
+                {
+                    primaryInput = container.queryInput(1);
+                    secondaryInput = container.queryInput(0);
+                    primaryCompare = helper->queryCompareRight();
+                    secondaryCompare = helper->queryCompareLeft();
+                    primaryKeySerializer = helper->querySerializeRight();
+                }
+                if (helper->getJoinFlags()&JFslidingmatch) // JCSMORE shouldn't be necessary
+                    primaryKeySerializer = NULL;
+                Owned<IRowInterfaces> primaryRowIf = createRowInterfaces(primaryInput->queryHelper()->queryOutputMeta(), queryActivityId(), queryCodeContext());
+                Owned<IRowInterfaces> secondaryRowIf;
+                if (secondaryInput)
+                    secondaryRowIf.setown(createRowInterfaces(secondaryInput->queryHelper()->queryOutputMeta(), queryActivityId(), queryCodeContext()));
+
                 bool betweenjoin = (helper->getJoinFlags()&JFslidingmatch)!=0;
-                if (!container.queryLocalOrGrouped() && container.getKind() == TAKselfjoin)
+                if (container.getKind() == TAKselfjoin)
                 {
-                    if (betweenjoin) {
+                    if (betweenjoin)
                         throw MakeActivityException(this, -1, "SELF BETWEEN JOIN not supported"); // Gavin shouldn't generate
-                    }
-                    Owned<IRowInterfaces> rowif = createRowInterfaces(container.queryInput(0)->queryHelper()->queryOutputMeta(),queryActivityId(),queryCodeContext());
-                    ICompare *cmpleft = helper->queryCompareLeft();
-                    if ((helper->getJoinFlags()&JFlimitedprefixjoin)&&(helper->getJoinLimit())) {
+                    ICompare *cmpleft = primaryCompare;
+                    if ((helper->getJoinFlags()&JFlimitedprefixjoin)&&(helper->getJoinLimit()))
+                    {
                         delete climitedcmp;
                         climitedcmp = new cLimitedCmp(helper->queryCompareLeftRight(),helper->queryPrefixCompare());
                         cmpleft = climitedcmp;
                         // partition by L/R
                     }
-                    imaster->SortSetup(rowif,cmpleft, helper->querySerializeLeft(), false, true, NULL, NULL);
-                    if (barrier->wait(false)) { // local sort complete
+                    imaster->SortSetup(primaryRowIf, cmpleft, primaryKeySerializer, false, true, NULL, NULL);
+                    if (barrier->wait(false)) // local sort complete
+                    {
                         try
                         {
                             imaster->Sort(skewThreshold,skewWarning,skewError,maxdeviance,false,false,false,0);
@@ -197,11 +227,14 @@ public:
                     }
                     imaster->SortDone();
                 }
-                else if (!nosortPrimary()||betweenjoin) {
-                    Owned<IRowInterfaces> rowif = createRowInterfaces(container.queryInput(rightpartition?1:0)->queryHelper()->queryOutputMeta(),queryActivityId(),queryCodeContext());
-                    imaster->SortSetup(rowif,rightpartition?helper->queryCompareRight():helper->queryCompareLeft(),rightpartition?helper->querySerializeRight():helper->querySerializeLeft(),false,true, NULL, NULL);
+                else if (!nosortPrimary()||betweenjoin)
+                {
+                    Owned<IRowInterfaces> secondaryRowIf = createRowInterfaces(secondaryInput->queryHelper()->queryOutputMeta(), queryActivityId(), queryCodeContext());
+
+                    imaster->SortSetup(primaryRowIf, primaryCompare, primaryKeySerializer, false, true, NULL, NULL);
                     ActPrintLog("JOIN waiting for barrier.1");
-                    if (barrier->wait(false)) {
+                    if (barrier->wait(false))
+                    {
                         ActPrintLog("JOIN barrier.1 raised");
                         try
                         {
@@ -220,14 +253,15 @@ public:
                                 throw;
                         }
                         ActPrintLog("JOIN waiting for barrier.2");
-                        if (barrier->wait(false)) { // merge complete
+                        if (barrier->wait(false)) // merge complete
+                        {
                             ActPrintLog("JOIN barrier.2 raised");
                             imaster->SortDone();
                             // NB on the cosort should use same serializer as sort (but in fact it only gets used when 0 rows on primary side)
-                            Owned<IRowInterfaces> rowif2 = createRowInterfaces(container.queryInput(rightpartition?0:1)->queryHelper()->queryOutputMeta(),queryActivityId(),queryCodeContext());
-                            imaster->SortSetup(rowif2,rightpartition?helper->queryCompareLeft():helper->queryCompareRight(),rightpartition?helper->querySerializeRight():helper->querySerializeLeft(),true,false, NULL, NULL); //serializers OK
+                            imaster->SortSetup(secondaryRowIf, secondaryCompare, primaryKeySerializer, true, false, NULL, NULL); //serializers OK
                             ActPrintLog("JOIN waiting for barrier.3");
-                            if (barrier->wait(false)) { // local sort complete
+                            if (barrier->wait(false)) // local sort complete
+                            {
                                 ActPrintLog("JOIN barrier.3 raised");
                                 try
                                 {
@@ -254,12 +288,12 @@ public:
                         imaster->SortDone();
                     }
                 }
-                else { // only sort non-partition side
-                    Owned<IRowInterfaces> rowif = createRowInterfaces(container.queryInput(rightpartition?0:1)->queryHelper()->queryOutputMeta(),queryActivityId(),queryCodeContext());
-                    Owned<IRowInterfaces> auxrowif = createRowInterfaces(container.queryInput(rightpartition?1:0)->queryHelper()->queryOutputMeta(),queryActivityId(),queryCodeContext());
-                    imaster->SortSetup(rowif,rightpartition?helper->queryCompareLeft():helper->queryCompareRight(),rightpartition?helper->querySerializeLeft():helper->querySerializeRight(),true,true, NULL, auxrowif);
+                else // only sort non-partition side
+                {
+                    imaster->SortSetup(secondaryRowIf, secondaryCompare, primaryKeySerializer, true, true, NULL, primaryRowIf);
                     ActPrintLog("JOIN waiting for barrier.1");
-                    if (barrier->wait(false)) { // local sort complete
+                    if (barrier->wait(false)) // local sort complete
+                    {
                         ActPrintLog("JOIN barrier.1 raised");
                         try
                         {

+ 58 - 30
thorlcr/activities/join/thjoinslave.cpp

@@ -52,8 +52,8 @@ class JoinSlaveActivity : public CSlaveActivity, public CThorDataLink, implement
     ICompare *rightCompare;
     ISortKeySerializer *leftKeySerializer;
     ISortKeySerializer *rightKeySerializer;
-    ICompare *collate;
-    ICompare *collateupper; // if non-null then between join
+    ICompare *primarySecondaryCompare;
+    ICompare *primarySecondaryUpperCompare; // if non-null then between join
 
     Owned<IRowStream> leftStream, rightStream;
     Semaphore secondaryStartSem;
@@ -122,6 +122,16 @@ class JoinSlaveActivity : public CSlaveActivity, public CThorDataLink, implement
 
     };  
 
+    struct CompareReverse : public ICompare
+    {
+        CompareReverse() { compare = NULL; }
+        ICompare *compare;
+        int docompare(const void *a,const void *b) const
+        {
+            return -compare->docompare(b,a);
+        }
+    } compareReverse, compareReverseUpper;
+
 public:
     IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
 
@@ -149,7 +159,8 @@ public:
 
     void init(MemoryBuffer &data, MemoryBuffer &slaveData)
     {
-        if (!islocal) {
+        if (!islocal)
+        {
             mpTagRPC = container.queryJob().deserializeMPTag(data);
             mptag_t barrierTag = container.queryJob().deserializeMPTag(data);
             barrier.setown(container.queryJob().createBarrier(barrierTag));
@@ -184,14 +195,6 @@ public:
         rightCompare = helper->queryCompareRight();
         leftKeySerializer = helper->querySerializeLeft();
         rightKeySerializer = helper->querySerializeRight();
-        if (helper->getJoinFlags()&JFslidingmatch) {
-            collate = helper->queryCompareLeftRightLower();
-            collateupper = helper->queryCompareLeftRightUpper();
-        }
-        else {
-            collate = helper->queryCompareLeftRight();
-            collateupper = NULL;
-        }
     }
     virtual void onInputStarted(IException *except)
     {
@@ -443,19 +446,11 @@ public:
     }
     bool doglobaljoin()
     {
+        rightpartition = (container.getKind()==TAKjoin)&&((helper->getJoinFlags()&JFpartitionright)!=0);
+
         Linked<IRowInterfaces> primaryRowIf, secondaryRowIf;
         ICompare *primaryCompare, *secondaryCompare;
-        ISortKeySerializer *primaryKeySerializer, *secondaryKeySerializer;
-        ICompare *primaryCollate, *primaryCollateUpper;
-        struct cCollateReverse: public ICompare
-        {
-            ICompare *collate;
-            cCollateReverse(ICompare *_collate) : collate(_collate) { }
-            int docompare(const void *a,const void *b) const
-            {
-                return -collate->docompare(b,a);
-            }
-        } collateRev(collate), collateRevUpper(collateupper);
+        ISortKeySerializer *primaryKeySerializer;
 
         Owned<IRowStream> secondaryStream, primaryStream;
         if (rightpartition)
@@ -463,22 +458,54 @@ public:
             primaryCompare = rightCompare;
             primaryKeySerializer = rightKeySerializer;
             secondaryCompare = leftCompare;
-            secondaryKeySerializer = leftKeySerializer;
-            primaryCollate = &collateRev;
-            primaryCollateUpper = collateupper?&collateRevUpper:NULL;
         }
         else
         {
             primaryCompare = leftCompare;
             primaryKeySerializer = leftKeySerializer;
             secondaryCompare = rightCompare;
-            secondaryKeySerializer = rightKeySerializer;
-            primaryCollate = collate;
-            primaryCollateUpper = collateupper;
         }
         primaryRowIf.set(queryRowInterfaces(primaryInput));
         secondaryRowIf.set(queryRowInterfaces(secondaryInput));
 
+        primarySecondaryCompare = NULL;
+        if (helper->getJoinFlags()&JFslidingmatch)
+        {
+            if (primaryKeySerializer) // JCSMORE shouldn't be generated
+                primaryKeySerializer = NULL;
+            primarySecondaryCompare = helper->queryCompareLeftRightLower();
+            primarySecondaryUpperCompare = helper->queryCompareLeftRightUpper();
+            if (rightpartition)
+            {
+                compareReverse.compare = primarySecondaryCompare;
+                compareReverseUpper.compare = primarySecondaryUpperCompare;
+                primarySecondaryCompare = &compareReverse;
+                primarySecondaryUpperCompare = &compareReverseUpper;
+            }
+        }
+        else
+        {
+            primarySecondaryUpperCompare = NULL;
+            if (rightpartition)
+            {
+                if (rightKeySerializer)
+                    primarySecondaryCompare = helper->queryCompareRightKeyLeftRow();
+                else
+                {
+                    compareReverse.compare = helper->queryCompareLeftRight();
+                    primarySecondaryCompare = &compareReverse;
+                }
+            }
+            else
+            {
+                if (leftKeySerializer)
+                    primarySecondaryCompare = helper->queryCompareLeftKeyRightRow();
+                else
+                    primarySecondaryCompare = helper->queryCompareLeftRight();
+            }
+        }
+        dbgassertex(primarySecondaryCompare);
+
         OwnedConstThorRow partitionRow;
         rowcount_t totalrows;
 
@@ -489,7 +516,7 @@ public:
         }
         else
         {
-            sorter->Gather(primaryRowIf,primaryInput,primaryCompare,NULL,NULL,primaryKeySerializer,NULL,false,isUnstable(),abortSoon,NULL);
+            sorter->Gather(primaryRowIf, primaryInput, primaryCompare, NULL, NULL, primaryKeySerializer, NULL, false, isUnstable(), abortSoon, NULL);
             stopPartitionInput();
             if (abortSoon)
             {
@@ -513,7 +540,8 @@ public:
             ActPrintLog("JOIN barrier.2 raised");
             sorter->stopMerge();
         }
-        sorter->Gather(secondaryRowIf,secondaryInput,secondaryCompare,primaryCollate,primaryCollateUpper,primaryKeySerializer,partitionRow,noSortOtherSide(),isUnstable(),abortSoon,primaryRowIf); // primaryKeySerializer *is* correct
+        // NB: on secondary sort, the primaryKeySerializer is used
+        sorter->Gather(secondaryRowIf, secondaryInput, secondaryCompare, primarySecondaryCompare, primarySecondaryUpperCompare, primaryKeySerializer, partitionRow, noSortOtherSide(), isUnstable(), abortSoon, primaryRowIf); // primaryKeySerializer *is* correct
         partitionRow.clear();
         stopOtherInput();
         if (abortSoon)

+ 1 - 0
thorlcr/graph/thgraph.cpp

@@ -2960,6 +2960,7 @@ void CActivityBase::logRow(const char * prefix, IOutputMetaData & meta, const vo
     {
         StringBuffer xml;
         appendRowXml(xml, meta, row);
+        ActPrintLog("%s: %s", prefix, xml.str());
     }
 }
 

+ 1 - 0
thorlcr/msort/tsorta.cpp

@@ -570,3 +570,4 @@ void CThorKeyArray::traceKey(const char *prefix,unsigned idx)
     IOutputRowSerializer *serializer = keyserializer?keyif->queryRowSerializer():rowif->queryRowSerializer();
     ::traceKey(serializer,s.str(),queryKey(idx));
 }
+

+ 223 - 217
thorlcr/msort/tsortm.cpp

@@ -29,6 +29,7 @@
 #include <limits.h>
 
 #include "jlib.hpp"
+#include "jflz.hpp"
 #include <mpbase.hpp>
 #include <mpcomm.hpp>
 #include "thorport.hpp"
@@ -260,14 +261,14 @@ struct PartitionInfo
             throw MakeStringException(-1,"SORT: PartitionInfo meta info mismatch(%d,%d)",guard,dsguard);
         splitkeys.kill();
         splitkeys.deserialize(left, mb.readDirect(left));
-    }   
+    }
 };
-    
+
 
 
 typedef CopyReferenceArrayOf<CSortNode> NodeArray;
 
-class CSortMaster: public IThorSorterMaster, public CSimpleInterface
+class CSortMaster : public IThorSorterMaster, public CSimpleInterface
 { 
 public:
     IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
@@ -290,6 +291,7 @@ public:
     size32_t maxdeviance;
     Linked<IRowInterfaces> rowif;
     Linked<IRowInterfaces> auxrowif;
+    Linked<IRowInterfaces> keyIf;
 
     int AddSlave(ICommunicator *comm,rank_t rank,SocketEndpoint &endpoint,mptag_t mpTagRPC)
     {
@@ -301,14 +303,13 @@ public:
     void ConnectSlaves()
     {
         ActPrintLog(activity, "CSortMaster::ConnectSlaves");
-#ifdef CONNECT_IN_PARALLEL
         class casyncfor: public CAsyncFor
         {
         public:
             casyncfor(CSortMaster &_owner, NodeArray &_slaves) : owner(_owner), slaves(_slaves) { }
             void Do(unsigned i)
             {
-                CSortNode &slave = slaves.item(i);          
+                CSortNode &slave = slaves.item(i);
                 if (!slave.doConnect(i,slaves.ordinality())) {
                     char url[100];
                     slave.endpoint.getUrlStr(url,sizeof(url));
@@ -320,21 +321,10 @@ public:
             CSortMaster &owner;
         } afor(*this,slaves);
         afor.For(slaves.ordinality(), CONNECT_IN_PARALLEL);
-#else
-        ForEachItemIn(i,slaves) {
-            CSortNode &slave = slaves.item(i);          
-            if (!slave.doConnect(i,slaves.ordinality())) {
-                char url[100];
-                slave.endpoint.getUrlStr(url,sizeof(url));
-                throw MakeActivityException(activity,TE_CannotConnectToSlave,"CSortMaster::ConnectSlaves: Could not connect to %s",url);
-            }
-        }
-#endif
     }
 
     void InitSlaves()
     {
-#ifdef INIT_IN_PARALLEL
         class casyncfor: public CAsyncFor
         {
         public:
@@ -348,12 +338,6 @@ public:
             NodeArray &slaves;
         } afor(slaves);
         afor.For(slaves.ordinality(), INIT_IN_PARALLEL);
-#else
-        ForEachItemIn(i,slaves) {
-            CSortNode &slave = slaves.item(i);          
-            slave.init();
-        }
-#endif
     }
 
 
@@ -400,14 +384,24 @@ public:
     void SortSetup(IRowInterfaces *_rowif,ICompare *_icompare,ISortKeySerializer *_keyserializer,bool cosort,bool needconnect,const char *_cosortfilenames,IRowInterfaces *_auxrowif)
     {
         ActPrintLog(activity, "Sort setup cosort=%s, needconnect=%s %s",cosort?"true":"false",needconnect?"true":"false",_keyserializer?"has key serializer":"");
+        assertex(_icompare);
         rowif.set(_rowif);
         if (_auxrowif&&_auxrowif->queryRowMetaData())
             auxrowif.set(_auxrowif);
         else
             auxrowif.set(_rowif);
         synchronized proc(slavemutex);
-        icompare = _icompare;
         keyserializer = _keyserializer;
+        if (keyserializer)
+        {
+            keyIf.setown(createRowInterfaces(keyserializer->queryRecordSize(), activity->queryContainer().queryId(), activity->queryCodeContext()));
+            icompare = keyserializer->queryCompareKey();
+        }
+        else
+        {
+            keyIf.set(auxrowif);
+            icompare = _icompare;
+        }
         sorted = false;
         total = 0;
         stotal = 0;
@@ -416,13 +410,11 @@ public:
         maxrecsonnode = 0;
         numnodes = slaves.ordinality();
         estrecsize = 100;
-        if (!partitioninfo) { // if cosort use aux
-            if (cosort) {
+        if (!partitioninfo) // if cosort use aux
+        {
+            if (cosort)
                 ActPrintLog(activity, "Cosort with no prior partition");
-                partitioninfo = new PartitionInfo(activity, auxrowif);
-            }
-            else
-                partitioninfo = new PartitionInfo(activity, rowif);
+            partitioninfo = new PartitionInfo(activity, keyIf);
         }
         free(partitioninfo->nodes);
         free(partitioninfo->mpports);
@@ -440,11 +432,6 @@ public:
             *mpp = slave.mpport;
             mpp++;
         }
-        if (keyserializer&&cosort&&!partitioninfo->IsOK()) {
-            keyserializer = NULL; // when joining to 0 rows can't use (LHS) serializer getMinMax will tell slave
-            ActPrintLog(activity, "Suppressing key serializer on master");
-        }
-        assertex(icompare);
         if (needconnect) // if cosort set, already done!
             ConnectSlaves();
         InitSlaves();
@@ -477,7 +464,6 @@ public:
         synchronized proc(slavemutex);
         if (activity->queryAbortSoon())
             return;
-#ifdef CLOSE_IN_PARALLEL
         class casyncfor: public CAsyncFor
         {
             CActivityBase &activity;
@@ -502,11 +488,6 @@ public:
             return;
         afor.wait = true;
         afor.For(slaves.ordinality(), CLOSE_IN_PARALLEL);
-#else
-        ForEachItemInRev(i,slaves) {
-            slaves.item(i).Close();
-        }
-#endif
         ActPrintLog(activity, "Sort Done");
     }
 
@@ -541,26 +522,30 @@ public:
             CSortNode &slave = slaves.item(i);
             if (slave.numrecs==0)
                 continue;
-            CThorExpandingRowArray minmax(*activity, rowif, true);
             void *p = NULL;
             size32_t retlen = 0;
             size32_t avrecsize=0;
             rowcount_t num=slave.GetMinMax(retlen,p,avrecsize);
-            if (avrecsize) {
+            if (avrecsize)
+            {
                 ers += avrecsize;       // should probably do mode but this is OK
                 ersn++;
             }
             tot += num;
             if (num>0)
             {
-                minmax.deserialize(retlen, p);
+                OwnedConstThorRow slaveMin, slaveMax;
+                RtlDynamicRowBuilder rowBuilder(keyIf->queryRowAllocator());
+                CThorStreamDeserializerSource dsz(retlen, p);
+                size32_t sz = keyIf->queryRowDeserializer()->deserialize(rowBuilder, dsz);
+                slaveMin.setown(rowBuilder.finalizeRowClear(sz));
+                sz = keyIf->queryRowDeserializer()->deserialize(rowBuilder, dsz);
+                slaveMax.setown(rowBuilder.finalizeRowClear(sz));
                 free(p);
-                const void *p = minmax.query(0);
-                if (!min.get()||(icompare->docompare(min,p)>0)) 
-                    min.set(p);
-                p = minmax.query(1);
-                if (!max.get()||(icompare->docompare(max,p)<0)) 
-                    max.set(p);
+                if (!min.get()||(icompare->docompare(min, slaveMin)>0))
+                    min.setown(slaveMin.getClear());
+                if (!max.get()||(icompare->docompare(max, slaveMax)<0))
+                    max.setown(slaveMax.getClear());
             }
         }
         if (ersn)
@@ -569,10 +554,11 @@ public:
             estrecsize = 100;
 #ifdef _TRACE
         if (min)
-            traceKey(rowif->queryRowSerializer(),"Min",min);
+            traceKey(keyIf->queryRowSerializer(),"Min",min);
         if (max)
-            traceKey(rowif->queryRowSerializer(),"Max",max);
-        if (min&&max) {
+            traceKey(keyIf->queryRowSerializer(),"Max",max);
+        if (min&&max)
+        {
             int cmp=icompare->docompare(min,max);
             if (cmp==0) 
                 ActPrintLog(activity, "Min == Max : All keys equal!");
@@ -608,18 +594,18 @@ public:
         unsigned averagesamples = OVERSAMPLE*numnodes;  
         rowcount_t averagerecspernode = (rowcount_t)(total/numnodes);
         CriticalSection asect;
-        CThorExpandingRowArray sample(*activity, rowif, true);
-#ifdef ASYNC_PARTIONING
+        CThorExpandingRowArray sample(*activity, keyIf, true);
         class casyncfor1: public CAsyncFor
         {
+            CSortMaster &owner;
             NodeArray &slaves;
             CThorExpandingRowArray &sample;
             CriticalSection &asect;
             unsigned averagesamples;
             rowcount_t averagerecspernode;
         public:
-            casyncfor1(NodeArray &_slaves, CThorExpandingRowArray &_sample, unsigned _averagesamples, rowcount_t _averagerecspernode, CriticalSection &_asect)
-                : slaves(_slaves), sample(_sample), asect(_asect)
+            casyncfor1(CSortMaster &_owner, NodeArray &_slaves, CThorExpandingRowArray &_sample, unsigned _averagesamples, rowcount_t _averagerecspernode, CriticalSection &_asect)
+                : owner(_owner), slaves(_slaves), sample(_sample), asect(_asect)
             { 
                 averagesamples = _averagesamples;
                 averagerecspernode = _averagerecspernode;
@@ -629,32 +615,26 @@ public:
                 CSortNode &slave = slaves.item(i);
                 unsigned slavesamples = averagerecspernode?((unsigned)((averagerecspernode/2+averagesamples*slave.numrecs)/averagerecspernode)):1;
                 //PrintLog("%d samples for %d",slavesamples,i);
-                if (slavesamples) {
+                if (slavesamples)
+                {
                     size32_t samplebufsize;
                     void *samplebuf=NULL;
                     slave.GetMultiNthRow(slavesamples, samplebufsize, samplebuf);
-                    CriticalBlock block(asect);
-                    sample.deserializeExpand(samplebufsize, samplebuf);
+                    MemoryBuffer mb;
+                    fastLZDecompressToBuffer(mb, samplebuf);
                     free(samplebuf);
+                    CriticalBlock block(asect);
+                    CThorStreamDeserializerSource d(mb.length(), mb.toByteArray());
+                    while (!d.eos())
+                    {
+                        RtlDynamicRowBuilder rowBuilder(owner.keyIf->queryRowAllocator());
+                        size32_t sz = owner.keyIf->queryRowDeserializer()->deserialize(rowBuilder, d);
+                        sample.append(rowBuilder.finalizeRowClear(sz));
+                    }
                 }
             }
-        } afor1(slaves,sample,averagesamples,averagerecspernode,asect);
+        } afor1(*this, slaves,sample,averagesamples,averagerecspernode,asect);
         afor1.For(numnodes, 20, true);
-#else
-        unsigned i;
-        for (i=0;i<numnodes;i++) {
-            CSortNode &slave = slaves.item(i);
-            unsigned slavesamples = (unsigned)((count_t)averagesamples*slave.numrecs/(count_t)averagerecspernode);
-            PrintLog("%d samples for %d",slavesamples,i);
-            if (!slavesamples)
-                continue;
-            size32_t samplebufsize;
-            void *samplebuf=NULL;
-            slave.GetMultiNthRow(slavesamples,samplebufsize,samplebuf);
-            sample.deserializeExpand(samplebufsize, samplebuf);
-            free(samplebuf);
-        }   
-#endif
 #ifdef TRACE_PARTITION2
         {
             ActPrintLog(activity, "partition points");
@@ -670,9 +650,11 @@ public:
         offset_t ts=sample.serializedSize();
         estrecsize = numsamples?((size32_t)(ts/numsamples)):100;
         sample.sort(*icompare, activity->queryMaxCores());
-        CThorExpandingRowArray mid(*activity, rowif, true);
-        if (numsamples) { // could shuffle up empty nodes here
-            for (unsigned i=0;i<numsplits;i++) {
+        CThorExpandingRowArray mid(*activity, keyIf, true);
+        if (numsamples) // could shuffle up empty nodes here
+        {
+            for (unsigned i=0;i<numsplits;i++)
+            {
                 unsigned pos = (unsigned)(((count_t)numsamples*(i+1))/((count_t)numsplits+1));
                 const void *r = sample.get(pos);
                 mid.append(r);
@@ -681,11 +663,12 @@ public:
 #ifdef TRACE_PARTITION2
         {
             ActPrintLog(activity, "merged partitions");
-            for (unsigned i=0;i<mid.ordinality();i++) {
+            for (unsigned i=0;i<mid.ordinality();i++)
+            {
                 const void *k = mid.query(i);
                 StringBuffer str;
                 str.appendf("%d: ",i);
-                traceKey(rowif->queryRowSerializer(),str.str(),(const byte *)k);
+                traceKey(keyIf->queryRowSerializer(),str.str(),(const byte *)k);
             }
         }
 #endif
@@ -695,9 +678,7 @@ public:
         mid.serializeCompress(mdmb);
         mdl = mdmb.length();
         const byte *mdp=(const byte *)mdmb.bufferBase();
-        unsigned i;
-#ifdef ASYNC_PARTIONING
-        i = 0;
+        unsigned i = 0;
         class casyncfor2: public CAsyncFor
         {
             NodeArray &slaves;
@@ -718,14 +699,6 @@ public:
             }
         } afor2(slaves,mdl,mdp);
         afor2.For(numnodes, 20, true);
-#else
-        for (i=0;i<numnodes;i++) {
-            CSortNode &slave = slaves.item(i);
-            if (slave.numrecs!=0)
-                slave.MultiBinChopStart(mdl,mdp,CMPFN_NORMAL);
-        }
-#endif
-#ifdef ASYNC_PARTIONING
         class casyncfor3: public CAsyncFor
         {
             NodeArray &slaves;
@@ -751,16 +724,6 @@ public:
             }
         } afor3(slaves, splitMap, numnodes, numsplits);
         afor3.For(numnodes, 20, true);
-#else
-        for (i=0;i<numnodes;i++) {
-            CSortNode &slave = slaves.item(i);
-            if (slave.numrecs!=0) {
-                rowcount_t *res=splitMap+(i*numnodes);
-                slave.MultiBinChopStop(numsplits,res);
-                res[numnodes-1] = slave.numrecs;
-            }
-        }
-#endif
 #ifdef _TRACE
 #ifdef TRACE_PARTITION
         for (i=0;i<numnodes;i++) {
@@ -779,45 +742,48 @@ public:
 
     rowcount_t *CalcPartition(bool logging)
     {
-        CriticalBlock block(ECFcrit);       
+        CriticalBlock block(ECFcrit);
         // this is a bit long winded
 
         OwnedConstThorRow mink;
         OwnedConstThorRow maxk;
         // so as won't overflow
         OwnedMalloc<rowcount_t> splitmap(numnodes*numnodes, true);
-        if (CalcMinMax(mink,maxk)==0) {
+        if (CalcMinMax(mink, maxk)==0)
+        {
             // no partition info!
             partitioninfo->kill();
             return splitmap.getClear();
         }
         unsigned numsplits=numnodes-1;
-        CThorExpandingRowArray emin(*activity, rowif, true);
-        CThorExpandingRowArray emax(*activity, rowif, true);
-        CThorExpandingRowArray totmid(*activity, rowif, true);
+        CThorExpandingRowArray emin(*activity, keyIf, true);
+        CThorExpandingRowArray emax(*activity, keyIf, true);
+        CThorExpandingRowArray totmid(*activity, keyIf, true);
         ECFarray = &totmid;
         ECFcompare = icompare;
-        CThorExpandingRowArray mid(*activity, rowif, true);
+        CThorExpandingRowArray mid(*activity, keyIf, true);
         unsigned i;
         unsigned j;
-        for(i=0;i<numsplits;i++) {
+        for(i=0;i<numsplits;i++)
+        {
             emin.append(mink.getLink());
             emax.append(maxk.getLink());
         }
         UnsignedArray amid;
         unsigned iter=0;
-        try {
+        try
+        {
             MemoryBuffer mbmn;
             MemoryBuffer mbmx;
             MemoryBuffer mbmd;
-            loop {
+            loop
+            {
 #ifdef _TRACE
                 iter++;
                 ActPrintLog(activity, "Split: %d",iter);
 #endif
                 emin.serializeCompress(mbmn.clear());
                 emax.serializeCompress(mbmx.clear());
-#ifdef ASYNC_PARTIONING
                 class casyncfor: public CAsyncFor
                 {
                     NodeArray &slaves;
@@ -836,14 +802,6 @@ public:
                     }
                 } afor(slaves,mbmn,mbmx);
                 afor.For(numnodes, 20, true);
-#else
-                for (i=0;i<numnodes;i++) {
-                    CSortNode &slave = slaves.item(i);
-                    if (slave.numrecs!=0)
-                        slave.GetMultiMidPointStart(mbmn.length(),mbmn.bufferBase(),mbmx.length(),mbmx.bufferBase());               
-                }
-#endif
-#ifdef ASYNC_PARTIONING
                 Semaphore *nextsem = new Semaphore[numnodes];
                 CriticalSection nextsect;
 
@@ -854,9 +812,10 @@ public:
                     CThorExpandingRowArray &totmid;
                     Semaphore *nextsem;
                     unsigned numsplits;
+                    IRowInterfaces *keyIf;
                 public:
-                    casyncfor2(NodeArray &_slaves, CThorExpandingRowArray &_totmid, unsigned _numsplits, Semaphore *_nextsem)
-                        : slaves(_slaves), totmid(_totmid)
+                    casyncfor2(NodeArray &_slaves, CThorExpandingRowArray &_totmid, unsigned _numsplits, Semaphore *_nextsem, IRowInterfaces *_keyIf)
+                        : slaves(_slaves), totmid(_totmid), keyIf(_keyIf)
                     { 
                         nextsem = _nextsem;
                         numsplits = _numsplits;
@@ -875,8 +834,23 @@ public:
                             unsigned base = totmid.ordinality();
                             if (p)
                             {
-                                totmid.deserializeExpand(retlen, p);
+                                MemoryBuffer mb;
+                                fastLZDecompressToBuffer(mb, p);
                                 free(p);
+                                CThorStreamDeserializerSource d(mb.length(), mb.toByteArray());
+                                while (!d.eos())
+                                {
+                                    RtlDynamicRowBuilder rowBuilder(keyIf->queryRowAllocator());
+                                    bool nullRow;
+                                    d.read(sizeof(bool),&nullRow);
+                                    if (nullRow)
+                                        totmid.append(NULL);
+                                    else
+                                    {
+                                        size32_t sz = keyIf->queryRowDeserializer()->deserialize(rowBuilder, d);
+                                        totmid.append(rowBuilder.finalizeRowClear(sz));
+                                    }
+                                }
                             }
                             while (totmid.ordinality()-base<numsplits)
                                 totmid.append(NULL);
@@ -889,42 +863,20 @@ public:
                         }
                         nextsem[i].signal();
                     }
-                } afor2(slaves, totmid, numsplits, nextsem);
+                } afor2(slaves, totmid, numsplits, nextsem, keyIf);
                 afor2.For(numnodes, 20);
+
                 delete [] nextsem;
-#else
-                for (i=0;i<numnodes;i++) {
-                    CSortNode &slave = slaves.item(i);
-                    unsigned base = totmid.ordinality();
-                    if (slave.numrecs!=0) {
-                        void *p = NULL;
-                        size32_t retlen = 0;
-                        slave.GetMultiMidPointStop(retlen,p);               
-                        totmid.deserializeExpand(retlen, p);
-                        free(p);
-#ifdef _DEBUG
-                        if (logging) {
-                            MemoryBuffer buf;
-                            for (j=0;j<numsplits;j++) {
-                                ActPrintLog(activity, "Min(%d): ",j); traceKey(rowif->queryRowSerializer(),"    ",emin.query(j));
-                                ActPrintLog(activity, "Mid(%d): ",j); traceKey(rowif->queryRowSerializer(),"    ",totmid.query(j+base));
-                                ActPrintLog(activity, "Max(%d): ",j); traceKey(rowif->queryRowSerializer(),"    ",emax.query(j));
-                            }
-                        }
-#endif
-                    }
-                    while (totmid.ordinality()-base<numsplits)
-                        totmid.append(NULL);
-                }
-#endif
                 mid.kill();
                 mbmn.clear();
                 mbmx.clear();
-                for (i=0;i<numsplits;i++) {
+                for (i=0;i<numsplits;i++)
+                {
                     amid.kill();
                     unsigned k;
                     unsigned t = i;
-                    for (k=0;k<numsplits;k++) {
+                    for (k=0;k<numsplits;k++)
+                    {
                         const void *row = totmid.query(t);
                         if (row)
                             amid.append(t);
@@ -938,11 +890,12 @@ public:
                     if (amid.ordinality()) {
                         unsigned mi = amid.item(amid.ordinality()/2);
 #ifdef _DEBUG
-                        if (logging) {
+                        if (logging)
+                        {
                             MemoryBuffer buf;
-                            const void *b =totmid.query(mi);
+                            const void *b = totmid.query(mi);
                             ActPrintLog(activity, "%d: %d %d",i,mi,amid.ordinality()/2);
-                            traceKey(rowif->queryRowSerializer(),"mid",b);
+                            traceKey(keyIf->queryRowSerializer(),"mid",b);
                         }
 #endif
                         mid.append(totmid.get(mi));
@@ -953,29 +906,33 @@ public:
 
                 // calculate split map
                 mid.serializeCompress(mbmd.clear());
-                for (i=0;i<numnodes;i++) {
+                for (i=0;i<numnodes;i++)
+                {
                     CSortNode &slave = slaves.item(i);
                     if (slave.numrecs!=0)
                         slave.MultiBinChopStart(mbmd.length(),(const byte *)mbmd.bufferBase(),CMPFN_NORMAL);
                 }
                 mbmd.clear();
-                for (i=0;i<numnodes;i++) {
+                for (i=0;i<numnodes;i++)
+                {
                     CSortNode &slave = slaves.item(i);
-                    if (slave.numrecs!=0) {
+                    if (slave.numrecs!=0)
+                    {
                         rowcount_t *res=splitmap+(i*numnodes);
                         slave.MultiBinChopStop(numsplits,res);
                         res[numnodes-1] = slave.numrecs;
                     }
                 }
 
-                CThorExpandingRowArray newmin(*activity, rowif, true);
-                CThorExpandingRowArray newmax(*activity, rowif, true);
+                CThorExpandingRowArray newmin(*activity, keyIf, true);
+                CThorExpandingRowArray newmax(*activity, keyIf, true);
                 unsigned __int64 maxerror=0;
                 unsigned __int64 nodewanted = (stotal/numnodes); // Note scaled total
                 unsigned __int64 variancelimit = estrecsize?maxdeviance/estrecsize:0;
                 if (variancelimit>nodewanted/50)
                     variancelimit=nodewanted/50; // 2%
-                for (i=0;i<numsplits;i++) {
+                for (i=0;i<numsplits;i++)
+                {
                     unsigned __int64 tot = 0;
                     unsigned __int64 loc = 0;
                     for (j=0;j<numnodes;j++) {
@@ -996,23 +953,26 @@ public:
                     unsigned __int64 error = (loc>nodewanted)?(loc-nodewanted):(nodewanted-loc);
                     if (error>maxerror)
                         maxerror = error;
-                    if (wanted<tot) {
+                    if (wanted<tot)
+                    {
                         newmin.append(emin.get(i));
                         newmax.append(mid.get(i));
                     }
-                    else if (wanted>tot) {
+                    else if (wanted>tot)
+                    {
                         newmin.append(mid.get(i));
                         newmax.append(emax.get(i));
                     }
-                    else {
+                    else
+                    {
                         newmin.append(emin.get(i));
                         newmax.append(emax.get(i));
                     }
                 }
-                if (emin.equal(icompare,newmin)&&emax.equal(icompare,newmax)) {
+                if (emin.equal(icompare,newmin)&&emax.equal(icompare,newmax))
                     break; // reached steady state 
-                }
-                if ((maxerror*10000<nodewanted)||((iter>3)&&(maxerror<variancelimit))) { // within .01% or within variancelimit 
+                if ((maxerror*10000<nodewanted)||((iter>3)&&(maxerror<variancelimit))) // within .01% or within variancelimit
+                {
                     ActPrintLog(activity, "maxerror = %" CF "d, nodewanted = %" CF "d, variancelimit=%" CF "d, estrecsize=%u, maxdeviance=%u",
                              maxerror,nodewanted,variancelimit,estrecsize,maxdeviance);
                     break;
@@ -1032,11 +992,14 @@ public:
         partitioninfo->splitkeys.transfer(mid);
         partitioninfo->numnodes = numnodes;
 #ifdef _DEBUG
-        if (logging) {
-            for (unsigned i=0;i<numnodes;i++) {
+        if (logging)
+        {
+            for (unsigned i=0;i<numnodes;i++)
+            {
                 StringBuffer str;
                 str.appendf("%d: ",i);
-                for (j=0;j<numnodes;j++) {
+                for (j=0;j<numnodes;j++)
+                {
                     str.appendf("%" RCPF "d, ",splitmap[j+i*numnodes]);
                 }
                 ActPrintLog(activity, "%s",str.str());
@@ -1053,7 +1016,8 @@ public:
 #ifdef _TRACE
 #ifdef TRACE_PARTITION
         ActPrintLog(activity, "UsePartitionInfo %s",uppercmp?"upper":"");
-        for (i=0;i<pi.splitkeys.ordinality();i++) {
+        for (i=0;i<pi.splitkeys.ordinality();i++)
+        {
             StringBuffer s;
             s.appendf("%d: ",i);
             traceKey(pi.prowif->queryRowSerializer(), s.str(), pi.splitkeys.query(i));
@@ -1067,22 +1031,26 @@ public:
         OwnedMalloc<rowcount_t> res(numsplits);
         unsigned j;
         rowcount_t *mapp=splitMap;
-        for (i=0;i<numnodes;i++) {
+        for (i=0;i<numnodes;i++)
+        {
             CSortNode &slave = slaves.item(i);
-            if (numsplits>0) {
+            if (numsplits>0)
+            {
                 MemoryBuffer mb;
                 pi.splitkeys.serialize(mb);
                 assertex(pi.splitkeys.ordinality()==numsplits);
-                slave.MultiBinChop(mb.length(),(const byte *)mb.bufferBase(),numsplits,res,uppercmp?CMPFN_UPPER:CMPFN_COLLATE,true);
+                slave.MultiBinChop(mb.length(),(const byte *)mb.bufferBase(),numsplits,res,uppercmp?CMPFN_UPPER:CMPFN_COLLATE);
                 rowcount_t *resp = res;
                 rowcount_t p=*resp;
                 *mapp = p;
                 resp++;
                 mapp++;
-                for (j=1;j<numsplits;j++) {
+                for (j=1;j<numsplits;j++)
+                {
                     rowcount_t n = *resp;
                     *mapp = n;
-                    if (p>n) {
+                    if (p>n)
+                    {
                         ActPrintLog(activity, "ERROR: Split positions out of order!");
                         throw MakeActivityException(activity, TE_SplitPostionsOutOfOrder,"CSortMaster::UsePartitionInfo: Split positions out of order!");
                     }
@@ -1098,10 +1066,12 @@ public:
 #ifdef TRACE_PARTITION
         ActPrintLog(activity, "UsePartitionInfo result");
         rowcount_t *p = splitMap;
-        for (i=0;i<numnodes;i++) {
+        for (i=0;i<numnodes;i++)
+        {
             StringBuffer s;
             s.appendf("%d: ",i);
-            for (j=0;j<numnodes;j++) {
+            for (j=0;j<numnodes;j++)
+            {
                 s.appendf(" %" RCPF "d,",*p);
                 p++;
             }
@@ -1114,26 +1084,28 @@ public:
 
     void CalcExtPartition()
     {
-        // I think this dependant on row being same format as meta
+        // I think this dependent on row being same format as meta
 
         unsigned numsplits=numnodes-1;
-        CThorExpandingRowArray splits(*activity, auxrowif, true);
+        CThorExpandingRowArray splits(*activity, keyIf, true);
         char *s=cosortfilenames;
         unsigned i;
-        for(i=0;i<numnodes;i++) {
+        for(i=0;i<numnodes;i++)
+        {
             char *e=strchr(s,'|');
             if (e) 
                 *e = 0;
             else if (i!=numnodes-1)
                 return;
-            if (i) {
+            if (i)
+            {
                 CSortNode &slave = slaves.item(i);
                 byte *rowmem;
                 size32_t rowsize;
                 if (!slave.FirstRowOfFile(s,rowsize,rowmem))
                     return;
                 OwnedConstThorRow row;
-                row.deserialize(auxrowif,rowsize,rowmem);
+                row.deserialize(keyIf,rowsize,rowmem);
                 splits.append(row.getClear());
                 free(rowmem);
             }
@@ -1149,20 +1121,22 @@ public:
     {
         ActPrintLog(activity, "Previous partition");
         unsigned numsplits=numnodes-1;
-        CThorExpandingRowArray splits(*activity, auxrowif, true);
+        CThorExpandingRowArray splits(*activity, keyIf, true);
         unsigned i;
-        for(i=1;i<numnodes;i++) {
+        for(i=1;i<numnodes;i++)
+        {
             CSortNode &slave = slaves.item(i);
             byte *rowmem;
             size32_t rowsize;
             if (!slave.FirstRowOfFile("",rowsize,rowmem))
                 return;
             OwnedConstThorRow row;
-            row.deserialize(auxrowif,rowsize,rowmem);
-            if (row&&rowsize) {
+            row.deserialize(keyIf, rowsize, rowmem);
+            if (row&&rowsize)
+            {
                 StringBuffer n;
                 n.append(i).append(": ");
-                traceKey(auxrowif->queryRowSerializer(),n,row);
+                traceKey(keyIf->queryRowSerializer(),n,row);
             }
             splits.append(row.getClear());
             free(rowmem);
@@ -1244,39 +1218,48 @@ public:
         bool usesampling = true;        
 #endif
         bool useAux = false; // JCSMORE using existing partioning and auxillary rowIf (only used if overflow)
-        loop {
+        loop
+        {
             OwnedMalloc<rowcount_t> splitMap, splitMapUpper;
             CTimer timer;
-            if (numnodes>1) {
+            if (numnodes>1)
+            {
                 timer.start();
-                if (cosortfilenames) {
+                if (cosortfilenames)
+                {
                     useAux = true;
                     CalcExtPartition();
                     canoptimizenullcolumns = false;
                 }
-                if (usepartitionrow) {
+                if (usepartitionrow)
+                {
                     useAux = true;
                     CalcPreviousPartition();
                     canoptimizenullcolumns = false;
                 }
-                if (partitioninfo->IsOK()) {
+                if (partitioninfo->IsOK())
+                {
                     useAux = true;
                     splitMap.setown(UsePartitionInfo(*partitioninfo, betweensort));
-                    if (betweensort) {
+                    if (betweensort)
+                    {
                         splitMapUpper.setown(UsePartitionInfo(*partitioninfo, false));
                         canoptimizenullcolumns = false;
                     }
                 }
-                else {
+                else
+                {
                     // check for small sort here
-                    if ((skewError<0.0)&&!betweensort) {
+                    if ((skewError<0.0)&&!betweensort)
+                    {
                         splitMap.setown(CalcPartitionUsingSampling());
                         skewError = -skewError;
 #ifdef USE_SAMPLE_PARTITIONING
                         usesampling = false;
 #endif
                     }
-                    else {
+                    else
+                    {
                         if (skewError<0.0)
                             skewError = -skewError;
 
@@ -1291,7 +1274,8 @@ public:
                             splitMap.setown(CalcPartition(false));
 #endif
                     }
-                    if (!partitioninfo->splitkeys.checkSorted(icompare)) {
+                    if (!partitioninfo->splitkeys.checkSorted(icompare))
+                    {
                         ActPrintLog(activity, "ERROR: Split keys out of order!");
                         partitioninfo->splitkeys.sort(*icompare, activity->queryMaxCores());
                     }
@@ -1300,18 +1284,22 @@ public:
             }
             OwnedMalloc<SocketEndpoint> endpoints(numnodes);
             SocketEndpoint *epp = endpoints;
-            for (i=0;i<numnodes;i++) {
+            for (i=0;i<numnodes;i++)
+            {
                 CSortNode &slave = slaves.item(i);
                 *epp = slave.endpoint;
                 epp++;
             }
-            if (numnodes>1) {
+            if (numnodes>1)
+            {
                 // minimize logging
                 unsigned numspilt = 0;
                 UnsignedArray spilln;
-                for (i=0;i<numnodes;i++) {
+                for (i=0;i<numnodes;i++)
+                {
                     CSortNode &slave = slaves.item(i);
-                    if (slave.overflow) {
+                    if (slave.overflow)
+                    {
                         while (spilln.ordinality()<slave.scale)
                             spilln.append(0);
                         spilln.replace(spilln.item(slave.scale-1)+1,slave.scale-1);
@@ -1324,14 +1312,19 @@ public:
                     unsigned mostspilt = 0;
                     unsigned spiltmax = 0;
                     ForEachItemIn(smi,spilln)
-                        if (spilln.item(smi)>spiltmax) {
+                    {
+                        if (spilln.item(smi)>spiltmax)
+                        {
                             spiltmax = spilln.item(smi);
                             mostspilt = smi;
                         }
+                    }
                     ActPrintLog(activity, "Gather - %d nodes spilt to disk, most %d times",numspilt,mostspilt);
-                    for (i=0;i<numnodes;i++) {
+                    for (i=0;i<numnodes;i++)
+                    {
                         CSortNode &slave = slaves.item(i);
-                        if (slave.scale!=mostspilt+1) {
+                        if (slave.scale!=mostspilt+1)
+                        {
                             char url[100];
                             slave.endpoint.getUrlStr(url,sizeof(url));
                             ActPrintLog(activity, "Gather - node %s spilled %d times to disk",url,slave.scale-1);
@@ -1339,23 +1332,28 @@ public:
                     }
                     MemoryBuffer mbsk;
                     partitioninfo->splitkeys.serialize(mbsk);
-                    for (i=0;i<numnodes;i++) {
+                    for (i=0;i<numnodes;i++)
+                    {
                         CSortNode &slave = slaves.item(i);
                         if (slave.overflow) 
                             slave.OverflowAdjustMapStart(numnodes,splitMap+i*numnodes,mbsk.length(),(const byte *)mbsk.bufferBase(),CMPFN_COLLATE,useAux);
                     }
-                    for (i=0;i<numnodes;i++) {
+                    for (i=0;i<numnodes;i++)
+                    {
                         CSortNode &slave = slaves.item(i);
                         if (slave.overflow) 
                             slave.AdjustNumRecs(slave.OverflowAdjustMapStop(numnodes,splitMap+i*numnodes));
                     }
-                    if (splitMapUpper.get()) {
-                        for (i=0;i<numnodes;i++) {
+                    if (splitMapUpper.get())
+                    {
+                        for (i=0;i<numnodes;i++)
+                        {
                             CSortNode &slave = slaves.item(i);
                             if (slave.overflow) 
                                 slave.OverflowAdjustMapStart(numnodes,splitMapUpper+i*numnodes,mbsk.length(),(const byte *)mbsk.bufferBase(),CMPFN_UPPER,useAux);
                         }
-                        for (i=0;i<numnodes;i++) {
+                        for (i=0;i<numnodes;i++)
+                        {
                             CSortNode &slave = slaves.item(i);
                             if (slave.overflow) 
                                 slave.OverflowAdjustMapStop(numnodes,splitMapUpper+i*numnodes);
@@ -1366,9 +1364,10 @@ public:
                 OwnedMalloc<rowcount_t> tot(numnodes, true);
                 rowcount_t max=0;
                 unsigned imax=numnodes;
-                for (i=0;i<imax;i++) {
+                for (i=0;i<imax;i++){
                     unsigned j;
-                    for (j=0;j<numnodes;j++) {
+                    for (j=0;j<numnodes;j++)
+                    {
                         if (splitMapUpper)
                             tot[i]+=splitMapUpper[i+j*numnodes];
                         else
@@ -1378,9 +1377,12 @@ public:
                     }
                     if (tot[i]>max)
                         max = tot[i];
-                    if (!betweensort&&canoptimizenullcolumns&&(tot[i]==0)) {
-                        for (j=0;j<numnodes;j++) {
-                            for (unsigned k=i+1;k<numnodes;k++) {
+                    if (!betweensort&&canoptimizenullcolumns&&(tot[i]==0))
+                    {
+                        for (j=0;j<numnodes;j++)
+                        {
+                            for (unsigned k=i+1;k<numnodes;k++)
+                            {
                                 splitMap[k+j*numnodes-1] = splitMap[k+j*numnodes];
                             }
                         }
@@ -1388,7 +1390,8 @@ public:
                         i--;
                     }
                 }
-                for (i=0;i<numnodes;i++) {
+                for (i=0;i<numnodes;i++)
+                {
                     CSortNode &slave = slaves.item(i);
                     char url[100];
                     slave.endpoint.getUrlStr(url,sizeof(url));
@@ -1402,7 +1405,8 @@ public:
                     splitMap.setown(CalcPartition(true));
 #endif
 #ifdef USE_SAMPLE_PARTITIONING
-                    if (usesampling) {
+                    if (usesampling)
+                    {
                         ActPrintLog(activity, "Partioning using sampling failed, trying iterative partitioning"); 
                         usesampling = false;
                         continue;
@@ -1411,7 +1415,8 @@ public:
                     throw e.getClear();
                 }
                 ActPrintLog(activity, "Starting Merge of %" RCPF "d records",total);
-                for (i=0;i<numnodes;i++) {
+                for (i=0;i<numnodes;i++)
+                {
                     CSortNode &slave = slaves.item(i);
                     char url[100];
                     slave.endpoint.getUrlStr(url,sizeof(url));
@@ -1422,7 +1427,8 @@ public:
     //              ActPrintLog(activity, "Merge %d started: %d rows on %s",i,tot[i],url);
                 }
             }
-            else {
+            else
+            {
                 CSortNode &slave = slaves.item(0);
                 slave.SingleMerge();
                 ActPrintLog(activity, "Merge started");

+ 4 - 62
thorlcr/msort/tsortmp.cpp

@@ -15,8 +15,6 @@ enum MPSlaveFunctions
     FN_StartGather,
     FN_GetGatherInfo,
     FN_GetMinMax,
-    FN_GetMidPoint,
-    FN_GetMultiMidPoint,
     FN_GetMultiMidPointStart,
     FN_GetMultiMidPointStop,
     FN_MultiBinChop,
@@ -126,29 +124,6 @@ rowcount_t SortSlaveMP::GetMinMax(size32_t &keybuffsize,void *&keybuff, size32_t
     return ret;
 }
 
-bool SortSlaveMP::GetMidPoint (size32_t lkeysize, const byte * lkey,size32_t hkeysize, const byte * hkey,  size32_t &mkeysize, byte * &mkey)
-{
-    CMessageBuffer mb;
-    mb.append((byte)FN_GetMidPoint );
-    serializeblk(mb,lkeysize,lkey);
-    serializeblk(mb,hkeysize,hkey);
-    sendRecv(mb);
-    deserializeblk(mb,mkeysize,mkey);
-    bool ret;
-    mb.read(ret);
-    return ret;
-}
-
-void SortSlaveMP::GetMultiMidPoint(size32_t lkeybuffsize, const void * lkeybuff,size32_t hkeybuffsize, const void * hkeybuff,  size32_t &mkeybuffsize, void * &mkeybuf)
-{
-    CMessageBuffer mb;
-    mb.append((byte)FN_GetMultiMidPoint);
-    serializeblk(mb,lkeybuffsize,lkeybuff);
-    serializeblk(mb,hkeybuffsize,hkeybuff);
-    sendRecv(mb);
-    deserializeblk(mb,mkeybuffsize,mkeybuf);
-}
-
 void SortSlaveMP::GetMultiMidPointStart(size32_t lkeybuffsize, const void * lkeybuff, size32_t hkeybuffsize, const void * hkeybuff) /* async */
 {
     CMessageBuffer mb;
@@ -166,11 +141,11 @@ void SortSlaveMP::GetMultiMidPointStop(size32_t &mkeybuffsize, void * &mkeybuf)
     deserializeblk(mb,mkeybuffsize,mkeybuf);
 }
 
-void SortSlaveMP::MultiBinChop(size32_t keybuffsize,const byte *keybuff, unsigned num,rowcount_t *pos,byte cmpfn,bool useaux)
+void SortSlaveMP::MultiBinChop(size32_t keybuffsize,const byte *keybuff, unsigned num,rowcount_t *pos,byte cmpfn)
 {
     CMessageBuffer mb;
     mb.append((byte)FN_MultiBinChop);
-    serializeblk(mb,keybuffsize,keybuff).append(num).append(cmpfn).append(useaux);
+    serializeblk(mb,keybuffsize,keybuff).append(num).append(cmpfn);
     sendRecv(mb);
     mb.read(num*sizeof(rowcount_t),pos);
 }
@@ -354,22 +329,6 @@ bool SortSlaveMP::marshall(ISortSlaveMP &slave, ICommunicator* comm, mptag_t tag
                 free(keybuff);
             }
             break;
-            case FN_GetMidPoint : {
-                size32_t lkeysize;
-                byte * lkey;
-                size32_t hkeysize;
-                byte * hkey;
-                deserializeblk(mb,lkeysize,lkey);
-                deserializeblk(mb,hkeysize,hkey);
-                size32_t mkeysize=0;
-                byte * mkey=NULL;
-                bool ret = slave.GetMidPoint(lkeysize,lkey,hkeysize,hkey,mkeysize,mkey);
-                free(lkey);
-                free(hkey);
-                serializeblk(mbout,mkeysize,mkey).append(ret);
-                free(mkey);
-            }
-            break;
             case FN_GetMultiMidPointStart: {
                 replydone = true;
                 comm->reply(mbout);
@@ -384,22 +343,6 @@ bool SortSlaveMP::marshall(ISortSlaveMP &slave, ICommunicator* comm, mptag_t tag
                 free(hkeybuff);
             }
             break;
-            case FN_GetMultiMidPoint: {
-                size32_t lkeybuffsize;
-                void * lkeybuff;
-                size32_t hkeybuffsize;
-                void * hkeybuff;
-                deserializeblk(mb,lkeybuffsize,lkeybuff);
-                deserializeblk(mb,hkeybuffsize,hkeybuff);
-                size32_t mkeybuffsize=0;
-                void * mkeybuff = NULL;
-                slave.GetMultiMidPoint(lkeybuffsize,lkeybuff,hkeybuffsize,hkeybuff,mkeybuffsize,mkeybuff);
-                free(lkeybuff);
-                free(hkeybuff);
-                serializeblk(mbout,mkeybuffsize,mkeybuff);
-                free(mkeybuff);
-            }
-            break;
             case FN_MultiBinChopStop: {
                 unsigned num;
                 mb.read(num);
@@ -433,10 +376,9 @@ bool SortSlaveMP::marshall(ISortSlaveMP &slave, ICommunicator* comm, mptag_t tag
                 deserializeblk(mb,keybuffsize,keybuff);
                 unsigned num;
                 byte cmpfn;
-                bool useaux;
-                mb.read(num).read(cmpfn).read(useaux);
+                mb.read(num).read(cmpfn);
                 void *out = mbout.reserveTruncate(num*sizeof(rowcount_t));
-                slave.MultiBinChop(keybuffsize,(const byte *)keybuff,num,(rowcount_t *)out,cmpfn,useaux);
+                slave.MultiBinChop(keybuffsize,(const byte *)keybuff,num,(rowcount_t *)out,cmpfn);
                 free(keybuff);
             }
             break;

+ 2 - 6
thorlcr/msort/tsortmp.hpp

@@ -17,11 +17,9 @@ interface ISortSlaveMP
     virtual void StartGather()=0;
     virtual void GetGatherInfo(rowcount_t &numlocal, offset_t &totalsize, unsigned &overflowscale, bool hasserializer)=0;
     virtual rowcount_t GetMinMax(size32_t &keybuffsize,void *&keybuff, size32_t &avrecsizesize)=0;
-    virtual bool GetMidPoint     (size32_t lkeysize, const byte * lkey, size32_t hkeysize, const byte * hkey, size32_t &mkeysize, byte * &mkey)=0;
-    virtual void GetMultiMidPoint(size32_t lkeybuffsize, const void * lkeybuff, size32_t hkeybuffsize, const void * hkeybuff, size32_t &mkeybuffsize, void * &mkeybuf)=0;
     virtual void GetMultiMidPointStart(size32_t lkeybuffsize, const void * lkeybuff, size32_t hkeybuffsize, const void * hkeybuff)=0; /* async */
     virtual void GetMultiMidPointStop(size32_t &mkeybuffsize, void * &mkeybuf)=0;
-    virtual void MultiBinChop(size32_t keybuffsize,const byte *keybuff, unsigned num,rowcount_t *pos,byte cmpfn,bool useaux)=0;
+    virtual void MultiBinChop(size32_t keybuffsize,const byte *keybuff, unsigned num,rowcount_t *pos,byte cmpfn)=0;
     virtual void MultiBinChopStart(size32_t keybuffsize,const byte *keybuff, byte cmpfn)=0; /* async */
     virtual void MultiBinChopStop(unsigned num,rowcount_t *pos)=0;
     virtual void OverflowAdjustMapStart(unsigned mapsize,rowcount_t *map,size32_t keybuffsize,const byte *keybuff, byte cmpfn, bool useaux)=0; /* async */
@@ -53,11 +51,9 @@ public:
     void StartGather();
     void GetGatherInfo(rowcount_t &numlocal, offset_t &totalsize, unsigned &overflowscale, bool hasserializer);
     rowcount_t GetMinMax(size32_t &keybuffsize,void *&keybuff, size32_t &avrecsizesize);
-    bool GetMidPoint     (size32_t lkeysize, const byte * lkey, size32_t hkeysize, const byte * hkey, size32_t &mkeysize, byte * &mkey);
-    void GetMultiMidPoint(size32_t lkeybuffsize, const void * lkeybuff, size32_t hkeybuffsize, const void * hkeybuff, size32_t &mkeybuffsize, void * &mkeybuf);
     void GetMultiMidPointStart(size32_t lkeybuffsize, const void * lkeybuff, size32_t hkeybuffsize, const void * hkeybuff); /* async */
     void GetMultiMidPointStop(size32_t &mkeybuffsize, void * &mkeybuf);
-    void MultiBinChop(size32_t keybuffsize,const byte *keybuff, unsigned num,rowcount_t *pos,byte cmpfn,bool useaux);
+    void MultiBinChop(size32_t keybuffsize,const byte *keybuff, unsigned num,rowcount_t *pos,byte cmpfn);
     void MultiBinChopStart(size32_t keybuffsize,const byte *keybuff, byte cmpfn); /* async */
     void MultiBinChopStop(unsigned num,rowcount_t *pos);
     void OverflowAdjustMapStart(unsigned mapsize,rowcount_t *map,size32_t keybuffsize,const byte *keybuff, byte cmpfn,bool useaux); /* async */

+ 180 - 145
thorlcr/msort/tsorts.cpp

@@ -18,6 +18,7 @@
 #include "platform.h"
 #include <limits.h>
 #include "jlib.hpp"
+#include "jflz.hpp"
 #include <mpbase.hpp>
 #include <mpcomm.hpp>
 #include "thorport.hpp"
@@ -584,15 +585,38 @@ class CThorSorter : public CSimpleInterface, implements IThorSorter, implements
     offset_t grandtotalsize;
     rowcount_t *overflowmap, *multibinchoppos;
     bool stopping, gatherdone, nosort, isstable;
-    ICompare *icompare;
-    ICompare *icollate; // used for co-sort
-    ICompare *icollateupper; // used in between join
+    ICompare *rowCompare, *keyRowCompare;
+    ICompare *primarySecondaryCompare; // used for co-sort
+    ICompare *primarySecondaryUpperCompare; // used in between join
     ISortKeySerializer *keyserializer;      // used on partition calculation
+    Owned<IRowInterfaces> keyIf;
+    Owned<IOutputRowSerializer> rowToKeySerializer;
     void *midkeybuf;
     Semaphore startgathersem, finishedmergesem, closedownsem;
     InterruptableSemaphore startmergesem;
     size32_t transferblocksize, midkeybufsize;
 
+    class CRowToKeySerializer : public CSimpleInterfaceOf<IOutputRowSerializer>
+    {
+        ISortKeySerializer *keyConverter;
+        IRowInterfaces *rowIf, *keyIf;
+    public:
+        CRowToKeySerializer(IRowInterfaces *_rowIf, IRowInterfaces *_keyIf, ISortKeySerializer *_keyConverter)
+            : rowIf(_rowIf), keyIf(_keyIf), keyConverter(_keyConverter)
+        {
+        }
+        // IOutputRowSerializer impl.
+        virtual void serialize(IRowSerializerTarget & out, const byte *row)
+        {
+            CSizingSerializer ssz;
+            rowIf->queryRowSerializer()->serialize(ssz, (const byte *)row);
+            size32_t recSz = ssz.size();
+            RtlDynamicRowBuilder k(keyIf->queryRowAllocator());
+            size32_t keySz = keyConverter->recordToKey(k, row, recSz);
+            OwnedConstThorRow keyRow =  k.finalizeRowClear(keySz);
+            keyIf->queryRowSerializer()->serialize(out, (const byte *)keyRow.get());
+        }
+    };
     void main()
     {
         try
@@ -626,10 +650,17 @@ class CThorSorter : public CSimpleInterface, implements IThorSorter, implements
         stopping = true;
     }
 #ifdef _TRACE
-    void TraceKey(const char *s, const void *k)
+    void TraceRow(const char *s, const void *k)
     {
         traceKey(rowif->queryRowSerializer(), s, k);
     }
+    void TraceKey(const char *s, const void *k)
+    {
+        traceKey(keyIf->queryRowSerializer(), s, k);
+    }
+#else
+#define TraceRow(msg, row)
+#define TraceKey(msg, key)
 #endif
 
     void stop()
@@ -646,33 +677,36 @@ class CThorSorter : public CSimpleInterface, implements IThorSorter, implements
     ICompare *queryCmpFn(byte cmpfn)
     {
         switch (cmpfn) {
-        case CMPFN_NORMAL: return icompare;
-        case CMPFN_COLLATE: return icollate;
-        case CMPFN_UPPER: return icollateupper;
+        case CMPFN_NORMAL: return keyRowCompare;
+        case CMPFN_COLLATE: return primarySecondaryCompare;
+        case CMPFN_UPPER: return primarySecondaryUpperCompare;
         }
         return NULL;
     }
-    rowidx_t BinChop(const void *row, bool lesseq, bool firstdup, byte cmpfn)
+    rowidx_t BinChop(const void *key, bool lesseq, bool firstdup, byte cmpfn)
     {
         rowidx_t n = rowArray.ordinality();
         rowidx_t l=0;
         rowidx_t r=n;
         ICompare* icmp=queryCmpFn(cmpfn);
-        while (l<r) {
+        while (l<r)
+        {
             rowidx_t m = (l+r)/2;
-            const void *p = rowArray.query(m);
-            int cmp = icmp->docompare(row, p);
+            int cmp = icmp->docompare(key, rowArray.query(m));
             if (cmp < 0)
                 r = m;
             else if (cmp > 0)
                 l = m+1;
-            else {
-                if (firstdup) {
-                    while ((m>0)&&(icmp->docompare(row, rowArray.query(m-1))==0))
+            else
+            {
+                if (firstdup)
+                {
+                    while ((m>0)&&(icmp->docompare(key, rowArray.query(m-1))==0))
                         m--;
                 }
-                else {
-                    while ((m+1<n)&&(icmp->docompare(row, rowArray.query(m+1))==0))
+                else
+                {
+                    while ((m+1<n)&&(icmp->docompare(key, rowArray.query(m+1))==0))
                         m++;
                 }
                 return m;
@@ -688,7 +722,8 @@ class CThorSorter : public CSimpleInterface, implements IThorSorter, implements
         for (unsigned n=0;n<num;n++)
         {
             unsigned i = n;
-            loop {                                      // adjustment for empty keys
+            loop                                      // adjustment for empty keys
+            {
                 if (i>=keys.ordinality())
                 {
                     pos[n] = rowArray.ordinality();
@@ -741,10 +776,10 @@ public:
     {
         numnodes = 0;
         partno = 0;
-        icompare = NULL;
+        rowCompare = keyRowCompare = NULL;
         nosort = false;
-        icollate = NULL;
-        icollateupper = NULL;
+        primarySecondaryCompare = NULL;
+        primarySecondaryUpperCompare = NULL;
         midkeybuf = NULL;
         multibinchoppos = NULL;
         transferblocksize = TRANSFERBLOCKSIZE;
@@ -782,78 +817,40 @@ public:
     {
         if (!gatherdone)
             ERRLOG("GetGatherInfo:***Error called before gather complete");
-        if (!haskeyserializer&&keyserializer) {
-            ActPrintLog(activity, "Suppressing key serializer on slave");
-            keyserializer = NULL;       // when cosorting and LHS empty
-        }
-        else if (haskeyserializer&&!keyserializer) {
-            WARNLOG("Mismatched key serializer (master has, slave doesn't");
-        }
+        if (haskeyserializer != (NULL != keyserializer))
+            throwUnexpected();
         numlocal = rowArray.ordinality(); // JCSMORE - this is sample total, why not return actual spill total?
         _overflowscale = overflowinterval;
         totalsize = grandtotalsize; // used by master, if nothing overflowed to see if can MiniSort
     }
     virtual rowcount_t GetMinMax(size32_t &keybufsize,void *&keybuf,size32_t &avrecsize)
     {
-        CThorExpandingRowArray ret(*activity, rowif, true);
         avrecsize = 0;
-        if (rowArray.ordinality()>0) {
-            const void *kp = rowArray.get(0);
-#ifdef _TRACE
-            TraceKey("Min =", kp);
-#endif
-            ret.append(kp);
-            kp = rowArray.get(rowArray.ordinality()-1);
-#ifdef _TRACE
-            TraceKey("Max =", kp);
-#endif
-            ret.append(kp);
-            avrecsize = (size32_t)(grandtotalsize/grandtotal);
-#ifdef _TRACE
-            ActPrintLog(activity, "Ave Rec Size = %u",avrecsize);
-#endif
+        if (0 == rowArray.ordinality())
+        {
+            keybufsize = 0;
+            keybuf = NULL;
+            return 0;
         }
+
         MemoryBuffer mb;
-        ret.serialize(mb);
+        CMemoryRowSerializer msz(mb);
+
+        const void *kp = rowArray.get(0);
+        TraceRow("Min =", kp);
+        rowToKeySerializer->serialize(msz, (const byte *)kp);
+
+        kp = rowArray.get(rowArray.ordinality()-1);
+        TraceRow("Max =", kp);
+        rowToKeySerializer->serialize(msz, (const byte *)kp);
+
+        avrecsize = (size32_t)(grandtotalsize/grandtotal);
+#ifdef _TRACE
+        ActPrintLog(activity, "Ave Rec Size = %u", avrecsize);
+#endif
         keybufsize = mb.length();
         keybuf = mb.detach();
-        return rowArray.ordinality();
-    }
-    virtual bool GetMidPoint(size32_t lsize,const byte *lkeymem,
-                    size32_t hsize,const byte *hkeymem,
-                    size32_t &msize,byte *&mkeymem)
-    {
-        // finds the keys within the ranges specified
-        // uses empty keys (0 size) if none found
-        // try to avoid endpoints if possible
-        if (rowArray.ordinality()!=0) {
-            OwnedConstThorRow lkey;
-            lkey.deserialize(rowif,lsize,lkeymem);
-            OwnedConstThorRow hkey;
-            hkey.deserialize(rowif,hsize,hkeymem);
-            unsigned p1 = BinChop(lkey.get(),false,false,false);
-            if (p1==(unsigned)-1)
-                p1 = 0;
-            unsigned p2 = BinChop(hkey.get(),true,true,false);
-            if (p2>=rowArray.ordinality()) 
-                p2 = rowArray.ordinality()-1;
-            if (p1<=p2) {
-                unsigned pm=(p1+p2+1)/2;
-                const void *kp=rowArray.query(pm);
-                if ((icompare->docompare(lkey,kp)<=0)&&
-                        (icompare->docompare(hkey,kp)>=0)) { // paranoia
-                    MemoryBuffer mb;
-                    CMemoryRowSerializer mbsz(mb);
-                    rowif->queryRowSerializer()->serialize(mbsz, (const byte *)kp);
-                    msize = mb.length();
-                    mkeymem = (byte *)mb.detach();;
-                    return true;
-                }
-            }
-        }
-        mkeymem = NULL;
-        msize = 0;
-        return false;
+        return 2;
     }
     virtual void GetMultiMidPoint(size32_t lbufsize,const void *lkeybuf,
                           size32_t hbufsize,const void *hkeybuf,
@@ -861,27 +858,31 @@ public:
     {
         // finds the keys within the ranges specified
         // uses empty keys (0 size) if none found
-        CThorExpandingRowArray low(*activity, rowif, true);
-        CThorExpandingRowArray high(*activity, rowif, true);
-        CThorExpandingRowArray mid(*activity, rowif, true);
+        CThorExpandingRowArray low(*activity, keyIf, true);
+        CThorExpandingRowArray high(*activity, keyIf, true);
+        CThorExpandingRowArray mid(*activity, keyIf, true);
         low.deserializeExpand(lbufsize, lkeybuf);
         high.deserializeExpand(hbufsize, hkeybuf);
         unsigned n=low.ordinality();
         assertex(n==high.ordinality());
         unsigned i;
-        for (i=0;i<n;i++) {
-            if (rowArray.ordinality()!=0) {
-                unsigned p1 = BinChop(low.query(i), false, false, false);
+        for (i=0;i<n;i++)
+        {
+            if (rowArray.ordinality()!=0)
+            {
+                unsigned p1 = BinChop(low.query(i), false, false, CMPFN_NORMAL);
                 if (p1==(unsigned)-1)
                     p1 = 0;
-                unsigned p2 = BinChop(high.query(i), true, true, false);
+                unsigned p2 = BinChop(high.query(i), true, true, CMPFN_NORMAL);
                 if (p2>=rowArray.ordinality()) 
                     p2 = rowArray.ordinality()-1;
-                if (p1<=p2) { 
+                if (p1<=p2)
+                {
                     unsigned pm=(p1+p2+1)/2;
                     OwnedConstThorRow kp = rowArray.get(pm);
-                    if ((icompare->docompare(low.query(i), kp)<=0)&&
-                        (icompare->docompare(high.query(i), kp)>=0)) { // paranoia
+                    if ((keyRowCompare->docompare(low.query(i), kp)<=0)&&
+                        (keyRowCompare->docompare(high.query(i), kp)>=0)) // paranoia
+                    {
                         mid.append(kp.getClear());
                     }
                     else
@@ -894,9 +895,22 @@ public:
                 mid.append(NULL);
         }
         MemoryBuffer mb;
-        mid.serializeCompress(mb);
-        mbufsize = mb.length();
-        mkeybuf = mb.detach();
+        CMemoryRowSerializer s(mb);
+        for (rowidx_t i=0; i<mid.ordinality(); i++)
+        {
+            const void *row = mid.query(i);
+            if (row)
+            {
+                mb.append(false);
+                rowToKeySerializer->serialize(s, (const byte *)row);
+            }
+            else
+                mb.append(true);
+        }
+        MemoryBuffer compressedMb;
+        fastLZCompressToBuffer(compressedMb, mb.length(), mb.toByteArray());
+        mbufsize = compressedMb.length();
+        mkeybuf = compressedMb.detach();
     }
     virtual void GetMultiMidPointStart(size32_t lbufsize,const void *lkeybuf,
                                size32_t hbufsize,const void *hkeybuf)
@@ -911,15 +925,15 @@ public:
         mbufsize = midkeybufsize;
         midkeybuf = NULL;
     }
-    virtual void MultiBinChop(size32_t keybufsize, const byte * keybuf, unsigned num, rowcount_t * pos, byte cmpfn, bool useaux)
+    virtual void MultiBinChop(size32_t keybufsize, const byte * keybuf, unsigned num, rowcount_t * pos, byte cmpfn)
     {
-        CThorExpandingRowArray keys(*activity, useaux?auxrowif:rowif, true);
+        CThorExpandingRowArray keys(*activity, keyIf, true);
         keys.deserialize(keybufsize, keybuf);
         doBinChop(keys, pos, num, cmpfn);
     }
     virtual void MultiBinChopStart(size32_t keybufsize, const byte * keybuf, byte cmpfn)
     {
-        CThorExpandingRowArray keys(*activity, rowif, true);
+        CThorExpandingRowArray keys(*activity, keyIf, true);
         keys.deserializeExpand(keybufsize, keybuf);
         assertex(multibinchoppos==NULL); // check for reentrancy
         multibinchopnum = keys.ordinality();
@@ -948,7 +962,7 @@ public:
         for (i=0;i<mapsize;i++)
             ActPrintLog(activity, "%" RCPF "d ",overflowmap[i]);
 #endif
-        CThorExpandingRowArray keys(*activity, useaux?auxrowif:rowif, true);
+        CThorExpandingRowArray keys(*activity, keyIf, true);
         keys.deserialize(keybufsize, keybuf);
         for (i=0;i<mapsize-1;i++)
             AdjustOverflow(overflowmap[i], keys.query(i), cmpfn);
@@ -992,32 +1006,27 @@ public:
     virtual bool FirstRowOfFile(const char *filename,
                         size32_t &rowbufsize, byte * &rowbuf)
     {
-        if (!*filename) { // partition row wanted
-            if (partitionrow) {
-                MemoryBuffer mb;
-                CMemoryRowSerializer ssz(mb);
-                auxrowif->queryRowSerializer()->serialize(ssz,(const byte *)partitionrow.get());
-                rowbufsize = mb.length();
-                rowbuf = (byte *)mb.detach();   
-                partitionrow.clear(); // only one attempt! 
-            }
-            else {
-                rowbuf = NULL;
-                rowbufsize = 0;
-            }
-            return true;
+        OwnedConstThorRow row;
+        MemoryBuffer mb;
+        CMemoryRowSerializer msz(mb);
+        if (!*filename) // partition row wanted
+        {
+            if (partitionrow)
+                row.set(partitionrow.getClear());
         }
-        Owned<IFile> file = createIFile(filename);
-        Owned<IExtRowStream> rowstream = createRowStream(file, auxrowif);
-        OwnedConstThorRow row = rowstream->nextRow();
-        if (!row) {
+        else
+        {
+            Owned<IFile> file = createIFile(filename);
+            Owned<IExtRowStream> rowstream = createRowStream(file, auxrowif);
+            row.setown(rowstream->nextRow());
+        }
+        if (!row)
+        {
             rowbuf = NULL;
             rowbufsize = 0;
             return true;
         }
-        MemoryBuffer mb;
-        CMemoryRowSerializer msz(mb);
-        auxrowif->queryRowSerializer()->serialize(msz,(const byte *)row.get());
+        rowToKeySerializer->serialize(msz, (const byte *)row.get());
         rowbufsize = mb.length();
         rowbuf = (byte *)mb.detach();
         return true;
@@ -1026,21 +1035,27 @@ public:
     {
         // actually doesn't get Nth row but numsplits samples distributed evenly through the rows
         assertex(numsplits);
-        CThorExpandingRowArray ret(*activity, rowif, true);
         unsigned numrows = rowArray.ordinality();
-        if (numrows) {
-            for (unsigned i=0;i<numsplits;i++) {
-                count_t pos = ((i*2+1)*(count_t)numrows)/(2*(count_t)numsplits);
-                if (pos>=numrows) 
-                    pos = numrows-1;
-                const void *kp = rowArray.get((unsigned)pos);
-                ret.append(kp);
-            }
+        if (0 == numrows)
+        {
+            outbufsize = 0;
+            outkeybuf = NULL;
+            return;
         }
         MemoryBuffer mb;
-        ret.serializeCompress(mb);
-        outbufsize = mb.length();
-        outkeybuf = mb.detach();
+        CMemoryRowSerializer msz(mb);
+        for (unsigned i=0;i<numsplits;i++)
+        {
+            count_t pos = ((i*2+1)*(count_t)numrows)/(2*(count_t)numsplits);
+            if (pos>=numrows)
+                pos = numrows-1;
+            const void *row = rowArray.query((unsigned)pos);
+            rowToKeySerializer->serialize(msz, (const byte *)row);
+        }
+        MemoryBuffer exp;
+        fastLZCompressToBuffer(mb, exp.length(), exp.toByteArray());
+        outbufsize = exp.length();
+        outkeybuf = exp.detach();
     }
     virtual void StartMiniSort(rowcount_t globalTotal)
     {
@@ -1050,7 +1065,7 @@ public:
         Owned<IRowStream> sortedStream;
         try
         {
-            sortedStream.setown(miniSort.sort(rowArray, globalTotal, *icompare, isstable, totalrows));
+            sortedStream.setown(miniSort.sort(rowArray, globalTotal, *rowCompare, isstable, totalrows));
         }
         catch (IException *e)
         {
@@ -1134,7 +1149,7 @@ public:
         else
         {
             Owned<IRowLinkCounter> linkcounter = new CThorRowLinkCounter;
-            merger.setown(createRowStreamMerger(readers.ordinality(), readers.getArray(), icompare, false, linkcounter));
+            merger.setown(createRowStreamMerger(readers.ordinality(), readers.getArray(), rowCompare, false, linkcounter));
         }
         ActPrintLog(activity, "Global Merger Created: %d streams", readers.ordinality());
         startmergesem.signal();
@@ -1150,9 +1165,9 @@ public:
     virtual void Gather(
         IRowInterfaces *_rowif,
         IRowStream *in,
-        ICompare *_icompare,
-        ICompare *_icollate,
-        ICompare *_icollateupper,
+        ICompare *_rowCompare,
+        ICompare *_primarySecondaryCompare,
+        ICompare *_primarySecondaryUpperCompare,
         ISortKeySerializer *_keyserializer,
         const void *_partitionrow,
         bool _nosort,
@@ -1173,10 +1188,8 @@ public:
         rowif.set(_rowif);
         rowArray.kill();
         rowArray.setup(rowif);
-        if (transferserver)
-            transferserver->setRowIF(rowif);
-        else
-            WARNLOG("SORT: transfer server not started!");
+        dbgassertex(transferserver);
+        transferserver->setRowIF(rowif);
         if (_auxrowif&&_auxrowif->queryRowMetaData())
             auxrowif.set(_auxrowif);
         else
@@ -1186,19 +1199,41 @@ public:
             auxrowif.set(_rowif);
         }
         keyserializer = _keyserializer;
-        if (!keyserializer)
+        rowCompare = _rowCompare;
+        if (keyserializer)
+        {
+            keyIf.setown(createRowInterfaces(keyserializer->queryRecordSize(), activity->queryContainer().queryId(), activity->queryCodeContext()));
+            rowToKeySerializer.setown(new CRowToKeySerializer(auxrowif, keyIf, keyserializer));
+            keyRowCompare = keyserializer->queryCompareKeyRow();
+        }
+        else
+        {
             ActPrintLog(activity, "No key serializer");
+            keyIf.set(auxrowif);
+            rowToKeySerializer.set(keyIf->queryRowSerializer());
+            keyRowCompare = rowCompare;
+        }
         nosort = _nosort;
         if (nosort)
             ActPrintLog(activity, "SORT: Gather not sorting");
         isstable = !_unstable;
         if (_unstable)
             ActPrintLog(activity, "SORT: UNSTABLE");
-        icompare = _icompare;
-        icollate = _icollate?_icollate:_icompare;
-        icollateupper = _icollateupper?_icollateupper:icollate;
 
-        Owned<IThorRowLoader> sortedloader = createThorRowLoader(*activity, rowif, nosort?NULL:icompare, isstable ? stableSort_earlyAlloc : stableSort_none, rc_allDiskOrAllMem, SPILL_PRIORITY_SELFJOIN);
+        if (_primarySecondaryCompare)
+            primarySecondaryCompare = _primarySecondaryCompare;
+        else
+            primarySecondaryCompare = keyRowCompare;
+        if (_primarySecondaryUpperCompare)
+        {
+            if (keyserializer)
+                throwUnexpected();
+            primarySecondaryUpperCompare = _primarySecondaryUpperCompare;
+        }
+        else
+            primarySecondaryUpperCompare = primarySecondaryCompare;
+
+        Owned<IThorRowLoader> sortedloader = createThorRowLoader(*activity, rowif, nosort?NULL:rowCompare, isstable ? stableSort_earlyAlloc : stableSort_none, rc_allDiskOrAllMem, SPILL_PRIORITY_SELFJOIN);
         Owned<IRowStream> overflowstream;
         memsize_t inMemUsage = 0;
         try

+ 1 - 0
thorlcr/thorutil/thmem.cpp

@@ -755,6 +755,7 @@ void CThorExpandingRowArray::transferFrom(CThorExpandingRowArray &donor)
 void CThorExpandingRowArray::transferFrom(CThorSpillableRowArray &donor)
 {
     transferFrom((CThorExpandingRowArray &)donor);
+    donor.kill();
 }
 
 void CThorExpandingRowArray::removeRows(rowidx_t start, rowidx_t n)

+ 0 - 1
thorlcr/thorutil/thmem.hpp

@@ -144,7 +144,6 @@ public:
         else
             clear();
     }
-    
 private:
     const void * ptr;
 };