浏览代码

Merge pull request #11723 from jakesmith/hpcc-20554

HPCC-20554 Split remote row service into [optional] separate port

Reviewed-By: Mark Kelly <mark.kelly@lexisnexis.com>
Reviewed-By: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 6 年之前
父节点
当前提交
4433fd632d

+ 44 - 0
common/environment/environment.cpp

@@ -273,6 +273,7 @@ public:
         synchronized procedure(safeCache);
         return fileAccessUrl.length() ? fileAccessUrl.str() : nullptr;
     }
+    virtual IConstDaFileSrvInfo *getDaFileSrvGroupInfo(const char *name) const override;
 };
 
 class CLockedEnvironment : implements IEnvironment, public CInterface
@@ -393,6 +394,8 @@ public:
             { return c->getPrivateKeyPath(keyPairName); }
     virtual const char *getFileAccessUrl() const
             { return c->getFileAccessUrl(); }
+    virtual IConstDaFileSrvInfo *getDaFileSrvGroupInfo(const char *name) const override
+            { return c->getDaFileSrvGroupInfo(name); }
 };
 
 void CLockedEnvironment::commit()
@@ -1128,6 +1131,29 @@ public:
 };
 #endif
 
+class CConstDaFileSrvInfo : public CConstEnvBase, implements IConstDaFileSrvInfo
+{
+public:
+    IMPLEMENT_IINTERFACE;
+    IMPLEMENT_ICONSTENVBASE;
+    CConstDaFileSrvInfo(const CLocalEnvironment *env, IPropertyTree *root) : CConstEnvBase(env, root)
+    {
+    }
+    virtual const char *getName() const override
+    {
+        return root->queryProp("@name");
+    }
+    virtual unsigned getPort() const override
+    {
+        return root->getPropInt("@rowServicePort");
+    }
+    virtual bool getSecure() const override
+    {
+        return root->getPropBool("@rowServiceSSL");
+    }
+};
+
+
 //==========================================================================================
 
 CLocalEnvironment::CLocalEnvironment(const char* environmentFile)
@@ -1777,6 +1803,24 @@ bool CLocalEnvironment::isDropZoneRestrictionEnabled() const
     return dropZoneRestrictionEnabled;
 }
 
+IConstDaFileSrvInfo *CLocalEnvironment::getDaFileSrvGroupInfo(const char *name) const
+{
+    if (!name)
+        return nullptr;
+    VStringBuffer xpath("Software/DafilesrvGroup[@name=\"%s\"]", name);
+    synchronized procedure(safeCache);
+    IConstEnvBase *cached = getCache(xpath.str());
+    if (!cached)
+    {
+        IPropertyTree *d = p->queryPropTree(xpath.str());
+        if (!d)
+            return nullptr;
+        cached = new CConstDaFileSrvInfo(this, d);
+        setCache(xpath.str(), cached);
+    }
+    return (IConstDaFileSrvInfo *) cached;
+}
+
 //==========================================================================================
 // Iterators implementation
 

+ 8 - 0
common/environment/environment.hpp

@@ -127,6 +127,13 @@ interface  IConstDropZoneInfoIterator : extends IIteratorOf<IConstDropZoneInfo>
     virtual unsigned count() const = 0;
 };
 
+interface IConstDaFileSrvInfo : extends IConstEnvBase
+{
+    virtual const char *getName() const = 0;
+    virtual unsigned getPort() const = 0;
+    virtual bool getSecure() const = 0;
+};
+
 interface IConstEnvironment : extends IConstEnvBase
 {
     virtual IConstDomainInfo * getDomain(const char * name) const = 0;
@@ -151,6 +158,7 @@ interface IConstEnvironment : extends IConstEnvBase
     virtual const char *getPublicKeyPath(const char *keyPairName) const = 0;
     virtual const char *getPrivateKeyPath(const char *keyPairName) const = 0;
     virtual const char *getFileAccessUrl() const = 0;
+    virtual IConstDaFileSrvInfo *getDaFileSrvGroupInfo(const char *name) const = 0;
 };
 
 

+ 185 - 69
common/remote/sockfile.cpp

@@ -242,6 +242,11 @@ static ISecureSocket *createSecureSocket(ISocket *sock, SecureSocketType type)
     else
         return secureContextClient->createSecureSocket(sock, loglevel);
 }
+#else
+static ISecureSocket *createSecureSocket(ISocket *sock, SecureSocketType type)
+{
+    throwUnexpected();
+}
 #endif
 
 void clientSetRemoteFileTimeouts(unsigned maxconnecttime,unsigned maxreadtime)
@@ -4673,6 +4678,7 @@ class CRemoteFileServer : implements IRemoteFileServer, public CInterface
     class CThrottler;
     class CRemoteClientHandler : implements ISocketSelectNotify, public CInterface
     {
+        bool calledByRowService;
     public:
         CRemoteFileServer *parent;
         Owned<ISocket> socket;
@@ -4690,8 +4696,8 @@ class CRemoteFileServer : implements IRemoteFileServer, public CInterface
 
         IMPLEMENT_IINTERFACE;
 
-        CRemoteClientHandler(CRemoteFileServer *_parent,ISocket *_socket,IAuthenticatedUser *_user,atomic_t &_globallasttick)
-            : socket(_socket), user(_user), globallasttick(_globallasttick)
+        CRemoteClientHandler(CRemoteFileServer *_parent,ISocket *_socket,IAuthenticatedUser *_user,atomic_t &_globallasttick, bool _calledByRowService)
+            : socket(_socket), user(_user), globallasttick(_globallasttick), calledByRowService(_calledByRowService)
         {
             previdx = (unsigned)-1;
             StringBuffer peerBuf;
@@ -4750,6 +4756,7 @@ class CRemoteFileServer : implements IRemoteFileServer, public CInterface
                 e->Release();
             }
         }
