Browse Source

HPCC-10709 Allow dafilesrv throttle limit to be configured

Signed-off-by: Jake Smith <jake.smith@lexisnexis.com>
Jake Smith 10 years ago
parent
commit
f01afe88b5

+ 2 - 0
common/remote/remoteerr.hpp

@@ -58,6 +58,7 @@
 #define RFSERR_NoConnectSlave                   8045
 #define RFSERR_NoConnectSlaveXY                 8046
 #define RFSERR_VersionMismatch                  8047
+#define RFSERR_SetThrottleFailed                8048
 
 //---- Text for all errors (make it easy to internationalise) ---------------------------
 
@@ -67,6 +68,7 @@
 #define RFSERR_TimeoutFileIOHandle_Text         "Remote fileio has been closed because of timeout"
 #define RFSERR_MasterSeemsToHaveDied_Text       "Master program seems to have died..."
 #define RFSERR_VersionMismatch_Text             "Slave version does not match, expected %d got %d"
+#define RFSERR_SetThrottleFailed_Text           "Failed to set throttle limit"
 
 #define RFSERR_TimeoutWaitSlave_Text            "Timeout waiting for slave %s to respond"
 #define RFSERR_TimeoutWaitConnect_Text          "Timeout waiting to connect to slave %s"

+ 18 - 0
common/remote/rmtfile.cpp

@@ -492,6 +492,24 @@ extern REMOTE_API int setDafileSvrTraceFlags(const SocketEndpoint &_ep,byte flag
     return -2;
 }
 
