Explorar el Código

Up Thor version number

Jake Smith hace 13 años
padre
commit
f514acfcb9

+ 4 - 4
roxie/roxiemem/roxiemem.cpp

@@ -1801,7 +1801,7 @@ class CChunkingRowManager : public CInterface, implements IRowManager
 public:
     IMPLEMENT_IINTERFACE;
 
-    CChunkingRowManager (unsigned _memLimit, ITimeLimiter *_tl, const IContextLogger &_logctx, const IRowAllocatorCache *_allocatorCache, bool _ignoreLeaks)
+    CChunkingRowManager(memsize_t _memLimit, ITimeLimiter *_tl, const IContextLogger &_logctx, const IRowAllocatorCache *_allocatorCache, bool _ignoreLeaks)
         : callbacks(this), logctx(_logctx), allocatorCache(_allocatorCache), hugeHeap(this, _logctx, _allocatorCache)
     {
         logctx.Link();
@@ -1816,7 +1816,7 @@ public:
             normalHeaps.append(*new CFixedChunkingHeap(this, _logctx, _allocatorCache, thisSize, 0));
             prevSize = thisSize;
         }
-        pageLimit = _memLimit / HEAP_ALIGNMENT_SIZE;
+        pageLimit = (unsigned) PAGES(_memLimit, HEAP_ALIGNMENT_SIZE);
         spillPageLimit = 0;
         timeLimit = _tl;
         peakPages = 0;
@@ -1832,7 +1832,7 @@ public:
         trackMemoryByActivity = false;
 #endif
         if (memTraceLevel >= 2)
