Переглянути джерело

gh-1099 - Detect process affinity and limit threads

Change the default number of threads used by e.g. sort to number of bound
cores, rather than total number of processes

Signed-off-by: Jake Smith <jake.smith@lexisnexis.com>
Jake Smith 13 роки тому
батько
коміт
41b386b024

+ 40 - 0
system/jlib/jdebug.cpp

@@ -816,6 +816,30 @@ void getCpuInfo(unsigned &numCPUs, unsigned &CPUSpeed)
 
 }
 
+unsigned getAffinityCpus()
+{
+    unsigned numCpus = 0;
+    DWORD ProcessAffinityMask, SystemAffinityMask;
+    if (GetProcessAffinityMask(GetCurrentProcess(), &ProcessAffinityMask, &SystemAffinityMask))
+    {
+        unsigned i = 0;
+        unsigned s = sizeof(ProcessAffinityMask)*8;
+        for(i=0; i < s; i++)
+        {
+            if (ProcessAffinityMask & (1 << i))
+                ++numCpus;
+        }
+    }
+    else // fall back to legacy num system cpus
+    {
+        Owned<IException> e = MakeOsException(GetLastError(), "Failed to get affinity");
+        EXCLOG(e, NULL);
+        unsigned cpuSpeed;
+        getCpuInfo(numCpus, cpuSpeed);
+        return numCpus;
+    }
+    return numCpus;
+}
 
 #else // linux
 
@@ -852,6 +876,22 @@ void getCpuInfo(unsigned &numCPUs, unsigned &CPUSpeed)
     close(cpufd);
 }
 
+unsigned getAffinityCpus()
+{
+    cpu_set_t cpuset;
+    int err = pthread_getaffinity_np(GetCurrentThreadId(), sizeof(cpu_set_t), &cpuset);
+    if (0 == err)
+        return CPU_COUNT(&cpuset);
+    else
+    {
+        Owned<IException> e = MakeErrnoException(errno, "Failed to get affinity");
+        EXCLOG(e, NULL);
+        unsigned numCPUs, CPUSpeed;
+        getCpuInfo(numCPUs, CPUSpeed);
+        return numCPUs;
+    }
+}
+
 static void getMemUsage(unsigned &inuse,unsigned &active,unsigned &total,unsigned &swaptotal,unsigned &swapinuse) 
 {
     unsigned free=0;

+ 1 - 0
system/jlib/jdebug.hpp

@@ -255,6 +255,7 @@ unsigned jlib_decl setAllocHook(bool on);  // bwd compat returns unsigned
 
 extern jlib_decl void getHardwareInfo(HardwareInfo &hdwInfo, const char *primDiskPath, const char *secDiskPath = NULL);
 extern jlib_decl void getCpuInfo(unsigned &numCPUs, unsigned &CPUSpeed);
+extern jlib_decl unsigned getAffinityCpus();
 extern jlib_decl void printProcMap(const char *fn, bool printbody, bool printsummary, StringBuffer *lnout, MemoryBuffer *mb, bool useprintf);
 extern jlib_decl void PrintMemoryReport(bool full=true);
 extern jlib_decl void printAllocationSummary();

+ 1 - 2
system/jlib/jsort.cpp

@@ -41,8 +41,7 @@ static bool sortParallel(unsigned &numcpus)
 {
     static unsigned numCPUs = 0;
     if (numCPUs==0) {
-        unsigned CPUSpeed;
-        getCpuInfo(numCPUs, CPUSpeed);
+        numCPUs = getAffinityCpus();
     }
     if ((numcpus==0)||(numcpus>numCPUs))
         numcpus = numCPUs;

+ 2 - 6
thorlcr/activities/msort/thsortu.cpp

@@ -1940,9 +1940,7 @@ IJoinHelper *createJoinHelper(IHThorJoinArg *helper, const char *activityName, a
     IJoinHelper *jhelper = new CJoinHelper(helper,activityName,TAKjoin,activityId,allocator);
     if (!parallelmatch||helper->getKeepLimit()||((helper->getJoinFlags()&JFslidingmatch)!=0)) // currently don't support betweenjoin or keep and multicore
         return jhelper;
-    unsigned numthreads;
-    unsigned CPUSpeed;
-    getCpuInfo(numthreads, CPUSpeed);
+    unsigned numthreads = getAffinityCpus();
     if (unsortedoutput)
         return new CMultiCoreUnorderedJoinHelper(numthreads,jhelper,helper,allocator,TAKjoin);
     return new CMultiCoreJoinHelper(numthreads,jhelper,helper,allocator,TAKjoin);
@@ -1960,9 +1958,7 @@ IJoinHelper *createSelfJoinHelper(IHThorJoinArg *helper, const char *activityNam
     IJoinHelper *jhelper = new SelfJoinHelper(helper,activityName,activityId,allocator);
     if (!parallelmatch||helper->getKeepLimit()||((helper->getJoinFlags()&JFslidingmatch)!=0)) // currently don't support betweenjoin or keep and multicore
         return jhelper;
-    unsigned numthreads;
-    unsigned CPUSpeed;
-    getCpuInfo(numthreads, CPUSpeed);
+    unsigned numthreads = getAffinityCpus();
     if (unsortedoutput)
         return new CMultiCoreUnorderedJoinHelper(numthreads,jhelper,helper,allocator,TAKselfjoin);
     return new CMultiCoreJoinHelper(numthreads,jhelper,helper,allocator,TAKselfjoin);

+ 1 - 2
thorlcr/thorutil/mcorecache.cpp

@@ -327,8 +327,7 @@ IMultiCoreCache *createMultiCoreCache(IMultiCoreRowIntercept &wrapped, IRecordSi
 {
     static unsigned numCPUs = 0;
     if (numCPUs==0) {
-        unsigned CPUSpeed;
-        getCpuInfo(numCPUs, CPUSpeed);
+        numCPUs = getAffinityCpus();
     }
     if (numCPUs<=1)
         return new CPassThroughMultiCoreCache(wrapped,recsize);