+        bool isRowServiceClient() const { return calledByRowService; }
         bool notifySelected(ISocket *sock,unsigned selected)
         {
             if (TF_TRACE_FULL)
@@ -5295,6 +5302,11 @@ class CRemoteFileServer : implements IRemoteFileServer, public CInterface
     CriticalSection     sect;
     Owned<ISocket>      acceptsock;
     Owned<ISocket>      securesock;
+    Owned<ISocket>      rowServiceSock;
+
+    bool rowServiceOnStdPort = true; // should row service commands be processed on std. service port
+    bool rowServiceSSL = false;
+
     Owned<ISocketSelectHandler> selecthandler;
     Owned<IThreadPool>  threads;    // for commands
     bool stopping;
@@ -5305,7 +5317,6 @@ class CRemoteFileServer : implements IRemoteFileServer, public CInterface
     CClientStatsTable clientStatsTable;
     atomic_t globallasttick;
     unsigned targetActiveThreads;
-    bool authorizedOnly;
     Owned<IPropertyTree> keyPairInfo;
 
     int getNextHandle()
@@ -5470,12 +5481,24 @@ class CRemoteFileServer : implements IRemoteFileServer, public CInterface
         inMb.setLength(inPos);
         return compressMb.capacity() > replyLimit;
     }
+
+    void validateSSLSetup()
+    {
+        if (!securitySettings.certificate)
+            throw createDafsException(DAFSERR_serverinit_failed, "SSL Certificate information not found in environment.conf");
+        if (!checkFileExists(securitySettings.certificate))
+            throw createDafsException(DAFSERR_serverinit_failed, "SSL Certificate File not found in environment.conf");
+        if (!securitySettings.privateKey)
+            throw createDafsException(DAFSERR_serverinit_failed, "SSL Key information not found in environment.conf");
+        if (!checkFileExists(securitySettings.privateKey))
+            throw createDafsException(DAFSERR_serverinit_failed, "SSL Key File not found in environment.conf");
+    }
 public:
 
     IMPLEMENT_IINTERFACE
 
-    CRemoteFileServer(unsigned maxThreads, unsigned maxThreadsDelayMs, unsigned maxAsyncCopy, bool _authorizedOnly, IPropertyTree *_keyPairInfo)
-        : asyncCommandManager(maxAsyncCopy), stdCmdThrottler("stdCmdThrotlter"), slowCmdThrottler("slowCmdThrotlter"), authorizedOnly(_authorizedOnly), keyPairInfo(_keyPairInfo)
+    CRemoteFileServer(unsigned maxThreads, unsigned maxThreadsDelayMs, unsigned maxAsyncCopy, IPropertyTree *_keyPairInfo)
+        : asyncCommandManager(maxAsyncCopy), stdCmdThrottler("stdCmdThrotlter"), slowCmdThrottler("slowCmdThrotlter"), keyPairInfo(_keyPairInfo)
     {
         lasthandle = 0;
         selecthandler.setown(createSocketSelectHandler(NULL));
@@ -6289,6 +6312,8 @@ public:
             acceptsock->cancel_accept();
         if (securesock)
             securesock->cancel_accept();
+        if (rowServiceSock)
+            rowServiceSock->cancel_accept();
         reply.append((unsigned)RFEnoerror);
     }
 
@@ -6484,6 +6509,10 @@ public:
                     WARNLOG("Output compression not supported for format: %s", outputFmtStr);
             }
 
+            /* NB: unless client call is on dedicated service, allow non-authorized requests through, e.g. from engines talking to unsecured port
+             * In a secure setup, this service will be configured on a dedicated port, and the std. insecure dafilesrv will be unreachable.
+             */
+            bool authorizedOnly = rowServiceSock && client.isRowServiceClient();
 
             // In future this may be passed the request and build a chain of activities and return sink.
             outputActivity.setown(createOutputActivity(*requestTree, authorizedOnly, keyPairInfo));
@@ -6869,6 +6898,12 @@ public:
         }
     }
 
+    void checkAuthorizedStreamCommand(CRemoteClientHandler &client)
+    {
+        if (!rowServiceOnStdPort && !client.isRowServiceClient())
+            throw createDafsException(DAFSERR_cmdstream_unauthorized, "Unauthorized command");
+    }
+
     bool processCommand(RemoteFileCommandType cmd, MemoryBuffer & msg, MemoryBuffer & reply, CRemoteClientHandler *client, CThrottler *throttler)
     {
         Owned<CClientStats> stats = clientStatsTable.getClientReference(cmd, client->queryPeerName());
@@ -6909,14 +6944,31 @@ public:
                 MAPCOMMAND(RFCgetinfo, cmdGetInfo);
                 MAPCOMMAND(RFCfirewall, cmdFirewall);
                 MAPCOMMANDCLIENT(RFCunlock, cmdUnlock, *client);
-                MAPCOMMANDCLIENT(RFCStreamRead, cmdStreamReadStd, *client);
-                MAPCOMMANDCLIENT(RFCStreamReadJSON, cmdStreamReadJSON, *client);
-                MAPCOMMANDCLIENTTESTSOCKET(RFCStreamReadTestSocket, cmdStreamReadTestSocket, *client);
                 MAPCOMMANDCLIENT(RFCcopysection, cmdCopySection, *client);
                 MAPCOMMANDCLIENTTHROTTLE(RFCtreecopy, cmdTreeCopy, *client, &slowCmdThrottler);
                 MAPCOMMANDCLIENTTHROTTLE(RFCtreecopytmp, cmdTreeCopyTmp, *client, &slowCmdThrottler);
                 MAPCOMMAND(RFCsetthrottle, cmdSetThrottle); // legacy version
                 MAPCOMMAND(RFCsetthrottle2, cmdSetThrottle2);
+                // row service commands
+                case RFCStreamRead:
+                {
+                    checkAuthorizedStreamCommand(*client);
+                    cmdStreamReadStd(msg, reply, *client);
+                    break;
+                }
+                case RFCStreamReadJSON:
+                {
+                    checkAuthorizedStreamCommand(*client);
+                    cmdStreamReadJSON(msg, reply, *client);
+                    break;
+                }
+                case RFCStreamReadTestSocket:
+                {
+                    testSocketFlag = true;
+                    checkAuthorizedStreamCommand(*client);
+                    cmdStreamReadTestSocket(msg, reply, *client);
+                    break;
+                }
             default:
                 formatException(reply, nullptr, cmd, false, RFSERR_InvalidCommand, client);
                 break;
@@ -6935,27 +6987,27 @@ public:
         return new cCommandProcessor();
     }
 