+extern REMOTE_API int setDafileSvrThrottleLimit(const SocketEndpoint &_ep, unsigned throttleLimit, unsigned throttleDelayMs, unsigned throttleCPULimit)
+{
+    SocketEndpoint ep(_ep);
+    setDafsEndpointPort(ep);
+    if (ep.isNull())
+        return -3;
+    try {
+        Owned<ISocket> socket = ISocket::connect_wait(ep,5000);
+        return setDafsThrottleLimit(socket, throttleLimit, throttleDelayMs, throttleCPULimit);
+    }
+    catch (IException *e)
+    {
+        EXCLOG(e,"setDafileSvrThrottleLimit");
+        e->Release();
+    }
+    return -2;
+}
+
 extern REMOTE_API int getDafileSvrInfo(const SocketEndpoint &_ep,StringBuffer &retstr)
 {
     SocketEndpoint ep(_ep);

+ 1 - 0
common/remote/rmtfile.hpp

@@ -68,6 +68,7 @@ extern REMOTE_API int remoteExec(const SocketEndpoint &ep,const char *cmdline, c
 extern REMOTE_API void remoteExtractBlobElements(const char * prefix, const RemoteFilename &file, ExtractedBlobArray & extracted);
 
 extern REMOTE_API int setDafileSvrTraceFlags(const SocketEndpoint &ep,byte flags);
+extern REMOTE_API int setDafileSvrThrottleLimit(const SocketEndpoint &_ep, unsigned throttleLimit, unsigned throttleDelayMs, unsigned throttleCPULimit);
 extern REMOTE_API int getDafileSvrInfo(const SocketEndpoint &ep,StringBuffer &retstr);
 
 extern REMOTE_API void disconnectRemoteFile(IFile *file);

+ 188 - 48
common/remote/sockfile.cpp

@@ -160,7 +160,7 @@ struct dummyReadWrite
 // backward compatible modes
 typedef enum { compatIFSHnone, compatIFSHread, compatIFSHwrite, compatIFSHexec, compatIFSHall} compatIFSHmode;
 
-static const char *VERSTRING= "DS V1.7e - 7 "       // dont forget FILESRV_VERSION in header
+static const char *VERSTRING= "DS V1.8"       // dont forget FILESRV_VERSION in header
 #ifdef _WIN32
 "Windows ";
 #else
@@ -269,6 +269,8 @@ enum {
     RFCtreecopy,
 // 1.7e - 1
     RFCtreecopytmp,
+// 1.8
+    RFCsetthrottle,
     RFCmax,
     };
 
@@ -297,45 +299,16 @@ static void mergeOnce(OnceKey &key,size32_t sz,const void *data)
 
 //---------------------------------------------------------------------------
 
+class CRemoteFileServer;
 class CThrottler
 {
-    Semaphore &sem;
+    CRemoteFileServer &owner;
     bool got;
 public:
-    CThrottler(Semaphore &_sem) : sem(_sem), got(false)
-    {
-        take();
-    }
-    ~CThrottler()
-    {
-        release();
-    }
-    bool take()
-    {
-        assertex(!got);
-        got = false;
-        loop {
-            if (sem.wait(5000)) {
-                got = true;
-                break;
-            }
-            unsigned cpu = getLatestCPUUsage();
-            PROGLOG("Throttler stalled (%d%% cpu)",cpu);
-            if (getLatestCPUUsage()<75) 
-                break;
-        }
-        return got;
-    }
-    bool release()
-    {
-        if (got)
-        {
-            got = false;
-            sem.signal();
-            return true;
-        }
-        return false;
-    }
+    CThrottler(CRemoteFileServer &_owner);
+    ~CThrottler() { release(); }
+    void take();
+    bool release();
 };
 
 // temporarily release a throttler slot
@@ -2678,6 +2651,26 @@ int setDafsTrace(ISocket * socket,byte flags)
     return -1;
 }
 
+int setDafsThrottleLimit(ISocket * socket, unsigned throttleLimit, unsigned throttleDelayMs, unsigned throttleCPULimit)
+{
+    assertex(socket);
+    MemoryBuffer sendbuf;
+    initSendBuffer(sendbuf);
+    sendbuf.append((RemoteFileCommandType)RFCsetthrottle).append(throttleLimit).append(throttleDelayMs).append(throttleCPULimit);
+    MemoryBuffer replybuf;
+    try {
+        sendBuffer(socket, sendbuf);
+        receiveBuffer(socket, replybuf, NORMAL_RETRIES, 1024);
+        int retcode;
+        replybuf.read(retcode);
+        return retcode;
+    }
+    catch (IException *e) {
+        EXCLOG(e);
+        ::Release(e);
+    }
+    return -1;
+}
 
 int getDafsInfo(ISocket * socket,StringBuffer &retstr)
 {
@@ -2954,6 +2947,7 @@ static unsigned ClientCount = 0;
 static unsigned MaxClientCount = 0;
 static CriticalSection ClientCountSect;
 
+#define TOTAL_THROTTLE_TIME_SECS 60 // log total throttled delay period
 
 class CRemoteFileServer : public CInterface, implements IRemoteFileServer, implements IThreadFactory
 {
@@ -2967,6 +2961,10 @@ class CRemoteFileServer : public CInterface, implements IRemoteFileServer, imple
     unsigned closedclients;
     CAsyncCommandManager asyncCommandManager;
     Semaphore throttlesem;
+    unsigned throttleLimit, throttleDelayMs, throttleCPULimit, disabledThrottleLimit;
+    unsigned __int64 totalThrottleDelay;
+    CCycleTimer totalThrottleDelayTimer;
+    CriticalSection setThrottleCrit;
     atomic_t globallasttick;
 
     int getNextHandle()
@@ -3137,14 +3135,18 @@ class CRemoteFileServer : public CInterface, implements IRemoteFileServer, imple
                 PROGLOG("Previous handle(%d): %s",handles.item(previdx),opennames.item(previdx).text.get());
         }
 
-
         void processCommand()
         {
-            CThrottler throttler(parent->throttleSem());
             MemoryBuffer reply;
             RemoteFileCommandType cmd;
             buf.read(cmd);
-            parent->dispatchCommand(cmd, buf, initSendBuffer(reply), this, &throttler);
+            if (parent->throttleLimit && (RFCsetthrottle != cmd))
+            {
+                CThrottler throttler(*parent);
+                parent->dispatchCommand(cmd, buf, initSendBuffer(reply), this, &throttler);
+            }
+            else
+                parent->dispatchCommand(cmd, buf, initSendBuffer(reply), this, NULL);
             buf.clear();
             sendBuffer(socket, reply);
         }
@@ -3324,9 +3326,13 @@ public:
 
     IMPLEMENT_IINTERFACE
 
-    CRemoteFileServer()
+    CRemoteFileServer(unsigned _throttleLimit, unsigned _throttleDelayMs, unsigned _throttleCPULimit)
+        : throttleLimit(_throttleLimit), throttleDelayMs(_throttleDelayMs), throttleCPULimit(_throttleCPULimit)
     {
-        throttlesem.signal(10);
+        if (throttleLimit) // if 0, throttling not used
+            throttlesem.signal(throttleLimit);
+        totalThrottleDelay = 0;
+        disabledThrottleLimit = 0;
         lasthandle = 0;
         selecthandler.setown(createSocketSelectHandler(NULL));
         threads.setown(createThreadPool("CRemoteFileServerPool",this,NULL,MAX_THREADS,60*1000,
@@ -3355,6 +3361,51 @@ public:
 #endif
     }
 
+    bool takeThrottleSem()
+    {
+        bool got = false;
+        CCycleTimer timer;
+        loop {
+            if (throttlesem.wait(throttleDelayMs)) {
+                got = true;
+                break;
+            }
+            unsigned cpu = getLatestCPUUsage();
+            PROGLOG("Throttler: transaction delayed (%d%% cpu)", cpu);
+
+            // NB: getLatestCPUUsage() is based on interval monitoring, typically 60 secs
+            if (cpu<throttleCPULimit)
+                break;
+        }
+        unsigned ms = timer.elapsedMs();
+        if (ms >= 1000) {
+            if (ms>throttleDelayMs)
+                PROGLOG("Throttle: transaction delayed for : %d seconds", ms/1000);
+        }
+        totalThrottleDelay += ms;
+        if (totalThrottleDelay && (totalThrottleDelayTimer.elapsedCycles() >= (queryOneSecCycles() * TOTAL_THROTTLE_TIME_SECS)))
+        {
+            unsigned elapsedSecs = totalThrottleDelayTimer.elapsedMs()/1000;
+            time_t simple;
+            time(&simple);
+            simple -= elapsedSecs;
+
+            CDateTime dt;
+            dt.set(simple);
+            StringBuffer dateStr;
+            dt.getTimeString(dateStr, true);
+            PROGLOG("Throttler: total delay of %0.2f seconds, since: %s", ((double)totalThrottleDelay)/1000, dateStr.str());
+
+            totalThrottleDelayTimer.reset();
+            totalThrottleDelay = 0;
+        }
+        return got;
+    }
+
+    void releaseThrottleSem()
+    {
+        throttlesem.signal();
+    }
 
     //MORE: The file handles should timeout after a while, and accessing an old (invalid handle)
     // should throw a different exception
@@ -4280,6 +4331,77 @@ public:
         return false;
     }
 
+    bool cmdSetThrottle(MemoryBuffer & msg, MemoryBuffer & reply,CRemoteClientHandler &client)
+    {
+        CriticalBlock b(setThrottleCrit);
+        try
+        {
+            unsigned _throttleLimit, _throttleDelayMs, _throttleCPULimit;
+            msg.read(_throttleLimit);
+            msg.read(_throttleDelayMs);
+            msg.read(_throttleCPULimit);
+            int delta = 0;
+            if (_throttleLimit)
+            {
+                if (disabledThrottleLimit) // if transitioning from disabled to some throttling
+                {
+                    assertex(0 == throttleLimit);
+                    delta = _throttleLimit - disabledThrottleLimit; // + or -
+                    disabledThrottleLimit = 0;
+                }
+                else
+                    delta = _throttleLimit - throttleLimit; // + or -
+            }
+            else if (0 == disabledThrottleLimit)
+            {
+                PROGLOG("Throttling disabled, previous limit: %d", throttleLimit);
+                /* disabling - set limit immediately to let all new transaction through.
+                 * NB: the semaphore signals are not consumed in this case, because transactions could be waiting on it.
+                 * Instead the existing 'throttleLimit' is kept in 'disabledThrottleLimit', so that if/when throttling is
+                 * re-enabled, it is used as a basis for increasing or consuming the semaphore signal count.
+                 */
+                disabledThrottleLimit = throttleLimit;
+                throttleLimit = 0;
+            }
+            if (delta > 0)
+            {
+                PROGLOG("Increasing throttleLimit from %d to %d", throttleLimit, _throttleLimit);
+                throttlesem.signal(delta);
+                throttleLimit = _throttleLimit;
+                // NB: If throttling was off, this doesn't effect transactions in progress, i.e. will only throttle new transactions coming in.
+            }
+            else if (delta < 0)
+            {
+                PROGLOG("Reducing throttleLimit from %d to %d", throttleLimit, _throttleLimit);
+                // NB: This is not expected to take long
+                CCycleTimer timer;
+                while (delta < 0)
+                {
+                    if (throttlesem.wait(1000))
+                        ++delta;
+                    else
+                        PROGLOG("Waited %0.2f seconds so far for a total of %d transactions to complete, %d completed", ((double)timer.elapsedMs())/1000, throttleLimit, -delta);
+                }
+                throttleLimit = _throttleLimit;
+                // NB: doesn't include transactions in progress, i.e. will only throttle new transactions coming in.
+            }
+            PROGLOG("New throttleDelayMs=%d, previous: %d", _throttleDelayMs, throttleDelayMs);
+            PROGLOG("New throttleCPULimit=%d, previous: %d", _throttleCPULimit, throttleCPULimit);
+            throttleDelayMs = _throttleDelayMs;
+            throttleCPULimit = _throttleCPULimit;
+            reply.append((unsigned)RFEnoerror);
+            return true;
+        }
+        catch (IException *e)
+        {
+            StringBuffer s;
+            e->errorMessage(s);
+            throwErr3(RFSERR_SetThrottleFailed,e->errorCode(),s.str());
+            e->Release();
+        }
+        return false;
+    }
+
     bool dispatchCommand(RemoteFileCommandType cmd, MemoryBuffer & msg, MemoryBuffer & reply, CRemoteClientHandler *client, CThrottler *throttler)
     {
         bool ret = true;
@@ -4319,6 +4441,7 @@ public:
             MAPCOMMANDCLIENT(RFCcopysection, cmdCopySection, *client);
             MAPCOMMANDCLIENTTHROTTLER(RFCtreecopy, cmdTreeCopy, *client, throttler);
             MAPCOMMANDCLIENTTHROTTLER(RFCtreecopytmp, cmdTreeCopyTmp, *client, throttler);
+            MAPCOMMANDCLIENT(RFCsetthrottle, cmdSetThrottle, *client);
 
         default:
             ret = cmdUnknown(msg,reply,cmd);
@@ -4610,11 +4733,6 @@ public:
         return threads->runningCount();
     }
 
-    Semaphore &throttleSem()
-    {
-        return throttlesem;
-    }
-
     unsigned idleTime()
     {
         unsigned t = (unsigned)atomic_read(&globallasttick);
@@ -4624,12 +4742,34 @@ public:
 };
 
 
+CThrottler::CThrottler(CRemoteFileServer &_owner) : owner(_owner), got(false)
+{
+    take();
+}
+
+void CThrottler::take()
+{
+   assertex(!got);
+   got = owner.takeThrottleSem();
+}
+
+bool CThrottler::release()
+{
+    if (got)
+    {
+        got = false;
+        owner.releaseThrottleSem();
+        return true;
+    }
+    return false;
+}
+
 
-IRemoteFileServer * createRemoteFileServer()
+IRemoteFileServer * createRemoteFileServer(unsigned throttleLimit, unsigned throttleDelayMs, unsigned throttleCPULimit)
 {
 #if SIMULATE_PACKETLOSS
     errorSimulationOn = false;
 #endif
-    return new CRemoteFileServer();
+    return new CRemoteFileServer(throttleLimit, throttleDelayMs, throttleCPULimit);
 }
 

+ 7 - 2
common/remote/sockfile.hpp

@@ -37,15 +37,20 @@ public:
     virtual unsigned idleTime() = 0; // in ms
 };
 
-#define FILESRV_VERSION 17 // don't forget VERSTRING in sockfile.cpp
+#define FILESRV_VERSION 18 // don't forget VERSTRING in sockfile.cpp
 
+// RemoteFileServer throttling defaults
+#define DEFAULT_PARALLELREQUESTLIMIT 20
+#define DEFAULT_THROTTLEDELAYMS 5000
+#define DEFAULT_THROTTLECPULIMIT 75
 
 extern REMOTE_API IFile * createRemoteFile(SocketEndpoint &ep,const char * _filename); // takes ownershop of socket
 extern REMOTE_API unsigned getRemoteVersion(ISocket * _socket, StringBuffer &ver);
 extern REMOTE_API unsigned stopRemoteServer(ISocket * _socket);
 extern REMOTE_API const char *remoteServerVersionString();
-extern REMOTE_API IRemoteFileServer * createRemoteFileServer();
+extern REMOTE_API IRemoteFileServer * createRemoteFileServer(unsigned throttleLimit=DEFAULT_PARALLELREQUESTLIMIT, unsigned throttleDelayMs=DEFAULT_THROTTLEDELAYMS, unsigned throttleCPULimit=DEFAULT_THROTTLECPULIMIT);
 extern REMOTE_API int setDafsTrace(ISocket * socket,byte flags);
+extern REMOTE_API int setDafsThrottleLimit(ISocket * socket, unsigned throttleLimit, unsigned throttleDelayMs, unsigned throttleCPULimit);
 extern REMOTE_API bool enableDafsAuthentication(bool on);
 extern int remoteExec(ISocket * socket, const char *cmdline, const char *workdir,bool sync,
                 size32_t insize, void *inbuf, MemoryBuffer *outbuf);

+ 39 - 2
dali/dafilesrv/dafilesrv.cpp

@@ -345,6 +345,40 @@ int main(int argc,char **argv)
     bool requireauthenticate = false;
     StringBuffer logDir;
     StringBuffer instanceName;
+
+    unsigned parallelRequestLimit = DEFAULT_PARALLELREQUESTLIMIT;
+    unsigned throttleDelayMs = DEFAULT_THROTTLEDELAYMS;
+    unsigned throttleCPULimit = DEFAULT_THROTTLECPULIMIT;
+
+    Owned<IPropertyTree> env = getHPCCEnvironment();
+    if (env)
+    {
+        StringBuffer dafilesrvPath("Software/DafilesrvProcess");
+        if (instanceName.length())
+            dafilesrvPath.appendf("[@name=\"%s\"]", instanceName.str());
+        IPropertyTree *daFileSrv = env->queryPropTree(dafilesrvPath);
+        if (daFileSrv)
+        {
+            // global DaFileSrv settings:
+            parallelRequestLimit = daFileSrv->getPropInt("@parallelRequestLimit", DEFAULT_PARALLELREQUESTLIMIT);
+            throttleDelayMs = daFileSrv->getPropInt("@throttleDelayMs", DEFAULT_THROTTLEDELAYMS);
+            throttleCPULimit = daFileSrv->getPropInt("@throttleCPULimit", DEFAULT_THROTTLECPULIMIT);
+
+            // any overrides by Instance definitions?
+            // NB: This won't work if netAddress is "." or if we start supporting hostnames there
+            StringBuffer ipStr;
+            queryHostIP().getIpText(ipStr);
+            VStringBuffer daFileSrvPath("Instance[@netAddress=\"%s\"]", ipStr.str());
+            IPropertyTree *dafileSrvInstance = daFileSrv->queryPropTree(daFileSrvPath);
+            if (dafileSrvInstance)
+            {
+                parallelRequestLimit = dafileSrvInstance->getPropInt("@parallelRequestLimit", parallelRequestLimit);
+                throttleDelayMs = dafileSrvInstance->getPropInt("@throttleDelayMs", throttleDelayMs);
+                throttleCPULimit = dafileSrvInstance->getPropInt("@throttleCPULimit", throttleCPULimit);
+            }
+        }
+    }
+
     while (argc>i) {
         if (stricmp(argv[i],"-D")==0) {
             i++;
@@ -523,7 +557,7 @@ int main(int argc,char **argv)
                 PROGLOG("Version: %s", verstring);
                 PROGLOG("Authentication:%s required",requireauthenticate?"":" not");
                 PROGLOG(DAFS_SERVICE_DISPLAY_NAME " Running");
-                server.setown(createRemoteFileServer());
+                server.setown(createRemoteFileServer(parallelRequestLimit, throttleDelayMs, throttleCPULimit));
                 try {
                     server->run(listenep);
                 }
@@ -549,6 +583,9 @@ int main(int argc,char **argv)
         lf->setMaxDetail(TopDetail);
         lf->beginLogging();
     }
+
+    PROGLOG("Parallel request limit = %d, throttleDelayMs = %d, throttleCPULimit = %d", parallelRequestLimit, throttleDelayMs, throttleCPULimit);
+
     const char * verstring = remoteServerVersionString();
     StringBuffer eps;
     if (listenep.isNull())
@@ -560,7 +597,7 @@ int main(int argc,char **argv)
     PROGLOG("Version: %s", verstring);
     PROGLOG("Authentication:%s required",requireauthenticate?"":" not");
     startPerformanceMonitor(10*60*1000, PerfMonStandard);
-    server.setown(createRemoteFileServer());
+    server.setown(createRemoteFileServer(parallelRequestLimit, throttleDelayMs, throttleCPULimit));
     writeSentinelFile(sentinelFile);
     try {
         server->run(listenep);

+ 27 - 1
dali/dafilesrv/dafscontrol.cpp

@@ -47,6 +47,7 @@ void usage()
     printf("  dafscontrol [<dali-ip>] CHECKVERMAJOR <ip-or-cluster>\n");
     printf("  dafscontrol [<dali-ip>] TRACE <ip> <num>\n");
     printf("  dafscontrol [<dali-ip>] CHKDSK <ip> <num>\n");
+    printf("  dafscontrol [<dali-ip>] THROTTLE <ip> <limit> <ms-delay> <cpu-limit>\n");
     printf("  dafscontrol MYVER\n");
     exit(1);
 }
@@ -363,7 +364,7 @@ int main(int argc, char* argv[])
                 break;
             }
             if (stricmp(argv[ai],"trace")==0) {
-                if (ai+2>=ac) 
+                if (ai+2>=ac)
                     usage(); 
                 else {
                     SocketEndpointArray eps;
@@ -387,6 +388,31 @@ int main(int argc, char* argv[])
                 }
                 break;
             }
+            if (stricmp(argv[ai],"throttle")==0) {
+                if (ai+4>=ac)
+                    usage();
+                else {
+                    SocketEndpointArray eps;
+                    if (!isdali||!getCluster(argv[ai+1],eps)) {
+                        SocketEndpoint ep(argv[ai+1]);
+                        int ret = setDafileSvrThrottleLimit(ep, atoi(argv[ai+2]), atoi(argv[ai+3]), atoi(argv[ai+4]));
+                        if (ret!=0)
+                            ERRLOG("setDafileSvrThrottleLimit returned %d", ret);
+                    }
+                    else {
+                        ForEachItemIn(ni,eps) {
+                            SocketEndpoint ep = eps.item(ni);
+                            int ret = setDafileSvrThrottleLimit(ep, atoi(argv[ai+2]), atoi(argv[ai+3]), atoi(argv[ai+4]));
+                            if (ret!=0)
+                                ERRLOG("setDafileSvrThrottleLimit returned %d", ret);
+                            StringBuffer s("done ");
+                            ep.getUrlStr(s);
+                            PROGLOG("%s",s.str());
+                        }
+                    }
+                }
+                break;
+            }
             SocketEndpoint ep;
             SocketEndpointArray epa;
             ep.set(argv[ai],DALI_SERVER_PORT);

+ 1 - 1
dali/dfuplus/dfuplus.cpp

@@ -50,7 +50,7 @@ public:
         else
             listenep.getUrlStr(eps);
         enableDafsAuthentication(requireauthenticate);
-        server.setown(createRemoteFileServer());
+        server.setown(createRemoteFileServer(0)); // no throttle limiting
     }
 
     int run()