浏览代码

HPCC-13791 Use keys in sort partitioning

Previously the sort partitioning used in global sort/join was
serializing whole rows to and from the slaves/master, instead of
just the key fields it needed. This was costly in terms of
transmission time, but also required an excessive amount of
memory where rows were big, ultimately limiting the size that
sort could cope with before running out of memory on the master
whilst holding all the partition points.

Signed-off-by: Jake Smith <jake.smith@lexisnexis.com>
Jake Smith 10 年之前
父节点
当前提交
313a50815d

+ 60 - 26
thorlcr/activities/join/thjoin.cpp

@@ -75,8 +75,6 @@ class JoinActivityMaster : public CMasterActivity
                 return -1;
             return 0;
         }
-
-
     } *climitedcmp;
 public:
     JoinActivityMaster(CMasterGraphElement * info, bool local) : CMasterActivity(info)
@@ -128,7 +126,8 @@ public:
     {
         ActPrintLog("process");
         CMasterActivity::process();
-        if (!islocal) {
+        if (!islocal)
+        {
             helper = (IHThorJoinArg *)queryHelper();
             StringBuffer skewV;
             double skewError;
@@ -151,30 +150,61 @@ public:
             unsigned __int64 skewThreshold = container.queryJob().getWorkUnitValueInt("overrideSkewThreshold", 0);
             if (!skewThreshold)
             {
-                skewThreshold = helper->getThreshold();         
+                skewThreshold = helper->getThreshold();
                 if (!skewThreshold)
                     skewThreshold = container.queryJob().getWorkUnitValueInt("defaultSkewThreshold", 0);
             }
             try
             {
                 size32_t maxdeviance = getOptUInt(THOROPT_SORT_MAX_DEVIANCE, 10*1024*1024);
-                rightpartition = (container.getKind() == TAKjoin)&&((helper->getJoinFlags()&JFpartitionright)!=0);
+                rightpartition = (container.getKind() == TAKjoin) && ((helper->getJoinFlags()&JFpartitionright)!=0);
+
+                CGraphElementBase *primaryInput = NULL;
+                CGraphElementBase *secondaryInput = NULL;
+                ICompare *primaryCompare = NULL, *secondaryCompare = NULL;
+                ISortKeySerializer *primaryKeySerializer = NULL;
+                if (!rightpartition)
+                {
+                    primaryInput = container.queryInput(0);
+                    primaryCompare = helper->queryCompareLeft();
+                    primaryKeySerializer = helper->querySerializeLeft();
+                    if (container.getKind() != TAKselfjoin)
+                    {
+                        secondaryInput = container.queryInput(1);
+                        secondaryCompare = helper->queryCompareRight();
+                    }
+                }
+                else
+                {
+                    primaryInput = container.queryInput(1);
+                    secondaryInput = container.queryInput(0);
+                    primaryCompare = helper->queryCompareRight();
+                    secondaryCompare = helper->queryCompareLeft();
+                    primaryKeySerializer = helper->querySerializeRight();
+                }
+                if (helper->getJoinFlags()&JFslidingmatch) // JCSMORE shouldn't be necessary
+                    primaryKeySerializer = NULL;
+                Owned<IRowInterfaces> primaryRowIf = createRowInterfaces(primaryInput->queryHelper()->queryOutputMeta(), queryActivityId(), queryCodeContext());
+                Owned<IRowInterfaces> secondaryRowIf;
+                if (secondaryInput)
+                    secondaryRowIf.setown(createRowInterfaces(secondaryInput->queryHelper()->queryOutputMeta(), queryActivityId(), queryCodeContext()));
+
                 bool betweenjoin = (helper->getJoinFlags()&JFslidingmatch)!=0;
-                if (!container.queryLocalOrGrouped() && container.getKind() == TAKselfjoin)
+                if (container.getKind() == TAKselfjoin)
                 {
-                    if (betweenjoin) {
+                    if (betweenjoin)
                         throw MakeActivityException(this, -1, "SELF BETWEEN JOIN not supported"); // Gavin shouldn't generate
-                    }
-                    Owned<IRowInterfaces> rowif = createRowInterfaces(container.queryInput(0)->queryHelper()->queryOutputMeta(),queryActivityId(),queryCodeContext());
-                    ICompare *cmpleft = helper->queryCompareLeft();
-                    if ((helper->getJoinFlags()&JFlimitedprefixjoin)&&(helper->getJoinLimit())) {
+                    ICompare *cmpleft = primaryCompare;
+                    if ((helper->getJoinFlags()&JFlimitedprefixjoin)&&(helper->getJoinLimit()))
+                    {
                         delete climitedcmp;
                         climitedcmp = new cLimitedCmp(helper->queryCompareLeftRight(),helper->queryPrefixCompare());
                         cmpleft = climitedcmp;
                         // partition by L/R
                     }
-                    imaster->SortSetup(rowif,cmpleft, helper->querySerializeLeft(), false, true, NULL, NULL);
-                    if (barrier->wait(false)) { // local sort complete
+                    imaster->SortSetup(primaryRowIf, cmpleft, primaryKeySerializer, false, true, NULL, NULL);
+                    if (barrier->wait(false)) // local sort complete
+                    {
                         try
                         {
                             imaster->Sort(skewThreshold,skewWarning,skewError,maxdeviance,false,false,false,0);
@@ -197,11 +227,14 @@ public:
                     }
                     imaster->SortDone();
                 }
-                else if (!nosortPrimary()||betweenjoin) {
-                    Owned<IRowInterfaces> rowif = createRowInterfaces(container.queryInput(rightpartition?1:0)->queryHelper()->queryOutputMeta(),queryActivityId(),queryCodeContext());
-                    imaster->SortSetup(rowif,rightpartition?helper->queryCompareRight():helper->queryCompareLeft(),rightpartition?helper->querySerializeRight():helper->querySerializeLeft(),false,true, NULL, NULL);
+                else if (!nosortPrimary()||betweenjoin)
+                {
+                    Owned<IRowInterfaces> secondaryRowIf = createRowInterfaces(secondaryInput->queryHelper()->queryOutputMeta(), queryActivityId(), queryCodeContext());
+
+                    imaster->SortSetup(primaryRowIf, primaryCompare, primaryKeySerializer, false, true, NULL, NULL);
                     ActPrintLog("JOIN waiting for barrier.1");
-                    if (barrier->wait(false)) {
+                    if (barrier->wait(false))
+                    {
                         ActPrintLog("JOIN barrier.1 raised");
                         try
                         {
@@ -220,14 +253,15 @@ public:
                                 throw;
                         }
                         ActPrintLog("JOIN waiting for barrier.2");
-                        if (barrier->wait(false)) { // merge complete
+                        if (barrier->wait(false)) // merge complete
+                        {
                             ActPrintLog("JOIN barrier.2 raised");
                             imaster->SortDone();
                             // NB on the cosort should use same serializer as sort (but in fact it only gets used when 0 rows on primary side)
-                            Owned<IRowInterfaces> rowif2 = createRowInterfaces(container.queryInput(rightpartition?0:1)->queryHelper()->queryOutputMeta(),queryActivityId(),queryCodeContext());
-                            imaster->SortSetup(rowif2,rightpartition?helper->queryCompareLeft():helper->queryCompareRight(),rightpartition?helper->querySerializeRight():helper->querySerializeLeft(),true,false, NULL, NULL); //serializers OK
+                            imaster->SortSetup(secondaryRowIf, secondaryCompare, primaryKeySerializer, true, false, NULL, NULL); //serializers OK
                             ActPrintLog("JOIN waiting for barrier.3");
-                            if (barrier->wait(false)) { // local sort complete
+                            if (barrier->wait(false)) // local sort complete
+                            {
                                 ActPrintLog("JOIN barrier.3 raised");
                                 try
                                 {
@@ -254,12 +288,12 @@ public:
                         imaster->SortDone();
                     }
                 }
-                else { // only sort non-partition side
-                    Owned<IRowInterfaces> rowif = createRowInterfaces(container.queryInput(rightpartition?0:1)->queryHelper()->queryOutputMeta(),queryActivityId(),queryCodeContext());
-                    Owned<IRowInterfaces> auxrowif = createRowInterfaces(container.queryInput(rightpartition?1:0)->queryHelper()->queryOutputMeta(),queryActivityId(),queryCodeContext());
-                    imaster->SortSetup(rowif,rightpartition?helper->queryCompareLeft():helper->queryCompareRight(),rightpartition?helper->querySerializeLeft():helper->querySerializeRight(),true,true, NULL, auxrowif);
+                else // only sort non-partition side
+                {
+                    imaster->SortSetup(secondaryRowIf, secondaryCompare, primaryKeySerializer, true, true, NULL, primaryRowIf);
                     ActPrintLog("JOIN waiting for barrier.1");
-                    if (barrier->wait(false)) { // local sort complete
+                    if (barrier->wait(false)) // local sort complete
+                    {
                         ActPrintLog("JOIN barrier.1 raised");
                         try
                         {

+ 58 - 30
thorlcr/activities/join/thjoinslave.cpp

@@ -52,8 +52,8 @@ class JoinSlaveActivity : public CSlaveActivity, public CThorDataLink, implement
     ICompare *rightCompare;
     ISortKeySerializer *leftKeySerializer;
     ISortKeySerializer *rightKeySerializer;
-    ICompare *collate;
-    ICompare *collateupper; // if non-null then between join
+    ICompare *primarySecondaryCompare;
+    ICompare *primarySecondaryUpperCompare; // if non-null then between join
 
     Owned<IRowStream> leftStream, rightStream;
     Semaphore secondaryStartSem;
@@ -122,6 +122,16 @@ class JoinSlaveActivity : public CSlaveActivity, public CThorDataLink, implement
 
     };  
 
+    struct CompareReverse : public ICompare
+    {
+        CompareReverse() { compare = NULL; }
+        ICompare *compare;
+        int docompare(const void *a,const void *b) const
+        {
+            return -compare->docompare(b,a);
+        }
+    } compareReverse, compareReverseUpper;
+
 public:
     IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
 
@@ -149,7 +159,8 @@ public:
 
     void init(MemoryBuffer &data, MemoryBuffer &slaveData)
     {
-        if (!islocal) {
+        if (!islocal)
+        {
             mpTagRPC = container.queryJob().deserializeMPTag(data);
             mptag_t barrierTag = container.queryJob().deserializeMPTag(data);
             barrier.setown(container.queryJob().createBarrier(barrierTag));
@@ -184,14 +195,6 @@ public:
         rightCompare = helper->queryCompareRight();
         leftKeySerializer = helper->querySerializeLeft();
         rightKeySerializer = helper->querySerializeRight();
-        if (helper->getJoinFlags()&JFslidingmatch) {
-            collate = helper->queryCompareLeftRightLower();
-            collateupper = helper->queryCompareLeftRightUpper();
-        }
-        else {
-            collate = helper->queryCompareLeftRight();
-            collateupper = NULL;
-        }
     }
     virtual void onInputStarted(IException *except)
     {
@@ -443,19 +446,11 @@ public:
     }
     bool doglobaljoin()
     {
+        rightpartition = (container.getKind()==TAKjoin)&&((helper->getJoinFlags()&JFpartitionright)!=0);
+
         Linked<IRowInterfaces> primaryRowIf, secondaryRowIf;
         ICompare *primaryCompare, *secondaryCompare;
-        ISortKeySerializer *primaryKeySerializer, *secondaryKeySerializer;
-        ICompare *primaryCollate, *primaryCollateUpper;
-        struct cCollateReverse: public ICompare
-        {
-            ICompare *collate;
-            cCollateReverse(ICompare *_collate) : collate(_collate) { }
-            int docompare(const void *a,const void *b) const
-            {
-                return -collate->docompare(b,a);
-            }
-        } collateRev(collate), collateRevUpper(collateupper);
+        ISortKeySerializer *primaryKeySerializer;
 
         Owned<IRowStream> secondaryStream, primaryStream;
         if (rightpartition)
@@ -463,22 +458,54 @@ public:
             primaryCompare = rightCompare;
             primaryKeySerializer = rightKeySerializer;
             secondaryCompare = leftCompare;
-            secondaryKeySerializer = leftKeySerializer;
-            primaryCollate = &collateRev;
-            primaryCollateUpper = collateupper?&collateRevUpper:NULL;
         }
         else
         {
             primaryCompare = leftCompare;
             primaryKeySerializer = leftKeySerializer;
             secondaryCompare = rightCompare;
-            secondaryKeySerializer = rightKeySerializer;
-            primaryCollate = collate;
-            primaryCollateUpper = collateupper;
         }
         primaryRowIf.set(queryRowInterfaces(primaryInput));
         secondaryRowIf.set(queryRowInterfaces(secondaryInput));
 
+        primarySecondaryCompare = NULL;
+        if (helper->getJoinFlags()&JFslidingmatch)
+        {
+            if (primaryKeySerializer) // JCSMORE shouldn't be generated
+                primaryKeySerializer = NULL;
+            primarySecondaryCompare = helper->queryCompareLeftRightLower();
+            primarySecondaryUpperCompare = helper->queryCompareLeftRightUpper();
+            if (rightpartition)
+            {
+                compareReverse.compare = primarySecondaryCompare;
+                compareReverseUpper.compare = primarySecondaryUpperCompare;
+                primarySecondaryCompare = &compareReverse;
+                primarySecondaryUpperCompare = &compareReverseUpper;
+            }
+        }
+        else
+        {
+            primarySecondaryUpperCompare = NULL;
+            if (rightpartition)
+            {
+                if (rightKeySerializer)
+                    primarySecondaryCompare = helper->queryCompareRightKeyLeftRow();
+                else
+                {
+                    compareReverse.compare = helper->queryCompareLeftRight();
+                    primarySecondaryCompare = &compareReverse;
+                }
+            }
+            else
+            {
+                if (leftKeySerializer)
+                    primarySecondaryCompare = helper->queryCompareLeftKeyRightRow();
+                else
+                    primarySecondaryCompare = helper->queryCompareLeftRight();
+            }
+        }
+        dbgassertex(primarySecondaryCompare);
+
         OwnedConstThorRow partitionRow;
         rowcount_t totalrows;
 
@@ -489,7 +516,7 @@ public:
         }
         else
         {
-            sorter->Gather(primaryRowIf,primaryInput,primaryCompare,NULL,NULL,primaryKeySerializer,NULL,false,isUnstable(),abortSoon,NULL);
+            sorter->Gather(primaryRowIf, primaryInput, primaryCompare, NULL, NULL, primaryKeySerializer, NULL, false, isUnstable(), abortSoon, NULL);
             stopPartitionInput();
             if (abortSoon)
             {
@@ -513,7 +540,8 @@ public:
             ActPrintLog("JOIN barrier.2 raised");
             sorter->stopMerge();
         }
-        sorter->Gather(secondaryRowIf,secondaryInput,secondaryCompare,primaryCollate,primaryCollateUpper,primaryKeySerializer,partitionRow,noSortOtherSide(),isUnstable(),abortSoon,primaryRowIf); // primaryKeySerializer *is* correct
+        // NB: on secondary sort, the primaryKeySerializer is used
+        sorter->Gather(secondaryRowIf, secondaryInput, secondaryCompare, primarySecondaryCompare, primarySecondaryUpperCompare, primaryKeySerializer, partitionRow, noSortOtherSide(), isUnstable(), abortSoon, primaryRowIf); // primaryKeySerializer *is* correct
         partitionRow.clear();
         stopOtherInput();
         if (abortSoon)

+ 1 - 0
thorlcr/graph/thgraph.cpp

@@ -2960,6 +2960,7 @@ void CActivityBase::logRow(const char * prefix, IOutputMetaData & meta, const vo
     {
         StringBuffer xml;
         appendRowXml(xml, meta, row);
+        ActPrintLog("%s: %s", prefix, xml.str());
     }
 }
 

+ 1 - 0
thorlcr/msort/tsorta.cpp

@@ -570,3 +570,4 @@ void CThorKeyArray::traceKey(const char *prefix,unsigned idx)
     IOutputRowSerializer *serializer = keyserializer?keyif->queryRowSerializer():rowif->queryRowSerializer();
     ::traceKey(serializer,s.str(),queryKey(idx));
 }
+

+ 223 - 217
thorlcr/msort/tsortm.cpp

@@ -29,6 +29,7 @@
 #include <limits.h>
 
 #include "jlib.hpp"
+#include "jflz.hpp"
 #include <mpbase.hpp>
 #include <mpcomm.hpp>
 #include "thorport.hpp"
@@ -260,14 +261,14 @@ struct PartitionInfo
             throw MakeStringException(-1,"SORT: PartitionInfo meta info mismatch(%d,%d)",guard,dsguard);
         splitkeys.kill();
         splitkeys.deserialize(left, mb.readDirect(left));
-    }   
+    }
 };
-    
+
 
 
 typedef CopyReferenceArrayOf<CSortNode> NodeArray;
 
-class CSortMaster: public IThorSorterMaster, public CSimpleInterface
+class CSortMaster : public IThorSorterMaster, public CSimpleInterface
 { 
 public:
     IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
@@ -290,6 +291,7 @@ public:
     size32_t maxdeviance;
     Linked<IRowInterfaces> rowif;
     Linked<IRowInterfaces> auxrowif;
+    Linked<IRowInterfaces> keyIf;
 
     int AddSlave(ICommunicator *comm,rank_t rank,SocketEndpoint &endpoint,mptag_t mpTagRPC)
     {
@@ -301,14 +303,13 @@ public:
     void ConnectSlaves()
     {
         ActPrintLog(activity, "CSortMaster::ConnectSlaves");
-#ifdef CONNECT_IN_PARALLEL
         class casyncfor: public CAsyncFor
         {
         public:
             casyncfor(CSortMaster &_owner, NodeArray &_slaves) : owner(_owner), slaves(_slaves) { }
             void Do(unsigned i)
             {
-                CSortNode &slave = slaves.item(i);          
+                CSortNode &slave = slaves.item(i);
                 if (!slave.doConnect(i,slaves.ordinality())) {
                     char url[100];
                     slave.endpoint.getUrlStr(url,sizeof(url));
@@ -320,21 +321,10 @@ public:
             CSortMaster &owner;
         } afor(*this,slaves);
         afor.For(slaves.ordinality(), CONNECT_IN_PARALLEL);
-#else
-        ForEachItemIn(i,slaves) {
-            CSortNode &slave = slaves.item(i);          
-            if (!slave.doConnect(i,slaves.ordinality())) {
-                char url[100];
-                slave.endpoint.getUrlStr(url,sizeof(url));
-                throw MakeActivityException(activity,TE_CannotConnectToSlave,"CSortMaster::ConnectSlaves: Could not connect to %s",url);
-            }
-        }
-#endif
     }
 
     void InitSlaves()
     {
-#ifdef INIT_IN_PARALLEL
         class casyncfor: public CAsyncFor
         {
         public:
@@ -348,12 +338,6 @@ public:
             NodeArray &slaves;
         } afor(slaves);
         afor.For(slaves.ordinality(), INIT_IN_PARALLEL);
-#else
-        ForEachItemIn(i,slaves) {
-            CSortNode &slave = slaves.item(i);          
-            slave.init();
-        }
-#endif
     }
 
 
@@ -400,14 +384,24 @@ public:
     void SortSetup(IRowInterfaces *_rowif,ICompare *_icompare,ISortKeySerializer *_keyserializer,bool cosort,bool needconnect,const char *_cosortfilenames,IRowInterfaces *_auxrowif)
     {
         ActPrintLog(activity, "Sort setup cosort=%s, needconnect=%s %s",cosort?"true":"false",needconnect?"true":"false",_keyserializer?"has key serializer":"");
+        assertex(_icompare);
         rowif.set(_rowif);
         if (_auxrowif&&_auxrowif->queryRowMetaData())
             auxrowif.set(_auxrowif);
         else
             auxrowif.set(_rowif);
         synchronized proc(slavemutex);
-        icompare = _icompare;
         keyserializer = _keyserializer;
+        if (keyserializer)
+        {
+            keyIf.setown(createRowInterfaces(keyserializer->queryRecordSize(), activity->queryContainer().queryId(), activity->queryCodeContext()));
+            icompare = keyserializer->queryCompareKey();
+        }
+        else
+        {
+            keyIf.set(auxrowif);
+            icompare = _icompare;
+        }
         sorted = false;
         total = 0;
         stotal = 0;
@@ -416,13 +410,11 @@ public:
         maxrecsonnode = 0;
         numnodes = slaves.ordinality();
         estrecsize = 100;
-        if (!partitioninfo) { // if cosort use aux
-            if (cosort) {
+        if (!partitioninfo) // if cosort use aux
+        {
+            if (cosort)
                 ActPrintLog(activity, "Cosort with no prior partition");
-                partitioninfo = new PartitionInfo(activity, auxrowif);
-            }
-            else
-                partitioninfo = new PartitionInfo(activity, rowif);
+            partitioninfo = new PartitionInfo(activity, keyIf);
         }
         free(partitioninfo->nodes);
         free(partitioninfo->mpports);
@@ -440,11 +432,6 @@ public:
             *mpp = slave.mpport;
             mpp++;
         }
-        if (keyserializer&&cosort&&!partitioninfo->IsOK()) {
-            keyserializer = NULL; // when joining to 0 rows can't use (LHS) serializer getMinMax will tell slave
-            ActPrintLog(activity, "Suppressing key serializer on master");
-        }
-        assertex(icompare);
         if (needconnect) // if cosort set, already done!
             ConnectSlaves();
         InitSlaves();
@@ -477,7 +464,6 @@ public:
         synchronized proc(slavemutex);
         if (activity->queryAbortSoon())
             return;
-#ifdef CLOSE_IN_PARALLEL
         class casyncfor: public CAsyncFor
         {
             CActivityBase &activity;
@@ -502,11 +488,6 @@ public:
             return;
         afor.wait = true;
         afor.For(slaves.ordinality(), CLOSE_IN_PARALLEL);
-#else
-        ForEachItemInRev(i,slaves) {
-            slaves.item(i).Close();
-        }
-#endif
         ActPrintLog(activity, "Sort Done");
     }
 
@@ -541,26 +522,30 @@ public:
             CSortNode &slave = slaves.item(i);
             if (slave.numrecs==0)
                 continue;
-            CThorExpandingRowArray minmax(*activity, rowif, true);
             void *p = NULL;
             size32_t retlen = 0;
             size32_t avrecsize=0;
             rowcount_t num=slave.GetMinMax(retlen,p,avrecsize);
-            if (avrecsize) {
+            if (avrecsize)
+            {
                 ers += avrecsize;       // should probably do mode but this is OK
                 ersn++;
             }
             tot += num;
             if (num>0)
             {
-                minmax.deserialize(retlen, p);
+                OwnedConstThorRow slaveMin, slaveMax;
+                RtlDynamicRowBuilder rowBuilder(keyIf->queryRowAllocator());
+                CThorStreamDeserializerSource dsz(retlen, p);
+                size32_t sz = keyIf->queryRowDeserializer()->deserialize(rowBuilder, dsz);
+                slaveMin.setown(rowBuilder.finalizeRowClear(sz));
+                sz = keyIf->queryRowDeserializer()->deserialize(rowBuilder, dsz);
+                slaveMax.setown(rowBuilder.finalizeRowClear(sz));
                 free(p);
-                const void *p = minmax.query(0);
-                if (!min.get()||(icompare->docompare(min,p)>0)) 
-                    min.set(p);
-                p = minmax.query(1);
-                if (!max.get()||(icompare->docompare(max,p)<0)) 
-                    max.set(p);
+                if (!min.get()||(icompare->docompare(min, slaveMin)>0))
+                    min.setown(slaveMin.getClear());
+                if (!max.get()||(icompare->docompare(max, slaveMax)<0))
+                    max.setown(slaveMax.getClear());
             }
         }
         if (ersn)
@@ -569,10 +554,11 @@ public:
             estrecsize = 100;
 #ifdef _TRACE
         if (min)
-            traceKey(rowif->queryRowSerializer(),"Min",min);
+            traceKey(keyIf->queryRowSerializer(),"Min",min);
         if (max)
-            traceKey(rowif->queryRowSerializer(),"Max",max);
-        if (min&&max) {
+            traceKey(keyIf->queryRowSerializer(),"Max",max);
+        if (min&&max)
+        {
             int cmp=icompare->docompare(min,max);
             if (cmp==0) 
                 ActPrintLog(activity, "Min == Max : All keys equal!");
@@ -608,18 +594,18 @@ public:
         unsigned averagesamples = OVERSAMPLE*numnodes;  
         rowcount_t averagerecspernode = (rowcount_t)(total/numnodes);
         CriticalSection asect;
-        CThorExpandingRowArray sample(*activity, rowif, true);
-#ifdef ASYNC_PARTIONING
+        CThorExpandingRowArray sample(*activity, keyIf, true);
         class casyncfor1: public CAsyncFor
         {
+            CSortMaster &owner;
             NodeArray &slaves;
             CThorExpandingRowArray &sample;
             CriticalSection &asect;
             unsigned averagesamples;
             rowcount_t averagerecspernode;
         public:
-            casyncfor1(NodeArray &_slaves, CThorExpandingRowArray &_sample, unsigned _averagesamples, rowcount_t _averagerecspernode, CriticalSection &_asect)
-                : slaves(_slaves), sample(_sample), asect(_asect)
+            casyncfor1(CSortMaster &_owner, NodeArray &_slaves, CThorExpandingRowArray &_sample, unsigned _averagesamples, rowcount_t _averagerecspernode, CriticalSection &_asect)
+                : owner(_owner), slaves(_slaves), sample(_sample), asect(_asect)
             { 
                 averagesamples = _averagesamples;
                 averagerecspernode = _averagerecspernode;
@@ -629,32 +615,26 @@ public:
                 CSortNode &slave = slaves.item(i);
                 unsigned slavesamples = averagerecspernode?((unsigned)((averagerecspernode/2+averagesamples*slave.numrecs)/averagerecspernode)):1;
                 //PrintLog("%d samples for %d",slavesamples,i);
-                if (slavesamples) {
+                if (slavesamples)
+                {
                     size32_t samplebufsize;
                     void *samplebuf=NULL;
                     slave.GetMultiNthRow(slavesamples, samplebufsize, samplebuf);
-                    CriticalBlock block(asect);
-                    sample.deserializeExpand(samplebufsize, samplebuf);
+                    MemoryBuffer mb;
+                    fastLZDecompressToBuffer(mb, samplebuf);
                     free(samplebuf);
+                    CriticalBlock block(asect);
+                    CThorStreamDeserializerSource d(mb.length(), mb.toByteArray());
+                    while (!d.eos())
+                    {
+                        RtlDynamicRowBuilder rowBuilder(owner.keyIf->queryRowAllocator());
+                        size32_t sz = owner.keyIf->queryRowDeserializer()->deserialize(rowBuilder, d);
+                        sample.append(rowBuilder.finalizeRowClear(sz));
+                    }
                 }
             }
-        } afor1(slaves,sample,averagesamples,averagerecspernode,asect);
+        } afor1(*this, slaves,sample,averagesamples,averagerecspernode,asect);
         afor1.For(numnodes, 20, true);
-#else
-        unsigned i;
-        for (i=0;i<numnodes;i++) {
-            CSortNode &slave = slaves.item(i);
-            unsigned slavesamples = (unsigned)((count_t)averagesamples*slave.numrecs/(count_t)averagerecspernode);
-            PrintLog("%d samples for %d",slavesamples,i);
-            if (!slavesamples)
-                continue;
-            size32_t samplebufsize;
-            void *samplebuf=NULL;
-            slave.GetMultiNthRow(slavesamples,samplebufsize,samplebuf);
-            sample.deserializeExpand(samplebufsize, samplebuf);
-            free(samplebuf);
-        }   
-#endif
 #ifdef TRACE_PARTITION2
         {
             ActPrintLog(activity, "partition points");
@@ -670,9 +650,11 @@ public:
         offset_t ts=sample.serializedSize();
         estrecsize = numsamples?((size32_t)(ts/numsamples)):100;
         sample.sort(*icompare, activity->queryMaxCores());
-        CThorExpandingRowArray mid(*activity, rowif, true);
-        if (numsamples) { // could shuffle up empty nodes here
-            for (unsigned i=0;i<numsplits;i++) {
+        CThorExpandingRowArray mid(*activity, keyIf, true);
+        if (numsamples) // could shuffle up empty nodes here
+        {
+            for (unsigned i=0;i<numsplits;i++)
+            {
                 unsigned pos = (unsigned)(((count_t)numsamples*(i+1))/((count_t)numsplits+1));
                 const void *r = sample.get(pos);
                 mid.append(r);
@@ -681,11 +663,12 @@ public:
 #ifdef TRACE_PARTITION2
         {
             ActPrintLog(activity, "merged partitions");
-            for (unsigned i=0;i<mid.ordinality();i++) {
+            for (unsigned i=0;i<mid.ordinality();i++)
+            {
                 const void *k = mid.query(i);
                 StringBuffer str;
                 str.appendf("%d: ",i);
-                traceKey(rowif->queryRowSerializer(),str.str(),(const byte *)k);
+                traceKey(keyIf->queryRowSerializer(),str.str(),(const byte *)k);
             }
         }
 #endif
@@ -695,9 +678,7 @@ public:
         mid.serializeCompress(mdmb);
         mdl = mdmb.length();
         const byte *mdp=(const byte *)mdmb.bufferBase();
-        unsigned i;
-#ifdef ASYNC_PARTIONING
-        i = 0;
+        unsigned i = 0;
         class casyncfor2: public CAsyncFor
         {
             NodeArray &slaves;
@@ -718,14 +699,6 @@ public:
             }
         } afor2(slaves,mdl,mdp);
         afor2.For(numnodes, 20, true);
-#else
-        for (i=0;i<numnodes;i++) {
-            CSortNode &slave = slaves.item(i);
-            if (slave.numrecs!=0)
-                slave.MultiBinChopStart(mdl,mdp,CMPFN_NORMAL);
-        }
-#endif
-#ifdef ASYNC_PARTIONING
         class casyncfor3: public CAsyncFor
         {
             NodeArray &slaves;
@@ -751,16 +724,6 @@ public:
             }
         } afor3(slaves, splitMap, numnodes, numsplits);
         afor3.For(numnodes, 20, true);
-#else
-        for (i=0;i<numnodes;i++) {
-            CSortNode &slave = slaves.item(i);
-            if (slave.numrecs!=0) {
-                rowcount_t *res=splitMap+(i*numnodes);
-                slave.MultiBinChopStop(numsplits,res);
-                res[numnodes-1] = slave.numrecs;
-            }
-        }
-#endif
 #ifdef _TRACE
 #ifdef TRACE_PARTITION
         for (i=0;i<numnodes;i++) {
@@ -779,45 +742,48 @@ public:
 
     rowcount_t *CalcPartition(bool logging)
     {
-        CriticalBlock block(ECFcrit);       
+        CriticalBlock block(ECFcrit);
         // this is a bit long winded
 
         OwnedConstThorRow mink;
         OwnedConstThorRow maxk;
         // so as won't overflow
         OwnedMalloc<rowcount_t> splitmap(numnodes*numnodes, true);
-        if (CalcMinMax(mink,maxk)==0) {
+        if (CalcMinMax(mink, maxk)==0)
+        {
             // no partition info!
             partitioninfo->kill();
             return splitmap.getClear();
         }
         unsigned numsplits=numnodes-1;
-        CThorExpandingRowArray emin(*activity, rowif, true);
-        CThorExpandingRowArray emax(*activity, rowif, true);
-        CThorExpandingRowArray totmid(*activity, rowif, true);
+        CThorExpandingRowArray emin(*activity, keyIf, true);
+        CThorExpandingRowArray emax(*activity, keyIf, true);
+        CThorExpandingRowArray totmid(*activity, keyIf, true);
         ECFarray = &totmid;
         ECFcompare = icompare;
-        CThorExpandingRowArray mid(*activity, rowif, true);
+        CThorExpandingRowArray mid(*activity, keyIf, true);
         unsigned i;
         unsigned j;
-        for(i=0;i<numsplits;i++) {
+        for(i=0;i<numsplits;i++)
+        {
             emin.append(mink.getLink());
             emax.append(maxk.getLink());
         }
         UnsignedArray amid;
         unsigned iter=0;
-        try {
+        try
+        {
             MemoryBuffer mbmn;
             MemoryBuffer mbmx;
             MemoryBuffer mbmd;
-            loop {
+            loop
+            {
 #ifdef _TRACE
                 iter++;
                 ActPrintLog(activity, "Split: %d",iter);
 #endif
                 emin.serializeCompress(mbmn.clear());
                 emax.serializeCompress(mbmx.clear());
-#ifdef ASYNC_PARTIONING
                 class casyncfor: public CAsyncFor
                 {
                     NodeArray &slaves;
@@ -836,14 +802,6 @@ public:
                     }
                 } afor(slaves,mbmn,mbmx);
                 afor.For(numnodes, 20, true);
-#else
-                for (i=0;i<numnodes;i++) {
-                    CSortNode &slave = slaves.item(i);
-                    if (slave.numrecs!=0)
-                        slave.GetMultiMidPointStart(mbmn.length(),mbmn.bufferBase(),mbmx.length(),mbmx.bufferBase());               
-                }
-#endif
-#ifdef ASYNC_PARTIONING
                 Semaphore *nextsem = new Semaphore[numnodes];
                 CriticalSection nextsect;
 
@@ -854,9 +812,10 @@ public:
                     CThorExpandingRowArray &totmid;
                     Semaphore *nextsem;
                     unsigned numsplits;
+                    IRowInterfaces *keyIf;
                 public:
-                    casyncfor2(NodeArray &_slaves, CThorExpandingRowArray &_totmid, unsigned _numsplits, Semaphore *_nextsem)
-                        : slaves(_slaves), totmid(_totmid)
+                    casyncfor2(NodeArray &_slaves, CThorExpandingRowArray &_totmid, unsigned _numsplits, Semaphore *_nextsem, IRowInterfaces *_keyIf)
+                        : slaves(_slaves), totmid(_totmid), keyIf(_keyIf)
                     { 
                         nextsem = _nextsem;
                         numsplits = _numsplits;
@@ -875,8 +834,23 @@ public:
                             unsigned base = totmid.ordinality();
                             if (p)
                             {
-                                totmid.deserializeExpand(retlen, p);
+                                MemoryBuffer mb;
+                                fastLZDecompressToBuffer(mb, p);
                                 free(p);
+                                CThorStreamDeserializerSource d(mb.length(), mb.toByteArray());
+                                while (!d.eos())
+                                {
+                                    RtlDynamicRowBuilder rowBuilder(keyIf->queryRowAllocator());
+                                    bool nullRow;
+                                    d.read(sizeof(bool),&nullRow);
+                                    if (nullRow)
+                                        totmid.append(NULL);
+                                    else
+                                    {
+                                        size32_t sz = keyIf->queryRowDeserializer()->deserialize(rowBuilder, d);
+                                        totmid.append(rowBuilder.finalizeRowClear(sz));
+                                    }
+                                }
                             }
                             while (totmid.ordinality()-base<numsplits)
                                 totmid.append(NULL);
@@ -889,42 +863,20 @@ public:
                         }
                         nextsem[i].signal();
                     }
-                } afor2(slaves, totmid, numsplits, nextsem);
+                } afor2(slaves, totmid, numsplits, nextsem, keyIf);
                 afor2.For(numnodes, 20);
+
                 delete [] nextsem;
-#else
-                for (i=0;i<numnodes;i++) {
-                    CSortNode &slave = slaves.item(i);
-                    unsigned base = totmid.ordinality();
-                    if (slave.numrecs!=0) {
-                        void *p = NULL;
-                        size32_t retlen = 0;
-                        slave.GetMultiMidPointStop(retlen,p);               
-                        totmid.deserializeExpand(retlen, p);
-                        free(p);
-#ifdef _DEBUG
-                        if (logging) {
-                            MemoryBuffer buf;
-                            for (j=0;j<numsplits;j++) {
-                                ActPrintLog(activity, "Min(%d): ",j); traceKey(rowif->queryRowSerializer(),"    ",emin.query(j));
-                                ActPrintLog(activity, "Mid(%d): ",j); traceKey(rowif->queryRowSerializer(),"    ",totmid.query(j+base));
-                                ActPrintLog(activity, "Max(%d): ",j); traceKey(rowif->queryRowSerializer(),"    ",emax.query(j));
-                            }
-                        }
-#endif
-                    }
-                    while (totmid.ordinality()-base<numsplits)
-                        totmid.append(NULL);
-                }
-#endif
                 mid.kill();
                 mbmn.clear();
                 mbmx.clear();
-                for (i=0;i<numsplits;i++) {
+                for (i=0;i<numsplits;i++)
+                {
                     amid.kill();
                     unsigned k;
                     unsigned t = i;
-                    for (k=0;k<numsplits;k++) {
+                    for (k=0;k<numsplits;k++)
+                    {
                         const void *row = totmid.query(t);
                         if (row)
                             amid.append(t);
@@ -938,11 +890,12 @@ public:
                     if (amid.ordinality()) {
                         unsigned mi = amid.item(amid.ordinality()/2);
 #ifdef _DEBUG
-                        if (logging) {
+                        if (logging)
+                        {
                             MemoryBuffer buf;
-                            const void *b =totmid.query(mi);
+                            const void *b = totmid.query(mi);
                             ActPrintLog(activity, "%d: %d %d",i,mi,amid.ordinality()/2);
-                            traceKey(rowif->queryRowSerializer(),"mid",b);
+                            traceKey(keyIf->queryRowSerializer(),"mid",b);
                         }
 #endif
                         mid.append(totmid.get(mi));
@@ -953,29 +906,33 @@ public:
 
                 // calculate split map
                 mid.serializeCompress(mbmd.clear());
-                for (i=0;i<numnodes;i++) {
+                for (i=0;i<numnodes;i++)
+                {
                     CSortNode &slave = slaves.item(i);
                     if (slave.numrecs!=0)
                         slave.MultiBinChopStart(mbmd.length(),(const byte *)mbmd.bufferBase(),CMPFN_NORMAL);
                 }
                 mbmd.clear();
-                for (i=0;i<numnodes;i++) {
+                for (i=0;i<numnodes;i++)
+                {
                     CSortNode &slave = slaves.item(i);
-                    if (slave.numrecs!=0) {
+                    if (slave.numrecs!=0)
+                    {
                         rowcount_t *res=splitmap+(i*numnodes);
                         slave.MultiBinChopStop(numsplits,res);
                         res[numnodes-1] = slave.numrecs;
                     }
                 }
 
-                CThorExpandingRowArray newmin(*activity, rowif, true);
-                CThorExpandingRowArray newmax(*activity, rowif, true);
+                CThorExpandingRowArray newmin(*activity, keyIf, true);
+                CThorExpandingRowArray newmax(*activity, keyIf, true);
                 unsigned __int64 maxerror=0;
                 unsigned __int64 nodewanted = (stotal/numnodes); // Note scaled total
                 unsigned __int64 variancelimit = estrecsize?maxdeviance/estrecsize:0;
                 if (variancelimit>nodewanted/50)
                     variancelimit=nodewanted/50; // 2%
-                for (i=0;i<numsplits;i++) {
+                for (i=0;i<numsplits;i++)
+                {
                     unsigned __int64 tot = 0;
                     unsigned __int64 loc = 0;
                     for (j=0;j<numnodes;j++) {
@@ -996,23 +953,26 @@ public:
                     unsigned __int64 error = (loc>nodewanted)?(loc-nodewanted):(nodewanted-loc);
                     if (error>maxerror)
                         maxerror = error;
-                    if (wanted<tot) {
+                    if (wanted<tot)
+                    {
                         newmin.append(emin.get(i));
                         newmax.append(mid.get(i));
                     }
-                    else if (wanted>tot) {
+                    else if (wanted>tot)
+                    {
                         newmin.append(mid.get(i));
                         newmax.append(emax.get(i));
                     }
-                    else {
+                    else
+                    {
                         newmin.append(emin.get(i));
                         newmax.append(emax.get(i));
                     }
                 }
-                if (emin.equal(icompare,newmin)&&emax.equal(icompare,newmax)) {
+                if (emin.equal(icompare,newmin)&&emax.equal(icompare,newmax))
                     break; // reached steady state 
-                }
-                if ((maxerror*10000<nodewanted)||((iter>3)&&(maxerror<variancelimit))) { // within .01% or within variancelimit 
+                if ((maxerror*10000<nodewanted)||((iter>3)&&(maxerror<variancelimit))) // within .01% or within variancelimit
+                {
                     ActPrintLog(activity, "maxerror = %" CF "d, nodewanted = %" CF "d, variancelimit=%" CF "d, estrecsize=%u, maxdeviance=%u",
                              maxerror,nodewanted,variancelimit,estrecsize,maxdeviance);
                     break;
@@ -1032,11 +992,14 @@ public:
         partitioninfo->splitkeys.transfer(mid);
         partitioninfo->numnodes = numnodes;
 #ifdef _DEBUG
-        if (logging) {
-            for (unsigned i=0;i<numnodes;i++) {
+        if (logging)
+        {
+            for (unsigned i=0;i<numnodes;i++)
+            {
                 StringBuffer str;
                 str.appendf("%d: ",i);
-                for (j=0;j<numnodes;j++) {
+                for (j=0;j<numnodes;j++)
+                {
                     str.appendf("%" RCPF "d, ",splitmap[j+i*numnodes]);
                 }
                 ActPrintLog(activity, "%s",str.str());
@@ -1053,7 +1016,8 @@ public:
 #ifdef _TRACE
 #ifdef TRACE_PARTITION
         ActPrintLog(activity, "UsePartitionInfo %s",uppercmp?"upper":"");
-        for (i=0;i<pi.splitkeys.ordinality();i++) {
+        for (i=0;i<pi.splitkeys.ordinality();i++)
+        {
             StringBuffer s;
             s.appendf("%d: ",i);
             traceKey(pi.prowif->queryRowSerializer(), s.str(), pi.splitkeys.query(i));
@@ -1067,22 +1031,26 @@ public:
         OwnedMalloc<rowcount_t> res(numsplits);
         unsigned j;
         rowcount_t *mapp=splitMap;
-        for (i=0;i<numnodes;i++) {
+        for (i=0;i<numnodes;i++)
+        {
             CSortNode &slave = slaves.item(i);
-            if (numsplits>0) {
+            if (numsplits>0)
+            {
                 MemoryBuffer mb;
                 pi.splitkeys.serialize(mb);
                 assertex(pi.splitkeys.ordinality()==numsplits);
-                slave.MultiBinChop(mb.length(),(const byte *)mb.bufferBase(),numsplits,res,uppercmp?CMPFN_UPPER:CMPFN_COLLATE,true);
+                slave.MultiBinChop(mb.length(),(const byte *)mb.bufferBase(),numsplits,res,uppercmp?CMPFN_UPPER:CMPFN_COLLATE);
                 rowcount_t *resp = res;
                 rowcount_t p=*resp;
                 *mapp = p;
                 resp++;
                 mapp++;
-                for (j=1;j<numsplits;j++) {
+                for (j=1;j<numsplits;j++)
+                {
                     rowcount_t n = *resp;
                     *mapp = n;
-                    if (p>n) {
+                    if (p>n)
+                    {
                         ActPrintLog(activity, "ERROR: Split positions out of order!");
                         throw MakeActivityException(activity, TE_SplitPostionsOutOfOrder,"CSortMaster::UsePartitionInfo: Split positions out of order!");
                     }
@@ -1098,10 +1066,12 @@ public:
 #ifdef TRACE_PARTITION
         ActPrintLog(activity, "UsePartitionInfo result");
         rowcount_t *p = splitMap;
-        for (i=0;i<numnodes;i++) {
+        for (i=0;i<numnodes;i++)
+        {
             StringBuffer s;
             s.appendf("%d: ",i);
-            for (j=0;j<numnodes;j++) {
+            for (j=0;j<numnodes;j++)
+            {
                 s.appendf(" %" RCPF "d,",*p);
                 p++;
             }
@@ -1114,26 +1084,28 @@ public:
 
     void CalcExtPartition()
     {
-        // I think this dependant on row being same format as meta
+        // I think this dependent on row being same format as meta
 
         unsigned numsplits=numnodes-1;
-        CThorExpandingRowArray splits(*activity, auxrowif, true);
+        CThorExpandingRowArray splits(*activity, keyIf, true);
         char *s=cosortfilenames;
         unsigned i;
-        for(i=0;i<numnodes;i++) {
+        for(i=0;i<numnodes;i++)
+        {
             char *e=strchr(s,'|');
             if (e) 
                 *e = 0;
             else if (i!=numnodes-1)
                 return;
-            if (i) {
+            if (i)
+            {
                 CSortNode &slave = slaves.item(i);
                 byte *rowmem;
                 size32_t rowsize;
                 if (!slave.FirstRowOfFile(s,rowsize,rowmem))
                     return;
                 OwnedConstThorRow row;
-                row.deserialize(auxrowif,rowsize,rowmem);
+                row.deserialize(keyIf,rowsize,rowmem);
                 splits.append(row.getClear());
                 free(rowmem);
             }
@@ -1149,20 +1121,22 @@ public:
     {
         ActPrintLog(activity, "Previous partition");
         unsigned numsplits=numnodes-1;
-        CThorExpandingRowArray splits(*activity, auxrowif, true);
+        CThorExpandingRowArray splits(*activity, keyIf, true);
         unsigned i;
-        for(i=1;i<numnodes;i++) {
+        for(i=1;i<numnodes;i++)
+        {
             CSortNode &slave = slaves.item(i);
             byte *rowmem;
             size32_t rowsize;
             if (!slave.FirstRowOfFile("",rowsize,rowmem))
                 return;
             OwnedConstThorRow row;
-            row.deserialize(auxrowif,rowsize,rowmem);
-            if (row&&rowsize) {
+            row.deserialize(keyIf, rowsize, rowmem);
+            if (row&&rowsize)
+            {
                 StringBuffer n;
                 n.append(i).append(": ");
-                traceKey(auxrowif->queryRowSerializer(),n,row);
+                traceKey(keyIf->queryRowSerializer(),n,row);
             }
             splits.append(row.getClear());
             free(rowmem);
@@ -1244,39 +1218,48 @@ public:
         bool usesampling = true;        
 #endif
         bool useAux = false; // JCSMORE using existing partioning and auxillary rowIf (only used if overflow)
-        loop {
+        loop
+        {
             OwnedMalloc<rowcount_t> splitMap, splitMapUpper;
             CTimer timer;
-            if (numnodes>1) {
+            if (numnodes>1)
+            {
                 timer.start();
-                if (cosortfilenames) {
+                if (cosortfilenames)
+                {
                     useAux = true;
                     CalcExtPartition();
                     canoptimizenullcolumns = false;
                 }
-                if (usepartitionrow) {
+                if (usepartitionrow)
+                {
                     useAux = true;
                     CalcPreviousPartition();
                     canoptimizenullcolumns = false;
                 }
-                if (partitioninfo->IsOK()) {
+                if (partitioninfo->IsOK())
+                {
                     useAux = true;
                     splitMap.setown(UsePartitionInfo(*partitioninfo, betweensort));
-                    if (betweensort) {
+                    if (betweensort)
+                    {
                         splitMapUpper.setown(UsePartitionInfo(*partitioninfo, false));
                         canoptimizenullcolumns = false;
                     }
                 }
-                else {
+                else
+                {
                     // check for small sort here
-                    if ((skewError<0.0)&&!betweensort) {
+                    if ((skewError<0.0)&&!betweensort)
+                    {
                         splitMap.setown(CalcPartitionUsingSampling());
                         skewError = -skewError;
 #ifdef USE_SAMPLE_PARTITIONING
                         usesampling = false;
 #endif
                     }
-                    else {
+                    else
+                    {
                         if (skewError<0.0)
                             skewError = -skewError;
 
@@ -1291,7 +1274,8 @@ public:
                             splitMap.setown(CalcPartition(false));
 #endif
                     }
-                    if (!partitioninfo->splitkeys.checkSorted(icompare)) {
+                    if (!partitioninfo->splitkeys.checkSorted(icompare))
+                    {
                         ActPrintLog(activity, "ERROR: Split keys out of order!");
                         partitioninfo->splitkeys.sort(*icompare, activity->queryMaxCores());
                     }
@@ -1300,18 +1284,22 @@ public:
             }
             OwnedMalloc<SocketEndpoint> endpoints(numnodes);
             SocketEndpoint *epp = endpoints;
-            for (i=0;i<numnodes;i++) {
+            for (i=0;i<numnodes;i++)
+            {
                 CSortNode &slave = slaves.item(i);
                 *epp = slave.endpoint;
                 epp++;
             }
-            if (numnodes>1) {
+            if (numnodes>1)
+            {
                 // minimize logging
                 unsigned numspilt = 0;
                 UnsignedArray spilln;
-                for (i=0;i<numnodes;i++) {
+                for (i=0;i<numnodes;i++)
+                {
                     CSortNode &slave = slaves.item(i);
-                    if (slave.overflow) {
+                    if (slave.overflow)
+                    {
                         while (spilln.ordinality()<slave.scale)
                             spilln.append(0);
                         spilln.replace(spilln.item(slave.scale-1)+1,slave.scale-1);
@@ -1324,14 +1312,19 @@ public:
                     unsigned mostspilt = 0;
                     unsigned spiltmax = 0;
                     ForEachItemIn(smi,spilln)
-                        if (spilln.item(smi)>spiltmax) {
+                    {
+                        if (spilln.item(smi)>spiltmax)
+                        {
                             spiltmax = spilln.item(smi);
                             mostspilt = smi;
                         }
+                    }
                     ActPrintLog(activity, "Gather - %d nodes spilt to disk, most %d times",numspilt,mostspilt);
-                    for (i=0;i<numnodes;i++) {
+                    for (i=0;i<numnodes;i++)
+                    {
                         CSortNode &slave = slaves.item(i);
-                        if (slave.scale!=mostspilt+1) {
+                        if (slave.scale!=mostspilt+1)
+                        {
                             char url[100];
                             slave.endpoint.getUrlStr(url,sizeof(url));
                             ActPrintLog(activity, "Gather - node %s spilled %d times to disk",url,slave.scale-1);
@@ -1339,23 +1332,28 @@ public:
                     }
                     MemoryBuffer mbsk;
                     partitioninfo->splitkeys.serialize(mbsk);
-                    for (i=0;i<numnodes;i++) {
+                    for (i=0;i<numnodes;i++)
+                    {
                         CSortNode &slave = slaves.item(i);
                         if (slave.overflow) 
                             slave.OverflowAdjustMapStart(numnodes,splitMap+i*numnodes,mbsk.length(),(const byte *)mbsk.bufferBase(),CMPFN_COLLATE,useAux);
                     }
-                    for (i=0;i<numnodes;i++) {
+                    for (i=0;i<numnodes;i++)
+                    {
                         CSortNode &slave = slaves.item(i);
                         if (slave.overflow) 
                             slave.AdjustNumRecs(slave.OverflowAdjustMapStop(numnodes,splitMap+i*numnodes));
                     }
-                    if (splitMapUpper.get()) {
-                        for (i=0;i<numnodes;i++) {
+                    if (splitMapUpper.get())
+                    {
+                        for (i=0;i<numnodes;i++)
+                        {
                             CSortNode &slave = slaves.item(i);
                             if (slave.overflow) 
                                 slave.OverflowAdjustMapStart(numnodes,splitMapUpper+i*numnodes,mbsk.length(),(const byte *)mbsk.bufferBase(),CMPFN_UPPER,useAux);
                         }
-                        for (i=0;i<numnodes;i++) {
+                        for (i=0;i<numnodes;i++)
+                        {
                             CSortNode &slave = slaves.item(i);
                             if (slave.overflow) 
                                 slave.OverflowAdjustMapStop(numnodes,splitMapUpper+i*numnodes);
@@ -1366,9 +1364,10 @@ public:
                 OwnedMalloc<rowcount_t> tot(numnodes, true);
                 rowcount_t max=0;
                 unsigned imax=numnodes;
-                for (i=0;i<imax;i++) {
+                for (i=0;i<imax;i++){
                     unsigned j;
-                    for (j=0;j<numnodes;j++) {
+                    for (j=0;j<numnodes;j++)
+                    {
                         if (splitMapUpper)
                             tot[i]+=splitMapUpper[i+j*numnodes];
                         else
@@ -1378,9 +1377,12 @@ public:
                     }
                     if (tot[i]>max)
                         max = tot[i];
-                    if (!betweensort&&canoptimizenullcolumns&&(tot[i]==0)) {
-                        for (j=0;j<numnodes;j++) {
-                            for (unsigned k=i+1;k<numnodes;k++) {
+                    if (!betweensort&&canoptimizenullcolumns&&(tot[i]==0))
+                    {
+                        for (j=0;j<numnodes;j++)
+                        {
+                            for (unsigned k=i+1;k<numnodes;k++)
+                            {
                                 splitMap[k+j*numnodes-1] = splitMap[k+j*numnodes];
                             }
                         }
@@ -1388,7 +1390,8 @@ public:
                         i--;
                     }
                 }
-                for (i=0;i<numnodes;i++) {
+                for (i=0;i<numnodes;i++)
+                {
                     CSortNode &slave = slaves.item(i);
                     char url[100];
                     slave.endpoint.getUrlStr(url,sizeof(url));
@@ -1402,7 +1405,8 @@ public:
                     splitMap.setown(CalcPartition(true));
 #endif
 #ifdef USE_SAMPLE_PARTITIONING
-                    if (usesampling) {
+                    if (usesampling)
+                    {
                         ActPrintLog(activity, "Partioning using sampling failed, trying iterative partitioning"); 
                         usesampling = false;
                         continue;
@@ -1411,7 +1415,8 @@ public:
                     throw e.getClear();
                 }
                 ActPrintLog(activity, "Starting Merge of %" RCPF "d records",total);
-                for (i=0;i<numnodes;i++) {
+                for (i=0;i<numnodes;i++)
+                {
                     CSortNode &slave = slaves.item(i);
                     char url[100];
                     slave.endpoint.getUrlStr(url,sizeof(url));
@@ -1422,7 +1427,8 @@ public:
     //              ActPrintLog(activity, "Merge %d started: %d rows on %s",i,tot[i],url);
                 }
             }
-            else {
+            else
+            {
                 CSortNode &slave = slaves.item(0);
                 slave.SingleMerge();
                 ActPrintLog(activity, "Merge started");

+ 4 - 62
thorlcr/msort/tsortmp.cpp

@@ -15,8 +15,6 @@ enum MPSlaveFunctions
     FN_StartGather,
     FN_GetGatherInfo,
     FN_GetMinMax,
-    FN_GetMidPoint,
-    FN_GetMultiMidPoint,
     FN_GetMultiMidPointStart,
     FN_GetMultiMidPointStop,
     FN_MultiBinChop,
@@ -126,29 +124,6 @@ rowcount_t SortSlaveMP::GetMinMax(size32_t &keybuffsize,void *&keybuff, size32_t
     return ret;
 }
 
-bool SortSlaveMP::GetMidPoint (size32_t lkeysize, const byte * lkey,size32_t hkeysize, const byte * hkey,  size32_t &mkeysize, byte * &mkey)
-{
-    CMessageBuffer mb;
-    mb.append((byte)FN_GetMidPoint );
-    serializeblk(mb,lkeysize,lkey);
-    serializeblk(mb,hkeysize,hkey);
-    sendRecv(mb);
-    deserializeblk(mb,mkeysize,mkey);
-    bool ret;
-    mb.read(ret);
-    return ret;
-}
-
-void SortSlaveMP::GetMultiMidPoint(size32_t lkeybuffsize, const void * lkeybuff,size32_t hkeybuffsize, const void * hkeybuff,  size32_t &mkeybuffsize, void * &mkeybuf)
-{
-    CMessageBuffer mb;
-    mb.append((byte)FN_GetMultiMidPoint);
-    serializeblk(mb,lkeybuffsize,lkeybuff);
-    serializeblk(mb,hkeybuffsize,hkeybuff);
-    sendRecv(mb);
-    deserializeblk(mb,mkeybuffsize,mkeybuf);
-}
-
 void SortSlaveMP::GetMultiMidPointStart(size32_t lkeybuffsize, const void * lkeybuff, size32_t hkeybuffsize, const void * hkeybuff) /* async */
 {
     CMessageBuffer mb;
@@ -166,11 +141,11 @@ void SortSlaveMP::GetMultiMidPointStop(size32_t &mkeybuffsize, void * &mkeybuf)
     deserializeblk(mb,mkeybuffsize,mkeybuf);
 }
 
-void SortSlaveMP::MultiBinChop(size32_t keybuffsize,const byte *keybuff, unsigned num,rowcount_t *pos,byte cmpfn,bool useaux)
+void SortSlaveMP::MultiBinChop(size32_t keybuffsize,const byte *keybuff, unsigned num,rowcount_t *pos,byte cmpfn)
 {
     CMessageBuffer mb;
     mb.append((byte)FN_MultiBinChop);
-    serializeblk(mb,keybuffsize,keybuff).append(num).append(cmpfn).append(useaux);
+    serializeblk(mb,keybuffsize,keybuff).append(num).append(cmpfn);
     sendRecv(mb);
     mb.read(num*sizeof(rowcount_t),pos);
 }
@@ -354,22 +329,6 @@ bool SortSlaveMP::marshall(ISortSlaveMP &slave, ICommunicator* comm, mptag_t tag
                 free(keybuff);
             }
             break;
-            case FN_GetMidPoint : {
-                size32_t lkeysize;
-                byte * lkey;
-                size32_t hkeysize;
-                byte * hkey;
-                deserializeblk(mb,lkeysize,lkey);
-                deserializeblk(mb,hkeysize,hkey);
-                size32_t mkeysize=0;
-                byte * mkey=NULL;
-                bool ret = slave.GetMidPoint(lkeysize,lkey,hkeysize,hkey,mkeysize,mkey);
-                free(lkey);
-                free(hkey);
-                serializeblk(mbout,mkeysize,mkey).append(ret);
-                free(mkey);
-            }
-            break;
             case FN_GetMultiMidPointStart: {
                 replydone = true;
                 comm->reply(mbout);
@@ -384,22 +343,6 @@ bool SortSlaveMP::marshall(ISortSlaveMP &slave, ICommunicator* comm, mptag_t tag
                 free(hkeybuff);
             }
             break;
-            case FN_GetMultiMidPoint: {
-                size32_t lkeybuffsize;
-                void * lkeybuff;
-                size32_t hkeybuffsize;
-                void * hkeybuff;
-                deserializeblk(mb,lkeybuffsize,lkeybuff);
-                deserializeblk(mb,hkeybuffsize,hkeybuff);
-                size32_t mkeybuffsize=0;
-                void * mkeybuff = NULL;
-                slave.GetMultiMidPoint(lkeybuffsize,lkeybuff,hkeybuffsize,hkeybuff,mkeybuffsize,mkeybuff);
-                free(lkeybuff);
-                free(hkeybuff);
-                serializeblk(mbout,mkeybuffsize,mkeybuff);
-                free(mkeybuff);
-            }
-            break;
             case FN_MultiBinChopStop: {
                 unsigned num;
                 mb.read(num);
@@ -433,10 +376,9 @@ bool SortSlaveMP::marshall(ISortSlaveMP &slave, ICommunicator* comm, mptag_t tag
                 deserializeblk(mb,keybuffsize,keybuff);
                 unsigned num;
                 byte cmpfn;
-                bool useaux;
-                mb.read(num).read(cmpfn).read(useaux);
+                mb.read(num).read(cmpfn);
                 void *out = mbout.reserveTruncate(num*sizeof(rowcount_t));
-                slave.MultiBinChop(keybuffsize,(const byte *)keybuff,num,(rowcount_t *)out,cmpfn,useaux);
+                slave.MultiBinChop(keybuffsize,(const byte *)keybuff,num,(rowcount_t *)out,cmpfn);
                 free(keybuff);
             }
             break;

+ 2 - 6
thorlcr/msort/tsortmp.hpp

@@ -17,11 +17,9 @@ interface ISortSlaveMP
     virtual void StartGather()=0;
     virtual void GetGatherInfo(rowcount_t &numlocal, offset_t &totalsize, unsigned &overflowscale, bool hasserializer)=0;
     virtual rowcount_t GetMinMax(size32_t &keybuffsize,void *&keybuff, size32_t &avrecsizesize)=0;
-    virtual bool GetMidPoint     (size32_t lkeysize, const byte * lkey, size32_t hkeysize, const byte * hkey, size32_t &mkeysize, byte * &mkey)=0;
-    virtual void GetMultiMidPoint(size32_t lkeybuffsize, const void * lkeybuff, size32_t hkeybuffsize, const void * hkeybuff, size32_t &mkeybuffsize, void * &mkeybuf)=0;
     virtual void GetMultiMidPointStart(size32_t lkeybuffsize, const void * lkeybuff, size32_t hkeybuffsize, const void * hkeybuff)=0; /* async */
     virtual void GetMultiMidPointStop(size32_t &mkeybuffsize, void * &mkeybuf)=0;
-    virtual void MultiBinChop(size32_t keybuffsize,const byte *keybuff, unsigned num,rowcount_t *pos,byte cmpfn,bool useaux)=0;
+    virtual void MultiBinChop(size32_t keybuffsize,const byte *keybuff, unsigned num,rowcount_t *pos,byte cmpfn)=0;
     virtual void MultiBinChopStart(size32_t keybuffsize,const byte *keybuff, byte cmpfn)=0; /* async */
     virtual void MultiBinChopStop(unsigned num,rowcount_t *pos)=0;
     virtual void OverflowAdjustMapStart(unsigned mapsize,rowcount_t *map,size32_t keybuffsize,const byte *keybuff, byte cmpfn, bool useaux)=0; /* async */
@@ -53,11 +51,9 @@ public:
     void StartGather();
     void GetGatherInfo(rowcount_t &numlocal, offset_t &totalsize, unsigned &overflowscale, bool hasserializer);
     rowcount_t GetMinMax(size32_t &keybuffsize,void *&keybuff, size32_t &avrecsizesize);
-    bool GetMidPoint     (size32_t lkeysize, const byte * lkey, size32_t hkeysize, const byte * hkey, size32_t &mkeysize, byte * &mkey);
-    void GetMultiMidPoint(size32_t lkeybuffsize, const void * lkeybuff, size32_t hkeybuffsize, const void * hkeybuff, size32_t &mkeybuffsize, void * &mkeybuf);
     void GetMultiMidPointStart(size32_t lkeybuffsize, const void * lkeybuff, size32_t hkeybuffsize, const void * hkeybuff); /* async */
     void GetMultiMidPointStop(size32_t &mkeybuffsize, void * &mkeybuf);
-    void MultiBinChop(size32_t keybuffsize,const byte *keybuff, unsigned num,rowcount_t *pos,byte cmpfn,bool useaux);
+    void MultiBinChop(size32_t keybuffsize,const byte *keybuff, unsigned num,rowcount_t *pos,byte cmpfn);
     void MultiBinChopStart(size32_t keybuffsize,const byte *keybuff, byte cmpfn); /* async */
     void MultiBinChopStop(unsigned num,rowcount_t *pos);
     void OverflowAdjustMapStart(unsigned mapsize,rowcount_t *map,size32_t keybuffsize,const byte *keybuff, byte cmpfn,bool useaux); /* async */

+ 180 - 145
thorlcr/msort/tsorts.cpp

@@ -18,6 +18,7 @@
 #include "platform.h"
 #include <limits.h>
 #include "jlib.hpp"
+#include "jflz.hpp"
 #include <mpbase.hpp>
 #include <mpcomm.hpp>
 #include "thorport.hpp"
@@ -584,15 +585,38 @@ class CThorSorter : public CSimpleInterface, implements IThorSorter, implements
     offset_t grandtotalsize;
     rowcount_t *overflowmap, *multibinchoppos;
     bool stopping, gatherdone, nosort, isstable;
-    ICompare *icompare;
-    ICompare *icollate; // used for co-sort
-    ICompare *icollateupper; // used in between join
+    ICompare *rowCompare, *keyRowCompare;
+    ICompare *primarySecondaryCompare; // used for co-sort
+    ICompare *primarySecondaryUpperCompare; // used in between join
     ISortKeySerializer *keyserializer;      // used on partition calculation
+    Owned<IRowInterfaces> keyIf;
+    Owned<IOutputRowSerializer> rowToKeySerializer;
     void *midkeybuf;
     Semaphore startgathersem, finishedmergesem, closedownsem;
     InterruptableSemaphore startmergesem;
     size32_t transferblocksize, midkeybufsize;
 
+    class CRowToKeySerializer : public CSimpleInterfaceOf<IOutputRowSerializer>
+    {
+        ISortKeySerializer *keyConverter;
+        IRowInterfaces *rowIf, *keyIf;
+    public:
+        CRowToKeySerializer(IRowInterfaces *_rowIf, IRowInterfaces *_keyIf, ISortKeySerializer *_keyConverter)
+            : rowIf(_rowIf), keyIf(_keyIf), keyConverter(_keyConverter)
+        {
+        }
+        // IOutputRowSerializer impl.
+        virtual void serialize(IRowSerializerTarget & out, const byte *row)
+        {
+            CSizingSerializer ssz;
+            rowIf->queryRowSerializer()->serialize(ssz, (const byte *)row);
+            size32_t recSz = ssz.size();
+            RtlDynamicRowBuilder k(keyIf->queryRowAllocator());
+            size32_t keySz = keyConverter->recordToKey(k, row, recSz);
+            OwnedConstThorRow keyRow =  k.finalizeRowClear(keySz);
+            keyIf->queryRowSerializer()->serialize(out, (const byte *)keyRow.get());
+        }
+    };
     void main()
     {
         try
@@ -626,10 +650,17 @@ class CThorSorter : public CSimpleInterface, implements IThorSorter, implements
         stopping = true;
     }
 #ifdef _TRACE
-    void TraceKey(const char *s, const void *k)
+    void TraceRow(const char *s, const void *k)
     {
         traceKey(rowif->queryRowSerializer(), s, k);
     }
+    void TraceKey(const char *s, const void *k)
+    {
+        traceKey(keyIf->queryRowSerializer(), s, k);
+    }
+#else
+#define TraceRow(msg, row)
+#define TraceKey(msg, key)
 #endif
 
     void stop()
@@ -646,33 +677,36 @@ class CThorSorter : public CSimpleInterface, implements IThorSorter, implements
     ICompare *queryCmpFn(byte cmpfn)
     {
         switch (cmpfn) {
-        case CMPFN_NORMAL: return icompare;
-        case CMPFN_COLLATE: return icollate;
-        case CMPFN_UPPER: return icollateupper;
+        case CMPFN_NORMAL: return keyRowCompare;
+        case CMPFN_COLLATE: return primarySecondaryCompare;
+        case CMPFN_UPPER: return primarySecondaryUpperCompare;
         }
         return NULL;
     }
-    rowidx_t BinChop(const void *row, bool lesseq, bool firstdup, byte cmpfn)
+    rowidx_t BinChop(const void *key, bool lesseq, bool firstdup, byte cmpfn)
     {
         rowidx_t n = rowArray.ordinality();
         rowidx_t l=0;
         rowidx_t r=n;
         ICompare* icmp=queryCmpFn(cmpfn);
-        while (l<r) {
+        while (l<r)
+        {
             rowidx_t m = (l+r)/2;
-            const void *p = rowArray.query(m);
-            int cmp = icmp->docompare(row, p);
+            int cmp = icmp->docompare(key, rowArray.query(m));
             if (cmp < 0)
                 r = m;
             else if (cmp > 0)
                 l = m+1;
-            else {
-                if (firstdup) {
-                    while ((m>0)&&(icmp->docompare(row, rowArray.query(m-1))==0))
+            else
+            {
+                if (firstdup)
+                {
+                    while ((m>0)&&(icmp->docompare(key, rowArray.query(m-1))==0))
                         m--;
                 }
-                else {
-                    while ((m+1<n)&&(icmp->docompare(row, rowArray.query(m+1))==0))
+                else
+                {
+                    while ((m+1<n)&&(icmp->docompare(key, rowArray.query(m+1))==0))
                         m++;
                 }
                 return m;
@@ -688,7 +722,8 @@ class CThorSorter : public CSimpleInterface, implements IThorSorter, implements
         for (unsigned n=0;n<num;n++)
         {
             unsigned i = n;
-            loop {                                      // adjustment for empty keys
+            loop                                      // adjustment for empty keys
+            {
                 if (i>=keys.ordinality())
                 {
                     pos[n] = rowArray.ordinality();
@@ -741,10 +776,10 @@ public:
     {
         numnodes = 0;
         partno = 0;
-        icompare = NULL;
+        rowCompare = keyRowCompare = NULL;
         nosort = false;
-        icollate = NULL;
-        icollateupper = NULL;
+        primarySecondaryCompare = NULL;
+        primarySecondaryUpperCompare = NULL;
         midkeybuf = NULL;
         multibinchoppos = NULL;
         transferblocksize = TRANSFERBLOCKSIZE;
@@ -782,78 +817,40 @@ public:
     {
         if (!gatherdone)
             ERRLOG("GetGatherInfo:***Error called before gather complete");
-        if (!haskeyserializer&&keyserializer) {
-            ActPrintLog(activity, "Suppressing key serializer on slave");
-            keyserializer = NULL;       // when cosorting and LHS empty
-        }
-        else if (haskeyserializer&&!keyserializer) {
-            WARNLOG("Mismatched key serializer (master has, slave doesn't");
-        }
+        if (haskeyserializer != (NULL != keyserializer))
+            throwUnexpected();
         numlocal = rowArray.ordinality(); // JCSMORE - this is sample total, why not return actual spill total?
         _overflowscale = overflowinterval;
         totalsize = grandtotalsize; // used by master, if nothing overflowed to see if can MiniSort
     }
     virtual rowcount_t GetMinMax(size32_t &keybufsize,void *&keybuf,size32_t &avrecsize)
     {
-        CThorExpandingRowArray ret(*activity, rowif, true);
         avrecsize = 0;
-        if (rowArray.ordinality()>0) {
-            const void *kp = rowArray.get(0);
-#ifdef _TRACE
-            TraceKey("Min =", kp);
-#endif
-            ret.append(kp);
-            kp = rowArray.get(rowArray.ordinality()-1);
-#ifdef _TRACE
-            TraceKey("Max =", kp);
-#endif
-            ret.append(kp);
-            avrecsize = (size32_t)(grandtotalsize/grandtotal);
-#ifdef _TRACE
-            ActPrintLog(activity, "Ave Rec Size = %u",avrecsize);
-#endif
+        if (0 == rowArray.ordinality())
+        {
+            keybufsize = 0;
+            keybuf = NULL;
+            return 0;
         }
+
         MemoryBuffer mb;
-        ret.serialize(mb);
+        CMemoryRowSerializer msz(mb);
+
+        const void *kp = rowArray.get(0);
+        TraceRow("Min =", kp);
+        rowToKeySerializer->serialize(msz, (const byte *)kp);
+
+        kp = rowArray.get(rowArray.ordinality()-1);
+        TraceRow("Max =", kp);
+        rowToKeySerializer->serialize(msz, (const byte *)kp);
+
+        avrecsize = (size32_t)(grandtotalsize/grandtotal);
+#ifdef _TRACE
+        ActPrintLog(activity, "Ave Rec Size = %u", avrecsize);
+#endif
         keybufsize = mb.length();
         keybuf = mb.detach();
-        return rowArray.ordinality();
-    }
-    virtual bool GetMidPoint(size32_t lsize,const byte *lkeymem,
-                    size32_t hsize,const byte *hkeymem,
-                    size32_t &msize,byte *&mkeymem)
-    {
-        // finds the keys within the ranges specified
-        // uses empty keys (0 size) if none found
-        // try to avoid endpoints if possible
-        if (rowArray.ordinality()!=0) {
-            OwnedConstThorRow lkey;
-            lkey.deserialize(rowif,lsize,lkeymem);
-            OwnedConstThorRow hkey;
-            hkey.deserialize(rowif,hsize,hkeymem);
-            unsigned p1 = BinChop(lkey.get(),false,false,false);
-            if (p1==(unsigned)-1)
-                p1 = 0;
-            unsigned p2 = BinChop(hkey.get(),true,true,false);
-            if (p2>=rowArray.ordinality()) 
-                p2 = rowArray.ordinality()-1;
-            if (p1<=p2) {
-                unsigned pm=(p1+p2+1)/2;
-                const void *kp=rowArray.query(pm);
-                if ((icompare->docompare(lkey,kp)<=0)&&
-                        (icompare->docompare(hkey,kp)>=0)) { // paranoia
-                    MemoryBuffer mb;
-                    CMemoryRowSerializer mbsz(mb);
-                    rowif->queryRowSerializer()->serialize(mbsz, (const byte *)kp);
-                    msize = mb.length();
-                    mkeymem = (byte *)mb.detach();;
-                    return true;
-                }
-            }
-        }
-        mkeymem = NULL;
-        msize = 0;
-        return false;
+        return 2;
     }
     virtual void GetMultiMidPoint(size32_t lbufsize,const void *lkeybuf,
                           size32_t hbufsize,const void *hkeybuf,
@@ -861,27 +858,31 @@ public:
     {
         // finds the keys within the ranges specified
         // uses empty keys (0 size) if none found
-        CThorExpandingRowArray low(*activity, rowif, true);
-        CThorExpandingRowArray high(*activity, rowif, true);
-        CThorExpandingRowArray mid(*activity, rowif, true);
+        CThorExpandingRowArray low(*activity, keyIf, true);
+        CThorExpandingRowArray high(*activity, keyIf, true);
+        CThorExpandingRowArray mid(*activity, keyIf, true);
         low.deserializeExpand(lbufsize, lkeybuf);
         high.deserializeExpand(hbufsize, hkeybuf);
         unsigned n=low.ordinality();
         assertex(n==high.ordinality());
         unsigned i;
-        for (i=0;i<n;i++) {
-            if (rowArray.ordinality()!=0) {
-                unsigned p1 = BinChop(low.query(i), false, false, false);
+        for (i=0;i<n;i++)
+        {
+            if (rowArray.ordinality()!=0)
+            {
+                unsigned p1 = BinChop(low.query(i), false, false, CMPFN_NORMAL);
                 if (p1==(unsigned)-1)
                     p1 = 0;
-                unsigned p2 = BinChop(high.query(i), true, true, false);
+                unsigned p2 = BinChop(high.query(i), true, true, CMPFN_NORMAL);
                 if (p2>=rowArray.ordinality()) 
                     p2 = rowArray.ordinality()-1;
-                if (p1<=p2) { 
+                if (p1<=p2)
+                {
                     unsigned pm=(p1+p2+1)/2;
                     OwnedConstThorRow kp = rowArray.get(pm);
-                    if ((icompare->docompare(low.query(i), kp)<=0)&&
-                        (icompare->docompare(high.query(i), kp)>=0)) { // paranoia
+                    if ((keyRowCompare->docompare(low.query(i), kp)<=0)&&
+                        (keyRowCompare->docompare(high.query(i), kp)>=0)) // paranoia
+                    {
                         mid.append(kp.getClear());
                     }
                     else
@@ -894,9 +895,22 @@ public:
                 mid.append(NULL);
         }
         MemoryBuffer mb;
-        mid.serializeCompress(mb);
-        mbufsize = mb.length();
-        mkeybuf = mb.detach();
+        CMemoryRowSerializer s(mb);
+        for (rowidx_t i=0; i<mid.ordinality(); i++)
+        {
+            const void *row = mid.query(i);
+            if (row)
+            {
+                mb.append(false);
+                rowToKeySerializer->serialize(s, (const byte *)row);
+            }
+            else
+                mb.append(true);
+        }
+        MemoryBuffer compressedMb;
+        fastLZCompressToBuffer(compressedMb, mb.length(), mb.toByteArray());
+        mbufsize = compressedMb.length();
+        mkeybuf = compressedMb.detach();
     }
     virtual void GetMultiMidPointStart(size32_t lbufsize,const void *lkeybuf,
                                size32_t hbufsize,const void *hkeybuf)
@@ -911,15 +925,15 @@ public:
         mbufsize = midkeybufsize;
         midkeybuf = NULL;
     }
-    virtual void MultiBinChop(size32_t keybufsize, const byte * keybuf, unsigned num, rowcount_t * pos, byte cmpfn, bool useaux)
+    virtual void MultiBinChop(size32_t keybufsize, const byte * keybuf, unsigned num, rowcount_t * pos, byte cmpfn)
     {
-        CThorExpandingRowArray keys(*activity, useaux?auxrowif:rowif, true);
+        CThorExpandingRowArray keys(*activity, keyIf, true);
         keys.deserialize(keybufsize, keybuf);
         doBinChop(keys, pos, num, cmpfn);
     }
     virtual void MultiBinChopStart(size32_t keybufsize, const byte * keybuf, byte cmpfn)
     {
-        CThorExpandingRowArray keys(*activity, rowif, true);
+        CThorExpandingRowArray keys(*activity, keyIf, true);
         keys.deserializeExpand(keybufsize, keybuf);
         assertex(multibinchoppos==NULL); // check for reentrancy
         multibinchopnum = keys.ordinality();
@@ -948,7 +962,7 @@ public:
         for (i=0;i<mapsize;i++)
             ActPrintLog(activity, "%" RCPF "d ",overflowmap[i]);
 #endif
-        CThorExpandingRowArray keys(*activity, useaux?auxrowif:rowif, true);
+        CThorExpandingRowArray keys(*activity, keyIf, true);
         keys.deserialize(keybufsize, keybuf);
         for (i=0;i<mapsize-1;i++)
             AdjustOverflow(overflowmap[i], keys.query(i), cmpfn);
@@ -992,32 +1006,27 @@ public:
     virtual bool FirstRowOfFile(const char *filename,
                         size32_t &rowbufsize, byte * &rowbuf)
     {
-        if (!*filename) { // partition row wanted
-            if (partitionrow) {
-                MemoryBuffer mb;
-                CMemoryRowSerializer ssz(mb);
-                auxrowif->queryRowSerializer()->serialize(ssz,(const byte *)partitionrow.get());
-                rowbufsize = mb.length();
-                rowbuf = (byte *)mb.detach();   
-                partitionrow.clear(); // only one attempt! 
-            }
-            else {
-                rowbuf = NULL;
-                rowbufsize = 0;
-            }
-            return true;
+        OwnedConstThorRow row;
+        MemoryBuffer mb;
+        CMemoryRowSerializer msz(mb);
+        if (!*filename) // partition row wanted
+        {
+            if (partitionrow)
+                row.set(partitionrow.getClear());
         }
-        Owned<IFile> file = createIFile(filename);
-        Owned<IExtRowStream> rowstream = createRowStream(file, auxrowif);
-        OwnedConstThorRow row = rowstream->nextRow();
-        if (!row) {
+        else
+        {
+            Owned<IFile> file = createIFile(filename);
+            Owned<IExtRowStream> rowstream = createRowStream(file, auxrowif);
+            row.setown(rowstream->nextRow());
+        }
+        if (!row)
+        {
             rowbuf = NULL;
             rowbufsize = 0;
             return true;
         }
-        MemoryBuffer mb;
-        CMemoryRowSerializer msz(mb);
-        auxrowif->queryRowSerializer()->serialize(msz,(const byte *)row.get());
+        rowToKeySerializer->serialize(msz, (const byte *)row.get());
         rowbufsize = mb.length();
         rowbuf = (byte *)mb.detach();
         return true;
@@ -1026,21 +1035,27 @@ public:
     {
         // actually doesn't get Nth row but numsplits samples distributed evenly through the rows
         assertex(numsplits);
-        CThorExpandingRowArray ret(*activity, rowif, true);
         unsigned numrows = rowArray.ordinality();
-        if (numrows) {
-            for (unsigned i=0;i<numsplits;i++) {
-                count_t pos = ((i*2+1)*(count_t)numrows)/(2*(count_t)numsplits);
-                if (pos>=numrows) 
-                    pos = numrows-1;
-                const void *kp = rowArray.get((unsigned)pos);
-                ret.append(kp);
-            }
+        if (0 == numrows)
+        {
+            outbufsize = 0;
+            outkeybuf = NULL;
+            return;
         }
         MemoryBuffer mb;
-        ret.serializeCompress(mb);
-        outbufsize = mb.length();
-        outkeybuf = mb.detach();
+        CMemoryRowSerializer msz(mb);
+        for (unsigned i=0;i<numsplits;i++)
+        {
+            count_t pos = ((i*2+1)*(count_t)numrows)/(2*(count_t)numsplits);
+            if (pos>=numrows)
+                pos = numrows-1;
+            const void *row = rowArray.query((unsigned)pos);
+            rowToKeySerializer->serialize(msz, (const byte *)row);
+        }
+        MemoryBuffer exp;
+        fastLZCompressToBuffer(mb, exp.length(), exp.toByteArray());
+        outbufsize = exp.length();
+        outkeybuf = exp.detach();
     }
     virtual void StartMiniSort(rowcount_t globalTotal)
     {
@@ -1050,7 +1065,7 @@ public:
         Owned<IRowStream> sortedStream;
         try
         {
-            sortedStream.setown(miniSort.sort(rowArray, globalTotal, *icompare, isstable, totalrows));
+            sortedStream.setown(miniSort.sort(rowArray, globalTotal, *rowCompare, isstable, totalrows));
         }
         catch (IException *e)
         {
@@ -1134,7 +1149,7 @@ public:
         else
         {
             Owned<IRowLinkCounter> linkcounter = new CThorRowLinkCounter;
-            merger.setown(createRowStreamMerger(readers.ordinality(), readers.getArray(), icompare, false, linkcounter));
+            merger.setown(createRowStreamMerger(readers.ordinality(), readers.getArray(), rowCompare, false, linkcounter));
         }
         ActPrintLog(activity, "Global Merger Created: %d streams", readers.ordinality());
         startmergesem.signal();
@@ -1150,9 +1165,9 @@ public:
     virtual void Gather(
         IRowInterfaces *_rowif,
         IRowStream *in,
-        ICompare *_icompare,
-        ICompare *_icollate,
-        ICompare *_icollateupper,
+        ICompare *_rowCompare,
+        ICompare *_primarySecondaryCompare,
+        ICompare *_primarySecondaryUpperCompare,
         ISortKeySerializer *_keyserializer,
         const void *_partitionrow,
         bool _nosort,
@@ -1173,10 +1188,8 @@ public:
         rowif.set(_rowif);
         rowArray.kill();
         rowArray.setup(rowif);
-        if (transferserver)
-            transferserver->setRowIF(rowif);
-        else
-            WARNLOG("SORT: transfer server not started!");
+        dbgassertex(transferserver);
+        transferserver->setRowIF(rowif);
         if (_auxrowif&&_auxrowif->queryRowMetaData())
             auxrowif.set(_auxrowif);
         else
@@ -1186,19 +1199,41 @@ public:
             auxrowif.set(_rowif);
         }
         keyserializer = _keyserializer;
-        if (!keyserializer)
+        rowCompare = _rowCompare;
+        if (keyserializer)
+        {
+            keyIf.setown(createRowInterfaces(keyserializer->queryRecordSize(), activity->queryContainer().queryId(), activity->queryCodeContext()));
+            rowToKeySerializer.setown(new CRowToKeySerializer(auxrowif, keyIf, keyserializer));
+            keyRowCompare = keyserializer->queryCompareKeyRow();
+        }
+        else
+        {
             ActPrintLog(activity, "No key serializer");
+            keyIf.set(auxrowif);
+            rowToKeySerializer.set(keyIf->queryRowSerializer());
+            keyRowCompare = rowCompare;
+        }
         nosort = _nosort;
         if (nosort)
             ActPrintLog(activity, "SORT: Gather not sorting");
         isstable = !_unstable;
         if (_unstable)
             ActPrintLog(activity, "SORT: UNSTABLE");
-        icompare = _icompare;
-        icollate = _icollate?_icollate:_icompare;
-        icollateupper = _icollateupper?_icollateupper:icollate;
 
-        Owned<IThorRowLoader> sortedloader = createThorRowLoader(*activity, rowif, nosort?NULL:icompare, isstable ? stableSort_earlyAlloc : stableSort_none, rc_allDiskOrAllMem, SPILL_PRIORITY_SELFJOIN);
+        if (_primarySecondaryCompare)
+            primarySecondaryCompare = _primarySecondaryCompare;
+        else
+            primarySecondaryCompare = keyRowCompare;
+        if (_primarySecondaryUpperCompare)
+        {
+            if (keyserializer)
+                throwUnexpected();
+            primarySecondaryUpperCompare = _primarySecondaryUpperCompare;
+        }
+        else
+            primarySecondaryUpperCompare = primarySecondaryCompare;
+
+        Owned<IThorRowLoader> sortedloader = createThorRowLoader(*activity, rowif, nosort?NULL:rowCompare, isstable ? stableSort_earlyAlloc : stableSort_none, rc_allDiskOrAllMem, SPILL_PRIORITY_SELFJOIN);
         Owned<IRowStream> overflowstream;
         memsize_t inMemUsage = 0;
         try

+ 1 - 0
thorlcr/thorutil/thmem.cpp

@@ -755,6 +755,7 @@ void CThorExpandingRowArray::transferFrom(CThorExpandingRowArray &donor)
 void CThorExpandingRowArray::transferFrom(CThorSpillableRowArray &donor)
 {
     transferFrom((CThorExpandingRowArray &)donor);
+    donor.kill();
 }
 
 void CThorExpandingRowArray::removeRows(rowidx_t start, rowidx_t n)

+ 0 - 1
thorlcr/thorutil/thmem.hpp

@@ -144,7 +144,6 @@ public:
         else
             clear();
     }
-    
 private:
     const void * ptr;
 };