-    void run(DAFSConnectCfg _connectMethod, SocketEndpoint &listenep, unsigned sslPort)
+    virtual void run(DAFSConnectCfg _connectMethod, const SocketEndpoint &listenep, unsigned sslPort, const SocketEndpoint *rowServiceEp, bool _rowServiceSSL, bool _rowServiceOnStdPort) override
     {
         SocketEndpoint sslep(listenep);
         if (sslPort)
             sslep.port = sslPort;
         else
             sslep.port = securitySettings.daFileSrvSSLPort;
-        Owned<ISocket> acceptSocket, acceptSSLSocket;
 
+        Owned<ISocket> acceptSock, secureSock, rowServiceSock;
         if (_connectMethod != SSLOnly)
         {
             if (listenep.port == 0)
                 throw createDafsException(DAFSERR_serverinit_failed, "dafilesrv port not specified");
 
             if (listenep.isNull())
-                acceptSocket.setown(ISocket::create(listenep.port));
+                acceptSock.setown(ISocket::create(listenep.port));
             else
             {
                 StringBuffer ips;
                 listenep.getIpText(ips);
-                acceptSocket.setown(ISocket::create_ip(listenep.port,ips.str()));
+                acceptSock.setown(ISocket::create_ip(listenep.port,ips.str()));
             }
         }
 
@@ -6983,45 +7035,57 @@ public:
                 }
             }
             else
-            {
-                if (!securitySettings.certificate)
-                    throw createDafsException(DAFSERR_serverinit_failed, "SSL Certificate information not found in environment.conf");
-                if (!checkFileExists(securitySettings.certificate))
-                    throw createDafsException(DAFSERR_serverinit_failed, "SSL Certificate File not found in environment.conf");
-                if (!securitySettings.privateKey)
-                    throw createDafsException(DAFSERR_serverinit_failed, "SSL Key information not found in environment.conf");
-                if (!checkFileExists(securitySettings.privateKey))
-                    throw createDafsException(DAFSERR_serverinit_failed, "SSL Key File not found in environment.conf");
-            }
+                validateSSLSetup();
 
             if (sslep.isNull())
-                acceptSSLSocket.setown(ISocket::create(sslep.port));
+                secureSock.setown(ISocket::create(sslep.port));
             else
             {
                 StringBuffer ips;
                 sslep.getIpText(ips);
-                acceptSSLSocket.setown(ISocket::create_ip(sslep.port,ips.str()));
+                secureSock.setown(ISocket::create_ip(sslep.port,ips.str()));
+            }
+        }
+
+        if (rowServiceEp)
+        {
+            rowServiceSSL = _rowServiceSSL;
+            rowServiceOnStdPort = _rowServiceOnStdPort;
+
+            if (rowServiceEp->isNull())
+                rowServiceSock.setown(ISocket::create(rowServiceEp->port));
+            else
+            {
+                StringBuffer ips;
+                rowServiceEp->getIpText(ips);
+                rowServiceSock.setown(ISocket::create_ip(rowServiceEp->port, ips.str()));
             }
+
+#ifdef _USE_OPENSSL
+            if (rowServiceSSL)
+                validateSSLSetup();
+#else
+            rowServiceSSL = false;
+#endif
         }
 
-        run(_connectMethod, acceptSocket.getClear(), acceptSSLSocket.getClear());
+        run(_connectMethod, acceptSock.getClear(), secureSock.getClear(), rowServiceSock.getClear());
     }
 
