瀏覽代碼

HPCC-14522 Implement support for affinity in Thor

Signed-off-by: Gavin Halliday <gavin.halliday@lexisnexis.com>
Gavin Halliday 9 年之前
父節點
當前提交
eef9324964

+ 1 - 1
cmake_modules/dependencies/suse11.3.cmake

@@ -1 +1 @@
-SET_DEPENDENCIES ( CPACK_RPM_PACKAGE_REQUIRES binutils gcc-c++ openssh libldap-2_4-2 libicu libboost_regex1_42_0 libxerces-c-3_0 libxalan-c110 expect libarchive2 rsync apr apr-util zip )
+SET_DEPENDENCIES ( CPACK_RPM_PACKAGE_REQUIRES binutils gcc-c++ openssh libldap-2_4-2 libicu libboost_regex1_42_0 libxerces-c-3_0 libxalan-c110 expect libarchive2 rsync apr apr-util zip libnuma1 )

+ 1 - 1
cmake_modules/dependencies/suse11.4.cmake

@@ -1 +1 @@
-SET_DEPENDENCIES ( CPACK_RPM_PACKAGE_REQUIRES binutils gcc-c++ openssh libldap-2_4-2 libicu libboost_regex1_44_0 libxerces-c-3_0 libxalan-c110 expect libarchive2 rsync apr apr-util zip )
+SET_DEPENDENCIES ( CPACK_RPM_PACKAGE_REQUIRES binutils gcc-c++ openssh libldap-2_4-2 libicu libboost_regex1_44_0 libxerces-c-3_0 libxalan-c110 expect libarchive2 rsync apr apr-util zip libnuma1 )

+ 1 - 1
initfiles/bin/init_thorslave

@@ -100,7 +100,7 @@ start_slaves()
             for (( slave=0; slave<${slavespernode}; slave++ )); do
                 slavenum=$(( ${clusternode} + (${slave} * ${clusternodes}) ))
                 log "$slavename master=$master:$masterport slave=.:$slaveport slavenum=$slavenum logDir=$logpth/$hpcc_compname"
-                ./$slavename master=$master:$masterport slave=.:$slaveport slavenum=$slavenum logDir=$logpth/$hpcc_compname 2>/dev/null 1>/dev/null &
+                ./$slavename master=$master:$masterport slave=.:$slaveport slavenum=$slavenum slaveprocessnum=$slave logDir=$logpth/$hpcc_compname 2>/dev/null 1>/dev/null &
                 slavepid=$!
                 if [[ "$slavepid" -eq "0" ]]; then
                     log "failed to start"

+ 21 - 0
initfiles/componentfiles/configxml/thor.xsd.in

@@ -633,6 +633,27 @@
           </xs:appinfo>
         </xs:annotation>
       </xs:attribute>
+      <xs:attribute name="affinity" type="xs:string" use="optional">
+        <xs:annotation>
+          <xs:appinfo>
+            <tooltip>A comma separated list of cpu ids (and ranges) to bind all thor slaves to</tooltip>
+          </xs:appinfo>
+        </xs:annotation>
+      </xs:attribute>
+      <xs:attribute name="autoAffinity" type="xs:boolean" default="true">
+        <xs:annotation>
+          <xs:appinfo>
+            <tooltip>Automatically bind slave processes to a single cpu socket, if multiple slaves are running on a multi socket machine</tooltip>
+          </xs:appinfo>
+        </xs:annotation>
+      </xs:attribute>
+      <xs:attribute name="numaBindLocal" type="xs:boolean" default="false">
+        <xs:annotation>
+          <xs:appinfo>
+            <tooltip>Restrict allocations to memory attached to the cpu sockets the slave process is bound to</tooltip>
+          </xs:appinfo>
+        </xs:annotation>
+      </xs:attribute>
     </xs:complexType>
     <xs:key name="thorProcessKey1">
       <xs:selector xpath="./ThorMasterProcess|./ThorSlaveProcess"/>

+ 1 - 1
system/jlib/CMakeLists.txt

@@ -201,7 +201,7 @@ if (WIN32)
  target_link_libraries ( jlib ws2_32 mpr.lib Winmm.lib psapi.lib )
 elseif (("${CMAKE_SYSTEM_NAME}" STREQUAL "Darwin" ))
 else ()
