Jelajahi Sumber

Merge remote-tracking branch 'origin/candidate-6.2.0'

Signed-off-by: Gavin Halliday <gavin.halliday@lexisnexis.com>
Gavin Halliday 8 tahun lalu
induk
melakukan
def51a3ebc

+ 1 - 1
dali/ft/daftformat.cpp

@@ -443,8 +443,8 @@ unsigned CInputBasePartitioner::transformBlock(offset_t endOffset, TransformCurs
 
 CFixedPartitioner::CFixedPartitioner(size32_t _recordSize) : CInputBasePartitioner(0, _recordSize)
 {
-    LOG(MCdebugProgressDetail, unknownJob, "CFixedPartitioner::CFixedPartitioner( recordSize:%d)", recordSize);
     recordSize = _recordSize;
+    LOG(MCdebugProgressDetail, unknownJob, "CFixedPartitioner::CFixedPartitioner( recordSize:%d)", recordSize);
 }
 
 size32_t CFixedPartitioner::getSplitRecordSize(const byte * record, unsigned maxToRead, bool processFullBuffer)

+ 1 - 1
system/jhtree/jhtree.cpp

@@ -984,7 +984,7 @@ class CNodeMRUCache : public CMRUCacheOf<CKeyIdAndPos, CJHTreeNode, CNodeMapping
 {
     size32_t sizeInMem, memLimit;
 public:
-    CNodeMRUCache(size32_t _memLimit)
+    CNodeMRUCache(size32_t _memLimit) : memLimit(0)
     {
         sizeInMem = 0;
         setMemLimit(_memLimit);

+ 16 - 8
system/jlib/jsocket.cpp

@@ -141,14 +141,6 @@ static RelaxedAtomic<unsigned> pre_conn_unreach_cnt{0};    // global count of pr
 
 #define IPV6_SERIALIZE_PREFIX (0x00ff00ff)
 
-inline void LogErr(unsigned err,unsigned ref,const char *info,unsigned lineno,const char *tracename)
-{
-    if (err) 
-        PROGLOG("jsocket(%d,%d)%s%s err = %d%s%s",ref,lineno,
-           (info&&*info)?" ":"",(info&&*info)?info:"",err,
-           (tracename&&*tracename)?" : ":"",(tracename&&*tracename)?tracename:"");
-}
-    
 
 class jlib_thrown_decl SocketException: public IJSOCK_Exception, public CInterface
 {
@@ -679,6 +671,22 @@ typedef union {
 #define INET6_ADDRSTRLEN 65
 #endif
 
+inline void LogErr(unsigned err,unsigned ref,const char *info,unsigned lineno,const char *tracename)
+{
+    if (err)
+    {
+        PROGLOG("jsocket(%d,%d)%s%s err = %d%s%s",ref,lineno,
+           (info&&*info)?" ":"",(info&&*info)?info:"",err,
+           (tracename&&*tracename)?" : ":"",(tracename&&*tracename)?tracename:"");
+        if ((JSE_NOTCONN == err) || (JSE_CONNRESET == err) || (JSE_CONNABORTED == err))
+        {
+            PROGLOG("Socket not connected, stack:");
+            PrintStackReport();
+        }
+    }
+}
+
+
 
 inline socklen_t setSockAddr(J_SOCKADDR &u, const IpAddress &ip,unsigned short port)
 {

+ 46 - 10
system/mp/mpcomm.cpp

@@ -48,6 +48,11 @@
 
 //#define _TRACE
 //#define _FULLTRACE
+
+#if 1 // #ifdef _FULLTRACE
+#define _TRACELINKCLOSED
+#endif
+#define _TRACEMPSERVERNOTIFYCLOSED
 #define _TRACEORPHANS
 
 
@@ -475,6 +480,7 @@ protected:
     unsigned short              port;
 public:
     bool checkclosed;
+    bool tryReopenChannel = false;
 
 // packet handlers
     PingPacketHandler           *pingpackethandler;         // TAG_SYS_PING
@@ -548,6 +554,22 @@ public:
     {
         return myNode;
     }
+    virtual void setOpt(MPServerOpts opt, const char *value)
+    {
+        switch (opt)
+        {
+            case mpsopt_channelreopen:
+            {
+                bool tf = (nullptr != value) ? strToBool(value) : false;
+                PROGLOG("Setting ChannelReopen = %s", tf ? "true" : "false");
+                tryReopenChannel = tf;
+                break;
+            }
+            default:
+                // ignore
+                break;
+        }
+    }
 };
 
 //===========================================================================
@@ -718,6 +740,17 @@ protected: friend class CMPPacketReader;
 #endif
 
 
+    bool checkReconnect(CTimeMon &tm)
+    {
+        if (!parent->tryReopenChannel)
+            return false;
+        ::Release(channelsock);
+        channelsock = nullptr;
+        if (connect(tm))
+            return true;
+        WARNLOG("Failed to reconnect");
+        return false;
+    }
     bool connect(CTimeMon &tm)
     {
         // must be called from connectsect
@@ -979,12 +1012,12 @@ public:
         {
             CriticalBlock block(connectsect);
             if (closed) {
-#ifdef _FULLTRACE
+#ifdef _TRACELINKCLOSED
                 LOG(MCdebugInfo(100), unknownJob, "WritePacket closed on entry");
                 PrintStackReport();
 #endif
-                IMP_Exception *e=new CMPException(MPERR_link_closed,remoteep);
-                throw e;
+                if (!checkReconnect(tm))
+                    throw new CMPException(MPERR_link_closed,remoteep);
             }
             if (!channelsock) {
                 if (!connect(tm)) {
@@ -1127,8 +1160,10 @@ public:
                 try {
                     s->shutdown();
                 }
-                catch (IException *) { 
+                catch (IException *e) {
                     socketfailed = true; // ignore if the socket has been closed
+                    WARNLOG("closeSocket() : Ignoring shutdown error");
+                    e->Release();
                 }
             }
             parent->querySelectHandler().remove(s);
@@ -1701,12 +1736,12 @@ bool CMPChannel::send(MemoryBuffer &mb, mptag_t tag, mptag_t replytag, CTimeMon
     size32_t msgsize = mb.length();
     PacketHeader hdr(msgsize+sizeof(PacketHeader),localep,remoteep,tag,replytag);
     if (closed||(reply&&!isConnected())) {  // flag error if has been disconnected
-#ifdef _FULLTRACE
+#ifdef _TRACELINKCLOSED
         LOG(MCdebugInfo(100), unknownJob, "CMPChannel::send closed on entry %d",(int)closed);
         PrintStackReport();
 #endif
-        IMP_Exception *e=new CMPException(MPERR_link_closed,remoteep);
-        throw e;
+        if (!checkReconnect(tm))
+            throw new CMPException(MPERR_link_closed,remoteep);
     }
 
     bool ismulti = (msgsize>MAXDATAPERPACKET);
@@ -2236,7 +2271,7 @@ bool CMPServer::recv(CMessageBuffer &mbuf, const SocketEndpoint *ep, mptag_t tag
         return true;
     }
     if (nfy.aborted) {
-#ifdef _FULLTRACE
+#ifdef _TRACELINKCLOSED
         LOG(MCdebugInfo(100), unknownJob, "CMPserver::recv closed on notify");
         PrintStackReport();
 #endif
@@ -2322,7 +2357,7 @@ unsigned CMPServer::probe(const SocketEndpoint *ep, mptag_t tag,CTimeMon &tm,Soc
         return nfy.cancel?0:nfy.count;
     }
     if (nfy.aborted) {
-#ifdef _FULLTRACE
+#ifdef _TRACELINKCLOSED
         LOG(MCdebugInfo(100), unknownJob, "CMPserver::probe closed on notify");
         PrintStackReport();
 #endif
@@ -2407,9 +2442,10 @@ bool CMPServer::nextChannel(CMPChannel *&cur)
 
 void CMPServer::notifyClosed(SocketEndpoint &ep)
 {
-#ifdef _TRACE
+#ifdef _TRACEMPSERVERNOTIFYCLOSED
     StringBuffer url;
     LOG(MCdebugInfo(100), unknownJob, "MP: CMPServer::notifyClosed %s",ep.getUrlStr(url).str());
+    PrintStackReport();
 #endif
     notifyclosedthread->notify(ep);
 }

+ 2 - 0
system/mp/mpcomm.hpp

@@ -91,12 +91,14 @@ extern mp_decl ICommunicator *createCommunicator(IGroup *group,bool outer=false)
 extern mp_decl IInterCommunicator &queryWorldCommunicator();
 extern mp_decl bool hasMPServerStarted();
 
+enum MPServerOpts { mpsopt_null, mpsopt_channelreopen };
 interface IMPServer : extends IInterface
 {
     virtual mptag_t createReplyTag() = 0;
     virtual ICommunicator *createCommunicator(IGroup *group, bool outer=false) = 0;
     virtual void stop() = 0;
     virtual INode *queryMyNode() = 0;
+    virtual void setOpt(MPServerOpts opt, const char *value) = 0;
 };
 
 extern mp_decl void startMPServer(unsigned port,bool paused=false);

+ 1 - 1
thorlcr/activities/thactivityutil.cpp

@@ -890,10 +890,10 @@ IRowStream *createSequentialPartHandler(CPartHandler *partHandler, IArrayOf<IPar
                     someInGroup = false;
                     return NULL;
                 }
-                partHandler->stop();
                 ++part;
                 if (part >= parts)
                 {
+                    partHandler->stop();
                     partHandler.clear();
                     eof = true;
                     return NULL;

+ 4 - 0
thorlcr/activities/thdiskbaseslave.cpp

@@ -76,6 +76,7 @@ CDiskPartHandlerBase::CDiskPartHandlerBase(CDiskReadSlaveActivityBase &_activity
 
 void CDiskPartHandlerBase::setPart(IPartDescriptor *_partDesc)
 {
+    stop(); // close previous if open
     partDesc.set(_partDesc);
     compressed = partDesc->queryOwner().isCompressed(&blockCompressed);
     if (NULL != activity.eexp.get())
@@ -165,6 +166,8 @@ void CDiskPartHandlerBase::open()
 
 void CDiskPartHandlerBase::stop()
 {
+    if (!iFile)
+        return;
     if (!eoi)
         checkFileCrc = false; // cannot perform file CRC if diskread has not read whole file.
     CRC32 fileCRC;
@@ -176,6 +179,7 @@ void CDiskPartHandlerBase::stop()
             throw MakeThorOperatorException(TE_FileCrc, "CRC Failure having read file: %s", filename.get());
         checkFileCrc = false;
     }
+    iFile.clear();
 }
 
 

+ 3 - 0
thorlcr/activities/xmlread/thxmlreadslave.cpp

@@ -100,7 +100,10 @@ class CXmlReadSlaveActivity : public CDiskReadSlaveActivityBase
             xmlParser.clear();
             inputIOstream.clear();
             if (checkFileCrc)
+            {
                 fileCRC.reset(~crcStream->queryCrc()); // MORE should prob. change stream to use CRC32
+                crcStream.clear();
+            }
             mergeStats(fileStats, iFileIO);
             iFileIO.clear();
         }

+ 3 - 0
thorlcr/master/thmastermain.cpp

@@ -588,6 +588,9 @@ int main( int argc, char *argv[]  )
             }
         }
 
+        if (globals->getPropBool("@MPChannelReconnect"))
+            getMPServer()->setOpt(mpsopt_channelreopen, "true");
+
         setPasswordsFromSDS();
 
         if (globals->getPropBool("@enableSysLog",true))

+ 5 - 1
thorlcr/slave/slavmain.cpp

@@ -164,10 +164,14 @@ public:
         channelsPerSlave = globals->getPropInt("@channelsPerSlave", 1);
         unsigned localThorPortInc = globals->getPropInt("@localThorPortInc", 200);
         mpServers.append(* getMPServer());
+        bool reconnect = globals->getPropBool("@MPChannelReconnect");
         for (unsigned sc=1; sc<channelsPerSlave; sc++)
         {
             unsigned port = getMachinePortBase() + (sc * localThorPortInc);
-            mpServers.append(*startNewMPServer(port));
+            IMPServer *mpServer = startNewMPServer(port);
+            if (reconnect)
+                mpServer->setOpt(mpsopt_channelreopen, "true");
+            mpServers.append(*mpServer);
         }
     }
     ~CJobListener()

+ 3 - 0
thorlcr/slave/thslavemain.cpp

@@ -360,6 +360,9 @@ int main( int argc, char *argv[]  )
         setSlaveAffinity(globals->getPropInt("@SLAVEPROCESSNUM"));
 
         startMPServer(getFixedPort(TPORT_mp));
+
+        if (globals->getPropBool("@MPChannelReconnect"))
+            getMPServer()->setOpt(mpsopt_channelreopen, "true");
 #ifdef USE_MP_LOG
         startLogMsgParentReceiver();
         LOG(MCdebugProgress, thorJob, "MPServer started on port %d", getFixedPort(TPORT_mp));

+ 3 - 1
tools/addScopes/CMakeLists.txt

@@ -34,7 +34,9 @@ include_directories (
          ./../../system/security/LdapSecurity
          ./../../system/security/shared 
          ./../../system/jlib 
-         ./../../system/include 
+         ./../../system/include
+         ./../../dali/base
+         ./../../system/mp
     )
 
 ADD_DEFINITIONS( -D_CONSOLE )

+ 38 - 3
tools/addScopes/addScopes.cpp

@@ -18,6 +18,7 @@
 #include "seclib.hpp"
 #include "ldapsecurity.hpp"
 #include "jliball.hpp"
+#include "dasess.hpp"
 
 #ifndef _WIN32
 #include <unistd.h>
@@ -25,10 +26,10 @@
 
 int main(int argc, char* argv[])
 {
-    if(argc != 2)
+    if(argc < 2  || argc > 3)
     {
-        printf("usage: addScopes daliconf.xml\n");
-        printf("\n\tCreates all user-specific LDAP private file scopes 'hpccinternal::<user>'\n\tand grants users access to their scope. The configuration file\n\tdaliconf.xml is the dali configuration file, typically\n\tfound in /var/lib/HPCCSystems/mydali\n\n");
+        printf("usage: addScopes daliconf.xml [-c]\n");
+        printf("\n\tCreates all user-specific LDAP private file scopes 'hpccinternal::<user>'\n\tand grants users access to their scope. The configuration file\n\tdaliconf.xml is the dali configuration file, typically\n\tfound in /var/lib/HPCCSystems/mydali\n\tSpecify -c to make changes immediately visible by clearing permission caches\n\n");
         return -1;
     }
 
@@ -51,10 +52,44 @@ int main(int argc, char* argv[])
         if(secmgr == NULL)
         {
             printf("Security manager can't be created\n");
+            releaseAtoms();
             return -1;
         }
         bool ok = secmgr->createUserScopes();
         printf(ok ? "User scopes added\n" : "Some scopes not added\n");
+
+        //Clear permission caches?
+        if (argc == 3 && 0==stricmp(argv[2], "-c"))
+        {
+            //Clear ESP Cache
+            StringBuffer sysuser;
+            StringBuffer passbuf;
+            seccfg->getProp(".//@systemUser", sysuser);
+            seccfg->getProp(".//@systemPassword", passbuf);
+
+            if (0 == sysuser.length())
+            {
+                printf("Error in configuration file %s - systemUser not specified", argv[1]);
+                releaseAtoms();
+                return -1;
+            }
+
+            if (0 == passbuf.length())
+            {
+                printf("Error in configuration file %s - systemPassword not specified", argv[1]);
+                releaseAtoms();
+                return -1;
+            }
+
+            StringBuffer decPwd;
+            decrypt(decPwd, passbuf.str());
+
+            //Clear Dali cache
+            Owned<IUserDescriptor> userdesc(createUserDescriptor());
+            userdesc->set(sysuser, decPwd);
+            ok = querySessionManager().clearPermissionsCache(userdesc);
+            printf(ok ? "Dali Cache cleared\n" : "Error clearing Dali Cache\n");
+        }
 #endif
     }
     catch(IException* e)