Browse Source

Merge remote-tracking branch 'origin/closedown-4.2.x'

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 11 năm trước cách đây
mục cha
commit
ebe7cd9d75

+ 1 - 1
dali/base/dadfs.cpp

@@ -9061,7 +9061,7 @@ class CInitGroups
             messages.append(msg).newline();
             matchExisting = matchOldEnv = false;
         }
-        if (!matchExisting && !matchOldEnv)
+        if (!existingClusterGroup || (!matchExisting && !matchOldEnv))
         {
             VStringBuffer msg("New cluster layout for cluster %s", gname.str());
             WARNLOG("%s", msg.str());

+ 8 - 5
dali/ft/filecopy.cpp

@@ -1171,11 +1171,14 @@ void FileSprayer::calculateSprayPartition()
     ForEachItemIn(idx2, partitioners)
         partitioners.item(idx2).getResults(partition);
 
-    // Store discovered CSV record structure into target logical file.
-    StringBuffer recStru;
-    partitioners.item(0).getRecordStructure(recStru);
-    IDistributedFile * target = distributedTarget.get();
-    target->setECL(recStru.str());
+    if (partitioners.ordinality() > 0)
+    {
+        // Store discovered CSV record structure into target logical file.
+        StringBuffer recStru;
+        partitioners.item(0).getRecordStructure(recStru);
+        IDistributedFile * target = distributedTarget.get();
+        target->setECL(recStru.str());
+    }
 
 }
 

+ 41 - 10
roxie/ccd/ccdfile.cpp

@@ -502,14 +502,14 @@ static bool isCopyFromCluster(IPartDescriptor *pdesc, unsigned clusterNo, const
     return strieq(name, pdesc->queryOwner().getClusterGroupName(clusterNo, s));
 }
 