- target_link_libraries ( jlib rt)
+ target_link_libraries ( jlib rt numa)
 endif ()
 
 if (NOT PLUGIN)

+ 111 - 0
system/jlib/jthread.cpp

@@ -33,6 +33,9 @@
 #include <sys/syscall.h>
 #include <sys/types.h>
 #include <sys/resource.h>
+#ifndef __APPLE__
+#include <numa.h>
+#endif
 #endif
 
 #if defined(_DEBUG) && defined(_WIN32) && !defined(USING_MPATROL)
@@ -2334,3 +2337,111 @@ unsigned threadLogID()  // for use in logging
 #endif
     return (unsigned)(memsize_t) GetCurrentThreadId(); // truncated in 64bit
 }
+
+//---------------------------------------------------------------------------------------------------------------------
+
+//MORE: Not currently implemented for windows.
+#ifdef CPU_SETSIZE
+static unsigned getCpuId(const char * text, char * * next)
+{
+    unsigned cpu = (unsigned)strtoul(text, next, 10);
+    if (*next == text)
+        throw makeStringExceptionV(1, "Invalid CPU: %s", text);
+    else if (cpu >= CPU_SETSIZE)
+        throw makeStringExceptionV(1, "CPU %u is out of range 0..%u", cpu, CPU_SETSIZE);
+    return cpu;
+}
+#endif
+
+void setProcessAffinity(const char * cpuList)
+{
+    assertex(cpuList);
+#ifdef CPU_ZERO
+    cpu_set_t cpus;
+    CPU_ZERO(&cpus);
+
+    const char * cur = cpuList;
+    loop
+    {
+        char * next;
+        unsigned cpu1 = getCpuId(cur, &next);
+        if (*next == '-')
+        {
+            const char * range = next+1;
+            unsigned cpu2 = getCpuId(range, &next);
+            for (unsigned cpu= cpu1; cpu <= cpu2; cpu++)
+                CPU_SET(cpu, &cpus);
+        }
+        else
+            CPU_SET(cpu1, &cpus);
+
+        if (*next == '\0')
+            break;
+
+        if (*next != ',')
+            throw makeStringExceptionV(1, "Invalid cpu affinity list %s", cur);
+
+        cur = next+1;
+    }
+
+    if (sched_setaffinity(0, sizeof(cpu_set_t), &cpus))
+        throw makeStringException(errno, "Failed to set affinity");
+    DBGLOG("Process affinity set to %s", cpuList);
+#endif
+}
+
+void setAutoAffinity(unsigned curProcess, unsigned processPerMachine, const char * optNodes)
+{
+#if defined(CPU_ZERO) && !defined(__APPLE__)
+    if (processPerMachine <= 1)
+        return;
+
+    if (numa_available() == -1)
+    {
+        DBGLOG("Numa functions not available");
+        return;
+    }
+
+    if (optNodes)
+        throw makeStringException(1, "Numa node list not yet supported");
+
+    unsigned numNumaNodes = numa_max_node()+1;
+    if (numNumaNodes <= 1)
+        return;
+
+    //MORE: If processPerMachine < numNumaNodes we may want to associate with > 1 node.
+    unsigned curNode = curProcess % numNumaNodes;
+
+#if defined(LIBNUMA_API_VERSION) && (LIBNUMA_API_VERSION>=2)
+    struct bitmask * cpus = numa_allocate_cpumask();
+    numa_node_to_cpus(curNode, cpus);
+    bool ok = (numa_sched_setaffinity(0, cpus) == 0);
+    numa_bitmask_free(cpus);
+#else
+    cpu_set_t cpus;
+    CPU_ZERO(&cpus);
+    numa_node_to_cpus(curNode, (unsigned long *) &cpus, sizeof (cpus));
+    bool ok = sched_setaffinity (0, sizeof(cpus), &cpus) != 0;
+#endif
+
+    if (!ok)
+        throw makeStringExceptionV(1, "Failed to set affinity for node %u", curNode);
+
+    DBGLOG("Process bound to numa node %u of %u", curNode, numNumaNodes);
+#endif
+}
+
+void bindMemoryToLocalNodes()
+{
+#if defined(LIBNUMA_API_VERSION) && (LIBNUMA_API_VERSION>=2)
+    numa_set_bind_policy(1);
+
+    unsigned numNumaNodes = numa_max_node() + 1;
+    if (numNumaNodes <= 1)
+        return;
+    struct bitmask *nodes = numa_get_run_node_mask();
+    numa_set_membind(nodes);
+    DBGLOG("Process memory bound to numa nodemask 0x%x (of %u nodes total)", (unsigned)(*(nodes->maskp)), numNumaNodes);
+    numa_bitmask_free(nodes);
+#endif
+}