-            logctx.CTXLOG("RoxieMemMgr: CChunkingRowManager c-tor memLimit=%u pageLimit=%u rowMgr=%p", _memLimit, pageLimit, this);
+            logctx.CTXLOG("RoxieMemMgr: CChunkingRowManager c-tor memLimit=%"I64F"u pageLimit=%u rowMgr=%p", (unsigned __int64)_memLimit, pageLimit, this);
         if (timeLimit)
         {
             cyclesChecked = get_cycles_now();
@@ -2883,7 +2883,7 @@ void DataBufferBottom::_setDestructorFlag(const void *ptr) { throwUnexpected();
 #define new new(_NORMAL_BLOCK, __FILE__, __LINE__)
 #endif
 
-extern IRowManager *createRowManager(unsigned memLimit, ITimeLimiter *tl, const IContextLogger &logctx, const IRowAllocatorCache *allocatorCache, bool ignoreLeaks)
+extern IRowManager *createRowManager(memsize_t memLimit, ITimeLimiter *tl, const IContextLogger &logctx, const IRowAllocatorCache *allocatorCache, bool ignoreLeaks)
 {
     return new CChunkingRowManager(memLimit, tl, logctx, allocatorCache, ignoreLeaks);
 }

+ 1 - 1
roxie/roxiemem/roxiemem.hpp

@@ -448,7 +448,7 @@ interface IActivityMemoryUsageMap : public IInterface
     virtual void report(const IContextLogger &logctx, const IRowAllocatorCache *allocatorCache) = 0;
 };
 
-extern roxiemem_decl IRowManager *createRowManager(unsigned memLimit, ITimeLimiter *tl, const IContextLogger &logctx, const IRowAllocatorCache *allocatorCache, bool ignoreLeaks = false);
+extern roxiemem_decl IRowManager *createRowManager(memsize_t memLimit, ITimeLimiter *tl, const IContextLogger &logctx, const IRowAllocatorCache *allocatorCache, bool ignoreLeaks = false);
 
 // Fixed size aggregated link-counted zero-overhead data Buffer manager
 

+ 16 - 11
system/jlib/jdebug.cpp

@@ -2209,10 +2209,13 @@ void getHardwareInfo(HardwareInfo &hdwInfo, const char *primDiskPath, const char
 
     ULARGE_INTEGER diskAvailStruct;
     ULARGE_INTEGER diskTotalStruct;
-    diskTotalStruct.QuadPart = 0;
-    GetDiskFreeSpaceEx(primDiskPath, &diskAvailStruct, &diskTotalStruct, 0);
-    hdwInfo.primDiskSize = (unsigned)(diskTotalStruct.QuadPart / (1024*1024*1024));  // in GB
-    hdwInfo.primFreeSize = (unsigned)(diskAvailStruct.QuadPart / (1024*1024*1024));  // in GB
+    if (primDiskPath)
+    {
+        diskTotalStruct.QuadPart = 0;
+        GetDiskFreeSpaceEx(primDiskPath, &diskAvailStruct, &diskTotalStruct, 0);
+        hdwInfo.primDiskSize = (unsigned)(diskTotalStruct.QuadPart / (1024*1024*1024));  // in GB
+        hdwInfo.primFreeSize = (unsigned)(diskAvailStruct.QuadPart / (1024*1024*1024));  // in GB
+    }
     if (secDiskPath)
     {
         diskTotalStruct.QuadPart = 0;
@@ -2224,16 +2227,18 @@ void getHardwareInfo(HardwareInfo &hdwInfo, const char *primDiskPath, const char
     // MORE: Find win32 call for NIC speed
 
 #else  // linux
-
     unsigned memUsed, memActive, memSwap, memSwapUsed; 
-    unsigned __int64 diskSize;
-    unsigned __int64 diskUsed;
-
     getMemUsage(memUsed, memActive, hdwInfo.totalMemory, memSwap, memSwapUsed);
     hdwInfo.totalMemory /= 1024; // in MB
-    getDiskUsage(primDiskPath, diskSize, diskUsed);
-    hdwInfo.primDiskSize = diskSize / (1024*1024*1024);   // in GB
-    hdwInfo.primFreeSize = (diskSize - diskUsed) / (1024*1024*1024);   // in GB
+
+    unsigned __int64 diskSize;
+    unsigned __int64 diskUsed;
+    if (primDiskPath)
+    {
+        getDiskUsage(primDiskPath, diskSize, diskUsed);
+        hdwInfo.primDiskSize = diskSize / (1024*1024*1024);   // in GB
+        hdwInfo.primFreeSize = (diskSize - diskUsed) / (1024*1024*1024);   // in GB
+    }
     if (secDiskPath)
     {
         getDiskUsage(secDiskPath, diskSize, diskUsed);

+ 1 - 1
system/jlib/jdebug.hpp

@@ -253,7 +253,7 @@ unsigned jlib_decl setAllocHook(bool on);  // bwd compat returns unsigned
  #define USE_JLIB_ALLOC_HOOK
 #endif
 
-extern jlib_decl void getHardwareInfo(HardwareInfo &hdwInfo, const char *primDiskPath, const char *secDiskPath = NULL);
+extern jlib_decl void getHardwareInfo(HardwareInfo &hdwInfo, const char *primDiskPath = NULL, const char *secDiskPath = NULL);
 extern jlib_decl void getCpuInfo(unsigned &numCPUs, unsigned &CPUSpeed);
 extern jlib_decl unsigned getAffinityCpus();
 extern jlib_decl void printProcMap(const char *fn, bool printbody, bool printsummary, StringBuffer *lnout, MemoryBuffer *mb, bool useprintf);

+ 32 - 0
system/jlib/jscm.hpp

@@ -41,6 +41,38 @@ template <class X> inline void Release(X * ptr) { if (ptr) ptr->Release(); }
 
 #define QUERYINTERFACE(ptr, TYPE)   (dynamic_cast<TYPE *>(ptr))
 
+//A simple object container/smart pointer
+template <class CLASS> class OwnedPtr
+{
+public:
+    inline OwnedPtr()                        { ptr = NULL; }
+    inline OwnedPtr(CLASS * _ptr)            { ptr = _ptr; }
+    inline ~OwnedPtr()                       { delete ptr; }
+
+    void operator = (CLASS * _ptr)
+    {
+        if (ptr)
+            delete ptr;
+        ptr = _ptr;
+    }
+    inline CLASS * operator -> () const         { return ptr; }
+    inline operator CLASS *() const             { return ptr; }
+
+    inline void clear()                         { CLASS *temp=ptr; ptr=NULL; delete temp; }
+    inline CLASS * get() const                  { return ptr; }
+    inline CLASS * getClear()                   { CLASS * temp = ptr; ptr = NULL; return temp; }
+    inline void setown(CLASS * _ptr)            { CLASS * temp = ptr; ptr = _ptr; delete temp; }
+
+private:
+    inline OwnedPtr(const OwnedPtr<CLASS> & other);
+    void operator = (const OwnedPtr<CLASS> & other);
+    void setown(const OwnedPtr<CLASS> &other);
+
+private:
+    CLASS * ptr;
+};
+
+
 //This base class implements a shared pointer based on a link count held in the object.
 //The two derived classes Owned and Linked should be used as the concrete types to construct a shared object
 //from a pointer.

+ 8 - 4
thorlcr/activities/filter/thfilterslave.cpp

@@ -279,13 +279,17 @@ public:
                 return NULL;
             }
             CThorExpandingRowArray rows(*this);
-            groupStream.setown(groupLoader->loadGroup(input, abortSoon, &rows));
+            groupLoader->loadGroup(input, abortSoon, &rows);
             if (rows.ordinality())
             {
                 // JCSMORE - if isValid would take a stream, group wouldn't need to be in mem.
-                if (!helper->isValid(rows.ordinality(), rows.getRowArray()))
-                    groupStream.clear();
-                // read next group
+                if (helper->isValid(rows.ordinality(), rows.getRowArray()))
+                {
+                    CThorSpillableRowArray spillableRows(*this);
+                    spillableRows.transferFrom(rows);
+                    groupStream.setown(spillableRows.createRowStream());
+                }
+                // else read next group
             }
             else
                 abortSoon = true; // eof

+ 9 - 1
thorlcr/activities/group/thgroupslave.cpp

@@ -101,8 +101,16 @@ public:
                         break; //always send group even when aborting
                     sentRecs++;
                     OwnedConstThorRow next2 = getNext();
-                    if (!next2 || !helper->isSameGroup(next2, next))
+                    if (!next2)
+                    {
+                        eof = true;
                         break;
+                    }
+                    else if (!helper->isSameGroup(next2, next))
+                    {
+                        next.setown(next2.getClear());
+                        break;
+                    }
                     next.setown(next2.getClear());
                 }
             }

+ 12 - 1
thorlcr/activities/loop/thloopslave.cpp

@@ -211,7 +211,12 @@ public:
         helper = (IHThorLoopArg *) queryHelper();
         flags = helper->getFlags();
     }
-
+    virtual void kill()
+    {
+        CLoopSlaveActivityBase::kill();
+        loopPending.clear();
+        curInput.clear();
+    }
 // IThorDataLink
     virtual void start()
     {
@@ -368,6 +373,11 @@ public:
         helper = (IHThorGraphLoopArg *)queryHelper();
         flags = helper->getFlags();
     }
+    virtual void kill()
+    {
+        CLoopSlaveActivityBase::kill();
+        finalResultStream.clear();
+    }
     virtual void start()
     {
         ActivityTimer s(totalCycles, timeActivities, NULL);
@@ -473,6 +483,7 @@ public:
     }
     virtual void kill()
     {
+        CSlaveActivity::kill();
         resultStream.clear();
     }
     CATCH_NEXTROW()

+ 3 - 8
thorlcr/activities/msort/thgroupsortslave.cpp

@@ -60,7 +60,7 @@ public:
         ActivityTimer s(totalCycles, timeActivities, NULL);
         dataLinkStart(activityKindStr(queryContainer().getKind()), container.queryId());
         input = inputs.item(0);
-        unsigned spillPriority = container.queryGrouped() ? 50 : 20;
+        unsigned spillPriority = container.queryGrouped() ? SPILL_PRIORITY_GROUPSORT : SPILL_PRIORITY_LARGESORT;
         iLoader.setown(createThorRowLoader(*this, queryRowInterfaces(input), iCompare, !unstable, rc_mixed, spillPriority));
         startInput(input);
         eoi = false;
@@ -93,18 +93,13 @@ public:
             }
             out.setown(iLoader->loadGroup(input, abortSoon));
             if (0 == iLoader->numRows())
-            {
                 eoi = true;
-                return NULL;
-            }
-            row.setown(out->nextRow());
-            if (!row)
-                return NULL;
+            return NULL; // eog marker
         }
         dataLinkIncrement();
         return row.getClear();
     }
