Browse Source

Merge branch 'candidate-6.0.4'

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 9 years ago
parent
commit
58b7311f18

+ 1 - 1
cmake_modules/dependencies/el5.cmake

@@ -1 +1 @@
-SET_DEPENDENCIES ( CPACK_RPM_PACKAGE_REQUIRES m4 libtool gcc-c++ openssh-server openssh-clients expect rsync zip )
+SET_DEPENDENCIES ( CPACK_RPM_PACKAGE_REQUIRES m4 libtool gcc-c++ openssh-server openssh-clients expect rsync zip psmisc )

+ 1 - 1
cmake_modules/dependencies/el6.cmake

@@ -1 +1 @@
-SET_DEPENDENCIES ( CPACK_RPM_PACKAGE_REQUIRES m4 libtool gcc-c++ openssh-server openssh-clients expect rsync zip )
+SET_DEPENDENCIES ( CPACK_RPM_PACKAGE_REQUIRES m4 libtool gcc-c++ openssh-server openssh-clients expect rsync zip psmisc )

+ 1 - 1
cmake_modules/dependencies/el7.cmake

@@ -1 +1 @@
-SET_DEPENDENCIES ( CPACK_RPM_PACKAGE_REQUIRES m4 libtool gcc-c++ openssh-server openssh-clients expect rsync zip )
+SET_DEPENDENCIES ( CPACK_RPM_PACKAGE_REQUIRES m4 libtool gcc-c++ openssh-server openssh-clients expect rsync zip psmisc )

+ 1 - 1
cmake_modules/dependencies/precise.cmake

@@ -1 +1 @@
-SET_DEPENDENCIES ( CPACK_DEBIAN_PACKAGE_DEPENDS g++ openssh-client openssh-server expect rsync  zip python )
+SET_DEPENDENCIES ( CPACK_DEBIAN_PACKAGE_DEPENDS g++ openssh-client openssh-server expect rsync  zip python psmisc )

+ 1 - 1
cmake_modules/dependencies/quantal.cmake

@@ -1 +1 @@
-SET_DEPENDENCIES ( CPACK_DEBIAN_PACKAGE_DEPENDS g++ openssh-client openssh-server expect rsync  zip python )
+SET_DEPENDENCIES ( CPACK_DEBIAN_PACKAGE_DEPENDS g++ openssh-client openssh-server expect rsync  zip python psmisc )

+ 1 - 1
cmake_modules/dependencies/raring.cmake

@@ -1 +1 @@
-SET_DEPENDENCIES ( CPACK_DEBIAN_PACKAGE_DEPENDS g++ openssh-client openssh-server expect rsync  zip python )
+SET_DEPENDENCIES ( CPACK_DEBIAN_PACKAGE_DEPENDS g++ openssh-client openssh-server expect rsync  zip python psmisc )

+ 1 - 1
cmake_modules/dependencies/saucy.cmake

@@ -1 +1 @@
-SET_DEPENDENCIES ( CPACK_DEBIAN_PACKAGE_DEPENDS g++ openssh-client openssh-server expect rsync  zip python )
+SET_DEPENDENCIES ( CPACK_DEBIAN_PACKAGE_DEPENDS g++ openssh-client openssh-server expect rsync  zip python psmisc )

+ 1 - 1
cmake_modules/dependencies/suse11.4.cmake

@@ -1 +1 @@
-SET_DEPENDENCIES ( CPACK_RPM_PACKAGE_REQUIRES binutils gcc-c++ openssh libldap-2_4-2 libicu libboost_regex1_44_0 libxerces-c-3_0 libxalan-c110 expect libarchive2 rsync apr apr-util zip libnuma1 )
+SET_DEPENDENCIES ( CPACK_RPM_PACKAGE_REQUIRES binutils gcc-c++ openssh libldap-2_4-2 libicu libboost_regex1_44_0 libxerces-c-3_0 libxalan-c110 expect libarchive2 rsync apr apr-util zip libnuma1 psmisc )

+ 1 - 1
cmake_modules/dependencies/trusty.cmake