-    void run(DAFSConnectCfg _connectMethod, ISocket *regSocket, ISocket *secureSocket)
+    void run(DAFSConnectCfg _connectMethod, ISocket *_acceptSock, ISocket *_secureSock, ISocket *_rowServiceSock)
     {
+        acceptsock.setown(_acceptSock);
+        securesock.setown(_secureSock);
+        rowServiceSock.setown(_rowServiceSock);
         if (_connectMethod != SSLOnly)
         {
-            if (regSocket)
-                acceptsock.setown(regSocket);
-            else
+            if (!acceptsock)
                 throw createDafsException(DAFSERR_serverinit_failed, "Invalid non-secure socket");
         }
 
         if (_connectMethod == SSLOnly || _connectMethod == SSLFirst || _connectMethod == UnsecureFirst)
         {
-            if (secureSocket)
-                securesock.setown(secureSocket);
-            else
+            if (!securesock)
                 throw createDafsException(DAFSERR_serverinit_failed, "Invalid secure socket");
         }
 
@@ -7031,32 +7095,41 @@ public:
         {
             Owned<ISocket> sock;
             Owned<ISocket> sockSSL;
+            Owned<ISocket> acceptedRSSock;
             bool sockavail = false;
             bool securesockavail = false;
-            if (_connectMethod == SSLNone)
+            bool rowServiceSockAvail = false;
+            if (_connectMethod == SSLNone && (nullptr == rowServiceSock.get()))
                 sockavail = acceptsock->wait_read(1000*60*1)!=0;
-            else if (_connectMethod == SSLOnly)
+            else if (_connectMethod == SSLOnly && (nullptr == rowServiceSock.get()))
                 securesockavail = securesock->wait_read(1000*60*1)!=0;
             else
             {
                 UnsignedArray readSocks;
                 UnsignedArray waitingSocks;
-                readSocks.append(acceptsock->OShandle());
-                readSocks.append(securesock->OShandle());
+                if (acceptsock)
+                    readSocks.append(acceptsock->OShandle());
+                if (securesock)
+                    readSocks.append(securesock->OShandle());
+                if (rowServiceSock)
+                    readSocks.append(rowServiceSock->OShandle());
                 int numReady = wait_read_multiple(readSocks, 1000*60*1, waitingSocks);
                 if (numReady > 0)
                 {
                     for (int idx = 0; idx < numReady; idx++)
                     {
-                        if (waitingSocks.item(idx) == acceptsock->OShandle())
+                        unsigned waitingSock = waitingSocks.item(idx);
+                        if (acceptsock && (waitingSock == acceptsock->OShandle()))
                             sockavail = true;
-                        else if (waitingSocks.item(idx) == securesock->OShandle())
+                        else if (securesock && (waitingSock == securesock->OShandle()))
                             securesockavail = true;
+                        else if (rowServiceSock && (waitingSock == rowServiceSock->OShandle()))
+                            rowServiceSockAvail = true;
                     }
                 }
             }
 #if 0
-            if (!sockavail && !securesockavail)
+            if (!sockavail && !securesockavail && !rowServiceSockAvail)
             {
                 JSocketStatistics stats;
                 getSocketStatistics(stats);
@@ -7069,7 +7142,7 @@ public:
             if (stopping)
                 break;
 
-            if (sockavail || securesockavail)
+            if (sockavail || securesockavail || rowServiceSockAvail)
             {
                 if (sockavail)
                 {
@@ -7078,13 +7151,6 @@ public:
                         sock.setown(acceptsock->accept(true));
                         if (!sock||stopping)
                             break;
-#ifdef _DEBUG
-                        SocketEndpoint eps;
-                        sock->getPeerEndpoint(eps);
-                        StringBuffer sb;
-                        eps.getUrlStr(sb);
-                        PROGLOG("Server accepting from %s", sb.str());
-#endif
                     }
                     catch (IException *e)
                     {
@@ -7112,22 +7178,11 @@ public:
                         }
                         else
                         {
-#ifdef _USE_OPENSSL
                             ssock.setown(createSecureSocket(sockSSL.getClear(), ServerSocket));
                             int status = ssock->secure_accept();
                             if (status < 0)
                                 throw createDafsException(DAFSERR_serveraccept_failed,"Failure to establish secure connection");
                             sockSSL.setown(ssock.getLink());
-#else
-                            throw createDafsException(DAFSERR_serveraccept_failed,"Failure to establish secure connection: OpenSSL disabled in build");
-#endif
-#ifdef _DEBUG
-                            SocketEndpoint eps;
-                            sockSSL->getPeerEndpoint(eps);
-                            StringBuffer sb;
-                            eps.getUrlStr(sb);
-                            PROGLOG("Server accepting SECURE from %s", sb.str());
-#endif
                         }
                     }
                     catch (IJSOCK_Exception *e)
@@ -7149,11 +7204,76 @@ public:
                     }
                 }
 
+                if (rowServiceSockAvail)
+                {
+                    Owned<ISecureSocket> ssock;
+                    try
+                    {
+                        acceptedRSSock.setown(rowServiceSock->accept(true));
+                        if (!acceptedRSSock||stopping)
+                            break;
+
+                        if (rowServiceSSL) // NB: will be disabled if !_USE_OPENSLL
+                        {
+                            ssock.setown(createSecureSocket(acceptedRSSock.getClear(), ServerSocket));
+                            int status = ssock->secure_accept();
+                            if (status < 0)
+                                throw createDafsException(DAFSERR_serveraccept_failed,"Failure to establish SSL row service connection");
+                            acceptedRSSock.setown(ssock.getLink());
+                        }
+                    }
+                    catch (IJSOCK_Exception *e)
+                    {
+                        // accept failed ...
+                        EXCLOG(e,"CRemoteFileServer (row service)");
+                        e->Release();
+                        break;
+                    }
+                    catch (IException *e) // IDAFS_Exception also ...
+                    {
+                        EXCLOG(e,"CRemoteFileServer1 (row service)");
+                        e->Release();
+                        cleanupSocket(acceptedRSSock);
+                        sockSSL.clear();
+                        cleanupSocket(ssock);
+                        ssock.clear();
+                        rowServiceSockAvail = false;
+                    }
+                }
+
+#ifdef _DEBUG
+                SocketEndpoint eps;
+                StringBuffer peerURL;
+#endif
                 if (sockavail)
-                    runClient(sock.getClear());
+                {
+#ifdef _DEBUG
+                    sock->getPeerEndpoint(eps);
+                    eps.getUrlStr(peerURL);
+                    PROGLOG("Server accepting from %s", peerURL.str());
+#endif
+                    runClient(sock.getClear(), false);
+                }
 
                 if (securesockavail)
-                    runClient(sockSSL.getClear());
+                {
+#ifdef _DEBUG
+                    sockSSL->getPeerEndpoint(eps);
+                    eps.getUrlStr(peerURL.clear());
+                    PROGLOG("Server accepting SECURE from %s", peerURL.str());
+#endif
+                    runClient(sockSSL.getClear(), false);
+                }
+
+                if (rowServiceSockAvail)
+                {
+#ifdef _DEBUG
+                    acceptedRSSock->getPeerEndpoint(eps);
+                    eps.getUrlStr(peerURL.clear());
+                    PROGLOG("Server accepting row service socket from %s", peerURL.str());
+#endif
+                    runClient(acceptedRSSock.getClear(), true);
+                }
             }
             else
                 checkTimeout();
@@ -7246,7 +7366,7 @@ public:
         return false;
     }
 
-    void runClient(ISocket *sock)
+    void runClient(ISocket *sock, bool rowService) // rowService used to distinguish client calls
     {
         cCommandProcessor::cCommandProcessorParams params;
         IAuthenticatedUser *user=NULL;
@@ -7272,7 +7392,7 @@ public:
             }
             return;
         }
-        params.client = new CRemoteClientHandler(this,sock,user,globallasttick);
+        params.client = new CRemoteClientHandler(this, sock, user, globallasttick, rowService);
         {
             CriticalBlock block(sect);
             clients.append(*LINK(params.client));
@@ -7439,17 +7559,13 @@ public:
 };
 
 
-IRemoteFileServer * createRemoteFileServer(unsigned maxThreads, unsigned maxThreadsDelayMs, unsigned maxAsyncCopy, bool authorizedOnly, IPropertyTree *keyPairInfo)
+IRemoteFileServer * createRemoteFileServer(unsigned maxThreads, unsigned maxThreadsDelayMs, unsigned maxAsyncCopy, IPropertyTree *keyPairInfo)
 {
 #if SIMULATE_PACKETLOSS
     errorSimulationOn = false;
 #endif
 
-// NB: if no OOPENSSL, no authorization checks
-#ifndef _USE_OPENSSL
-    authorizedOnly = false;
-#endif
-    return new CRemoteFileServer(maxThreads, maxThreadsDelayMs, maxAsyncCopy, authorizedOnly, keyPairInfo);
+    return new CRemoteFileServer(maxThreads, maxThreadsDelayMs, maxAsyncCopy, keyPairInfo);
 }
 
 
@@ -7571,7 +7687,7 @@ protected:
             virtual void threadmain() override
             {
                 DAFSConnectCfg sslCfg = SSLNone;
-                server->run(sslCfg, socket, nullptr);
+                server->run(sslCfg, socket, nullptr, nullptr);
             }
         };
         enableDafsAuthentication(false);

