Преглед изворни кода

Merge pull request #2581 from jakesmith/trace-querysend

Compress query init and trace timing

Reviewed-By: Gavin Halliday <gavin.halliday@lexisnexis.com>
Reviewed-By: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman пре 13 година
родитељ
комит
0ce6b7c3ca
2 измењених фајлова са 25 додато и 12 уклоњено
  1. 21 12
      thorlcr/graph/thgraphmaster.cpp
  2. 4 0
      thorlcr/slave/slavmain.cpp

+ 21 - 12
thorlcr/graph/thgraphmaster.cpp

@@ -21,6 +21,7 @@
 #include "jprop.hpp"
 #include "jexcept.hpp"
 #include "jiter.ipp"
+#include "jlzw.hpp"
 #include "jsocket.hpp"
 #include "jset.hpp"
 #include "jsort.hpp"
@@ -1373,23 +1374,24 @@ void CJobMaster::sendQuery()
 {
     CriticalBlock b(sendQueryCrit);
     if (querySent) return;
-    CMessageBuffer msg;
-    msg.append(QueryInit);
-    msg.append(mpJobTag);
-    msg.append(slavemptag);
-    msg.append(queryWuid());
-    msg.append(graphName);
+    CMessageBuffer tmp;
+    tmp.append(mpJobTag);
+    tmp.append(slavemptag);
+    tmp.append(queryWuid());
+    tmp.append(graphName);
     const char *soName = queryDllEntry().queryName();
     PROGLOG("Query dll: %s", soName);
-    msg.append(soName);
-    msg.append(sendSo);
+    tmp.append(soName);
+    tmp.append(sendSo);
     if (sendSo)
     {
+        CTimeMon atimer;
         OwnedIFile iFile = createIFile(soName);
         OwnedIFileIO iFileIO = iFile->open(IFOread);
         size32_t sz = (size32_t)iFileIO->size();
-        msg.append(sz);
-        read(iFileIO, 0, sz, msg);
+        tmp.append(sz);
+        read(iFileIO, 0, sz, tmp);
+        PROGLOG("Loading query for serialization to slaves took %d ms", atimer.elapsed());
     }
     Owned<IPropertyTree> deps = createPTree(queryXGMML()->queryName());
     Owned<IPropertyTreeIterator> edgeIter = queryXGMML()->getElements("edge"); // JCSMORE trim to those actually needed
@@ -1399,9 +1401,16 @@ void CJobMaster::sendQuery()
         deps->addPropTree("edge", LINK(&edge));
     }
     Owned<IPropertyTree> workUnitInfo = prepareWorkUnitInfo();
-    workUnitInfo->serialize(msg);
-    deps->serialize(msg);
+    workUnitInfo->serialize(tmp);
+    deps->serialize(tmp);
+
+    CMessageBuffer msg;
+    msg.append(QueryInit);
+    compressToBuffer(msg, tmp.length(), tmp.toByteArray());
+
+    CTimeMon queryToSlavesTimer;
     broadcastToSlaves(msg, masterSlaveMpTag, LONGTIMEOUT, "sendQuery");
+    PROGLOG("Serialization of query init info (%d bytes) to slaves took %d ms", msg.length(), queryToSlavesTimer.elapsed());
     queryJobManager().addCachedSo(soName);
     querySent = true;
 }

+ 4 - 0
thorlcr/slave/slavmain.cpp

@@ -23,6 +23,7 @@
 #include "jthread.hpp"
 #include "jprop.hpp"
 #include "jiter.ipp"
+#include "jlzw.hpp"
 
 #include "jhtree.hpp"
 #include "mpcomm.hpp"
@@ -153,6 +154,9 @@ public:
                 {
                     case QueryInit:
                     {
+                        MemoryBuffer mb;
+                        decompressToBuffer(mb, msg);
+                        msg.swapWith(mb);
                         mptag_t mptag, slaveMsgTag;
                         deserializeMPtag(msg, mptag);
                         queryClusterComm().flush(mptag);