Browse Source

HPCC-12599 Option in Roxie to bind queries to cores

Set coresPerQuery in RoxieTopology.xml or bindCores in workunit
debug values ir in query XML to indicate that execution of this
query should be bould to at most N cores out of the ones that
the Roxie process itself is bould to.

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 10 years ago
parent
commit
e68ab5310b
5 changed files with 88 additions and 0 deletions
  1. 1 0
      roxie/ccd/ccd.hpp
  2. 80 0
      roxie/ccd/ccdlistener.cpp
  3. 2 0
      roxie/ccd/ccdmain.cpp
  4. 4 0
      roxie/ccd/ccdquery.cpp
  5. 1 0
      roxie/ccd/ccdquery.hpp

+ 1 - 0
roxie/ccd/ccd.hpp

@@ -435,6 +435,7 @@ extern unsigned defaultPrefetchProjectPreload;
 extern bool defaultCheckingHeap;
 
 extern unsigned slaveQueryReleaseDelaySeconds;
+extern unsigned coresPerQuery;
 
 extern StringBuffer logDirectory;
 extern StringBuffer pluginDirectory;

+ 80 - 0
roxie/ccd/ccdlistener.cpp

@@ -716,6 +716,10 @@ public:
 
 class RoxieListener : public Thread, implements IRoxieListener, implements IThreadFactory
 {
+#ifndef _WIN32
+    cpu_set_t cpuMask;
+#endif
+    unsigned lastCore;
 public:
     IMPLEMENT_IINTERFACE;
     RoxieListener(unsigned _poolSize, bool _suspended) : Thread("RoxieListener")
@@ -725,6 +729,28 @@ public:
         poolSize = _poolSize;
         threadsActive = 0;
         maxThreadsActive = 0;
+#ifndef _WIN32
+        if (coresPerQuery)
+        {
+            if (sched_getaffinity(0, sizeof(cpu_set_t), &cpuMask))
+            {
+                if (traceLevel)
+                    DBGLOG("Unable to get CPU affinity - ignoring coresPerQuery");
+                coresPerQuery = 0;
+            }
+            else if (traceLevel)
+            {
+                StringBuffer trace;
+                for (unsigned core = 0; core < CPU_SETSIZE; core++)
+                {
+                    if (CPU_ISSET(core, &cpuMask))
+                        trace.appendf(",%d", core);
+                }
+                if (trace.length())
+                    DBGLOG("Process affinity is set to use  core(s) %s", trace.str()+1);
+            }
+        }
+#endif
     }
 
     virtual void start()
@@ -814,6 +840,41 @@ public:
         info.append("</ACCESSINFO>\n");
     }
 
+    void setAffinity(int numCores)
+    {
+#ifndef _WIN32
+        if (numCores > 0)
+        {
+            cpu_set_t threadMask;
+            CPU_ZERO_S(sizeof(cpu_set_t), &threadMask);
+            unsigned cores = 0;
+            unsigned offset = lastCore;
+            unsigned core;
+            for (core = 0; core < CPU_SETSIZE; core++)
+            {
+                unsigned useCore = (core + offset) % CPU_SETSIZE;
+                if (CPU_ISSET(useCore, &cpuMask))
+                {
+                    CPU_SET(useCore, &threadMask);
+                    cores++;
+                    if (cores == numCores)
+                    {
+                        lastCore = useCore+1;
+                        break;
+                    }
+                }
+            }
+            if (traceLevel > 3)
+                traceAffinity(&threadMask);
+            pthread_setaffinity_np(GetCurrentThreadId(), sizeof(cpu_set_t), &threadMask);
+        }
+        else
+            if (traceLevel > 3)
+                traceAffinity(&cpuMask);
+            pthread_setaffinity_np(GetCurrentThreadId(), sizeof(cpu_set_t), &cpuMask);
+#endif
+    }
+
 protected:
     unsigned poolSize;
     bool running;
@@ -827,6 +888,18 @@ protected:
     friend class ActiveQueryLimiter;
 
 private:
+    void traceAffinity(cpu_set_t *mask)
+    {
+        StringBuffer trace;
+        for (unsigned core = 0; core < CPU_SETSIZE; core++)
+        {
+            if (CPU_ISSET(core, mask))
+                trace.appendf(",%d", core);
+        }
+        if (trace.length())
+            DBGLOG("Process affinity is set to use  core(s) %s", trace.str()+1);
+    }
+
     CIArrayOf<AccessTableEntry> accessTable;
 };
 
@@ -1190,6 +1263,9 @@ public:
                         logctx.logOperatorException(e, __FILE__, __LINE__, NULL);
                     throw e;
                 }