@@ -1 +1 @@
-SET_DEPENDENCIES ( CPACK_DEBIAN_PACKAGE_DEPENDS g++ openssh-client openssh-server expect rsync  zip python )
+SET_DEPENDENCIES ( CPACK_DEBIAN_PACKAGE_DEPENDS g++ openssh-client openssh-server expect rsync  zip python psmisc )

+ 1 - 1
cmake_modules/dependencies/utopic.cmake

@@ -1 +1 @@
-SET_DEPENDENCIES ( CPACK_DEBIAN_PACKAGE_DEPENDS g++ openssh-client openssh-server expect rsync  zip python )
+SET_DEPENDENCIES ( CPACK_DEBIAN_PACKAGE_DEPENDS g++ openssh-client openssh-server expect rsync  zip python psmisc )

+ 1 - 1
cmake_modules/dependencies/vivid.cmake

@@ -1 +1 @@
-SET_DEPENDENCIES ( CPACK_DEBIAN_PACKAGE_DEPENDS g++ openssh-client openssh-server expect rsync  zip python )
+SET_DEPENDENCIES ( CPACK_DEBIAN_PACKAGE_DEPENDS g++ openssh-client openssh-server expect rsync  zip python psmisc )

+ 1 - 1
cmake_modules/dependencies/wily.cmake

@@ -1 +1 @@
-SET_DEPENDENCIES ( CPACK_DEBIAN_PACKAGE_DEPENDS g++ openssh-client openssh-server expect rsync libapr1 python )
+SET_DEPENDENCIES ( CPACK_DEBIAN_PACKAGE_DEPENDS g++ openssh-client openssh-server expect rsync libapr1 python psmisc )

+ 1 - 1
cmake_modules/dependencies/xenial.cmake

@@ -1 +1 @@
-SET_DEPENDENCIES ( CPACK_DEBIAN_PACKAGE_DEPENDS g++ openssh-client openssh-server expect rsync libapr1 python )
+SET_DEPENDENCIES ( CPACK_DEBIAN_PACKAGE_DEPENDS g++ openssh-client openssh-server expect rsync libapr1 python psmisc )

+ 14 - 3
esp/services/ws_workunits/ws_workunitsService.cpp