+ 20 - 3
common/remote/sockfile.hpp

@@ -50,11 +50,28 @@ enum ThrottleClass
 #define DEFAULT_SLOWCMD_THROTTLECPULIMIT 75
 #define DEFAULT_SLOWCMD_THROTTLEQUEUELIMIT 1000
 
-#define DEFAULT_AUTHORIZED_ONLY false
+
+enum RowServiceCfg
+{
+    rs_off,     // No dedicated row service, allows row service commands on std. dafilesrv port.
+    rs_on,      // Dedicated row service on own port accepting authorized signed connections only. Row service commands on std. dafilersv port will be refused.
+    rs_both,    // Dedicated row service on own port accepting authorized signed connections only. Still accepts unsigned connection on std. dafilesrv port
+    rs_onssl,   // Same as rs_on, but SSL
+    rs_bothssl  // Same as rs_only, but SSL
+};
 
 interface IRemoteFileServer : extends IInterface
 {
-    virtual void run(DAFSConnectCfg connectMethod, SocketEndpoint &listenep, unsigned sslPort=0) = 0;
+    virtual void run(DAFSConnectCfg connectMethod, const SocketEndpoint &listenep, unsigned sslPort=0, const SocketEndpoint *rowServiceEp=nullptr, bool rowServiceSSL=false, bool rowServiceOnStdPort=true) = 0;
+    virtual void stop() = 0;
+    virtual unsigned idleTime() = 0; // in ms
+    virtual void setThrottle(ThrottleClass throttleClass, unsigned limit, unsigned delayMs=DEFAULT_STDCMD_THROTTLEDELAYMS, unsigned cpuThreshold=DEFAULT_STDCMD_THROTTLECPULIMIT, unsigned queueLimit=DEFAULT_STDCMD_THROTTLEQUEUELIMIT) = 0;
+    virtual StringBuffer &getStats(StringBuffer &stats, bool reset) = 0;
+};
+
+interface IRemoteRowServer : extends IInterface
+{
+    virtual void run(unsigned port=0) = 0;
     virtual void stop() = 0;
     virtual unsigned idleTime() = 0; // in ms
     virtual void setThrottle(ThrottleClass throttleClass, unsigned limit, unsigned delayMs=DEFAULT_STDCMD_THROTTLEDELAYMS, unsigned cpuThreshold=DEFAULT_STDCMD_THROTTLECPULIMIT, unsigned queueLimit=DEFAULT_STDCMD_THROTTLEQUEUELIMIT) = 0;
@@ -70,7 +87,7 @@ extern REMOTE_API IFile * createRemoteFile(SocketEndpoint &ep,const char * _file
 extern REMOTE_API unsigned getRemoteVersion(ISocket * _socket, StringBuffer &ver);
 extern REMOTE_API unsigned stopRemoteServer(ISocket * _socket);
 extern REMOTE_API const char *remoteServerVersionString();
-extern REMOTE_API IRemoteFileServer * createRemoteFileServer(unsigned maxThreads=DEFAULT_THREADLIMIT, unsigned maxThreadsDelayMs=DEFAULT_THREADLIMITDELAYMS, unsigned maxAsyncCopy=DEFAULT_ASYNCCOPYMAX, bool authorizedOnly=DEFAULT_AUTHORIZED_ONLY, IPropertyTree *keyPairInfo=nullptr);
+extern REMOTE_API IRemoteFileServer * createRemoteFileServer(unsigned maxThreads=DEFAULT_THREADLIMIT, unsigned maxThreadsDelayMs=DEFAULT_THREADLIMITDELAYMS, unsigned maxAsyncCopy=DEFAULT_ASYNCCOPYMAX, IPropertyTree *keyPairInfo=nullptr);
 extern REMOTE_API int setDafsTrace(ISocket * socket,byte flags);
 extern REMOTE_API int setDafsThrottleLimit(ISocket * socket, ThrottleClass throttleClass, unsigned throttleLimit, unsigned throttleDelayMs, unsigned throttleCPULimit, unsigned queueLimit, StringBuffer *errMsg=NULL);
 extern REMOTE_API bool enableDafsAuthentication(bool on);

+ 84 - 13
dali/dafilesrv/dafilesrv.cpp

@@ -32,6 +32,10 @@
 #pragma warning (disable : 4355)
 #endif
 
+static const bool defaultRowServiceOnStdPort = true;
+static const bool defaultDedicatedRowServiceSSL = false;
+
+
 
 #include "remoteerr.hpp"
 #include "sockfile.hpp"
@@ -380,7 +384,10 @@ int main(int argc,char **argv)
     unsigned throttleSlowDelayMs = DEFAULT_SLOWCMD_THROTTLEDELAYMS;
     unsigned throttleSlowCPULimit = DEFAULT_SLOWCMD_THROTTLECPULIMIT;
     unsigned throttleSlowQueueLimit = DEFAULT_SLOWCMD_THROTTLEQUEUELIMIT;
-    bool authorizedOnly = DEFAULT_AUTHORIZED_ONLY;
+
+    unsigned dedicatedRowServicePort = DEFAULT_ROWSERVICE_PORT;
+    bool dedicatedRowServiceSSL = defaultDedicatedRowServiceSSL;
+    bool rowServiceOnStdPort = defaultRowServiceOnStdPort;
 
     Owned<IPropertyTree> env = getHPCCEnvironment();
     IPropertyTree *keyPairInfo = nullptr;
@@ -408,16 +415,41 @@ int main(int argc,char **argv)
             throttleSlowCPULimit = daFileSrv->getPropInt("@throttleSlowCPULimit", DEFAULT_SLOWCMD_THROTTLECPULIMIT);
             throttleSlowQueueLimit = daFileSrv->getPropInt("@throttleSlowQueueLimit", DEFAULT_SLOWCMD_THROTTLEQUEUELIMIT);
 
-            authorizedOnly = daFileSrv->getPropBool("@authorizedOnly", DEFAULT_AUTHORIZED_ONLY);
+            dedicatedRowServicePort = daFileSrv->getPropInt("@rowServicePort", DEFAULT_ROWSERVICE_PORT);
+            dedicatedRowServiceSSL = daFileSrv->getPropBool("@rowServiceSSL", defaultDedicatedRowServiceSSL);
+            rowServiceOnStdPort = daFileSrv->getPropBool("@rowServiceOnStdPort", defaultRowServiceOnStdPort);
+
+            const char *groupName = daFileSrv->queryProp("@group");
 
             // any overrides by Instance definitions?
             // NB: This won't work if netAddress is "." or if we start supporting hostnames there
             StringBuffer ipStr;
             queryHostIP().getIpText(ipStr);
             VStringBuffer daFileSrvPath("Instance[@netAddress=\"%s\"]", ipStr.str());
-            IPropertyTree *dafileSrvInstance = daFileSrv->queryPropTree(daFileSrvPath);
-            if (dafileSrvInstance)
+            IPropertyTree *_dafileSrvInstance = daFileSrv->queryPropTree(daFileSrvPath);
+            if (_dafileSrvInstance)
             {
+                Owned<IPropertyTree> dafileSrvInstance;
+
+                // check if there's a DaFileSrvGroup
+                if (isEmptyString(groupName))
+                    groupName = _dafileSrvInstance->queryProp("@group");
+
+                if (!isEmptyString(groupName))
+                {
+                    VStringBuffer dafilesrvGroupPath("Software/DafilesrvGroup[@name=\"%s\"]", groupName);
+                    IPropertyTree *daFileSrvGroup = env->queryPropTree(dafilesrvGroupPath);
+                    if (daFileSrvGroup)
+                    {
+                        // create a copy of the instance settings and merge in any from group info.
+                        dafileSrvInstance.setown(createPTreeFromIPT(_dafileSrvInstance));
+
+                        // any group settings override defaults
+                        synchronizePTree(dafileSrvInstance, daFileSrvGroup, false);
+
+                        _dafileSrvInstance = dafileSrvInstance;
+                    }
+                }
                 maxThreads = dafileSrvInstance->getPropInt("@maxThreads", maxThreads);
                 maxThreadsDelayMs = dafileSrvInstance->getPropInt("@maxThreadsDelayMs", maxThreadsDelayMs);
                 maxAsyncCopy = dafileSrvInstance->getPropInt("@maxAsyncCopy", maxAsyncCopy);
@@ -432,11 +464,19 @@ int main(int argc,char **argv)
                 throttleSlowCPULimit = dafileSrvInstance->getPropInt("@throttleSlowCPULimit", throttleSlowCPULimit);
                 throttleSlowQueueLimit = dafileSrvInstance->getPropInt("@throttleSlowQueueLimit", throttleSlowQueueLimit);
 
-                authorizedOnly = dafileSrvInstance->getPropBool("@authorizedOnly", authorizedOnly);
+                dedicatedRowServicePort = dafileSrvInstance->getPropInt("@rowServicePort", dedicatedRowServicePort);
+                dedicatedRowServiceSSL = daFileSrv->getPropBool("@rowServiceSSL", dedicatedRowServiceSSL);
+                rowServiceOnStdPort = daFileSrv->getPropBool("@rowServiceOnStdPort", rowServiceOnStdPort);
             }
         }
         keyPairInfo = env->queryPropTree("EnvSettings/Keys");
     }
