Ver código fonte

HPCC-13573 Improve throttling and diagnostic statistics

+ Split throttling into 2 categories (slow/fast).
+ Allow throttle items to queue rather than block
+ Improve error reporting/feedback to client
+ Improve configurability over limits
+ Improve statistics, # clients, # reqeusts, throttling and allow
to be requested via dafscontrol

Signed-off-by: Jake Smith <jake.smith@lexisnexis.com>
Jake Smith 10 anos atrás
pai
commit
6814704df8

+ 1 - 0
common/remote/remoteerr.hpp

@@ -59,6 +59,7 @@
 #define RFSERR_NoConnectSlaveXY                 8046
 #define RFSERR_VersionMismatch                  8047
 #define RFSERR_SetThrottleFailed                8048
+#define RFSERR_MaxQueueRequests                 8049
 
 //---- Text for all errors (make it easy to internationalise) ---------------------------
 

+ 4 - 4
common/remote/rmtfile.cpp

@@ -504,7 +504,7 @@ extern REMOTE_API int setDafileSvrTraceFlags(const SocketEndpoint &_ep,byte flag
     return -2;
 }
 
-extern REMOTE_API int setDafileSvrThrottleLimit(const SocketEndpoint &_ep, unsigned throttleLimit, unsigned throttleDelayMs, unsigned throttleCPULimit)
+extern REMOTE_API int setDafileSvrThrottleLimit(const SocketEndpoint &_ep, ThrottleClass throttleClass, unsigned throttleLimit, unsigned throttleDelayMs, unsigned throttleCPULimit, unsigned queueLimit, StringBuffer *errMsg)
 {
     SocketEndpoint ep(_ep);
     setDafsEndpointPort(ep);
@@ -512,7 +512,7 @@ extern REMOTE_API int setDafileSvrThrottleLimit(const SocketEndpoint &_ep, unsig
         return -3;
     try {
         Owned<ISocket> socket = ISocket::connect_wait(ep,5000);
-        return setDafsThrottleLimit(socket, throttleLimit, throttleDelayMs, throttleCPULimit);
+        return setDafsThrottleLimit(socket, throttleClass, throttleLimit, throttleDelayMs, throttleCPULimit, queueLimit, errMsg);
     }
     catch (IException *e)
     {
@@ -522,7 +522,7 @@ extern REMOTE_API int setDafileSvrThrottleLimit(const SocketEndpoint &_ep, unsig
     return -2;
 }
 
-extern REMOTE_API int getDafileSvrInfo(const SocketEndpoint &_ep,StringBuffer &retstr)
+extern REMOTE_API int getDafileSvrInfo(const SocketEndpoint &_ep, unsigned level, StringBuffer &retstr)
 {
     SocketEndpoint ep(_ep);
     setDafsEndpointPort(ep);
@@ -530,7 +530,7 @@ extern REMOTE_API int getDafileSvrInfo(const SocketEndpoint &_ep,StringBuffer &r
         return false;
     try {
         Owned<ISocket> socket = ISocket::connect_wait(ep,5000);
-        return getDafsInfo(socket, retstr);
+        return getDafsInfo(socket, level, retstr);
     }
     catch (IException *e)
     {

+ 4 - 2
common/remote/rmtfile.hpp

@@ -21,6 +21,8 @@
 #include "jsocket.hpp"
 #include "jfile.hpp"
 
+#include "sockfile.hpp"
+
 #ifdef REMOTE_EXPORTS
 #define REMOTE_API __declspec(dllexport)
 #else
@@ -68,8 +70,8 @@ extern REMOTE_API int remoteExec(const SocketEndpoint &ep,const char *cmdline, c
 extern REMOTE_API void remoteExtractBlobElements(const char * prefix, const RemoteFilename &file, ExtractedBlobArray & extracted);
 
 extern REMOTE_API int setDafileSvrTraceFlags(const SocketEndpoint &ep,byte flags);
-extern REMOTE_API int setDafileSvrThrottleLimit(const SocketEndpoint &_ep, unsigned throttleLimit, unsigned throttleDelayMs, unsigned throttleCPULimit);
-extern REMOTE_API int getDafileSvrInfo(const SocketEndpoint &ep,StringBuffer &retstr);
+extern REMOTE_API int setDafileSvrThrottleLimit(const SocketEndpoint &_ep, ThrottleClass throttleClass, unsigned throttleLimit, unsigned throttleDelayMs, unsigned throttleCPULimit, unsigned queueLimit, StringBuffer *errMsg=NULL);
+extern REMOTE_API int getDafileSvrInfo(const SocketEndpoint &ep, unsigned level, StringBuffer &retstr);
 
 extern REMOTE_API void disconnectRemoteFile(IFile *file);
 extern REMOTE_API void disconnectRemoteIoOnExit(IFileIO *fileio,bool set=true);

Diferenças do arquivo suprimidas por serem muito extensas
+ 1310 - 949
common/remote/sockfile.cpp


+ 29 - 11
common/remote/sockfile.hpp

@@ -29,33 +29,51 @@
 
 #define RFEnoerror      0
 
-interface IRemoteFileServer : public IInterface
+enum ThrottleClass
+{
+    ThrottleStd,
+    ThrottleSlow,
+    ThrottleClassMax
+};
+
+// RemoteFileServer throttling defaults
+#define DEFAULT_THREADLIMIT 100
+#define DEFAULT_THREADLIMITDELAYMS (60*1000)
+#define DEFAULT_ASYNCCOPYMAX 10
+
+#define DEFAULT_STDCMD_PARALLELREQUESTLIMIT 80
+#define DEFAULT_STDCMD_THROTTLEDELAYMS 1000
+#define DEFAULT_STDCMD_THROTTLECPULIMIT 85
+#define DEFAULT_STDCMD_THROTTLEQUEUELIMIT 1000
+
+#define DEFAULT_SLOWCMD_PARALLELREQUESTLIMIT 20
+#define DEFAULT_SLOWCMD_THROTTLEDELAYMS 5000
+#define DEFAULT_SLOWCMD_THROTTLECPULIMIT 75
+#define DEFAULT_SLOWCMD_THROTTLEQUEUELIMIT 1000
+
+interface IRemoteFileServer : extends IInterface
 {
-public:
     virtual void run(SocketEndpoint &listenep, bool useSSL = false) = 0;
     virtual void stop() = 0;
     virtual unsigned idleTime() = 0; // in ms
+    virtual void setThrottle(ThrottleClass throttleClass, unsigned limit, unsigned delayMs=DEFAULT_STDCMD_THROTTLEDELAYMS, unsigned cpuThreshold=DEFAULT_STDCMD_THROTTLECPULIMIT, unsigned queueLimit=DEFAULT_STDCMD_THROTTLEQUEUELIMIT) = 0;
+    virtual StringBuffer &getStats(StringBuffer &stats, bool reset) = 0;
 };
 
-#define FILESRV_VERSION 18 // don't forget VERSTRING in sockfile.cpp
-
-// RemoteFileServer throttling defaults
-#define DEFAULT_PARALLELREQUESTLIMIT 20
-#define DEFAULT_THROTTLEDELAYMS 5000
-#define DEFAULT_THROTTLECPULIMIT 75
+#define FILESRV_VERSION 19 // don't forget VERSTRING in sockfile.cpp
 
 extern REMOTE_API IFile * createRemoteFile(SocketEndpoint &ep,const char * _filename); // takes ownershop of socket
 extern REMOTE_API unsigned getRemoteVersion(ISocket * _socket, StringBuffer &ver);
 extern REMOTE_API unsigned stopRemoteServer(ISocket * _socket);
 extern REMOTE_API const char *remoteServerVersionString();
-extern REMOTE_API IRemoteFileServer * createRemoteFileServer(unsigned throttleLimit=DEFAULT_PARALLELREQUESTLIMIT, unsigned throttleDelayMs=DEFAULT_THROTTLEDELAYMS, unsigned throttleCPULimit=DEFAULT_THROTTLECPULIMIT);
+extern REMOTE_API IRemoteFileServer * createRemoteFileServer(unsigned maxThreads=DEFAULT_THREADLIMIT, unsigned maxThreadsDelayMs=DEFAULT_THREADLIMITDELAYMS, unsigned maxAsyncCopy=DEFAULT_ASYNCCOPYMAX);
 extern REMOTE_API int setDafsTrace(ISocket * socket,byte flags);
-extern REMOTE_API int setDafsThrottleLimit(ISocket * socket, unsigned throttleLimit, unsigned throttleDelayMs, unsigned throttleCPULimit);
+extern REMOTE_API int setDafsThrottleLimit(ISocket * socket, ThrottleClass throttleClass, unsigned throttleLimit, unsigned throttleDelayMs, unsigned throttleCPULimit, unsigned queueLimit, StringBuffer *errMsg=NULL);
 extern REMOTE_API bool enableDafsAuthentication(bool on);
 extern int remoteExec(ISocket * socket, const char *cmdline, const char *workdir,bool sync,
                 size32_t insize, void *inbuf, MemoryBuffer *outbuf);
 extern void remoteExtractBlobElements(const SocketEndpoint &ep, const char * prefix, const char * filename, ExtractedBlobArray & extracted);
-extern int getDafsInfo(ISocket * socket,StringBuffer &retstr);
+extern int getDafsInfo(ISocket * socket, unsigned level, StringBuffer &retstr);
 extern void setDafsEndpointPort(SocketEndpoint &ep);
 extern void setDafsLocalMountRedirect(const IpAddress &ip,const char *dir,const char *mountdir);
 

+ 62 - 11
dali/dafilesrv/dafilesrv.cpp

@@ -354,9 +354,17 @@ int main(int argc,char **argv)
     unsigned short  dafsPort;//DAFILESRV_PORT or SECURE_DAFILESRV_PORT
     querySecuritySettings(&useSSL, &dafsPort, &sslCertFile, NULL);
 
-    unsigned parallelRequestLimit = DEFAULT_PARALLELREQUESTLIMIT;
-    unsigned throttleDelayMs = DEFAULT_THROTTLEDELAYMS;
-    unsigned throttleCPULimit = DEFAULT_THROTTLECPULIMIT;
+    unsigned maxThreads = DEFAULT_THREADLIMIT;
+    unsigned maxThreadsDelayMs = DEFAULT_THREADLIMITDELAYMS;
+    unsigned maxAsyncCopy = DEFAULT_ASYNCCOPYMAX;
+    unsigned parallelRequestLimit = DEFAULT_STDCMD_PARALLELREQUESTLIMIT;
+    unsigned throttleDelayMs = DEFAULT_STDCMD_THROTTLEDELAYMS;
+    unsigned throttleCPULimit = DEFAULT_STDCMD_THROTTLECPULIMIT;
+    unsigned throttleQueueLimit = DEFAULT_STDCMD_THROTTLEQUEUELIMIT;
+    unsigned parallelSlowRequestLimit = DEFAULT_SLOWCMD_PARALLELREQUESTLIMIT;
+    unsigned throttleSlowDelayMs = DEFAULT_SLOWCMD_THROTTLEDELAYMS;
+    unsigned throttleSlowCPULimit = DEFAULT_SLOWCMD_THROTTLECPULIMIT;
+    unsigned throttleSlowQueueLimit = DEFAULT_SLOWCMD_THROTTLEQUEUELIMIT;
 
     Owned<IPropertyTree> env = getHPCCEnvironment();
     if (env)
@@ -368,9 +376,20 @@ int main(int argc,char **argv)
         if (daFileSrv)
         {
             // global DaFileSrv settings:
-            parallelRequestLimit = daFileSrv->getPropInt("@parallelRequestLimit", DEFAULT_PARALLELREQUESTLIMIT);
-            throttleDelayMs = daFileSrv->getPropInt("@throttleDelayMs", DEFAULT_THROTTLEDELAYMS);
-            throttleCPULimit = daFileSrv->getPropInt("@throttleCPULimit", DEFAULT_THROTTLECPULIMIT);
+
+            maxThreads = daFileSrv->getPropInt("@maxThreads", DEFAULT_THREADLIMIT);
+            maxThreadsDelayMs = daFileSrv->getPropInt("@maxThreadsDelayMs", DEFAULT_THREADLIMITDELAYMS);
+            maxAsyncCopy = daFileSrv->getPropInt("@maxAsyncCopy", DEFAULT_ASYNCCOPYMAX);
+
+            parallelRequestLimit = daFileSrv->getPropInt("@parallelRequestLimit", DEFAULT_STDCMD_PARALLELREQUESTLIMIT);
+            throttleDelayMs = daFileSrv->getPropInt("@throttleDelayMs", DEFAULT_STDCMD_THROTTLEDELAYMS);
+            throttleCPULimit = daFileSrv->getPropInt("@throttleCPULimit", DEFAULT_STDCMD_THROTTLECPULIMIT);
+            throttleQueueLimit = daFileSrv->getPropInt("@throttleQueueLimit", DEFAULT_STDCMD_THROTTLEQUEUELIMIT);
+
+            parallelSlowRequestLimit = daFileSrv->getPropInt("@parallelSlowRequestLimit", DEFAULT_SLOWCMD_PARALLELREQUESTLIMIT);
+            throttleSlowDelayMs = daFileSrv->getPropInt("@throttleSlowDelayMs", DEFAULT_SLOWCMD_THROTTLEDELAYMS);
+            throttleSlowCPULimit = daFileSrv->getPropInt("@throttleSlowCPULimit", DEFAULT_SLOWCMD_THROTTLECPULIMIT);
+            throttleSlowQueueLimit = daFileSrv->getPropInt("@throttleSlowQueueLimit", DEFAULT_SLOWCMD_THROTTLEQUEUELIMIT);
 
             // any overrides by Instance definitions?
             // NB: This won't work if netAddress is "." or if we start supporting hostnames there
@@ -380,9 +399,19 @@ int main(int argc,char **argv)
             IPropertyTree *dafileSrvInstance = daFileSrv->queryPropTree(daFileSrvPath);
             if (dafileSrvInstance)
             {
+                maxThreads = dafileSrvInstance->getPropInt("@maxThreads", maxThreads);
+                maxThreadsDelayMs = dafileSrvInstance->getPropInt("@maxThreadsDelayMs", maxThreadsDelayMs);
+                maxAsyncCopy = dafileSrvInstance->getPropInt("@maxAsyncCopy", maxAsyncCopy);
+
                 parallelRequestLimit = dafileSrvInstance->getPropInt("@parallelRequestLimit", parallelRequestLimit);
                 throttleDelayMs = dafileSrvInstance->getPropInt("@throttleDelayMs", throttleDelayMs);
                 throttleCPULimit = dafileSrvInstance->getPropInt("@throttleCPULimit", throttleCPULimit);
+                throttleQueueLimit = dafileSrvInstance->getPropInt("@throttleQueueLimit", throttleQueueLimit);
+
+                parallelSlowRequestLimit = dafileSrvInstance->getPropInt("@parallelSlowRequestLimit", parallelSlowRequestLimit);
+                throttleSlowDelayMs = dafileSrvInstance->getPropInt("@throttleSlowDelayMs", throttleSlowDelayMs);
+                throttleSlowCPULimit = dafileSrvInstance->getPropInt("@throttleSlowCPULimit", throttleSlowCPULimit);
+                throttleSlowQueueLimit = dafileSrvInstance->getPropInt("@throttleSlowQueueLimit", throttleSlowQueueLimit);
             }
         }
     }
@@ -585,7 +614,9 @@ int main(int argc,char **argv)
                 PROGLOG("Version: %s", verstring);
                 PROGLOG("Authentication:%s required",requireauthenticate?"":" not");
                 PROGLOG(DAFS_SERVICE_DISPLAY_NAME " Running");
-                server.setown(createRemoteFileServer(parallelRequestLimit, throttleDelayMs, throttleCPULimit));
+                server.setown(createRemoteFileServer(maxThreads, maxThreadsDelayMs, maxAsyncCopy));
+                server->setThrottle(ThrottleStd, parallelRequestLimit, throttleDelayMs, throttleCPULimit);
+                server->setThrottle(ThrottleSlow, parallelSlowRequestLimit, throttleSlowDelayMs, throttleSlowCPULimit);
                 try {
                     server->run(listenep, useSSL);
                 }
@@ -624,16 +655,36 @@ int main(int argc,char **argv)
     PROGLOG("Opening Dali File Server on %s%s", useSSL?"SECURE ":"",eps.str());
     PROGLOG("Version: %s", verstring);
     PROGLOG("Authentication:%s required",requireauthenticate?"":" not");
-    startPerformanceMonitor(10*60*1000, PerfMonStandard);
-    server.setown(createRemoteFileServer(parallelRequestLimit, throttleDelayMs, throttleCPULimit));
+    server.setown(createRemoteFileServer(maxThreads, maxThreadsDelayMs, maxAsyncCopy));
+    server->setThrottle(ThrottleStd, parallelRequestLimit, throttleDelayMs, throttleCPULimit);
+    server->setThrottle(ThrottleSlow, parallelSlowRequestLimit, throttleSlowDelayMs, throttleSlowCPULimit);
+    class CPerfHook : public CSimpleInterfaceOf<IPerfMonHook>
+    {
+    public:
+        virtual void processPerfStats(unsigned processorUsage, unsigned memoryUsage, unsigned memoryTotal, unsigned __int64 fistDiskUsage, unsigned __int64 firstDiskTotal, unsigned __int64 secondDiskUsage, unsigned __int64 secondDiskTotal, unsigned threadCount)
+        {
+        }
+        virtual StringBuffer &extraLogging(StringBuffer &extra)
+        {
+            return server->getStats(extra.newline(), true);
+        }
+        virtual void log(int level, const char *msg)
+        {
+            PROGLOG("%s", msg);
+        }
+    } perfHook;
+    startPerformanceMonitor(10*60*1000, PerfMonStandard, &perfHook);
     writeSentinelFile(sentinelFile);
-    try {
+    try
+    {
         server->run(listenep, useSSL);
     }
-    catch (IException *e) {
+    catch (IException *e)
+    {
         EXCLOG(e,"DAFILESRV");
         e->Release();
     }
+    stopPerformanceMonitor();
     if (server)
         server->stop();
     server.clear();

+ 46 - 7
dali/dafilesrv/dafscontrol.cpp

@@ -47,7 +47,8 @@ void usage()
     printf("  dafscontrol [<dali-ip>] CHECKVERMAJOR <ip-or-cluster>\n");
     printf("  dafscontrol [<dali-ip>] TRACE <ip> <num>\n");
     printf("  dafscontrol [<dali-ip>] CHKDSK <ip> <num>\n");
-    printf("  dafscontrol [<dali-ip>] THROTTLE <ip> <limit> <ms-delay> <cpu-limit>\n");
+    printf("  dafscontrol [<dali-ip>] INFO <ip-or-clsuter> [level]\n");
+    printf("  dafscontrol [<dali-ip>] THROTTLE <ip-or-cluster> <class> <limit> <ms-delay> <cpu-limit> <queue-limit>\n");
     printf("  dafscontrol MYVER\n");
     exit(1);
 }
@@ -81,7 +82,7 @@ bool getCluster(const char *clustername,SocketEndpointArray &eps)
     unsigned p = getDaliServixPort();
     for (unsigned i=0;i<n;i++) {
         SocketEndpoint ep(p,grp->queryNode(i).endpoint());
-        eps.append(ep);
+        eps.appendUniq(ep);
     }
     return eps.ordinality()!=0;
 }
@@ -388,23 +389,61 @@ int main(int argc, char* argv[])
                 }
                 break;
             }
+            if (stricmp(argv[ai], "info")==0) {
+                if (ai+1>=ac)
+                    usage();
+                else {
+                    SocketEndpointArray eps;
+                    StringBuffer errMsg;
+                    unsigned level=1;
+                    if (ac-(ai+1)>1)
+                        level = atoi(argv[ai+2]);
+                    PROGLOG("Info level = %u", level);
+                    if (!isdali||!getCluster(argv[ai+1],eps)) {
+                        SocketEndpoint ep(argv[ai+1]);
+                        StringBuffer epStr;
+                        ep.getUrlStr(epStr);
+                        VStringBuffer result("Info for %s", epStr.str());
+                        int ret = getDafileSvrInfo(ep, level, result);
+                        if (ret!=0)
+                            ERRLOG("getDafileSvrInfo for %s returned %d", epStr.str(), ret);
+                        else
+                            PROGLOG("%s", result.str());
+                    }
+                    else {
+                        ForEachItemIn(ni,eps) {
+                            SocketEndpoint ep = eps.item(ni);
+                            StringBuffer epStr;
+                            ep.getUrlStr(epStr);
+                            VStringBuffer result("Info for %s: ", epStr.str());
+                            int ret = getDafileSvrInfo(ep, level, result);
+                            if (ret!=0)
+                                ERRLOG("getDafileSvrInfo for %s returned %d", epStr.str(), ret);
+                            else
+                                PROGLOG("%s", result.str());
+                        }
+                    }
+                }
+                break;
+            }
             if (stricmp(argv[ai],"throttle")==0) {
-                if (ai+4>=ac)
+                if (ai+6>=ac)
                     usage();
                 else {
                     SocketEndpointArray eps;
+                    StringBuffer errMsg;
                     if (!isdali||!getCluster(argv[ai+1],eps)) {
                         SocketEndpoint ep(argv[ai+1]);
-                        int ret = setDafileSvrThrottleLimit(ep, atoi(argv[ai+2]), atoi(argv[ai+3]), atoi(argv[ai+4]));
+                        int ret = setDafileSvrThrottleLimit(ep, (ThrottleClass)atoi(argv[ai+2]), atoi(argv[ai+3]), atoi(argv[ai+4]), atoi(argv[ai+5]), atoi(argv[ai+6]), &errMsg);
                         if (ret!=0)
-                            ERRLOG("setDafileSvrThrottleLimit returned %d", ret);
+                            ERRLOG("setDafileSvrThrottleLimit returned %d, error = %s", ret, errMsg.str());
                     }
                     else {
                         ForEachItemIn(ni,eps) {
                             SocketEndpoint ep = eps.item(ni);
-                            int ret = setDafileSvrThrottleLimit(ep, atoi(argv[ai+2]), atoi(argv[ai+3]), atoi(argv[ai+4]));
+                            int ret = setDafileSvrThrottleLimit(ep, (ThrottleClass)atoi(argv[ai+2]), atoi(argv[ai+3]), atoi(argv[ai+4]), atoi(argv[ai+5]), atoi(argv[ai+6]), &errMsg.clear());
                             if (ret!=0)
-                                ERRLOG("setDafileSvrThrottleLimit returned %d", ret);
+                                ERRLOG("setDafileSvrThrottleLimit returned %d, error = %s", ret, errMsg.str());
                             StringBuffer s("done ");
                             ep.getUrlStr(s);
                             PROGLOG("%s",s.str());

+ 3 - 1
dali/dfuplus/dfuplus.cpp

@@ -65,7 +65,9 @@ public:
         else
             listenep.getUrlStr(eps);
         enableDafsAuthentication(requireauthenticate);
-        server.setown(createRemoteFileServer(0)); // no throttle limiting
+        server.setown(createRemoteFileServer());
+        server->setThrottle(ThrottleStd, 0); // disable throttling
+        server->setThrottle(ThrottleSlow, 0); // disable throttling
     }
 
     int run()