@@ -4392,19 +4392,30 @@ void CWsWorkunitsEx::createZAPECLQueryArchiveFiles(Owned<IConstWorkUnit>& cwu, c
     ForEach(*iter)
     {
         IConstWUAssociatedFile & cur = iter->query();
-        SCMStringBuffer ssb;
+        SCMStringBuffer ssb, ip;
         cur.getDescription(ssb);
         if (!strieq(ssb.str(), "archive"))
             continue;
 
         cur.getName(ssb);
-        if (!ssb.length())
+        cur.getIp(ip);
+        if (!ssb.length() || !ip.length())
             continue;
 
         StringBuffer fileName, archiveContents;
         try
         {
-            archiveContents.loadFile(ssb.str());
+            SocketEndpoint ep(ip.str());
+            RemoteFilename rfn;
+            rfn.setRemotePath(ssb.str());
+            rfn.setIp(ep);
+            Owned<IFile> rFile = createIFile(rfn);
+            if (!rFile)
+            {
+                DBGLOG("Cannot open %s on %s", ssb.str(), ip.str());
+                continue;
+            }
+            archiveContents.loadFile(rFile);
         }
         catch (IException *e)
         {

+ 11 - 2
plugins/cassandra/cassandrawu.cpp

@@ -2096,7 +2096,7 @@ static void lockWuid(Owned<IRemoteConnection> &connection, const char *wuid)
 {
     VStringBuffer wuRoot("/WorkUnitLocks/%s", wuid);
     if (connection)
-        connection->changeMode(RTM_LOCK_WRITE|RTM_CREATE_QUERY, SDS_LOCK_TIMEOUT); // Would it ever be anything else?
+        connection->changeMode(RTM_LOCK_WRITE, SDS_LOCK_TIMEOUT); // Would it ever be anything else?
     else
         connection.setown(querySDS().connect(wuRoot.str(), myProcessSession(), RTM_LOCK_WRITE|RTM_CREATE_QUERY, SDS_LOCK_TIMEOUT));
     if (!connection)
@@ -2121,7 +2121,7 @@ public:
 
     virtual void forceReload()
     {
-        synchronized sync(locked); // protect locked workunits (uncommited writes) from reload
+        synchronized sync(locked); // protect locked workunits (uncommitted writes) from reload
         loadPTree(sessionCache->cassandraToWorkunitXML(queryWuid()));
         memset(childLoaded, 0, sizeof(childLoaded));
         allDirty = false;
@@ -2655,6 +2655,15 @@ public:
         CPersistedWorkUnit::clearExceptions();
     }
 
+    virtual IPropertyTree *getUnpackedTree(bool includeProgress) const
+    {
+        // If anyone wants the whole ptree, we'd better make sure we have fully loaded it...
+        CriticalBlock b(crit);
+        for (const ChildTableInfo * const * table = childTables; *table != NULL; table++)
+            checkChildLoaded(**table);
+        return CPersistedWorkUnit::getUnpackedTree(includeProgress);
+    }
+
     virtual IPropertyTree *queryPTree() const
     {
         // If anyone wants the whole ptree, we'd better make sure we have fully loaded it...

+ 42 - 4
roxie/roxiemem/roxiemem.cpp

@@ -1289,6 +1289,8 @@ public:
 
     inline void addToSpaceList();
     virtual void verifySpaceList();
+
+    inline bool isWithinHeap(CHeap * search) const { return heap == search; }
 };
 
 
@@ -3159,6 +3161,19 @@ class BufferedRowCallbackManager
             return numSuccess;
         }
 
+        void report(const IContextLogger &logctx) const
+        {
+            StringBuffer msg;
+            msg.appendf(" ac(%u) cost(%u):", activityId, cost);
+            ForEachItemIn(i, callbacks)
+            {
+                if (i == nextCallback)
+                    msg.append(" {").append(callbacks.item(i).first).append("}");
+                else
+                    msg.append(" ").append(callbacks.item(i).first);
+            }
+            logctx.CTXLOG("%s", msg.str());
+        }
         inline unsigned getSpillCost() const { return cost; }
         inline unsigned getActivityId() const { return activityId; }
 
@@ -3279,6 +3294,14 @@ public:
         return releaseBuffersThread->releaseBuffers(slaveId, maxSpillCost, critical);
     }
 
+    void reportActive(const IContextLogger &logctx) const
+    {
+        logctx.CTXLOG("--Active callbacks--");
+        CriticalBlock block(callbackCrit);
+        ForEachItemIn(i, rowBufferCallbacks)
+            rowBufferCallbacks.item(i).report(logctx);
+    }
+
     void runReleaseBufferThread()
     {
         loop
@@ -3408,7 +3431,7 @@ protected:
     }
 
 protected:
-    CriticalSection callbackCrit;
+    mutable CriticalSection callbackCrit;
     Semaphore releaseBuffersSem;
     CIArrayOf<CallbackItem> rowBufferCallbacks;
     PointerArrayOf<IBufferedRowCallback> activeCallbacks;
@@ -3497,7 +3520,6 @@ private:
     CHugeHeap hugeHeap;
     ITimeLimiter *timeLimit;
     DataBuffer *activeBuffs;
-    const IContextLogger &logctx;
     unsigned peakPages;
     unsigned dataBuffs;
     unsigned dataBuffPages;
@@ -3516,6 +3538,7 @@ private:
     bool minimizeFootprintCritical;
 
 protected:
+    const IContextLogger &logctx;
     unsigned maxPageLimit;
     unsigned spillPageLimit;
 
@@ -4072,7 +4095,7 @@ public:
 
             //Try and directly free up some buffers.  It is worth trying again if one of the release functions thinks it
             //freed up some memory.
-            //The following reduces the nubmer of times the callback is called, but I'm not sure how this affects
+            //The following reduces the number of times the callback is called, but I'm not sure how this affects
             //performance.  I think better if a single free is likely to free up some memory, and worse if not.
             const bool skipReleaseIfAnotherThreadReleases = true;
             if (!releaseCallbackMemory(maxSpillCost, true, skipReleaseIfAnotherThreadReleases, lastReleaseSeq))
@@ -4353,6 +4376,7 @@ public:
     virtual void setMinimizeFootprint(bool value, bool critical) { throwUnexpected(); }
     virtual void setReleaseWhenModifyCallback(bool value, bool critical) { throwUnexpected(); }
     virtual unsigned querySlaveId() const { return slaveId; }
+    virtual void reportMemoryUsage(bool peak) const;
 
 protected:
     virtual unsigned getPageLimit() const;
@@ -4432,6 +4456,12 @@ public:
 
     virtual unsigned querySlaveId() const { return 0; }
 
+    virtual void reportMemoryUsage(bool peak) const
+    {
+        CChunkingRowManager::reportMemoryUsage(peak);
+        callbacks.reportActive(logctx);
+    }
+
 protected:
     virtual void addRowBuffer(IBufferedRowCallback * callback)
     {
@@ -4565,6 +4595,11 @@ unsigned CSlaveRowManager::getPageLimit() const
     return globalManager->getSlavePageLimit(slaveId);
 }
 
+void CSlaveRowManager::reportMemoryUsage(bool peak) const
+{
+    CChunkingRowManager::reportMemoryUsage(peak);
+    globalManager->reportMemoryUsage(peak);
+}
 
 //================================================================================
 
@@ -4678,6 +4713,8 @@ void CHugeHeap::expandHeap(void * original, memsize_t copysize, memsize_t oldcap
     unsigned newPages = PAGES(newsize + HugeHeaplet::dataOffset(), HEAP_ALIGNMENT_SIZE);
     unsigned oldPages = PAGES(oldcapacity + HugeHeaplet::dataOffset(), HEAP_ALIGNMENT_SIZE);
     void *oldbase =  (void *) ((memsize_t) original & HEAP_ALIGNMENT_MASK);
+    HugeHeaplet * oldHeaplet = (HugeHeaplet *)oldbase;
+    assertex(oldHeaplet->isWithinHeap(this));
 
     //Check if we are shrinking the number of pages.
     if (newPages <= oldPages)
@@ -4893,7 +4930,8 @@ const void * CChunkedHeap::compactRow(const void * ptr, HeapCompactState & state
                 }
                 return ret;
             }
-            dbgassertex((chunkedFinger->numChunks() == maxChunksPerPage()) || (chunkedFinger->numChunks() == 0));
+
+            //heaplet was either empty or full (it may no longer be full if another thread has freed a row)
             finger = getNext(finger);
 
             //Check if we have looped all the way around

+ 22 - 18
thorlcr/activities/lookupjoin/thlookupjoinslave.cpp

@@ -1695,6 +1695,7 @@ protected:
     using PARENT::gatheredRHSNodeStreams;
     using PARENT::queryInput;
     using PARENT::rhsRowLock;
+    using PARENT::hasStarted;
 
     IHash *leftHash, *rightHash;
     ICompare *compareRight, *compareLeftRight;
@@ -2665,29 +2666,32 @@ public:
     }
     virtual void stop() override
     {
-        if (isGlobal())
+        if (hasStarted())
         {
-            if (gotRHS)
+            if (isGlobal())
             {
-                // Other channels sharing HT. So do not reset until all here
-                if (!hasFailedOverToLocal() && queryJob().queryJobChannels()>1)
-                    InterChannelBarrier();
+                if (gotRHS)
+                {
+                    // Other channels sharing HT. So do not reset until all here
+                    if (!hasFailedOverToLocal() && queryJob().queryJobChannels()>1)
+                        InterChannelBarrier();
+                }
+                else
+                    getRHS(true); // If global, need to handle RHS until all are slaves stop
             }
-            else
-                getRHS(true); // If global, need to handle RHS until all are slaves stop
-        }
 
-        if (rhsDistributor)
-        {
-            rhsDistributor->disconnect(true);
-            rhsDistributor->join();
-        }
-        if (lhsDistributor)
-        {
-            lhsDistributor->disconnect(true);
-            lhsDistributor->join();
+            if (rhsDistributor)
+            {
+                rhsDistributor->disconnect(true);
+                rhsDistributor->join();
+            }
+            if (lhsDistributor)
+            {
+                lhsDistributor->disconnect(true);
+                lhsDistributor->join();
+            }
+            joinHelper.clear();
         }
-        joinHelper.clear();
         PARENT::stop();
     }
     virtual bool isGrouped() const override

+ 21 - 1
thorlcr/master/thmastermain.cpp

@@ -550,6 +550,7 @@ int main( int argc, char *argv[]  )
     const char *thorname = NULL;
     StringBuffer nodeGroup, logUrl;
     Owned<IPerfMonHook> perfmonhook;
+    unsigned channelsPerSlave = 1;
 
     ILogMsgHandler *logHandler;
     try
@@ -606,7 +607,7 @@ int main( int argc, char *argv[]  )
             globals->setProp("@nodeGroup", thorname);
         }
         unsigned slavesPerNode = globals->getPropInt("@slavesPerNode", 1);