-    virtual bool isGrouped() { return false; }
+    virtual bool isGrouped() { return container.queryGrouped(); }
     void getMetaInfo(ThorDataLinkMetaInfo &info)
     {
         initMetaInfo(info);

+ 13 - 9
thorlcr/activities/msort/thsortu.cpp

@@ -280,7 +280,7 @@ void swapRows(RtlDynamicRowBuilder &row1, RtlDynamicRowBuilder &row2)
 class CJoinHelper : public IJoinHelper, public CSimpleInterface
 {
     CActivityBase &activity;
-	ICompare *compareLR;
+    ICompare *compareLR;
     ICompare *compareL; 
     ICompare *compareR; 
 
@@ -356,7 +356,7 @@ public:
     CJoinHelper(CActivityBase &_activity, IHThorJoinArg *_helper, IEngineRowAllocator *_allocator)
         : activity(_activity), allocator(_allocator), denormTmp(NULL), rightgroup(_activity), denormRows(_activity)
     {
-		kind = activity.queryContainer().getKind();
+        kind = activity.queryContainer().getKind();
         helper = _helper; 
         denormCount = 0;
         outSz = 0;
@@ -575,6 +575,7 @@ public:
                             denormRows.kill();
                             break;
                         case TAKjoin:
+                        case TAKselfjoin:
                             gotsz = helper->transform(ret, defaultLeft, nextright);
                             nextR();
                             break;
@@ -623,6 +624,7 @@ public:
                         denormCount = 0;
                         break;
                     case TAKjoin:
+                    case TAKselfjoin:
                         if (!rightgroupmatched[rightidx]) 
                             gotsz = helper->transform(ret, defaultLeft, rightgroup.query(rightidx));
                         rightidx++;
@@ -645,6 +647,7 @@ public:
                         gotsz = helper->transform(ret, nextleft, NULL, 0, (const void **)NULL);
                         break;
                     case TAKjoin:
+                    case TAKselfjoin:
                         gotsz = helper->transform(ret, nextleft, defaultRight);
                         break;
                     default:
@@ -699,6 +702,7 @@ public:
                             break;
                         }
                         case TAKjoin:
+                        case TAKselfjoin:
                             gotsz = helper->transform(ret,nextleft,rightgroup.query(rightidx));
                             break;
                         default:
@@ -1383,7 +1387,7 @@ ILimitedCompareHelper *createLimitedCompareHelper()
 class CMultiCoreJoinHelperBase: extends CInterface, implements IJoinHelper, implements IMulticoreIntercept
 {
 public:
-	CActivityBase &activity;
+    CActivityBase &activity;
     IJoinHelper *jhelper;
     bool leftouter;  
     bool rightouter;  
@@ -1413,17 +1417,17 @@ public:
 
     class cWorkItem
     {
-		CActivityBase &activity;
+        CActivityBase &activity;
     public:
         CThorExpandingRowArray lgroup;
         CThorExpandingRowArray rgroup;
         const void *row;
         inline cWorkItem(CActivityBase &_activity, CThorExpandingRowArray *_lgroup, CThorExpandingRowArray *_rgroup)
-			: activity(_activity), lgroup(_activity), rgroup(_activity)
+            : activity(_activity), lgroup(_activity), rgroup(_activity)
         {
             set(_lgroup,_rgroup);
         }
-		inline cWorkItem(CActivityBase &_activity) : activity(_activity), lgroup(_activity), rgroup(_activity)
+        inline cWorkItem(CActivityBase &_activity) : activity(_activity), lgroup(_activity), rgroup(_activity)
         {
             clear();
         }
@@ -1511,7 +1515,7 @@ public:
     CMultiCoreJoinHelperBase(CActivityBase &_activity, unsigned numthreads, IJoinHelper *_jhelper, IHThorJoinArg *_helper, IEngineRowAllocator *_allocator)
         : activity(_activity), allocator(_allocator)
     {
-		kind = activity.queryContainer().getKind();
+        kind = activity.queryContainer().getKind();
         jhelper = _jhelper;
         helper = _helper;
         unsigned flags = helper->getJoinFlags();
@@ -1603,7 +1607,7 @@ class CMultiCoreJoinHelper: public CMultiCoreJoinHelperBase
         Semaphore workwait;
         SimpleInterThreadQueueOf<cOutItem,false> outqueue;
 
-		cWorker(CActivityBase &activity, CMultiCoreJoinHelper *_parent)
+        cWorker(CActivityBase &activity, CMultiCoreJoinHelper *_parent)
             : Thread("CMultiCoreJoinHelper::cWorker"), parent(_parent), work(activity)
         {
         }
@@ -1658,7 +1662,7 @@ public:
         curin = 0;
         curout = 0;
         for (unsigned i=0;i<numthreads;i++)
-			workers[i] = new cWorker(activity, this);
+            workers[i] = new cWorker(activity, this);
     }
 
     ~CMultiCoreJoinHelper()

+ 0 - 1
thorlcr/activities/msort/thsortu.hpp

@@ -22,7 +22,6 @@
 #include "slave.hpp"
 #include "jio.hpp"
 #include "thcrc.hpp"
-//#include "thmem.hpp"
 
 
 interface IHThorJoinArg;

+ 7 - 7
thorlcr/activities/nsplitter/thnsplitterslave.cpp

@@ -196,13 +196,13 @@ class NSplitterSlaveActivity : public CSlaveActivity
     class CDelayedInput : public CSimpleInterface, public CThorDataLink
     {
         Owned<CSplitterOutputBase> input;
-        NSplitterSlaveActivity &activity;
+        Linked<NSplitterSlaveActivity> activity;
         unsigned id;
 
     public:
         IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
 
-        CDelayedInput(NSplitterSlaveActivity &_activity) : CThorDataLink(&_activity), activity(_activity), id(0) { }
+        CDelayedInput(NSplitterSlaveActivity &_activity) : CThorDataLink(&_activity), activity(&_activity), id(0) { }
         void setInput(CSplitterOutputBase *_input, unsigned _id=0)
         {
             input.setown(_input);
@@ -223,19 +223,19 @@ class NSplitterSlaveActivity : public CSlaveActivity
     // IThorDataLink impl.
         virtual void start()
         {
-            activity.ensureInputsConfigured();
+            activity->ensureInputsConfigured();
             input->start();
-            dataLinkStart("SPLITTEROUTPUT", activity.queryContainer().queryId(), id);
+            dataLinkStart("SPLITTEROUTPUT", activity->queryContainer().queryId(), id);
         }
-        virtual bool isGrouped() { return activity.inputs.item(0)->isGrouped(); }
+        virtual bool isGrouped() { return activity->inputs.item(0)->isGrouped(); }
         virtual void getMetaInfo(ThorDataLinkMetaInfo &info)
         {
             initMetaInfo(info);
             if (input)
                 input->getMetaInfo(info);
             else
-                activity.inputs.item(0)->getMetaInfo(info);
-            info.canStall = !activity.spill;
+                activity->inputs.item(0)->getMetaInfo(info);
+            info.canStall = !activity->spill;
         }
     };
 public:

+ 6 - 1
thorlcr/graph/thgraph.cpp

@@ -66,7 +66,7 @@ public:
     CThorGraphResult(CActivityBase &_activity, IRowInterfaces *_rowIf, bool _local) : activity(_activity), rowIf(_rowIf), local(_local)
     {
         init();
-        rowBuffer.setown(createOverflowableBuffer(activity, rowIf, false, true));
+        rowBuffer.setown(createOverflowableBuffer(activity, rowIf, true, true));
     }
 
 // IRowWriter
@@ -444,6 +444,8 @@ IGraphTempHandler *CGraphElementBase::queryTempHandler() const
 
 void CGraphElementBase::releaseIOs()
 {
+    loopGraph.clear();
+    associatedChildGraphs.kill();
     if (activity)
         activity->releaseIOs();
     connectedInputs.kill();
@@ -1077,6 +1079,9 @@ void CGraphBase::clean()
     ::Release(startBarrier);
     ::Release(waitBarrier);
     ::Release(doneBarrier);
+    localResults.clear();
+    graphLoopResults.clear();
+    childGraphs.kill();
     disconnectActivities();
     containers.kill();
     sinks.kill();

+ 4 - 1
thorlcr/master/thactivitymaster.cpp

@@ -360,7 +360,10 @@ public:
                 throw MakeThorException(0, "Thor currently, does not support a dataset loop condition, activity id: %"ACTPF"d", queryId());
             case TAKlocalresultspill:
             case TAKlocalresultwrite:
-                ret = createLocalResultActivityMaster(this);
+                if (!queryOwner().queryOwner() || queryOwner().isGlobal()) // don't need result in master if in local child query
+                    ret = createLocalResultActivityMaster(this);
+                else
+                    ret = new CMasterActivity(this);
                 break;
             case TAKgraphloopresultwrite:
                 ret = createGraphLoopResultActivityMaster(this);

+ 0 - 3
thorlcr/master/thgraphmanager.cpp

@@ -854,9 +854,6 @@ void closeThorServerStatus()
 
 void thorMain()
 {
-    memsize_t gmemSize = globals->getPropInt("@globalMemorySize"); // in MB
-    roxiemem::setTotalMemoryLimit(gmemSize * 0x100000, 0, NULL);
-
     aborting = 0;
     unsigned multiThorMemoryThreshold = globals->getPropInt("@multiThorMemoryThreshold")*0x100000;
     try

+ 31 - 0
thorlcr/master/thmastermain.cpp

@@ -606,6 +606,37 @@ int main( int argc, char *argv[]  )
             return globals->getPropBool("@validateDAFSretCode"); // default is no recycle!
         }
         
+        HardwareInfo hdwInfo;
+        getHardwareInfo(hdwInfo);
+        globals->setPropInt("@masterTotalMem", hdwInfo.totalMemory);
+        unsigned gmemSize = globals->getPropInt("@globalMemorySize"); // in MB
+        if (0 == gmemSize)
+        {
+            unsigned maxMem = hdwInfo.totalMemory;
+#ifdef _WIN32
+            if (maxMem > 2048)
+                maxMem = 2048;
+#else
+#ifndef __64BIT__
+            if (maxMem > 4096)
+                maxMem = 4096;
+#endif
+#endif
+            gmemSize = maxMem * 3 / 4; // default to 75% of total
+            unsigned slavesPerNode = globals->getPropInt("@slavesPerNode", 1);
+            if (slavesPerNode>1)
+            {
+                PROGLOG("Sharing globalMemorySize(%d MB), between %d slave. %d MB each", gmemSize, slavesPerNode, gmemSize / slavesPerNode);
+                gmemSize /= slavesPerNode;
+            }
+            globals->setPropInt("@globalMemorySize", gmemSize);
+        }
+        else if (gmemSize >= hdwInfo.totalMemory)
+        {
+            // should prob. error here
+        }
+        roxiemem::setTotalMemoryLimit(((memsize_t)gmemSize) * 0x100000, 0, NULL);
+
         const char * overrideBaseDirectory = globals->queryProp("@thorDataDirectory");
         const char * overrideReplicateDirectory = globals->queryProp("@thorReplicateDirectory");
         StringBuffer datadir;

+ 103 - 78
thorlcr/msort/tsorta.cpp

@@ -55,8 +55,6 @@ CThorKeyArray::CThorKeyArray(
     ICompare *_irowkeycompare) : activity(_activity), keys(_activity, _rowif)
 {
     rowif.set(_rowif);
-    sizes = NULL;
-    filepos = NULL;
     clear();
     maxsamplesize = 0;
     divisor = 1;
@@ -73,37 +71,29 @@ CThorKeyArray::CThorKeyArray(
 void CThorKeyArray::clear()
 {
     keys.kill();
-    delete filepos;
-    filepos = NULL;
     totalserialsize = 0;
     serialrowsize = 0;
     totalfilesize = 0;
     filerecsize = 0;
     filerecnum = 0;
     index = 0;
-    delete sizes;
-    sizes = NULL;
-}
-
-CThorKeyArray::~CThorKeyArray()
-{
-    delete filepos;
-    delete sizes;
 }
 
 void CThorKeyArray::setSampling(size32_t _maxsamplesize, unsigned _divisor)
 {
     maxsamplesize = _maxsamplesize;
     serialrowsize = 0;
-    sizes = NULL;
+    sizes.clear();
     index = 0;
     divisor = _divisor?_divisor:1;
 }
 
 void CThorKeyArray::expandfpos()
 {
-    if (!filepos) {
-        filepos = new Int64Array;
+    if (!filepos)
+    {
+        filepos.setown(new Int64Array);
+        filepos->ensure(filerecnum);
         for (unsigned i=0;i<=filerecnum;i++)
             filepos->append(i*(offset_t)filerecsize);
         filerecsize = 0;
@@ -121,26 +111,31 @@ void CThorKeyArray::add(const void *row)
         filepos->append(totalfilesize);
     else if (filerecnum==0)
         filerecsize=recsz;
-    else if (filerecsize!=recsz) {
+    else if (filerecsize!=recsz)
+    {
         expandfpos();
         filepos->append(totalfilesize);
     }
     filerecnum++;
-    if (maxsamplesize&&(index%divisor!=(divisor-1))) {
+    if (maxsamplesize&&(index%divisor!=(divisor-1)))
+    {
         index++;
         return;
     }
     size32_t sz;
-    if (keyserializer) {
+    if (keyserializer)
+    {
         RtlDynamicRowBuilder k(keyif->queryRowAllocator());
         sz = keyserializer->recordToKey(k,row,recsz);
         row = k.finalizeRowClear(sz);
     }
-    else {
+    else
+    {
         sz = recsz;
         LinkThorRow(row);
     }
-    if (maxsamplesize) {
+    if (maxsamplesize)
+    {
         while (keys.ordinality()&&(totalserialsize+sz>maxsamplesize))
             split();
     }
@@ -148,8 +143,9 @@ void CThorKeyArray::add(const void *row)
        sizes->append(sz);
     else if (keys.ordinality()==0)
         serialrowsize = sz;
-    else if (serialrowsize!=sz) {
-        sizes = new UnsignedArray;
+    else if (serialrowsize!=sz)
+    {
+        sizes.setown(new UnsignedArray);
         for (unsigned i=0;i<keys.ordinality();i++)
             sizes->append(serialrowsize);
        sizes->append(sz);
@@ -166,7 +162,8 @@ void CThorKeyArray::serialize(MemoryBuffer &mb)
     unsigned i;
     mb.append(n);
     mb.append(serialrowsize);
-    if (sizes) {
+    if (sizes)
+    {
         assertex(n==sizes->ordinality());
         mb.append(n);
         for (i=0;i<n;i++)
@@ -199,27 +196,33 @@ void CThorKeyArray::deserialize(MemoryBuffer &mb,bool append)
     mb.read(rss);
     unsigned nsz;
     mb.read(nsz);
-    if (n&&(rss!=serialrowsize)) {
-        if (rss==0) {
-            if (nsz) {
-                sizes = new UnsignedArray;
+    if (n&&(rss!=serialrowsize))
+    {
+        if (rss==0)
+        {
+            if (nsz)
+            {
+                sizes.setown(new UnsignedArray);
                 for (i=0;i<keys.ordinality();i++)
                     sizes->append(serialrowsize);
                 serialrowsize = 0;
             }
         }
-        else {
+        else
+        {
             if (!sizes)
-                sizes = new UnsignedArray;
+                sizes.setown(new UnsignedArray);
             for (i=0;i<n;i++)
                 sizes->append(rss);
             rss = 0;
         }
     }
-    if (nsz) {
+    if (nsz)
+    {
         if (!sizes)
-            sizes = new UnsignedArray;
-        for (i=0;i<nsz;i++) {
+            sizes.setown(new UnsignedArray);
+        for (i=0;i<nsz;i++)
+        {
             unsigned s;
             mb.read(s);
             sizes->append(s);
@@ -268,18 +271,19 @@ void CThorKeyArray::sort()
         kcthis = this;
         qsortarray<unsigned>(ra,n,::keyCompare);
     }
-    if (sizes) {
-        UnsignedArray *newsizes = new UnsignedArray;
+    if (sizes)
+    {
+        OwnedPtr<UnsignedArray> newsizes(new UnsignedArray);
+        newsizes->ensure(n);
         for (i = 0; i<n; i++)
             newsizes->append(sizes->item(ra[i]));
-        delete sizes;
-        sizes = newsizes;
+        sizes.setown(newsizes.getClear());
     }
-    Int64Array *newpos = new Int64Array;
+    OwnedPtr<Int64Array> newpos(new Int64Array);
+    newpos->ensure(n);
     for (i = 0; i<n; i++)
         newpos->append(filepos?filepos->item(ra[i]):filerecsize*(offset_t)ra[i]);
-    delete filepos;
-    filepos = newpos;
+    filepos.setown(newpos.getClear());
     keys.reorder(0,n,ra);
     delete [] ra;
 }
@@ -288,12 +292,14 @@ void CThorKeyArray::sort()
 void CThorKeyArray::createSortedPartition(unsigned pn) 
 {   
     // reduces to pn-1 keys to split into pn equal parts
-    if (pn<=1) {
+    if (pn<=1)
+    {
         clear();
         return;
     }
     unsigned n = ordinality();
-    if (n<pn) {
+    if (n<pn)
+    {
         sort();
         return;
     }
@@ -304,24 +310,28 @@ void CThorKeyArray::createSortedPartition(unsigned pn)
     for (i = 0; i<n; i++)
         ra[i] = i;
     qsortarray<unsigned>(ra, n, ::keyCompare);
-    if (sizes) {
-        UnsignedArray *newsizes = new UnsignedArray;
+    if (sizes)
+    {
+        OwnedPtr<UnsignedArray> newsizes(new UnsignedArray);
+        newsizes->ensure(pn);
         for (i = 1; i<pn; i++) {
             unsigned p = i*n/pn;
             newsizes->append(sizes->item(ra[p]));
         }
-        delete sizes;
-        sizes = newsizes;
+        sizes.setown(newsizes.getClear());
     }
-    Int64Array *newpos = new Int64Array;
-    for (i = 0; i<pn; i++) {
+    OwnedPtr<Int64Array> newpos(new Int64Array);
+    newpos->ensure(pn);
+    for (i = 0; i<pn; i++)
+    {
         unsigned p = i*n/pn;
         newpos->append(filepos?filepos->item(ra[p]):filerecsize*(offset_t)ra[p]);
     }
-    delete filepos;
-    filepos = newpos;
+    filepos.setown(newpos.getClear());
     CThorExpandingRowArray newrows(activity);
-    for (i = 1; i<pn; i++) {
+    newrows.ensure(pn);
+    for (i = 1; i<pn; i++)
+    {
         unsigned p = i*n/pn;
         newrows.append(keys.get(ra[p]));
     }
@@ -339,13 +349,16 @@ int CThorKeyArray::binchopPartition(const void * row,bool lt)
 #ifdef _TESTING
 try {
 #endif
-    while (a<b) {
+    while (a<b)
+    {
         unsigned m = (a+b)/2;
         cmp = keyRowCompare((unsigned)m,row);
         if (cmp>0) 
             b = m;
-        else {
-            if (cmp==0) {
+        else
+        {
+            if (cmp==0)
+            {
 #ifdef _TESTING
                 a = m;
                 while ((a<n)&&(keyCompare(m,a)==0))
@@ -367,27 +380,31 @@ try {
         }
     }
 #ifdef _TESTING
-    if (lt) {
+    if (lt)
+    {
         if (a<n) 
             assertex(keyRowCompare((unsigned)a,row)>=0);
         if (a>0)
             assertex(keyRowCompare((unsigned)a-1,row)<0);
     }
-    else {
+    else
+    {
         if (a<n) 
             assertex(keyRowCompare((unsigned)a,row)>0);
         if (a>0)
             assertex(keyRowCompare((unsigned)a-1,row)<=0);
     }
 }
-catch (IException *e) {
+catch (IException *e)
+{
     EXCLOG(e,"binchopPartition");
     StringBuffer s("row: ");
     unsigned i;
     for (i=0;i<10;i++)
         s.appendf(" %02x",(int)*((const byte *)row+i));
     PROGLOG("%s",s.str());
-    for (i=0;i<(unsigned)n;i++) {
+    for (i=0;i<(unsigned)n;i++)
+    {
         s.clear().appendf("k%d:",i);
         const byte *k=(const byte *)queryKey(i);
         for (unsigned j=0;j<10;j++) 
@@ -427,19 +444,21 @@ void CThorKeyArray::calcPositions(IFile *file,CThorKeyArray &sample)
 {
     // calculates positions based on sample
     // not fast!
-    delete filepos;
-    filepos = new Int64Array;
+    filepos.setown(new Int64Array);
+    filepos->ensure(ordinality());
     for (unsigned i0=0;i0<ordinality();i0++)
         filepos->append(-1);
     filerecsize = 0;
-    ForEachItemIn(i,*filepos) {
+    ForEachItemIn(i,*filepos)
+    {
         OwnedConstThorRow row = getRow(i);
         offset_t pos = sample.findLessRowPos(row);
         if (pos==(offset_t)-1) 
             pos = 0;
         // should do bin-chop for fixed length but initially do sequential search
         Owned<IRowStream> s = createRowStream(file,rowif,pos,(offset_t)-1,RCUNBOUND,false,false);
-        loop {
+        loop
+        {
             OwnedConstThorRow rowcmp = s->nextRow();
             if (!rowcmp)
                 break;
@@ -490,30 +509,38 @@ void CThorKeyArray::split()
     // not that fast!
     unsigned n = ordinality();
     CThorExpandingRowArray newkeys(activity, rowif);
-    UnsignedArray *newsizes = sizes?new UnsignedArray:NULL;
-    Int64Array *newfilepos = filepos?new Int64Array:NULL;
+    newkeys.ensure(n);
+    OwnedPtr<UnsignedArray> newSizes;
+    OwnedPtr<Int64Array> newFilePos;
+    if (sizes)
+    {
+        newSizes.setown(new UnsignedArray);
+        newSizes->ensure(n);
+    }
+    if (filepos)
+    {
+        newFilePos.setown(new Int64Array);
+        newFilePos->ensure(n);
+    }
     unsigned newss = 0;
-    for (unsigned i=0;i<n;i+=2) {
+    for (unsigned i=0;i<n;i+=2)
+    {
         const void *k = keys.query(i);
         LinkThorRow(k);
         newkeys.append(k);
         size32_t sz = sizes?sizes->item(i):serialrowsize;
-        if (newsizes)
-            newsizes->append(sz);
+        if (newSizes)
+            newSizes->append(sz);
         newss += sz;
-        if (newfilepos)
-            newfilepos->append(filepos->item(i));
+        if (newFilePos)
+            newFilePos->append(filepos->item(i));
     }
     keys.swap(newkeys);
-    if (newsizes) {
-        delete sizes;
-        sizes = newsizes;
-    }
+    if (newSizes)
+        sizes.setown(newSizes.getClear());
     totalserialsize = newss;
-    if (newfilepos) {
-        delete filepos;
-        filepos = newfilepos;
-    }
+    if (newFilePos)
+        filepos.setown(newFilePos.getClear());
 }
 
 offset_t CThorKeyArray::getFilePos(unsigned idx)
@@ -536,5 +563,3 @@ void CThorKeyArray::traceKey(const char *prefix,unsigned idx)
     IOutputRowSerializer *serializer = keyserializer?keyif->queryRowSerializer():rowif->queryRowSerializer();
     ::traceKey(serializer,s.str(),queryKey(idx));
 }
-
-        

+ 2 - 3
thorlcr/msort/tsorta.hpp

@@ -73,8 +73,8 @@ class CThorKeyArray
     ICompare *icompare;
     ICompare *ikeycompare;
     ICompare *irowkeycompare;
-    UnsignedArray *sizes;       // serial sizes (needed if keysize==0)
-    Int64Array *filepos;         
+    OwnedPtr<UnsignedArray> sizes;       // serial sizes (needed if keysize==0)
+    OwnedPtr<Int64Array> filepos;
     size32_t filerecsize;
     size32_t filerecnum;
     offset_t totalfilesize;
@@ -98,7 +98,6 @@ public:
         ICompare *_icompare,
         ICompare *_ikeycompare,
         ICompare *_irowkeycompare); 
-    ~CThorKeyArray();
     void clear();
     void add(const void *row);
     unsigned ordinality() { return keys.ordinality(); }

+ 2 - 2
thorlcr/msort/tsortm.cpp

@@ -805,8 +805,8 @@ public:
         unsigned i;
         unsigned j;
         for(i=0;i<numsplits;i++) {
-            emin.append(mink.getClear());
-            emax.append(maxk.getClear());
+            emin.append(mink.getLink());
+            emax.append(maxk.getLink());
         }
         UnsignedArray amid;
         unsigned iter=0;

+ 11 - 9
thorlcr/msort/tsorts.cpp

@@ -72,7 +72,7 @@ class CWriteIntercept : public CSimpleInterface
     Owned<IFileIO> dataFileIO, idxFileIO;
     Owned<ISerialStream> dataFileStream;
     Linked<IFileIOStream> idxFileStream;
-    CThorStreamDeserializerSource dataFileDeserialzierSource;
+    CThorStreamDeserializerSource dataFileDeserializerSource;
     unsigned interval;
     unsigned idx;
     CThorExpandingRowArray sampleRows;
@@ -99,14 +99,14 @@ class CWriteIntercept : public CSimpleInterface
                 StringBuffer tempname;
                 GetTempName(tempname.clear(),"srtidx",false);
                 idxFile.setown(createIFile(tempname.str()));
-                idxFileIO.setown(idxFile->open(IFOcreate));
+                idxFileIO.setown(idxFile->open(IFOcreaterw));
                 idxFileStream.setown(idxFileIO?createBufferedIOStream(idxFileIO,0x100000):NULL);
                 if (idxFileStream.get()==NULL)
                 {
                     StringBuffer err;
                     err.append("Cannot create ").append(idxFile->queryFilename());
                     LOG(MCerror, thorJob, "%s", err.str());
-                    throw MakeStringException(-1, "%s", err.str());
+                    throw MakeActivityException(&activity, -1, "%s", err.str());
                 }
                 offset_t s = 0;
                 while (s<=lastofs)
@@ -199,8 +199,8 @@ public:
                 break;
             ret++;
 
-            offset_t start = output->getPosition();
             OwnedConstThorRow row = _row;
+            offset_t start = output->getPosition();
             output->putRow(row.getLink());
             idx++;
             if (idx==interval)
@@ -225,7 +225,10 @@ public:
         offset_t end = output->getPosition();
         writeidxofs(end);
         if (idxFileStream)
+        {
             idxFileStream->flush();
+            idxFileStream.clear();
+        }
         output.clear();
         if (overflowed)
             WARNLOG("Overflowed by %"I64F"d", overflowsize);
@@ -240,7 +243,7 @@ public:
     {
         dataFileStream.clear();
         dataFileIO.clear();
-        dataFileDeserialzierSource.setStream(NULL);
+        dataFileDeserializerSource.setStream(NULL);
         idxFileIO.clear();
     }
     size32_t readOverflowPos(rowmap_t pos, unsigned n, offset_t *ofs, bool closeIO)
@@ -259,12 +262,11 @@ public:
         {
             dataFileIO.setown(dataFile->open(IFOread));
             dataFileStream.setown(createFileSerialStream(dataFileIO));
-            dataFileDeserialzierSource.setStream(dataFileStream);
+            dataFileDeserializerSource.setStream(dataFileStream);
         }
-        dataFileStream->reset(ofs[0], (offset_t)-1);
+        dataFileStream->reset(ofs[0], idxSz);
         RtlDynamicRowBuilder rowBuilder(rowIf->queryRowAllocator());
-        size32_t sz = rowIf->queryRowDeserializer()->deserialize(rowBuilder, dataFileDeserialzierSource);
-        assertex(sz == idxSz);
+        size32_t sz = rowIf->queryRowDeserializer()->deserialize(rowBuilder, dataFileDeserializerSource);
         return rowBuilder.finalizeRowClear(sz);
     }
     void transferRows(CThorExpandingRowArray &rows)

+ 1 - 1
thorlcr/shared/thor.hpp

@@ -20,7 +20,7 @@
 #define __THOR__
 
 #define THOR_VERSION_MAJOR 4
-#define THOR_VERSION_MINOR 0
+#define THOR_VERSION_MINOR 1
 
 typedef unsigned activity_id;
 typedef unsigned graph_id;

+ 11 - 2
thorlcr/slave/slavmain.cpp

@@ -536,8 +536,17 @@ public:
 
 void slaveMain()
 {
-    memsize_t gmemSize = globals->getPropInt("@globalMemorySize"); // in MB
-    roxiemem::setTotalMemoryLimit(gmemSize * 0x100000, 0, NULL);
+    unsigned masterMemMB = globals->getPropInt("@masterTotalMem");
+    HardwareInfo hdwInfo;
+    getHardwareInfo(hdwInfo);
+    if (hdwInfo.totalMemory < masterMemMB)
+        WARNLOG("Slave has less memory than master node"); // JCSMORE, error?
+    unsigned gmemSize = globals->getPropInt("@globalMemorySize");
+    if (gmemSize >= hdwInfo.totalMemory)
+    {
+        // should prob. error here
+    }
+    roxiemem::setTotalMemoryLimit(((memsize_t)gmemSize) * 0x100000, 0, NULL);
 
     CJobListener jobListener;
     CThorResourceSlave slaveResource;

+ 1 - 3
thorlcr/thorutil/thalloc.cpp

@@ -74,9 +74,7 @@ size32_t thorRowMemoryFootprint(IOutputRowSerializer *serializer, const void *ro
 {
     if (!row)
         return 0;
-    // JCSMORE
-    if (!serializer)
-        return 100;
+    assertex(serializer);
     CSizingSerializer ssz;
     serializer->serialize(ssz, (const byte *)row);
     return ssz.size();

+ 21 - 24
thorlcr/thorutil/thmem.cpp

@@ -214,6 +214,7 @@ protected:
 
     bool spillRows()
     {
+        // NB: Should always be called whilst 'rows' is locked (with CThorSpillableRowArrayLock)
         rowcount_t numRows = rows.numCommitted();
         if (0 == numRows)
             return false;
@@ -381,7 +382,6 @@ public:
             const void **toRead = rows.getBlock(fetch);
             memcpy(readRows, toRead, fetch * sizeof(void *));
             rows.noteSpilled(fetch);
-            rows.flush();
             numReadRows = fetch;
             pos = 0;
         }
@@ -588,7 +588,7 @@ void CThorExpandingRowArray::transferFrom(CThorSpillableRowArray &donor)
 void CThorExpandingRowArray::removeRows(rowcount_t start, rowcount_t n)
 {
     assertex(numRows-start >= n);
-    assertex(!n || !rows);
+    assertex(!n || rows);
     if (rows)
     {
         for (rowcount_t i = start; i < n; i++)
@@ -1036,15 +1036,6 @@ void CThorSpillableRowArray::flush()
     commitRows = numRows;
 }
 
-void CThorSpillableRowArray::transferRows(rowcount_t & outNumRows, const void * * & outRows)
-{
-    assertex(firstRow == 0);  // could allow that to be transferred as well
-    CThorSpillableRowArrayLock block(*this);
-    CThorExpandingRowArray::transferRows(outNumRows, outRows);
-    //firstRows = 0;
-    commitRows = 0;
-}
-
 void CThorSpillableRowArray::transferFrom(CThorExpandingRowArray &src)
 {
     CThorSpillableRowArrayLock block(*this);
@@ -1052,11 +1043,6 @@ void CThorSpillableRowArray::transferFrom(CThorExpandingRowArray &src)
     commitRows = numRows;
 }
 
-void CThorSpillableRowArray::transferFrom(CThorSpillableRowArray &donor)
-{
-	transferFrom((CThorExpandingRowArray &)donor);
-}
-
 void CThorSpillableRowArray::swap(CThorSpillableRowArray &other)
 {
     CThorSpillableRowArrayLock block(*this);
@@ -1235,7 +1221,7 @@ protected:
                 totalRows += spillableRows.numCommitted();
                 if (iCompare && (1 == outStreams))
                     spillableRows.sort(*iCompare, maxCores);
-                if (rc_allDiskOrAllMem == diskMemMix)
+                if (rc_allDiskOrAllMem == diskMemMix || (NULL!=allMemRows && (rc_allMem == diskMemMix)))
                 {
                     assertex(allMemRows);
                     assertex(1 == outStreams);
@@ -1312,6 +1298,7 @@ public:
     void transferRowsOut(CThorExpandingRowArray &out, bool sort)
     {
         CThorSpillableRowArray::CThorSpillableRowArrayLock block(spillableRows);
+        spillableRows.flush();
         totalRows += spillableRows.numCommitted();
         if (sort && iCompare)
             spillableRows.sort(*iCompare, maxCores);
@@ -1695,9 +1682,10 @@ public:
 // IRowAllocatorCache
     virtual unsigned getActivityId(unsigned cacheId) const
     {
+        unsigned allocatorIndex = (cacheId & ALLOCATORID_MASK);
         SpinBlock b(allAllocatorsLock);
-        if (allAllocators.isItem(cacheId))
-            return allAllocators.item(cacheId).queryActivityId();
+        if (allAllocators.isItem(allocatorIndex))
+            return allAllocators.item(allocatorIndex).queryActivityId();
         else
         {
             //assert(false);
@@ -1706,9 +1694,10 @@ public:
     }
     virtual StringBuffer &getActivityDescriptor(unsigned cacheId, StringBuffer &out) const
     {
+        unsigned allocatorIndex = (cacheId & ALLOCATORID_MASK);
         SpinBlock b(allAllocatorsLock);
-        if (allAllocators.isItem(cacheId))
-            return allAllocators.item(cacheId).getId(out);
+        if (allAllocators.isItem(allocatorIndex))
+            return allAllocators.item(allocatorIndex).getId(out);
         else
         {
             assert(false);
@@ -1718,21 +1707,29 @@ public:
     virtual void onDestroy(unsigned cacheId, void *row) const
     {
         IEngineRowAllocator *allocator;
+        unsigned allocatorIndex = (cacheId & ALLOCATORID_MASK);
         {
             SpinBlock b(allAllocatorsLock); // just protect the access to the array - don't keep locked for the call of destruct or may deadlock
-            if (allAllocators.isItem(cacheId))
-                allocator = &allAllocators.item(cacheId);
+            if (allAllocators.isItem(allocatorIndex))
+                allocator = &allAllocators.item(allocatorIndex);
             else
             {
                 assert(false);
                 return;
             }
         }
+        if (!RoxieRowCheckValid(cacheId, row))
+        {
+            //MORE: Give an error, but don't throw an exception!
+        }
         allocator->queryOutputMeta()->destruct((byte *) row);
     }
     virtual void checkValid(unsigned cacheId, const void *row) const
     {
-        // JCSMORE
+        if (!RoxieRowCheckValid(cacheId, row))
+        {
+            //MORE: Throw an exception?
+        }
     }
 // IRtlRowCallback
     virtual void releaseRow(const void * row) const

+ 4 - 6
thorlcr/thorutil/thmem.hpp

@@ -249,7 +249,7 @@ public:
     CThorExpandingRowArray(CActivityBase &activity, bool allowNulls=false, bool stableSort=false, bool throwOnOom=true, rowcount_t initialSize=InitialSortElements);
     CThorExpandingRowArray(CActivityBase &activity, IRowInterfaces *rowIf, bool allowNulls=false, bool stableSort=false, bool throwOnOom=true, rowcount_t initialSize=InitialSortElements);
     ~CThorExpandingRowArray();
-	CActivityBase &queryActivity() { return activity; }
+    CActivityBase &queryActivity() { return activity; }
     // NB: throws error on OOM by default
     void setup(IRowInterfaces *rowIf, bool allowNulls=false, bool stableSort=false, bool throwOnOom=true);
     inline void setAllowNulls(bool b) { allowNulls = b; }
@@ -312,8 +312,8 @@ public:
         swap(from);
     }
     void transferRows(rowcount_t & outNumRows, const void * * & outRows);
-	void transferFrom(CThorExpandingRowArray &src); 
-	void transferFrom(CThorSpillableRowArray &src);
+    void transferFrom(CThorExpandingRowArray &src); 
+    void transferFrom(CThorSpillableRowArray &src);
     void removeRows(rowcount_t start, rowcount_t n);
     void clearUnused();
     void sort(ICompare &compare, unsigned maxCores);
@@ -379,7 +379,6 @@ public:
     inline void setAllowNulls(bool b) { CThorExpandingRowArray::setAllowNulls(b); }
     void kill();
     void clearRows();
-    void transferRows(rowcount_t & outNumRows, const void * * & outRows);
     void flush();
     inline bool append(const void *row)
     {
@@ -441,8 +440,7 @@ public:
         kill();
         swap(from);
     }
-	void transferFrom(CThorExpandingRowArray &src); 
-    void transferFrom(CThorSpillableRowArray &src);
+    void transferFrom(CThorExpandingRowArray &src); 
 
     IRowStream *createRowStream();