-static void appendRemoteLocations(IPartDescriptor *pdesc, StringArray &locations, const char *localFileName, const char *fromCluster)
+static void appendRemoteLocations(IPartDescriptor *pdesc, StringArray &locations, const char *localFileName, const char *fromCluster, bool includeFromCluster)
 {
     UnsignedArray clusterCounts;
     unsigned numCopies = pdesc->numCopies();
     for (unsigned copy = 0; copy < numCopies; copy++)
     {
         unsigned clusterNo = pdesc->copyClusterNum(copy);
-        if (fromCluster && *fromCluster && !isCopyFromCluster(pdesc, clusterNo, fromCluster))
+        if (fromCluster && *fromCluster && isCopyFromCluster(pdesc, clusterNo, fromCluster)!=includeFromCluster)
             continue;
         RemoteFilename r;
         pdesc->getFilename(copy,r);
@@ -538,7 +538,7 @@ static void appendPeerLocations(IPartDescriptor *pdesc, StringArray &locations,
         if (streq(peerCluster, roxieName))
             peerCluster=NULL;
     }
-    appendRemoteLocations(pdesc, locations, localFileName, peerCluster);
+    appendRemoteLocations(pdesc, locations, localFileName, peerCluster, true);
 }
 
 
@@ -566,6 +566,7 @@ class CRoxieFileCache : public CInterface, implements ICopyFileProgress, impleme
 
     RoxieFileStatus fileUpToDate(IFile *f, offset_t size, const CDateTime &modified, unsigned crc, bool isCompressed)
     {
+        cacheFileConnect(f, dafilesrvLookupTimeout);  // set timeout to 10 seconds
         if (f->exists())
         {
             // only check size if specified
@@ -611,20 +612,23 @@ class CRoxieFileCache : public CInterface, implements ICopyFileProgress, impleme
 
             // put the peerRoxieLocations next in the list
             StringArray localLocations;
-            appendPeerLocations(pdesc, localLocations, localLocation);
+            appendRemoteLocations(pdesc, localLocations, localLocation, roxieName, true);  // Adds all locations on the same cluster
             ForEachItemIn(roxie_idx, localLocations)
             {
                 try
                 {
                     const char *remoteName = localLocations.item(roxie_idx);
                     Owned<IFile> remote = createIFile(remoteName);
-                    if (fileUpToDate(remote, size, modified, crc, isCompressed)==FileIsValid)
+                    RoxieFileStatus status = fileUpToDate(remote, size, modified, crc, isCompressed);
+                    if (status==FileIsValid)
                     {
-                        if (miscDebugTraceLevel > 10)
-                            DBGLOG("adding peer roxie location %s", remoteName);
+                        if (miscDebugTraceLevel > 5)
+                            DBGLOG("adding peer location %s", remoteName);
                         ret->addSource(remote.getClear());
                         addedOne = true;
                     }
+                    else if (miscDebugTraceLevel > 10)
+                        DBGLOG("Checked peer roxie location %s, status=%d", remoteName, (int) status);
                 }
                 catch (IException *E)
                 {
@@ -643,13 +647,16 @@ class CRoxieFileCache : public CInterface, implements ICopyFileProgress, impleme
                         Owned<IFile> remote = createIFile(remoteName);
                         if (traceLevel > 5)
                             DBGLOG("checking remote location %s", remoteName);
-                        if (fileUpToDate(remote, size, modified, crc, isCompressed)==FileIsValid)
+                        RoxieFileStatus status = fileUpToDate(remote, size, modified, crc, isCompressed);
+                        if (status==FileIsValid)
                         {
-                            if (miscDebugTraceLevel > 10)
+                            if (miscDebugTraceLevel > 5)
                                 DBGLOG("adding remote location %s", remoteName);
                             ret->addSource(remote.getClear());
                             addedOne = true;
                         }
+                        else if (miscDebugTraceLevel > 10)
+                            DBGLOG("Checked remote file location %s, status=%d", remoteName, (int) status);
                     }
                     catch (IException *E)
                     {
@@ -664,7 +671,22 @@ class CRoxieFileCache : public CInterface, implements ICopyFileProgress, impleme
                 if (local->exists())  // Implies local dali and local file out of sync
                     throw MakeStringException(ROXIE_FILE_ERROR, "Local file %s does not match DFS information", localLocation);
                 else
+                {
+                    if (traceLevel > 2)
+                    {
+                        DBGLOG("Failed to open file at any of the following %d local locations:", localLocations.length());
+                        ForEachItemIn(local_idx, localLocations)
+                        {
+                            DBGLOG("%d: %s", local_idx+1, localLocations.item(local_idx));
+                        }
+                        DBGLOG("Or at any of the following %d remote locations:", remoteLocationInfo.length());
+                        ForEachItemIn(remote_idx, remoteLocationInfo)
+                        {
+                            DBGLOG("%d: %s", remote_idx+1, remoteLocationInfo.item(remote_idx));
+                        }
+                    }
                     throw MakeStringException(ROXIE_FILE_OPEN_FAIL, "Could not open file %s", localLocation);
+                }
             }
             ret->setRemote(true);
         }
@@ -1066,6 +1088,7 @@ public:
             if (file == &todo.item(idx))
             {
                 todo.remove(idx);
+                atomic_dec(&numFilesToProcess);    // must decrement counter for SNMP accuracy
             }
         }
     }
@@ -1316,8 +1339,16 @@ public:
 ILazyFileIO *createPhysicalFile(const char *id, IPartDescriptor *pdesc, IPartDescriptor *remotePDesc, RoxieFileType fileType, int numParts, bool startCopy, unsigned channel)
 {
     StringArray remoteLocations;
+    const char *peerCluster = pdesc->queryOwner().queryProperties().queryProp("@cloneFromPeerCluster");
+    if (peerCluster)
+    {
+        if (*peerCluster!='-') // a remote cluster was specified explicitly
+            appendRemoteLocations(pdesc, remoteLocations, NULL, peerCluster, true);  // Add only from specified cluster
+    }
+    else
+        appendRemoteLocations(pdesc, remoteLocations, NULL, roxieName, false);      // Add from any cluster on same dali, other than mine
     if (remotePDesc)
-        appendRemoteLocations(remotePDesc, remoteLocations, NULL, NULL);
+        appendRemoteLocations(remotePDesc, remoteLocations, NULL, NULL, false);    // Then any remote on remote dali
 
     return queryFileCache().lookupFile(id, fileType, pdesc, numParts, replicationLevel[channel], remoteLocations, startCopy);
 }

+ 5 - 3
roxie/ccd/ccdmain.cpp

@@ -867,13 +867,15 @@ int STARTQUERY_API start_query(int argc, const char *argv[])
             unsigned cyclicOffset = topology->getPropInt("@cyclicOffset", 1);
             for (int i=0; i<numNodes; i++)
             {
+                // Note this code is a little confusing - easy to get the cyclic offset backwards
+                // cyclic offset means node n+offset has copy 2 for channel n, so node n has copy 2 for channel n-offset
                 int channel = i+1;
                 for (int copy=0; copy<numDataCopies; copy++)
                 {
-                    if (channel > numNodes)
-                        channel = channel - numNodes;
+                    if (channel < 1)
+                        channel = channel + numNodes;
                     addChannel(i, channel, copy);
-                    channel = channel + cyclicOffset;
+                    channel = channel - cyclicOffset;
                 }
             }
         }

+ 1 - 1
system/jlib/jsocket.cpp