+                int bindCores = wu->getDebugValueInt("bindCores", coresPerQuery);
+                if (bindCores > 0)
+                    pool->setAffinity(bindCores);
             }
             isBlind = isBlind || blindLogging;
             logctx.setBlind(isBlind);
@@ -1657,6 +1733,10 @@ readAnother:
                         if (queryFactory)
                         {
                             queryFactory->checkSuspended();
+                            int bindCores = queryFactory->queryOptions().bindCores;
+                            bindCores = queryXml->getPropInt("@bindCores", bindCores);
+                            if (bindCores > 0)
+                                pool->setAffinity(bindCores);
                             bool stripWhitespace = queryFactory->queryOptions().stripWhitespaceFromStoredDataset;
                             stripWhitespace = queryXml->getPropBool("_stripWhitespaceFromStoredDataset", stripWhitespace);
                             PTreeReaderOptions xmlReadFlags = (PTreeReaderOptions)((defaultXmlReadFlags & ~ptr_ignoreWhiteSpace) |

+ 2 - 0
roxie/ccd/ccdmain.cpp

@@ -133,6 +133,7 @@ unsigned dafilesrvLookupTimeout = 10000;
 bool defaultCheckingHeap = false;
 
 unsigned slaveQueryReleaseDelaySeconds = 60;
+unsigned coresPerQuery = 0;
 
 unsigned logQueueLen;
 unsigned logQueueDrop;
@@ -747,6 +748,7 @@ int STARTQUERY_API start_query(int argc, const char *argv[])
         defaultCheckingHeap = topology->getPropBool("@checkingHeap", false);  // NOTE - not in configmgr - too dangerous!
 
         slaveQueryReleaseDelaySeconds = topology->getPropInt("@slaveQueryReleaseDelaySeconds", 60);
+        coresPerQuery = topology->getPropInt("@coresPerQuery", 0);
 
         diskReadBufferSize = topology->getPropInt("@diskReadBufferSize", 0x10000);
         fieldTranslationEnabled = topology->getPropBool("@fieldTranslationEnabled", false);

+ 4 - 0
roxie/ccd/ccdquery.cpp

@@ -288,6 +288,7 @@ QueryOptions::QueryOptions()
     concatPreload = defaultConcatPreload;
     fetchPreload = defaultFetchPreload;
     prefetchProjectPreload = defaultPrefetchProjectPreload;
+    bindCores = coresPerQuery;
 
     checkingHeap = defaultCheckingHeap;
     disableLocalOptimizations = false;  // No global default for this
@@ -311,6 +312,7 @@ QueryOptions::QueryOptions(const QueryOptions &other)
     concatPreload = other.concatPreload;
     fetchPreload = other.fetchPreload;
     prefetchProjectPreload = other.prefetchProjectPreload;
+    bindCores = other.bindCores;
 
     checkingHeap = other.checkingHeap;
     disableLocalOptimizations = other.disableLocalOptimizations;
@@ -344,6 +346,7 @@ void QueryOptions::setFromWorkUnit(IConstWorkUnit &wu, const IPropertyTree *stat
     updateFromWorkUnit(concatPreload, wu, "concatPreload");
     updateFromWorkUnit(fetchPreload, wu, "fetchPreload");
     updateFromWorkUnit(prefetchProjectPreload, wu, "prefetchProjectPreload");
+    updateFromWorkUnit(bindCores, wu, "bindCores");
 
     updateFromWorkUnit(checkingHeap, wu, "checkingHeap");
     updateFromWorkUnit(disableLocalOptimizations, wu, "disableLocalOptimizations");
@@ -387,6 +390,7 @@ void QueryOptions::setFromContext(const IPropertyTree *ctx)
         updateFromContext(concatPreload, ctx, "@concatPreload", "_ConcatPreload");
         updateFromContext(fetchPreload, ctx, "@fetchPreload", "_FetchPreload");
         updateFromContext(prefetchProjectPreload, ctx, "@prefetchProjectPreload", "_PrefetchProjectPreload");
+        updateFromContext(bindCores, ctx, "@bindCores", "_bindCores");
 
         updateFromContext(checkingHeap, ctx, "@checkingHeap", "_CheckingHeap");
         // Note: disableLocalOptimizations is not permitted at context level (too late)

+ 1 - 0
roxie/ccd/ccdquery.hpp

@@ -107,6 +107,7 @@ public:
     int concatPreload;
     int fetchPreload;
     int prefetchProjectPreload;
+    int bindCores;
 
     bool checkingHeap;
     bool disableLocalOptimizations;