+    if (dedicatedRowServicePort)
+    {
+#ifdef _USE_OPENSSL
+        dedicatedRowServiceSSL = false;
+#endif
+    }
 
     // these should really be in env, but currently they are not ...
     listenep.port = port;
@@ -602,6 +642,11 @@ int main(int argc,char **argv)
             unsigned throttleSlowCPULimit;
             unsigned sslport;
             StringBuffer secMethod;
+            Linked<IPropertyTree> keyPairInfo;
+            StringAttr rowServiceConfiguration;
+            unsigned dedicatedRowServicePort;
+            bool dedicatedRowServiceSSL;
+            bool rowServiceOnStdPort;
             
             class cpollthread: public Thread
             {
@@ -626,12 +671,17 @@ int main(int argc,char **argv)
                         unsigned _maxThreads, unsigned _maxThreadsDelayMs, unsigned _maxAsyncCopy,
                         unsigned _parallelRequestLimit, unsigned _throttleDelayMs, unsigned _throttleCPULimit,
                         unsigned _parallelSlowRequestLimit, unsigned _throttleSlowDelayMs, unsigned _throttleSlowCPULimit,
-                        unsigned _sslport, const char * _secMethod)
+                        unsigned _sslport, const char * _secMethod,
+                        IPropertyTree *_keyPairInfo,
+                        const char *_rowServiceConfiguration,
+                        unsigned _dedicatedRowServicePort, bool _dedicatedRowServiceSSL, bool _rowServiceOnStdPort)
             : connectMethod(_connectMethod), listenep(_listenep), pollthread(this),
                   maxThreads(_maxThreads), maxThreadsDelayMs(_maxThreadsDelayMs), maxAsyncCopy(_maxAsyncCopy),
                   parallelRequestLimit(_parallelRequestLimit), throttleDelayMs(_throttleDelayMs), throttleCPULimit(_throttleCPULimit),
                   parallelSlowRequestLimit(_parallelSlowRequestLimit), throttleSlowDelayMs(_throttleSlowDelayMs), throttleSlowCPULimit(_throttleSlowCPULimit),