@@ -3079,8 +3079,8 @@ void IpAddress::setNetAddress(size32_t sz,const void *src)
     if (sz==sizeof(unsigned)) { // IPv4
         netaddr[0] = 0;
         netaddr[1] = 0;
-        netaddr[2]=0xffff0000;
         netaddr[3] = *(const unsigned *)src;
+        netaddr[2] = netaddr[3] ? 0xffff0000 : 0; // leave as null if Ipv4 address is null
     }
     else if (!IP4only&&(sz==sizeof(netaddr))) { // IPv6
         memcpy(&netaddr,src,sz);

+ 1 - 0
system/mp/mpbase.hpp

@@ -144,6 +144,7 @@ extern mp_decl void initMyNode(unsigned short port);
 
 interface IMP_Exception: extends IException
 {
+    virtual const SocketEndpoint &queryEndpoint() const = 0;
 };
 
 enum MessagePassingError

+ 66 - 16
system/mp/mpcomm.cpp

@@ -195,10 +195,6 @@ public:
     {
     }
 
-    CMPException(MessagePassingError err) : error(err) 
-    {
-    }
-
     StringBuffer &  errorMessage(StringBuffer &str) const
     { 
         StringBuffer tmp;
@@ -216,6 +212,7 @@ public:
     { 
         return MSGAUD_user; 
     }
+    virtual const SocketEndpoint &queryEndpoint() const { return endpoint; }
 private:
     MessagePassingError error;
     SocketEndpoint endpoint;
@@ -1668,8 +1665,21 @@ int CMPConnectThread::run()
                 SocketEndpoint hostep;
                 SocketEndpointV4 id[2];
                 sock->readtms(&id[0],sizeof(id),sizeof(id),rd,CONFIRM_TIMEOUT); 
+                if (rd != sizeof(id))
+                {
+                    FLLOG(MCoperatorWarning, unknownJob, "MP Connect Thread: invalid number of connection bytes serialized");
+                    sock->close();
+                    continue;
+                }
                 id[0].get(remoteep);
                 id[1].get(hostep);
+                if (remoteep.isNull() || hostep.isNull())
+                {
+                    // JCSMORE, I think remoteep really must/should match a IP of this local host
+                    FLLOG(MCoperatorWarning, unknownJob, "MP Connect Thread: invalid remote and/or host ep serialized");
+                    sock->close();
+                    continue;
+                }
 #ifdef _FULLTRACE       
                 StringBuffer tmp1;
                 remoteep.getUrlStr(tmp1);
@@ -2191,12 +2201,32 @@ public:
         if (sender)
             *sender = NULL;
         CTimeMon tm(timeout);
-        if (parent->recv(mbuf,src?&src->endpoint():NULL,tag,tm)) {
-            if (sender) 
-                *sender = createINode(mbuf.getSender());
-            return true;
+        loop
+        {
+            try
+            {
+                if (parent->recv(mbuf,src?&src->endpoint():NULL,tag,tm))
+                {
+                    if (sender)
+                        *sender = createINode(mbuf.getSender());
+                    return true;
+                }
+                return false;
+            }
+            catch (IMP_Exception *e)
+            {
+                if (MPERR_link_closed != e->errorCode())
+                    throw;
+                const SocketEndpoint &ep = e->queryEndpoint();
+                if (src && (ep == src->endpoint()))
+                    throw;
+                StringBuffer epStr;
+                ep.getUrlStr(epStr);
+                FLLOG(MCoperatorWarning, unknownJob, "CInterCommunicator: ignoring closed endpoint: %s", epStr.str());
+                e->Release();
+                // loop around and recv again
+            }
         }
-        return false;
     }
 
 
@@ -2408,14 +2438,34 @@ public:
         else
             srcep = &queryEndpoint(srcrank);
         CTimeMon tm(timeout);
-        if (parent->recv(mbuf,(srcrank==RANK_ALL)?NULL:&queryEndpoint(srcrank),tag,tm)) {
-            if (sender)
-                *sender = group->rank(mbuf.getSender());
-            return true;
+        loop
+        {
+            try
+            {
+                if (parent->recv(mbuf,(srcrank==RANK_ALL)?NULL:&queryEndpoint(srcrank),tag,tm))
+                {
+                    if (sender)
+                        *sender = group->rank(mbuf.getSender());
+                    return true;
+                }
+                if (sender)
+                    *sender = RANK_NULL;
+                return false;
+            }
+            catch (IMP_Exception *e)
+            {
+                if (MPERR_link_closed != e->errorCode())
+                    throw;
+                const SocketEndpoint &ep = e->queryEndpoint();
+                if (RANK_NULL != group->rank(ep))
+                    throw;
+                StringBuffer epStr;
+                ep.getUrlStr(epStr);
+                FLLOG(MCoperatorWarning, unknownJob, "CCommunicator: ignoring closed endpoint from outside the communicator group: %s", epStr.str());
+                e->Release();
+                // loop around and recv again
+            }
         }
-        if (sender)
-            *sender = RANK_NULL;
-        return false;
     }
     
     void flush(mptag_t tag)

+ 14 - 1
thorlcr/master/mawatchdog.cpp

@@ -196,7 +196,20 @@ void CMasterWatchdogBase::main()
         {
             HeartBeatPacketHeader hb;
             MemoryBuffer progressData;
-            unsigned sz = readPacket(hb, progressData);
+            unsigned sz;
+            try
+            {
+                sz = readPacket(hb, progressData);
+            }
+            catch (IMP_Exception *e)
+            {
+                if (MPERR_link_closed != e->errorCode())
+                    throw;
+                const SocketEndpoint &ep = e->queryEndpoint();
+                StringBuffer epStr;
+                ep.getUrlStr(epStr);
+                abortThor(MakeThorOperatorException(TE_AbortException, "Watchdog has lost connectivity with Thor slave: %s (Process terminated or node down?)", epStr.str()));
+            }
             if (stopped)
                 break;
             else if (sz)