+ 4 - 0
system/jlib/jthread.hpp

@@ -294,5 +294,9 @@ interface IWorkQueueThread: extends IInterface
 // internally thread persists for specified time waiting before self destroying
 extern jlib_decl IWorkQueueThread *createWorkQueueThread(unsigned persisttime=1000*60);
 
+extern jlib_decl void setProcessAffinity(const char * cpus);
+extern jlib_decl void setAutoAffinity(unsigned curProcess, unsigned processPerNode, const char * optNodes);
+extern jlib_decl void bindMemoryToLocalNodes();
+
 
 #endif

+ 6 - 0
system/mp/mpcomm.cpp

@@ -3051,6 +3051,12 @@ void stopMPServer()
     initMyNode(0);
 }
 
+bool hasMPServerStarted()
+{
+    CriticalBlock block(CGlobalMPServer::sect);
+    return globalMPServer != NULL;
+}
+
 IInterCommunicator &queryWorldCommunicator()
 {
     CriticalBlock block(CGlobalMPServer::sect);

+ 1 - 0
system/mp/mpcomm.hpp

@@ -89,6 +89,7 @@ extern mp_decl mptag_t createReplyTag(); // creates (short-lived) reply-tag;
 
 extern mp_decl ICommunicator *createCommunicator(IGroup *group,bool outer=false); // outer allows nodes outside group to send
 extern mp_decl IInterCommunicator &queryWorldCommunicator();
+extern mp_decl bool hasMPServerStarted();
 
 interface IMPServer : extends IInterface
 {

+ 29 - 0
thorlcr/slave/thslavemain.cpp

@@ -32,6 +32,8 @@
 #include "jfile.hpp"
 #include "jmisc.hpp"
 #include "jprop.hpp"
+#include "jthread.hpp"
+
 #include "thormisc.hpp"
 #include "slavmain.hpp"
 #include "thorport.hpp"
@@ -166,6 +168,9 @@ static bool jobListenerStopped = true;
 
 void UnregisterSelf(IException *e)
 {
+    if (!hasMPServerStarted())
+        return;
+
     StringBuffer slfStr;
     slfEp.getUrlStr(slfStr);
     LOG(MCdebugProgress, thorJob, "Unregistering slave : %s", slfStr.str());
@@ -235,6 +240,27 @@ void startSlaveLog()
     globals->setProp("@logURL", url.str());
 }
 
+void setSlaveAffinity(unsigned processOnNode)
+{
+    const char * affinity = globals->queryProp("@affinity");
+    if (affinity)
+        setProcessAffinity(affinity);
+    else if (globals->getPropBool("@autoAffinity", true))
+    {
+        const char * nodes = globals->queryProp("@autoNodeAffinityNodes");
+        unsigned slavesPerNode = globals->getPropInt("@slavesPerNode", 1);
+        setAutoAffinity(processOnNode, slavesPerNode, nodes);
+    }
+
+    //The default policy is to allocate from the local node, so restricting allocations to the current sockets
+    //may not buy much once the affinity is set up.  It also means it will fail if there is no memory left on
+    //this socket - even if there is on others.
+    //Therefore it is not recommended unless you have maybe several independent thors running on the same machines
+    //with exclusive access to memory.
+    if (globals->getPropBool("@numaBindLocal", false))
+        bindMemoryToLocalNodes();
+}
+
 
 int main( int argc, char *argv[]  )
 {
@@ -292,12 +318,15 @@ int main( int argc, char *argv[]  )
         }
         else 
             slfEp.setLocalHost(0);
+
         mySlaveNum = globals->getPropInt("@SLAVENUM");
 
         setMachinePortBase(slfEp.port);
         slfEp.port = getMachinePortBase();
         startSlaveLog();
 
+        setSlaveAffinity(globals->getPropInt("@SLAVEPROCESSNUM"));
+
         startMPServer(getFixedPort(TPORT_mp));
 #ifdef USE_MP_LOG
         startLogMsgParentReceiver();