-                  sslport(_sslport), secMethod(_secMethod)
+                  sslport(_sslport), secMethod(_secMethod),
+                  keyPairInfo(_keyPairInfo),
+                  rowServiceConfiguration(_rowServiceConfiguration), dedicatedRowServicePort(_dedicatedRowServicePort), dedicatedRowServiceSSL(_dedicatedRowServiceSSL), rowServiceOnStdPort(_rowServiceOnStdPort)
             {
                 stopped = false;
                 started = false;
@@ -714,12 +764,22 @@ int main(int argc,char **argv)
                 const char * verstring = remoteServerVersionString();
                 PROGLOG("Version: %s", verstring);
                 PROGLOG("Authentication:%s required",requireauthenticate?"":" not");
+                if (dedicatedRowServicePort)
+                    PROGLOG("Row service(%s) port = %u", rowServiceConfiguration, dedicatedRowServicePort);
                 PROGLOG(DAFS_SERVICE_DISPLAY_NAME " Running");
-                server.setown(createRemoteFileServer(maxThreads, maxThreadsDelayMs, maxAsyncCopy));
+                server.setown(createRemoteFileServer(maxThreads, maxThreadsDelayMs, maxAsyncCopy, keyPairInfo));
                 server->setThrottle(ThrottleStd, parallelRequestLimit, throttleDelayMs, throttleCPULimit);
                 server->setThrottle(ThrottleSlow, parallelSlowRequestLimit, throttleSlowDelayMs, throttleSlowCPULimit);
-                try {
-                    server->run(connectMethod, listenep, sslport);
+                try
+                {
+                    if (dedicatedRowServicePort)
+                    {
+                        SocketEndpoint rowServiceEp(listenep); // copy listenep, incase bound by -addr
+                        rowServiceEp.port = dedicatedRowServicePort;
+                        server->run(connectMethod, listenep, sslport, &rowServiceEp, dedicatedRowServiceSSL, rowServiceOnStdPort);
+                    }
+                    else
+                        server->run(connectMethod, listenep, sslport);
                 }
                 catch (IException *e) {
                     EXCLOG(e,DAFS_SERVICE_NAME);
@@ -731,7 +791,8 @@ int main(int argc,char **argv)
         } service(connectMethod, listenep,
                 maxThreads, maxThreadsDelayMs, maxAsyncCopy,
                 parallelRequestLimit, throttleDelayMs, throttleCPULimit,
-                parallelSlowRequestLimit, throttleSlowDelayMs, throttleSlowCPULimit, sslport, secMethod);
+                parallelSlowRequestLimit, throttleSlowDelayMs, throttleSlowCPULimit, sslport, secMethod,
+                keyPairInfo, rowServiceConfiguration, dedicatedRowServicePort, dedicatedRowServiceSSL, rowServiceOnStdPort);
         service.start();
         return 0;
 #else
@@ -778,7 +839,10 @@ int main(int argc,char **argv)
 
     PROGLOG("Version: %s", verstring);
     PROGLOG("Authentication:%s required",requireauthenticate?"":" not");
-    server.setown(createRemoteFileServer(maxThreads, maxThreadsDelayMs, maxAsyncCopy, authorizedOnly, keyPairInfo));
+    if (dedicatedRowServicePort)
+        PROGLOG("Row service port = %u", dedicatedRowServicePort);
+
+    server.setown(createRemoteFileServer(maxThreads, maxThreadsDelayMs, maxAsyncCopy, keyPairInfo));
     server->setThrottle(ThrottleStd, parallelRequestLimit, throttleDelayMs, throttleCPULimit);
     server->setThrottle(ThrottleSlow, parallelSlowRequestLimit, throttleSlowDelayMs, throttleSlowCPULimit);
     class CPerfHook : public CSimpleInterfaceOf<IPerfMonHook>
@@ -800,7 +864,14 @@ int main(int argc,char **argv)
     writeSentinelFile(sentinelFile);
     try
     {
-        server->run(connectMethod, listenep, sslport);
+        if (dedicatedRowServicePort)
+        {
+            SocketEndpoint rowServiceEp(listenep); // copy listenep, incase bound by -addr
+            rowServiceEp.port = dedicatedRowServicePort;
+            server->run(connectMethod, listenep, sslport, &rowServiceEp, dedicatedRowServiceSSL, rowServiceOnStdPort);
+        }
+        else
+            server->run(connectMethod, listenep, sslport);
     }
     catch (IException *e)
     {

+ 3 - 0
esp/scm/ws_dfu.ecm

@@ -868,6 +868,9 @@ ESPresponse [exceptions_inline] DFUFileAccessResponse
     
     binary RecordTypeInfoBin;   // optional
     string RecordTypeInfoJson;  // optional
+    
+    int fileAccessPort;
+    bool fileAccessSSL;
 };
 
 //  ===========================================================================

+ 29 - 6
esp/services/ws_dfu/ws_dfuService.cpp

@@ -5970,21 +5970,40 @@ void CWsDfuEx::getFileMeta(StringBuffer &metaInfoStr, IDistributedFile &file, IU
     JBASE64_Encode(compressedMetaInfoMb.bytes(), compressedMetaInfoMb.length(), metaInfoStr, false);
 }
 
-StringBuffer &CWsDfuEx::getFileDafilesrvKeyName(StringBuffer &keyPairName, IDistributedFile &file)
+void CWsDfuEx::getFileDafilesrvConfiguration(StringBuffer &keyPairName, unsigned &retPort, bool &retSecure, IDistributedFile &file)
 {
+    retPort = DEFAULT_ROWSERVICE_PORT;
+    retSecure = false;
     unsigned numClusters = file.numClusters();
     for (unsigned c=0; c<numClusters; c++)
     {
         StringBuffer clusterName;
         const char *cluster = file.getClusterName(c, clusterName.clear()).str();
         const char *_keyPairName = env->getClusterKeyPairName(cluster);
+        Owned<IConstDaFileSrvInfo> daFileSrvInfo = env->getDaFileSrvGroupInfo(cluster);
+        unsigned port = DEFAULT_ROWSERVICE_PORT;
+        bool secure = false;
+        if (daFileSrvInfo)
+        {
+            port = daFileSrvInfo->getPort();
+            secure = daFileSrvInfo->getSecure();
+        }
         if (0 == c)
+        {
             keyPairName.set(_keyPairName);
-        else if (!strsame(keyPairName, _keyPairName))
-            throwStringExceptionV(0, "Configuration issue - file '%s' is on multiple clusters, keys for file access must match", file.queryLogicalName());
+            retPort = port;
+            retSecure = secure;
+        }
+        else
+        {
+            if (!strsame(keyPairName, _keyPairName))
+                throwStringExceptionV(0, "Configuration issue - file '%s' is on multiple clusters, keys for file access must match", file.queryLogicalName());
+            if (retPort != port)
+                throwStringExceptionV(0, "Configuration issue - file '%s' is on multiple clusters, dafilesrv's port for file access must match", file.queryLogicalName());
+            if (retSecure != secure)
+                throwStringExceptionV(0, "Configuration issue - file '%s' is on multiple clusters, dafilesrv's security setting for file access must match", file.queryLogicalName());
+        }
     }
-
-    return keyPairName;
 }
 
 void CWsDfuEx::getFileAccess(IEspContext &context, IUserDescriptor *udesc, SecAccessFlags accessType, IEspDFUFileAccessRequest &req, IEspDFUFileAccessResponse &resp)
@@ -6069,12 +6088,16 @@ void CWsDfuEx::getFileAccess(IEspContext &context, IUserDescriptor *udesc, SecAc
     expiryDt.getString(expiryTimeStr);
 
     StringBuffer keyPairName;
-    getFileDafilesrvKeyName(keyPairName, *df);
+    unsigned port;
+    bool secure;
+    getFileDafilesrvConfiguration(keyPairName, port, secure, *df);
 
     StringBuffer metaInfo;
     getFileMeta(metaInfo, *df, udesc, role, expiryTimeStr, keyPairName, req);
     resp.setMetaInfoBlob(metaInfo);
     resp.setExpiryTime(expiryTimeStr);
+    resp.setFileAccessPort(port);
+    resp.setFileAccessSSL(secure);
 }
 
 

+ 1 - 1
esp/services/ws_dfu/ws_dfuService.hpp

@@ -238,7 +238,7 @@ private:
     void parseFieldMask(unsigned __int64 fieldMask, unsigned &fieldCount, IntArray &fieldIndexArray);
     unsigned getFilePartsInfo(IEspContext &context, IDistributedFile *df, const char *clusterName,
         IArrayOf<IEspDFUPartLocations> &dfuPartLocations, IArrayOf<IEspDFUPartCopies> &dfuPartCopies);
-    StringBuffer &getFileDafilesrvKeyName(StringBuffer &keyPairName, IDistributedFile &file);
+    void getFileDafilesrvConfiguration(StringBuffer &keyPairName, unsigned &port, bool &secure, IDistributedFile &file);
     void getFileMeta(StringBuffer &metaInfo, IDistributedFile &file, IUserDescriptor *user, CFileAccessRole role, const char *expiryTime, const char *keyPairName, IConstDFUFileAccessRequest &req);
     void getFileAccess(IEspContext &context, IUserDescriptor *udesc, SecAccessFlags accessType, IEspDFUFileAccessRequest &req, IEspDFUFileAccessResponse &resp);
 

+ 1 - 0
system/include/portlist.h

@@ -58,6 +58,7 @@
 #define MP_END_PORT                     7500
 
 #define SECURE_DAFILESRV_PORT           7600 // aka daliservix
+#define DEFAULT_ROWSERVICE_PORT         7601
 
 //ESP SERVICES
 //INSECURE

+ 22 - 11
system/jlib/jptree.cpp

@@ -3666,7 +3666,7 @@ void mergePTree(IPropertyTree *target, IPropertyTree *toMerge)
     }
 }
 
