浏览代码

Merge pull request #3866 from jakesmith/bcast-parallel

HPCC-8552 - speed up MP broadcast on congested network

Reviewed-By: Richard Chapman <rchapman@hpccsystems.com>
Reviewed-By: Gavin Halliday <gavin.halliday@lexisnexis.com>
Richard Chapman 12 年之前
父节点
当前提交
5fd37f9687
共有 1 个文件被更改,包括 63 次插入6 次删除
  1. 63 6
      thorlcr/graph/thgraphmaster.cpp

+ 63 - 6
thorlcr/graph/thgraphmaster.cpp

@@ -1256,16 +1256,57 @@ CJobMaster::~CJobMaster()
     tmpHandler.clear();
 }
 
+static IException *createBCastException(unsigned slave, const char *errorMsg)
+{
+    // think this should always be fatal, could check link down here, or in general and flag as _shutdown.
+    StringBuffer msg("General failure communicating to slave");
+    if (slave)
+        msg.append("(").append(slave).append(") ");
+    else
+        msg.append("s ");
+    Owned<IThorException> e = MakeThorException(0, "%s", msg.append(" [").append(errorMsg).append("]").str());
+    e->setAction(tea_shutdown);
+    return e.getClear();
+}
+
 void CJobMaster::broadcastToSlaves(CMessageBuffer &msg, mptag_t mptag, unsigned timeout, const char *errorMsg, mptag_t *_replyTag, bool sendOnly)
 {
     mptag_t replyTag = createReplyTag();
     msg.setReplyTag(replyTag);
-    if (!queryJobComm().send(msg, RANK_ALL_OTHER, mptag, timeout))
+    if (globals->getPropBool("@broadcastSendAsync", true)) // only here in case of problems/debugging.
+    {
+        class CSendAsyncfor : public CAsyncFor
+        {
+            CJobMaster &job;
+            CMessageBuffer &msg;
+            mptag_t mptag;
+            unsigned timeout;
+            StringAttr errorMsg;
+        public:
+            CSendAsyncfor(CJobMaster &_job, CMessageBuffer &_msg, mptag_t _mptag, unsigned _timeout, const char *_errorMsg)
+                : job(_job), msg(_msg), mptag(_mptag), timeout(_timeout), errorMsg(_errorMsg)
+            {
+            }
+            void Do(unsigned i)
+            {
+                if (!job.queryJobComm().send(msg, i+1, mptag, timeout))
+                    throw createBCastException(i+1, errorMsg);
+            }
+        } afor(*this, msg, mptag, timeout, errorMsg);
+        try
+        {
+            afor.For(querySlaves(), querySlaves());
+        }
+        catch (IException *e)
+        {
+            EXCLOG(e, "broadcastSendAsync");
+            abort(e);
+            throw;
+        }
+    }
+    else if (!queryJobComm().send(msg, RANK_ALL_OTHER, mptag, timeout))
     {
-        // think this should always be fatal, could check link down here, or in general and flag as _shutdown.
-        StringBuffer msg("General failure communicating to slaves [");
-        Owned<IThorException> e = MakeThorException(0, "%s", msg.append(errorMsg).append("]").str());
-        e->setAction(tea_shutdown);
+        Owned<IException> e = createBCastException(0, errorMsg);
         EXCLOG(e, NULL);
         abort(e);
         throw e.getClear();
@@ -1274,6 +1315,7 @@ void CJobMaster::broadcastToSlaves(CMessageBuffer &msg, mptag_t mptag, unsigned
     if (_replyTag)
         *_replyTag = replyTag;
     unsigned respondents = 0;
+    Owned<IBitSet> bitSet = createBitSet();
     loop
     {
         rank_t sender;
@@ -1282,7 +1324,21 @@ void CJobMaster::broadcastToSlaves(CMessageBuffer &msg, mptag_t mptag, unsigned
         {
             if (_replyTag) _replyTag = NULL;
             StringBuffer tmpStr;
-            Owned<IException> e = MakeThorFatal(NULL, 0, "%s - Timeout receiving from slaves", errorMsg?tmpStr.append(": ").append(errorMsg).str():"");
+            if (errorMsg)
+                tmpStr.append(": ").append(errorMsg).append(" - ");
+            tmpStr.append("Timeout receiving from slaves - no reply from: [");
+            unsigned s = bitSet->scan(0, false);
+            assertex(s<querySlaves()); // must be at least one
+            tmpStr.append(s+1);
+            loop
+            {
+                s = bitSet->scan(s+1, false);
+                if (s>=querySlaves())
+                    break;
+                tmpStr.append(",").append(s+1);
+            }
+            tmpStr.append("]");
+            Owned<IException> e = MakeThorFatal(NULL, 0, " %s", tmpStr.str());
             EXCLOG(e, NULL);
             throw e.getClear();
         }
@@ -1296,6 +1352,7 @@ void CJobMaster::broadcastToSlaves(CMessageBuffer &msg, mptag_t mptag, unsigned
             throw e.getClear();
         }
         ++respondents;
+        bitSet->set((unsigned)sender-1);
         if (respondents == querySlaveGroup().ordinality())
             break;
     }