-        unsigned channelsPerSlave = globals->getPropInt("@channelsPerSlave", 1);
+        channelsPerSlave = globals->getPropInt("@channelsPerSlave", 1);
         unsigned localThorPortInc = globals->getPropInt("@localThorPortInc", DEFAULT_SLAVEPORTINC);
         unsigned slaveBasePort = globals->getPropInt("@slaveport", DEFAULT_THORSLAVEPORT);
         Owned<IGroup> rawGroup = getClusterNodeGroup(thorname, "ThorCluster");
@@ -780,6 +781,25 @@ int main( int argc, char *argv[]  )
 
         if (registry->connect())
         {
+            unsigned totSlaveProcs = queryNodeClusterWidth();
+            for (unsigned s=0; s<totSlaveProcs; s++)
+            {
+                StringBuffer slaveStr;
+                for (unsigned c=0; c<channelsPerSlave; c++)
+                {
+                    unsigned o = s + (c * totSlaveProcs);
+                    if (c)
+                        slaveStr.append(",");
+                    slaveStr.append(o+1);
+                }
+                StringBuffer virtStr;
+                if (channelsPerSlave>1)
+                    virtStr.append("virtual slaves:");
+                else
+                    virtStr.append("slave:");
+                PROGLOG("Slave log %u contains %s %s", s+1, virtStr.str(), slaveStr.str());
+            }
+
             PROGLOG("verifying mp connection to rest of cluster");
             if (!queryNodeComm().verifyAll())
                 ERRLOG("Failed to connect to all nodes");

+ 17 - 0
thorlcr/slave/slavmain.cpp

@@ -183,6 +183,23 @@ public:
     }
     virtual void main()
     {
+        rank_t slaveProc = queryNodeGroup().rank()-1;
+        unsigned totSlaveProcs = queryNodeClusterWidth();
+        StringBuffer slaveStr;
+        for (unsigned c=0; c<channelsPerSlave; c++)
+        {
+            unsigned o = slaveProc + (c * totSlaveProcs);
+            if (c)
+                slaveStr.append(",");
+            slaveStr.append(o+1);
+        }
+        StringBuffer virtStr;
+        if (channelsPerSlave>1)
+            virtStr.append("virtual slaves:");
+        else
+            virtStr.append("slave:");
+        PROGLOG("Slave log %u contains %s %s", slaveProc+1, virtStr.str(), slaveStr.str());
+
         if (channelsPerSlave>1)
         {
             class CVerifyThread : public CInterface, implements IThreaded

+ 2 - 0
thorlcr/thorutil/thmem.cpp

@@ -1236,6 +1236,8 @@ CThorSpillableRowArray::CThorSpillableRowArray(CActivityBase &activity)
     : CThorExpandingRowArray(activity)
 {
     initCommon();
+    commitDelta = CommitStep;
+    throwOnOom = false;
 }
 
 CThorSpillableRowArray::CThorSpillableRowArray(CActivityBase &activity, IThorRowInterfaces *rowIf, bool allowNulls, StableSortFlag stableSort, rowidx_t initialSize, size32_t _commitDelta)