-void _synchronizePTree(IPropertyTree *target, IPropertyTree *source)
+void _synchronizePTree(IPropertyTree *target, IPropertyTree *source, bool removeTargetsNotInSource)
 {
     Owned<IAttributeIterator> aiter = target->getAttributes();
     StringArray targetAttrs;
@@ -3694,9 +3694,13 @@ void _synchronizePTree(IPropertyTree *target, IPropertyTree *source)
             targetAttrs.zap(attr);
         }
     }
-    // remaining
-    ForEachItemIn (a, targetAttrs)
-        target->removeProp(targetAttrs.item(a));
+
+    if (removeTargetsNotInSource)
+    {
+        // remaining
+        ForEachItemIn (a, targetAttrs)
+            target->removeProp(targetAttrs.item(a));
+    }
     
     bool equal = true;
     MemoryBuffer srcMb;
@@ -3773,14 +3777,18 @@ void _synchronizePTree(IPropertyTree *target, IPropertyTree *source)
             if (sourceCompare)
             {
                 toProcess.zap(*sourceCompare);
-                _synchronizePTree(&e, sourceCompare);
+                _synchronizePTree(&e, sourceCompare, removeTargetsNotInSource);
             }
             else
                 removeTreeList.append(e);
         }
     }
-    ForEachItemIn (rt, removeTreeList)
-        target->removeTree(&removeTreeList.item(rt));
+
+    if (removeTargetsNotInSource)
+    {
+        ForEachItemIn (rt, removeTreeList)
+            target->removeTree(&removeTreeList.item(rt));
+    }
 
     // add unprocessed source elements, not reference by name in target
     ForEachItemIn (s, toProcess)
@@ -3790,17 +3798,20 @@ void _synchronizePTree(IPropertyTree *target, IPropertyTree *source)
     }
 }
 
-// ensure target is equivalent to source whilst retaining elements already present in target.
-// presevers ordering of matching elements.
-void synchronizePTree(IPropertyTree *target, IPropertyTree *source)
+/* ensure target is equivalent to source whilst retaining elements already present in target.
+ * presevers ordering of matching elements.
+ * If removeTargetsNotInSource = true (default) elements in the target not present in the source will be removed
+ */
+void synchronizePTree(IPropertyTree *target, IPropertyTree *source, bool removeTargetsNotInSource)
 {
     const char *srcName = source->queryName();
     const char *tgtName = target->queryName();
     if (0 != strcmp(srcName, tgtName))
         throw MakeIPTException(PTreeExcpt_Unsupported, "Cannot synchronize if root nodes mismatch");
-    _synchronizePTree(target, source);
+    _synchronizePTree(target, source, removeTargetsNotInSource);
 }
 
+
 IPropertyTree *ensurePTree(IPropertyTree *root, const char *xpath)
 {
     return createPropBranch(root, xpath, true);

+ 1 - 1
system/jlib/jptree.hpp

@@ -207,7 +207,7 @@ jlib_decl IPullPTreeReader *createPullJSONStringReader(const char *json, IPTreeN
 jlib_decl IPullPTreeReader *createPullJSONBufferReader(const void *buf, size32_t bufLength, IPTreeNotifyEvent &iEvent, PTreeReaderOptions readerOptions=ptr_ignoreWhiteSpace);
 
 jlib_decl void mergePTree(IPropertyTree *target, IPropertyTree *toMerge);
-jlib_decl void synchronizePTree(IPropertyTree *target, IPropertyTree *source);
+jlib_decl void synchronizePTree(IPropertyTree *target, IPropertyTree *source, bool removeTargetsNotInSource=true);
 jlib_decl IPropertyTree *ensurePTree(IPropertyTree *root, const char *xpath);
 jlib_decl bool areMatchingPTrees(IPropertyTree * left, IPropertyTree * right);