Переглянути джерело

HPCC-10024 MP lib (and Thor sort) TLS

Signed-off-by: Mark Kelly <mark.kelly@lexisnexisrisk.com>
Mark Kelly 4 роки тому
батько
коміт
b9d287c0e7

+ 4 - 0
common/workunit/CMakeLists.txt

@@ -55,6 +55,7 @@ include_directories (
          ./../../system/jlib
          ./../../system/security/shared 
          ./../../system/security/cryptohelper
+         ./../../system/security/securesocket
          ./../../common/thorhelper 
          ./../../rtl/eclrtl 
          ./../../rtl/nbcd
@@ -82,6 +83,9 @@ if(NOT PLUGIN)
         eclrtl 
         deftype
         )
+    if (USE_OPENSSL)
+        target_link_libraries(workunit securesocket)
+    endif()
     if (NOT CONTAINERIZED)
         target_link_libraries(workunit environment)
     endif()

+ 23 - 3
common/workunit/wujobq.cpp

@@ -21,6 +21,7 @@
 #include "limits.h"
 #include "jlib.hpp"
 #include "jbuff.hpp"
+#include "jsecrets.hpp"
 #include "dasess.hpp"
 #include "dautils.hpp"
 #include "portlist.h"
@@ -34,6 +35,8 @@
 #include "workunit.hpp"
 #include "wujobq.hpp"
 
+#include "securesocket.hpp"
+
 #ifndef _CONTAINERIZED
 #include "environment.hpp"
 #endif
@@ -1756,7 +1759,12 @@ public:
         assertex(!initiateconv.get());
         SocketEndpoint ep = item->queryEndpoint();
         unsigned short port = (unsigned short)item->getPort();
-        initiateconv.setown(createSingletonSocketConnection(port));
+#if defined(_USE_OPENSSL)
+        if (queryMtls())
+            initiateconv.setown(createSingletonSecureSocketConnection(port));
+        else
+#endif
+            initiateconv.setown(createSingletonSocketConnection(port));
         if (!port)
             item->setPort(initiateconv->setRandomPort(WUJOBQ_BASE_PORT,WUJOBQ_PORT_NUM));
         initiatewu.set(item->queryWUID());
@@ -1800,7 +1808,13 @@ public:
                 if (item->isValidSession()) {
                     SocketEndpoint ep = item->queryEndpoint();
                     ep.port = item->getPort();
-                    Owned<IConversation> acceptconv = createSingletonSocketConnection(ep.port,&ep);
+                    Owned<IConversation> acceptconv;
+#if defined(_USE_OPENSSL)
+                    if (queryMtls())
+                        acceptconv.setown(createSingletonSecureSocketConnection(ep.port,&ep));
+                    else
+#endif
+                        acceptconv.setown(createSingletonSocketConnection(ep.port,&ep));
                     if (acceptconv->connect(3*60*1000)) { // shouldn't need that long
                         retitem = item.getClear();
                         return acceptconv.getClear();
@@ -1841,7 +1855,13 @@ public:
             if (item->isValidSession()) {
                 SocketEndpoint ep = item->queryEndpoint();
                 ep.port = item->getPort();
-                Owned<IConversation> acceptconv = createSingletonSocketConnection(ep.port,&ep);
+                Owned<IConversation> acceptconv;
+#if defined(_USE_OPENSSL)
+                if (queryMtls())
+                    acceptconv.setown(createSingletonSecureSocketConnection(ep.port,&ep));
+                else
+#endif
+                    acceptconv.setown(createSingletonSocketConnection(ep.port,&ep));
                 acceptconv->connect(3*60*1000); // connect then close should close other end
                 return true;
             }

+ 1 - 0
ecl/eclagent/eclagent.cpp

@@ -231,6 +231,7 @@ public:
         while (running)
         {
             ISocket *client = socket->accept(true);
+            // TLS TODO: secure_accept() on hThor debug socket if globally configured for mtls ...
             if (client)
             {
                 client->set_linger(-1);

+ 1 - 0
fs/dafsserver/dafsserver.cpp

@@ -2886,6 +2886,7 @@ class CRemoteFileServer : implements IRemoteFileServer, public CInterface
                 }
                 if (left)
                 {
+                    // TLS TODO: avail_read() may not return accurate amount of pending bytes
                     avail = (size32_t)socket->avail_read();
                     try
                     {

+ 2 - 2
roxie/ccd/ccdprotocol.cpp

@@ -314,9 +314,9 @@ public:
                         if (!secureContext)
                             secureContext.setown(createSecureSocketContextEx(certFile.get(), keyFile.get(), passPhrase.get(), ServerSocket));
                         ssock.setown(secureContext->createSecureSocket(client.getClear()));
-                        int loglevel = 0;
+                        int loglevel = SSLogMin;
                         if (traceLevel > 1)
-                            loglevel = traceLevel;
+                            loglevel = SSLogMax;
                         int status = ssock->secure_accept(loglevel);
                         if (status < 0)
                         {

+ 2 - 0
roxie/topo/toposerver.cpp

@@ -206,6 +206,7 @@ void doServer(ISocket *socket)
             try
             {
                 Owned<ISocket> p = ISocket::connect(me);
+                // TLS TODO: secure_connect() here if globally configured for mtls ...
                 p->write("\0\0\0\0", 4);
                 p->close();
             }
@@ -220,6 +221,7 @@ void doServer(ISocket *socket)
         try
         {
             Owned<ISocket> client = socket->accept();
+            // TLS TODO: secure_accept() here if globally configured for mtls ...
             timeoutTopology();
             unsigned packetLen;
             client->read(&packetLen, 4);

+ 1 - 0
system/jlib/jliball.hpp

@@ -56,6 +56,7 @@
 #include "jthread.hpp"
 #include "jtime.hpp"
 #include "jutil.hpp"
+#include "jsecrets.hpp"
 
 #endif
 

+ 62 - 2
system/jlib/jsecrets.cpp

@@ -768,8 +768,8 @@ IPropertyTree *queryMtlsSecretInfo(const char *name)
 {
     if (isEmptyString(name))
         return nullptr;
-   CriticalBlock block(mtlsInfoCacheCS);
-   IPropertyTree *info = mtlsInfoCache->queryPropTree(name);
+    CriticalBlock block(mtlsInfoCacheCS);
+    IPropertyTree *info = mtlsInfoCache->queryPropTree(name);
     if (info)
         return info;
 
@@ -797,6 +797,7 @@ IPropertyTree *queryMtlsSecretInfo(const char *name)
             if (ca)
                 ca->setProp("@path", filepath.str());
         }
+        // TLS TODO: do we want to always require verify, even if no ca ?
         verify->setPropBool("@enable", true);
         verify->setPropBool("@address_match", false);
         verify->setPropBool("@accept_selfsigned", false);
@@ -804,3 +805,62 @@ IPropertyTree *queryMtlsSecretInfo(const char *name)
     }
     return info;
 }
+
+enum UseMTLS { UNINIT, DISABLED, ENABLED };
+static UseMTLS useMtls = UNINIT;
+
+static CriticalSection queryMtlsCS;
+
+jlib_decl bool queryMtls()
+{
+    CriticalBlock block(queryMtlsCS);
+    if (useMtls == UNINIT)
+    {
+        useMtls = DISABLED;
+#if defined(_USE_OPENSSL)
+# ifdef _CONTAINERIZED
+        // check component setting first, but default to global
+        if (getComponentConfigSP()->getPropBool("@mtls", getGlobalConfigSP()->getPropBool("@mtls")))
+            useMtls = ENABLED;
+# else
+        if (queryMtlsBareMetalConfig())
+        {
+            useMtls = ENABLED;
+            const char *cert = nullptr;
+            const char *pubKey = nullptr;
+            const char *privKey = nullptr;
+            const char *passPhrase = nullptr;
+            if (queryHPCCPKIKeyFiles(&cert, &pubKey, &privKey, &passPhrase))
+            {
+                if ( (!isEmptyString(cert)) && (!isEmptyString(privKey)) )
+                {
+                    if (checkFileExists(cert) && checkFileExists(privKey))
+                    {
+                        CriticalBlock block(mtlsInfoCacheCS);
+                        if (mtlsInfoCache)
+                        {
+                            IPropertyTree *info = mtlsInfoCache->queryPropTree("local");
+                            if (!info)
+                                info = mtlsInfoCache->setPropTree("local");
+                            if (info)
+                            {   // always update
+                                info->setProp("certificate", cert);
+                                info->setProp("privatekey", privKey);
+                                if ( (!isEmptyString(pubKey)) && (checkFileExists(pubKey)) )
+                                    info->setProp("publickey", pubKey);
+                                if (!isEmptyString(passPhrase))
+                                    info->setProp("passphrase", passPhrase); // encrypted
+                            }
+                        }
+                    }
+                }
+            }
+        }
+# endif
+#endif
+    }
+    if (useMtls == ENABLED)
+        return true;
+    else
+        return false;
+}

+ 2 - 0
system/jlib/jsecrets.hpp

@@ -41,4 +41,6 @@ extern jlib_decl IPropertyTree *queryMtlsSecretInfo(const char *name);
 extern jlib_decl  void splitFullUrl(const char *url, bool &https, StringBuffer &user, StringBuffer &password, StringBuffer &host, StringBuffer &port, StringBuffer &fullpath);
 extern jlib_decl void splitUrlSchemeHostPort(const char *url, StringBuffer &user, StringBuffer &password, StringBuffer &schemeHostPort, StringBuffer &path);
 
+extern jlib_decl bool queryMtls();
+
 #endif

+ 201 - 250
system/jlib/jsocket.cpp

@@ -103,13 +103,6 @@
   static int dropCounter = 0;
 #endif
 
-#ifndef _WIN32 
-#define BLOCK_POLLED_SINGLE_CONNECTS  // NB this is much slower in windows
-#define CENTRAL_NODE_RANDOM_DELAY
-#else
-#define USERECVSEM      // to singlethread BF_SYNC_TRANSFER_PUSH 
-#endif
-
 #ifdef _DEBUG
 //#define SOCKTRACE
 //#define EPOLLTRACE
@@ -2404,37 +2397,6 @@ bool CSocket::send_block(const void *blk,size32_t sz)
     return true;
 }
 
-#ifdef USERECVSEM
-
-class CSemProtect
-{
-    Semaphore *sem;
-    bool *owned;
-public:
-    CSemProtect() { clear(); }
-    ~CSemProtect() 
-    { 
-        if (sem&&*owned) {
-            *owned = false;
-            sem->signal(); 
-        }
-    }
-    void set(Semaphore *_sem,bool *_owned)
-    {
-        sem = _sem;
-        owned = _owned;
-    }
-    bool wait(Semaphore *_sem,bool *_owned,unsigned timeout) {
-        if (!*_owned&&!_sem->wait(timeout))
-            return false;
-        *_owned = true;
-        set(_sem,_owned);
-        return true;
-    }
-    void clear() { sem = NULL; owned = NULL; }
-};
-#endif
-
 size32_t CSocket::receive_block_size()
 {
     // assumed always paired with receive_block
@@ -2512,9 +2474,8 @@ size32_t CSocket::receive_block(void *blk,size32_t maxsize)
         else { // truncate
             readtms(blk,maxsize,maxsize,rd,blocktimeoutms);
             sz -= maxsize;
-            void *tmp=malloc(sz);
+            OwnedMalloc<void> tmp = malloc(sz);
             readtms(tmp,sz,sz,rd,blocktimeoutms);
-            free(tmp);
             sz = maxsize;
         }
         if (blockflags&BF_RELIABLE_TRANSFER) {
@@ -3891,6 +3852,11 @@ unsigned SocketListParser::getSockets(SocketEndpointArray &array,unsigned defpor
     return array.ordinality();
 }
 
+jlib_decl JSocketStatistics *getSocketStatPtr()
+{
+    return &STATS;
+}
+
 void getSocketStatistics(JSocketStatistics &stats)
 {
     // should put in simple lock
@@ -5548,82 +5514,168 @@ bool catchWriteBuffer(ISocket * socket, MemoryBuffer & buffer)
     return false;
 }
 
-// utility interface for simple conversations 
-// conversation is always between two ends, 
-// at any given time one end must be receiving and other sending (though these may swap during the conversation)
-class CSingletonSocketConnection: implements IConversation, public CInterface
+CSingletonSocketConnection::CSingletonSocketConnection(SocketEndpoint &_ep)
 {
-    Owned<ISocket> sock;
-    Owned<ISocket> listensock;
-    enum { Snone, Saccept, Sconnect, Srecv, Ssend, Scancelled } state;
-    bool cancelling;
-    SocketEndpoint ep;
-    CriticalSection crit;
-public:
-    IMPLEMENT_IINTERFACE;
+    ep = _ep;
+    state = Snone;
+    cancelling = false;
+}
 
-    CSingletonSocketConnection(SocketEndpoint &_ep)
-    {
-        ep = _ep;
-        state = Snone;
-        cancelling = false;
+CSingletonSocketConnection::~CSingletonSocketConnection()
+{
+    try {
+        if (sock)
+            sock->close();
+    }
+    catch (IException *e) {
+        if (e->errorCode()!=JSOCKERR_graceful_close)
+            EXCLOG(e,"CSingletonSocketConnection close");
+        e->Release();
     }
+}
 
-    ~CSingletonSocketConnection()
-    {
+void CSingletonSocketConnection::set_keep_alive(bool keepalive)
+{
+    if (sock)
+        sock->set_keep_alive(keepalive);
+}
+
+bool CSingletonSocketConnection::connect(unsigned timeoutms)
+{
+    CriticalBlock block(crit);
+    if (cancelling)
+        state = Scancelled;
+    if (state==Scancelled)
+        return false;
+    assertex(!sock);
+    ISocket *newsock=NULL;
+    state = Sconnect;
+    unsigned start = 0;
+    if (timeoutms!=(unsigned)INFINITE)
+        start = msTick();
+    while (state==Sconnect) {
+        try {
+            CriticalUnblock unblock(crit);
+            newsock = ISocket::connect_wait(ep,1000*60*4);
+            break;
+        }
+        catch (IException * e) {
+            if ((e->errorCode()==JSOCKERR_timeout_expired)||(e->errorCode()==JSOCKERR_connection_failed)) {
+                e->Release();
+                if ((state==Sconnect)&&(timeoutms!=(unsigned)INFINITE)&&(msTick()-start>timeoutms)) {
+                    state = Snone;
+                    return false;
+                }
+            }
+            else {
+                state = Scancelled;
+                EXCLOG(e,"CSingletonSocketConnection::connect");
+                e->Release();
+                return false;
+            }
+        }
+    }
+    if (state!=Sconnect) {
+        ::Release(newsock);
+        newsock = NULL;
+    }
+    if (!newsock) {
+        state = Scancelled;
+        return false;
+    }
+    sock.setown(newsock);
+    return true;
+}
+
+bool CSingletonSocketConnection::send(MemoryBuffer &mb)
+{
+    CriticalBlock block(crit);
+    if (cancelling)
+        state = Scancelled;
+    if (state==Scancelled)
+        return false;
+    assertex(sock);
+    state = Srecv;
+    try {
+        CriticalUnblock unblock(crit);
+        writeBuffer(sock,mb);
+    }
+    catch (IException * e) {
+        state = Scancelled;
+        EXCLOG(e,"CSingletonSocketConnection::send");
+        e->Release();
+        return false;
+    }
+    state = Snone;
+    return true;
+}
+
+unsigned short CSingletonSocketConnection::setRandomPort(unsigned short base, unsigned num)
+{
+    for (;;) {
         try {
-            if (sock)
-                sock->close();
+            ep.port = base+(unsigned short)(getRandom()%num);
+            listensock.setown(ISocket::create(ep.port));
+            return ep.port;
         }
         catch (IException *e) {
-            if (e->errorCode()!=JSOCKERR_graceful_close)
-                EXCLOG(e,"CSingletonSocketConnection close");
+            if (e->errorCode()!=JSOCKERR_port_in_use) {
+                state = Scancelled;
+                EXCLOG(e,"CSingletonSocketConnection::setRandomPort");
+                e->Release();
+                break;
+            }
             e->Release();
         }
     }
+    return 0;
+}
 
-    void set_keep_alive(bool keepalive)
-    {
-        if (sock)
-            sock->set_keep_alive(keepalive);
-    }
 
-    bool connect(unsigned timeoutms)
-    {
-        CriticalBlock block(crit);
-        if (cancelling) 
-            state = Scancelled;
-        if (state==Scancelled)
-            return false;
-        assertex(!sock);
+bool CSingletonSocketConnection::accept(unsigned timeoutms)
+{
+    CriticalBlock block(crit);
+    if (cancelling)
+        state = Scancelled;
+    if (state==Scancelled)
+        return false;
+    if (!sock) {
         ISocket *newsock=NULL;
-        state = Sconnect;
-        unsigned start = 0;
-        if (timeoutms!=(unsigned)INFINITE)
-            start = msTick();
-        while (state==Sconnect) {
+        state = Saccept;
+        for (;;) {
             try {
-                CriticalUnblock unblock(crit);
-                newsock = ISocket::connect_wait(ep,1000*60*4);
-                break;
-            }
-            catch (IException * e) {
-                if ((e->errorCode()==JSOCKERR_timeout_expired)||(e->errorCode()==JSOCKERR_connection_failed)) {
-                    e->Release();
-                    if ((state==Sconnect)&&(timeoutms!=(unsigned)INFINITE)&&(msTick()-start>timeoutms)) {
+                {
+                    CriticalUnblock unblock(crit);
+                    if (!listensock)
+                        listensock.setown(ISocket::create(ep.port));
+                    if ((timeoutms!=(unsigned)INFINITE)&&(!listensock->wait_read(timeoutms))) {
                         state = Snone;
                         return false;
                     }
                 }
+                if (cancelling)
+                    state = Scancelled;
+                if (state==Scancelled)
+                    return false;
+                {
+                    CriticalUnblock unblock(crit);
+                    newsock=listensock->accept(true);
+                    break;
+                }
+            }
+            catch (IException *e) {
+                if (e->errorCode()==JSOCKERR_graceful_close)
+                    PROGLOG("CSingletonSocketConnection: Closed socket on accept - retrying...");
                 else {
                     state = Scancelled;
-                    EXCLOG(e,"CSingletonSocketConnection::connect");
+                    EXCLOG(e,"CSingletonSocketConnection::accept");
                     e->Release();
-                    return false;
+                    break;
                 }
+                e->Release();
             }
-        }   
-        if (state!=Sconnect) {
+        }
+        if (state!=Saccept) {
             ::Release(newsock);
             newsock = NULL;
         }
@@ -5632,179 +5684,78 @@ public:
             return false;
         }
         sock.setown(newsock);
-        return true;
     }
+    return true;
+}
 
-    bool send(MemoryBuffer &mb)
-    {
-        CriticalBlock block(crit);
-        if (cancelling) 
-            state = Scancelled;
-        if (state==Scancelled)
-            return false;
-        assertex(sock);
-        state = Srecv;
-        try {
-            CriticalUnblock unblock(crit);
-            writeBuffer(sock,mb);
-        }
-        catch (IException * e) {
-            state = Scancelled;
-            EXCLOG(e,"CSingletonSocketConnection::send");
-            e->Release();
-            return false;
-        }
-        state = Snone;
-        return true;
+bool CSingletonSocketConnection::recv(MemoryBuffer &mb, unsigned timeoutms)
+{
+    CriticalBlock block(crit);
+    if (cancelling)
+        state = Scancelled;
+    if (state==Scancelled)
+        return false;
+    assertex(sock);
+    state = Srecv;
+    try {
+        CriticalUnblock unblock(crit);
+        readBuffer(sock,mb,timeoutms);
     }
-
-    unsigned short setRandomPort(unsigned short base, unsigned num)
-    {
-        for (;;) {
-            try {
-                ep.port = base+(unsigned short)(getRandom()%num);
-                listensock.setown(ISocket::create(ep.port));
-                return ep.port;
-            }
-            catch (IException *e) {
-                if (e->errorCode()!=JSOCKERR_port_in_use) {
-                    state = Scancelled;
-                    EXCLOG(e,"CSingletonSocketConnection::setRandomPort");
-                    e->Release();
-                    break;
-                }
-                e->Release();
-            }
+    catch (IException *e) {
+        if (e->errorCode()==JSOCKERR_timeout_expired)
+            state = Snone;
+        else {
+            state = Scancelled;
+            if (e->errorCode()!=JSOCKERR_graceful_close)
+                EXCLOG(e,"CSingletonSocketConnection::recv");
         }
-        return 0;
+        e->Release();
+        return false;
     }
+    state = Snone;
+    return true;
+}
 
-
-    bool accept(unsigned timeoutms)
-    {
-        CriticalBlock block(crit);
-        if (cancelling) 
-            state = Scancelled;
-        if (state==Scancelled)
-            return false;
-        if (!sock) {
-            ISocket *newsock=NULL;
-            state = Saccept;
-            for (;;) {
-                try {
-                    { 
-                        CriticalUnblock unblock(crit);
-                        if (!listensock)
-                            listensock.setown(ISocket::create(ep.port));
-                        if ((timeoutms!=(unsigned)INFINITE)&&(!listensock->wait_read(timeoutms))) {
-                            state = Snone;
-                            return false;
-                        }
-                    }
-                    if (cancelling) 
-                        state = Scancelled;
-                    if (state==Scancelled)
-                        return false;
-                    {
-                        CriticalUnblock unblock(crit);
-                        newsock=listensock->accept(true);
-                        break;
-                    }
+void CSingletonSocketConnection::cancel()
+{
+    CriticalBlock block(crit);
+    while (state!=Scancelled) {
+        cancelling = true;
+        try {
+            switch (state) {
+            case Saccept:
+                {
+                    if (listensock)
+                        listensock->cancel_accept();
                 }
-                catch (IException *e) {
-                    if (e->errorCode()==JSOCKERR_graceful_close) 
-                        PROGLOG("CSingletonSocketConnection: Closed socket on accept - retrying...");
-                    else {
-                        state = Scancelled;
-                        EXCLOG(e,"CSingletonSocketConnection::accept");
-                        e->Release();
-                        break;
-                    }
-                    e->Release();
+                break;
+            case Sconnect:
+                // wait for timeout
+                break;
+            case Srecv:
+                {
+                    if (sock)
+                        sock->close();
                 }
-            }
-            if (state!=Saccept) {
-                ::Release(newsock);
-                newsock = NULL;
-            }
-            if (!newsock) {
+                break;
+            case Ssend:
+                // wait for finished
+                break;
+            default:
                 state = Scancelled;
-                return false;
+                break;
             }
-            sock.setown(newsock);
-        }
-        return true;
-    }
-
-    bool recv(MemoryBuffer &mb, unsigned timeoutms)
-    {
-        CriticalBlock block(crit);
-        if (cancelling) 
-            state = Scancelled;
-        if (state==Scancelled)
-            return false;
-        assertex(sock);
-        state = Srecv;
-        try {
-            CriticalUnblock unblock(crit);
-            readBuffer(sock,mb,timeoutms);
         }
         catch (IException *e) {
-            if (e->errorCode()==JSOCKERR_timeout_expired)
-                state = Snone;
-            else {
-                state = Scancelled;
-                if (e->errorCode()!=JSOCKERR_graceful_close)
-                    EXCLOG(e,"CSingletonSocketConnection::recv");
-            }
+            EXCLOG(e,"CSingletonSocketConnection::cancel");
             e->Release();
-            return false;
         }
-        state = Snone;
-        return true;
-    }
-
-    virtual void cancel()
-    {
-        CriticalBlock block(crit);
-        while (state!=Scancelled) {
-            cancelling = true;
-            try {
-                switch (state) {
-                case Saccept:
-                    {
-                        if (listensock)
-                            listensock->cancel_accept();
-                    }
-                    break;
-                case Sconnect:
-                    // wait for timeout
-                    break;
-                case Srecv:
-                    {
-                        if (sock)
-                            sock->close();
-                    }
-                    break;
-                case Ssend:
-                    // wait for finished
-                    break;
-                default:
-                    state = Scancelled;
-                    break;
-                }
-            }
-            catch (IException *e) {
-                EXCLOG(e,"CSingletonSocketConnection::cancel");
-                e->Release();
-            }
-            {
-                CriticalUnblock unblock(crit);
-                Sleep(1000);
-            }
+        {
+            CriticalUnblock unblock(crit);
+            Sleep(1000);
         }
     }
-};
+}
 
     
 IConversation *createSingletonSocketConnection(unsigned short port,SocketEndpoint *_ep)

+ 62 - 1
system/jlib/jsocket.hpp

@@ -70,6 +70,12 @@ enum JSOCKET_ERROR_CODES {
 #define SHUTDOWN_WRITE      1
 #define SHUTDOWN_READWRITE  2
 
+#ifndef _WIN32
+#define BLOCK_POLLED_SINGLE_CONNECTS  // NB this is much slower in windows
+#define CENTRAL_NODE_RANDOM_DELAY
+#else
+#define USERECVSEM      // to singlethread BF_SYNC_TRANSFER_PUSH
+#endif
 
 //
 // Abstract socket interface
@@ -448,7 +454,35 @@ extern jlib_decl IJSOCK_Exception *IPv6NotImplementedException(const char *filen
 #define IPV6_NOT_IMPLEMENTED() throw IPv6NotImplementedException(sanitizeSourceFile(__FILE__), __LINE__)
 
 
-
+#ifdef USERECVSEM
+class CSemProtect
+{
+    Semaphore *sem;
+    bool *owned;
+public:
+    CSemProtect() { clear(); }
+    ~CSemProtect()
+    {
+        if (sem&&*owned) {
+            *owned = false;
+            sem->signal();
+        }
+    }
+    void set(Semaphore *_sem,bool *_owned)
+    {
+        sem = _sem;
+        owned = _owned;
+    }
+    bool wait(Semaphore *_sem,bool *_owned,unsigned timeout) {
+        if (!*_owned&&!_sem->wait(timeout))
+            return false;
+        *_owned = true;
+        set(_sem,_owned);
+        return true;
+    }
+    void clear() { sem = NULL; owned = NULL; }
+};
+#endif
 
 
 //---------------------------------------------------------------------------
@@ -514,6 +548,7 @@ struct JSocketStatistics
     unsigned longestblocksize; 
 };
 
+extern jlib_decl JSocketStatistics *getSocketStatPtr();
 extern jlib_decl void getSocketStatistics(JSocketStatistics &stats);
 extern jlib_decl void resetSocketStatistics();
 extern jlib_decl StringBuffer &getSocketStatisticsString(JSocketStatistics &stats,StringBuffer &buf);
@@ -666,5 +701,31 @@ typedef std::function<bool(IAllowListWriter &)> AllowListPopulateFunction;
 typedef std::function<StringBuffer &(StringBuffer &, unsigned __int64)> AllowListFormatFunction;
 extern jlib_decl IAllowListHandler *createAllowListHandler(AllowListPopulateFunction populateFunc, AllowListFormatFunction roleFormatFunc = {}); // format function optional
 
+// utility interface for simple conversations
+// conversation is always between two ends,
+// at any given time one end must be receiving and other sending (though these may swap during the conversation)
+class jlib_decl CSingletonSocketConnection: implements IConversation, public CInterface
+{
+public:
+    Owned<ISocket> sock;
+    Owned<ISocket> listensock;
+    enum { Snone, Saccept, Sconnect, Srecv, Ssend, Scancelled } state;
+    bool cancelling;
+    SocketEndpoint ep;
+    CriticalSection crit;
+    IMPLEMENT_IINTERFACE;
+
+    CSingletonSocketConnection() {}
+    CSingletonSocketConnection(SocketEndpoint &_ep);
+    virtual ~CSingletonSocketConnection();
+    void set_keep_alive(bool keepalive);
+    virtual bool connect(unsigned timeoutms);
+    bool send(MemoryBuffer &mb);
+    unsigned short setRandomPort(unsigned short base, unsigned num);
+    virtual bool accept(unsigned timeoutms);
+    bool recv(MemoryBuffer &mb, unsigned timeoutms);
+    virtual void cancel();
+};
+
 #endif
 

+ 18 - 0
system/jlib/jutil.cpp

@@ -2466,6 +2466,9 @@ jlib_decl bool querySecuritySettings(DAFSConnectCfg *_connectMethod,
     if (_port)
         *_port = DAFILESRV_PORT;//default
 
+    // TLS TODO: could share mtls setting and cert/config for secure dafilesrv
+    //           but note remote cluster configs should then match this one
+
     const IProperties & conf = queryEnvironmentConf();
     StringAttr sslMethod;
     sslMethod.set(conf.queryProp("dfsUseSSL"));
@@ -2582,6 +2585,21 @@ jlib_decl bool queryHPCCPKIKeyFiles(const char * *  _certificate,//HPCCCertifica
     return true;
 }
 
+#ifndef _CONTAINERIZED
+jlib_decl bool queryMtlsBareMetalConfig()
+{
+    const IProperties &conf = queryEnvironmentConf();
+    if (conf.queryProp("mtls"))
+        return conf.getPropBool("mtls", false);
+    // not in conf, check xml, since all other mp settings are checked there
+    Owned<IPropertyTree> env = getHPCCEnvironment();
+    if (env)
+        return env->getPropBool("EnvSettings/mtls", false);
+
+    return false;
+}
+#endif
+
 static IPropertyTree *getOSSdirTree()
 {
     Owned<IPropertyTree> envtree = getHPCCEnvironment();

+ 4 - 0
system/jlib/jutil.hpp

@@ -447,6 +447,10 @@ extern jlib_decl bool queryHPCCPKIKeyFiles(const char * *  _certificate,//HPCCCe
                                            const char * *  _privateKey, //HPCCPrivateKeyFile
                                            const char * *  _passPhrase);//HPCCPassPhrase, encrypted
 
+#ifndef _CONTAINERIZED
+extern jlib_decl bool queryMtlsBareMetalConfig();
+#endif
+
 extern jlib_decl const char * matchConfigurationDirectoryEntry(const char *path,const char *mask,StringBuffer &name, StringBuffer &component, StringBuffer &instance);
 extern jlib_decl bool replaceConfigurationDirectoryEntry(const char *path,const char *frommask,const char *tomask,StringBuffer &out);
 

+ 4 - 0
system/mp/CMakeLists.txt

@@ -36,6 +36,7 @@ include_directories (
          ./../include 
          .
          ./../jlib
+         ./../security/securesocket
     )
 
 ADD_DEFINITIONS( -DLOGMSGCOMPONENT=2 -D_USRDLL -DMP_EXPORTS )
@@ -45,5 +46,8 @@ install ( TARGETS mp RUNTIME DESTINATION ${EXEC_DIR} LIBRARY DESTINATION ${LIB_D
 target_link_libraries ( mp 
          jlib
     )
+    if (USE_OPENSSL)
+        target_link_libraries(mp securesocket)
+    endif()
 
 HPCC_ADD_SUBDIRECTORY(test "PLATFORM")

+ 97 - 39
system/mp/mpcomm.cpp

@@ -39,12 +39,15 @@
 #include "jqueue.tpp"
 #include "jsuperhash.hpp"
 #include "jmisc.hpp"
+#include "jsecrets.hpp"
 
 #include "mpcomm.hpp"
 #include "mpbuff.hpp"
 #include "mputil.hpp"
 #include "mplog.hpp"
 
+#include "securesocket.hpp"
+
 #ifdef _MSC_VER
 #pragma warning (disable : 4355)
 #endif
@@ -444,10 +447,11 @@ class CMPConnectThread: public Thread
     ISocket *listensock;
     CMPServer *parent;
     int mpSoMaxConn;
-    unsigned mpTraceLevel;
     Owned<IAllowListHandler> allowListCallback;
     void checkSelfDestruct(void *p,size32_t sz);
 
+    Owned<ISecureSocketContext> secureContextServer;
+
 public:
     CMPConnectThread(CMPServer *_parent, unsigned port, bool _listen);
     ~CMPConnectThread()
@@ -498,6 +502,8 @@ protected:
 public:
     bool checkclosed;
     bool tryReopenChannel = false;
+    bool useTLS = false;
+    unsigned mpTraceLevel = 0;
 
 // packet handlers
     PingPacketHandler           *pingpackethandler;         // TAG_SYS_PING
@@ -800,7 +806,7 @@ protected: friend class CMPPacketReader;
     unsigned startxfer; 
     unsigned numiter;
 #endif
-
+    Owned<ISecureSocketContext> secureContextClient;
 
     bool checkReconnect(CTimeMon &tm)
     {
@@ -843,7 +849,27 @@ protected: friend class CMPPacketReader;
                 if (remaining<10000)
                     remaining = 10000; // 10s min granularity for MP
                 newsock.setown(ISocket::connect_timeout(remoteep,remaining));
+
+#if defined(_USE_OPENSSL)
+                if (parent->useTLS)
+                {
+                    Owned<ISecureSocket> ssock = secureContextClient->createSecureSocket(newsock.getClear());
+                    int tlsTraceLevel = SSLogMin;
+                    if (parent->mpTraceLevel >= MPVerboseMsgThreshold)
+                        tlsTraceLevel = SSLogMax;
+                    int status = ssock->secure_connect(tlsTraceLevel);
+                    if (status < 0)
+                    {
+                        ssock->close();
+                        exitException.setown(new CMPException(MPERR_connection_failed, remoteep));
+                        throw exitException.getLink();
+                    }
+                    newsock.setown(ssock.getClear());
+                }
+#endif // OPENSSL
+
                 newsock->set_keep_alive(true);
+
 #ifdef _FULLTRACE
                 LOG(MCdebugInfo, unknownJob, "MP: connect after socket connect, retrycount = %d", retrycount);
 #endif
@@ -1550,7 +1576,7 @@ public:
             return false;
         try {
             // try and mop up all data on socket 
-            
+            // TLS TODO: avail_read() may not return accurate amount of pending bytes
             size32_t sizeavail = sock->avail_read(); 
             if (sizeavail==0) {
                 // graceful close
@@ -1686,6 +1712,11 @@ CMPChannel::CMPChannel(CMPServer *_parent,SocketEndpoint &_remoteep) : parent(_p
     attachep.set(nullptr);
     atomic_set(&attachchk, 0);
     lastxfer = msTick();
+
+#if defined(_USE_OPENSSL)
+    if (parent->useTLS)
+        secureContextClient.setown(createSecureSocketContextSecret("local", ClientSocket));
+#endif
 }
 
 void CMPChannel::reset()
@@ -1946,7 +1977,7 @@ CMPConnectThread::CMPConnectThread(CMPServer *_parent, unsigned port, bool _list
     parent = _parent;
     listen = _listen;
     mpSoMaxConn = 0;
-    mpTraceLevel = 0;
+#ifndef _CONTAINERIZED
     Owned<IPropertyTree> env = getHPCCEnvironment();
     if (env)
     {
@@ -1956,8 +1987,27 @@ CMPConnectThread::CMPConnectThread(CMPServer *_parent, unsigned port, bool _list
             if (!mpSoMaxConn)
                 mpSoMaxConn = env->getPropInt("EnvSettings/ports/mpSoMaxConn", 0);
         }
-        mpTraceLevel = env->getPropInt("EnvSettings/mpTraceLevel", 0);
+        unsigned mpTraceLevel = env->getPropInt("EnvSettings/mpTraceLevel", 0);
+        switch (mpTraceLevel)
+        {
+            case 0:
+                parent->mpTraceLevel = InfoMsgThreshold;
+                break;
+            case 1:
+                parent->mpTraceLevel = DebugMsgThreshold;
+                break;
+            case 2:
+                parent->mpTraceLevel = ExtraneousMsgThreshold;
+                break;
+            default:
+                parent->mpTraceLevel = MPVerboseMsgThreshold;
+                break;
+        }
     }
+#else
+    parent->mpTraceLevel = getComponentConfigSP()->getPropInt("logging/@detail", InfoMsgThreshold);
+#endif
+
     if (mpSoMaxConn)
     {
         int kernSoMaxConn = 0;
@@ -1971,6 +2021,9 @@ CMPConnectThread::CMPConnectThread(CMPServer *_parent, unsigned port, bool _list
     {
         // need to connect early to resolve clash
         unsigned minPort, maxPort;
+        minPort = MP_START_PORT;
+        maxPort = MP_END_PORT;
+#ifndef _CONTAINERIZED
         if (env)
         {
             minPort = env->getPropInt("EnvSettings/mpStart", 0);
@@ -1980,11 +2033,7 @@ CMPConnectThread::CMPConnectThread(CMPServer *_parent, unsigned port, bool _list
             if (!maxPort)
                 maxPort = env->getPropInt("EnvSettings/ports/mpEnd", MP_END_PORT);
         }
-        else
-        {
-            minPort = MP_START_PORT;
-            maxPort = MP_END_PORT;
-        }
+#endif
         assertex(maxPort >= minPort);
         Owned<IJSOCK_Exception> lastErr;
         // mck - if not listening then could ignore port range and
@@ -2015,6 +2064,11 @@ CMPConnectThread::CMPConnectThread(CMPServer *_parent, unsigned port, bool _list
     LOG(MCdebugInfo, unknownJob, "MP Connect Thread Init Port = %d", port);
 #endif
     running = false;
+
+#if defined(_USE_OPENSSL)
+    if (parent->useTLS)
+        secureContextServer.setown(createSecureSocketContextSecretSrv("local"));
+#endif
 }
 
 void CMPConnectThread::checkSelfDestruct(void *p,size32_t sz)
@@ -2069,20 +2123,11 @@ int CMPConnectThread::run()
 #endif
     while (running)
     {
-        ISocket *sock=NULL;
+        Owned<ISocket> sock;
         SocketEndpoint peerEp;
         try
         {
-            sock=listensock->accept(true, &peerEp);
-#ifdef _FULLTRACE       
-            StringBuffer s;
-            SocketEndpoint ep1;
-            if (sock)
-            {
-                sock->getPeerEndpoint(ep1);
-                PROGLOG("MP: Connect Thread: socket accepted from %s",ep1.getUrlStr(s).str());
-            }
-#endif
+            sock.setown(listensock->accept(true, &peerEp));
         }
         catch (IException *e)
         {
@@ -2093,7 +2138,33 @@ int CMPConnectThread::run()
         {
             try
             {
+#if defined(_USE_OPENSSL)
+                if (parent->useTLS)
+                {
+                    Owned<ISecureSocket> ssock = secureContextServer->createSecureSocket(sock.getClear());
+                    int tlsTraceLevel = SSLogMin;
+                    if (parent->mpTraceLevel >= MPVerboseMsgThreshold)
+                        tlsTraceLevel = SSLogMax;
+                    int status = ssock->secure_accept(tlsTraceLevel);
+                    if (status < 0)
+                    {
+                        ssock->close();
+                        PROGLOG("MP Connect Thread: failed to accept secure connection");
+                        continue;
+                    }
+                    sock.setown(ssock.getClear());
+                }
+#endif // OPENSSL
+
+#ifdef _FULLTRACE
+                StringBuffer s;
+                SocketEndpoint ep1;
+                sock->getPeerEndpoint(ep1);
+                PROGLOG("MP: Connect Thread: socket accepted from %s",ep1.getUrlStr(s).str());
+#endif
+
                 sock->set_keep_alive(true);
+
                 size32_t rd;
                 SocketEndpoint _remoteep;
                 SocketEndpoint hostep;
@@ -2104,14 +2175,12 @@ int CMPConnectThread::run()
                 traceSlowReadTms("MP: initial accept packet from", sock, &connectHdr, sizeof(connectHdr.id), sizeof(connectHdr), rd, CONFIRM_TIMEOUT, CONFIRM_TIMEOUT_INTERVAL);
                 if (0 == rd)
                 {
-                    if (mpTraceLevel > 1)
+                    if (parent->mpTraceLevel >= MPVerboseMsgThreshold)
                     {
                         // cannot get peer addresss as socket state is now ss_shutdown (unless we want to allow this in getPeerEndpoint())
-                        StringBuffer errMsg("MP Connect Thread: connect with no msg received, assumed port monitor check");
-                        PROGLOG("%s", errMsg.str());
+                        PROGLOG("MP Connect Thread: connect with no msg received, assumed port monitor check");
                     }
                     sock->close();
-                    sock->Release();
                     continue;
                 }
                 else
@@ -2128,7 +2197,6 @@ int CMPConnectThread::run()
                         peerEp.getUrlStr(errMsg);
                         FLLOG(MCoperatorWarning, unknownJob, "%s", errMsg.str());
                         sock->close();
-                        sock->Release();
                         continue;
                     }
                 }
@@ -2162,7 +2230,6 @@ int CMPConnectThread::run()
                         }
 
                         sock->close();
-                        sock->Release();
                         continue;
                     }
                 }
@@ -2187,7 +2254,7 @@ int CMPConnectThread::run()
                         peerEp.getUrlStr(errMsg);
                         FLLOG(MCoperatorWarning, unknownJob, "%s", errMsg.str());
                     }
-                    else if (mpTraceLevel > 1)
+                    else if (parent->mpTraceLevel >= MPVerboseMsgThreshold)
                     {
                         // all zeros msg received
                         errMsg.append("MP Connect Thread: connect with empty msg received, assumed port monitor check from ");
@@ -2195,7 +2262,6 @@ int CMPConnectThread::run()
                         PROGLOG("%s", errMsg.str());
                     }
                     sock->close();
-                    sock->Release();
                     continue;
                 }
 #ifdef _FULLTRACE       
@@ -2207,7 +2273,7 @@ int CMPConnectThread::run()
 #endif
                 checkSelfDestruct(&connectHdr.id[0],sizeof(connectHdr.id));
                 Owned<CMPChannel> channel = parent->lookup(_remoteep);
-                if (!channel->attachSocket(sock,_remoteep,hostep,false,&rd,addrval))
+                if (!channel->attachSocket(sock.getClear(),_remoteep,hostep,false,&rd,addrval))
                 {
 #ifdef _FULLTRACE       
                     PROGLOG("MP Connect Thread: lookup failed");
@@ -2231,15 +2297,6 @@ int CMPConnectThread::run()
                 sock->close();
                 e->Release();
             }
-            try
-            {
-                sock->Release();
-            }
-            catch (IException *e)
-            {
-                FLLOG(MCoperatorWarning, unknownJob, e,"MP sock release failed");
-                e->Release();
-            }
         }
         else
         {
@@ -2331,6 +2388,7 @@ CMPServer::CMPServer(unsigned __int64 _role, unsigned _port, bool _listen)
     role = _role;
     port = 0;   // connectthread tells me what port it actually connected on
     checkclosed = false;
+    useTLS = queryMtls();
 
     // If !_listen, CMPConnectThread binds a port but does not actually start
     // running, it is used as a unique IP:port required in MP INode/IGroup internals

+ 2 - 1
system/mp/mpcomm.hpp

@@ -26,11 +26,12 @@
 #include "mpbuff.hpp"
 #include "mptag.hpp"
 
+const unsigned MPVerboseMsgThreshold = 110; // greater than default logging detail
+
 // timeout values
 #define MP_WAIT_FOREVER ((unsigned)-1)
 #define MP_ASYNC_SEND   ((unsigned)-2)
 
-
 interface ICommunicator: extends IInterface
 {
     virtual bool send (CMessageBuffer &mbuf, rank_t dstrank, mptag_t tag, unsigned timeout=MP_WAIT_FOREVER) = 0;  

+ 388 - 43
system/security/securesocket/securesocket.cpp

@@ -23,13 +23,11 @@
 //jlib
 #include "jliball.hpp"
 #include "string.h"
-#include "jsecrets.hpp"
 
 #ifdef _WIN32
 #include <windows.h>
 #include <winsock2.h>
 #include <ws2tcpip.h>
-#include <signal.h>  
 #else
 #include <sys/types.h>
 #include <sys/socket.h>
@@ -39,6 +37,7 @@
 #include <stddef.h>
 #include <errno.h>
 #endif
+#include <signal.h>
 
 //openssl
 #include <openssl/rsa.h>
@@ -62,6 +61,8 @@
 #include "jsmartsock.ipp"
 #include "securesocket.hpp"
 
+static JSocketStatistics *SSTATS;
+
 Owned<ISecureSocketContext> server_securesocket_context;
 bool accept_selfsigned = false;
 
@@ -137,6 +138,13 @@ private:
     CStringSet* m_peers;
     int         m_loglevel;
     bool        m_isSecure;
+    size32_t    nextblocksize;
+    unsigned    blockflags;
+    unsigned    blocktimeoutms;
+#ifdef USERECVSEM
+    static Semaphore receiveblocksem;
+    bool             receiveblocksemowned; // owned by this socket
+#endif
 private:
     StringBuffer& get_cn(X509* cert, StringBuffer& cn);
     bool verify_cert(X509* cert);
@@ -197,7 +205,7 @@ public:
     // 
     virtual int wait_write(unsigned timeout)
     {
-        throw MakeStringException(-1, "CSecureSocket::wait_write: not implemented");
+        return m_socket->wait_write(timeout);
     }
 
     //
@@ -216,7 +224,6 @@ public:
         throw MakeStringException(-1, "CSecureSocket::set_nagle: not implemented");
     }
 
-
     // set 'linger' time - time close will linger so that outstanding unsent data will be transmited
     //
     virtual void set_linger(int lingersecs)  
@@ -224,13 +231,12 @@ public:
         m_socket->set_linger(lingersecs);
     }
 
-
     //
     // Cancel accept operation and close socket
     //
     virtual void  cancel_accept() // not needed for UDP
     {
-        throw MakeStringException(-1, "CSecureSocket::cancel_accept: not implemented");
+        m_socket->cancel_accept();
     }
 
     //
@@ -285,39 +291,25 @@ public:
     }
 
     // Block functions 
+    // TLS TODO: can we move these block* into a base class for both CSocket and CSecureSocket ?
 
-    virtual void  set_block_mode (             // must be called before block operations
-                            unsigned flags,    // BF_* flags (must match receive_block)
-                          size32_t recsize=0,  // record size (required for rec compression)
-                            unsigned timeout=0 // timeout in msecs (0 for no timeout)
-                  ) 
-    {
-        throw MakeStringException(-1, "CSecureSocket::set_block_mode: not implemented");
-    }
-
-
+    virtual void  set_block_mode(                // must be called before block operations
+                            unsigned flags,      // BF_* flags (must match receive_block)
+                            size32_t recsize=0,  // record size (required for rec compression)
+                            unsigned timeout=0   // timeout in msecs (0 for no timeout)
+                  );
 
     virtual bool  send_block( 
-                            const void *blk,   // data to send 
+                            const void *blk,     // data to send
                             size32_t sz          // size to send (0 for eof)
-                  )
-    {
-        throw MakeStringException(-1, "CSecureSocket::send_block: not implemented");
-    }
+                  );
 
-    virtual size32_t receive_block_size ()     // get size of next block (always must call receive_block after) 
-    {
-        throw MakeStringException(-1, "CSecureSocket::receive_block_size: not implemented");
-    }
+    virtual size32_t receive_block_size();       // get size of next block (always must call receive_block after)
 
     virtual size32_t receive_block(
-                            void *blk,         // receive pointer 
+                            void *blk,           // receive pointer
                             size32_t sz          // max size to read (0 for sync eof) 
-                                               // if less than block size truncates block
-                  )
-    {
-        throw MakeStringException(-1, "CSecureSocket::receive_block: not implemented");
-    }
+                     );                          // if less than block size truncates block
 
     virtual void  close()
     {
@@ -344,6 +336,17 @@ public:
         size32_t avr = m_socket->avail_read();
         if (avr > 0)
         {
+            // TLS TODO: MCK - needs to be thought out and refactored
+            // Bytes here may be part of encrypted SSL record.
+            // If SSL_MODE_AUTO_RETRY set (and it is) then bytes
+            // here may also be from auto-renegotiation.
+            // If return 0 here, caller may think socket was closed
+            // If return >0 here, caller may block on next SSL_read()
+            // Only two locations where this value other than !=0 is used:
+            //     CRemoteFileServer->notifySelected() (dafsserver)
+            //     CMPPacketReader->notifySelected() (mpcomm)
+            return 1;
+            /* --------------------------------
             // bytes may be SSL/TLS protocol and not part of msg
             byte c[2];
             // TODO this may block ...
@@ -362,13 +365,24 @@ public:
                 errbuf[511] = '\0';
                 DBGLOG("SSL_peek (avail_read) returns error %d - %s", ret, errbuf);
             }
+            -------------------------------- */
         }
         return 0;
     }
 
-    virtual size32_t write_multiple(unsigned num,const void **buf, size32_t *size)
+    virtual size32_t write_multiple(unsigned num, const void **buf, size32_t *size)
     {
-        throw MakeStringException(-1, "CSecureSocket::write_multiple: not implemented");
+        size32_t res = 0;
+        for (int i=0; i<num; i++)
+        {
+            if (size[i] > 0)
+            {
+                // non-tls version tries to write equal 64k chunks, but unless we set
+                // SSL_MODE_ENABLE_PARTIAL_WRITE, we should write each buf[i] entirely ...
+                res += write(buf[i], size[i]);
+            }
+        }
+        return res;
     }
 
     virtual size32_t get_send_buffer_size() // get OS send buffer
@@ -410,7 +424,7 @@ public:
 
     virtual void set_keep_alive(bool set) // set option SO_KEEPALIVE
     {
-        throw MakeStringException(-1, "CSecureSocket::set_keep_alive: not implemented");
+        m_socket->set_keep_alive(set);
     }
 
     virtual size32_t udp_write_to(const SocketEndpoint &ep, void const* buf, size32_t size)
@@ -429,6 +443,9 @@ public:
     }
 };
 
+#ifdef USERECVSEM
+Semaphore CSecureSocket::receiveblocksem(2);
+#endif
 
 /**************************************************************************
  *  CSecureSocket -- secure socket layer implementation using openssl     *
@@ -448,6 +465,12 @@ CSecureSocket::CSecureSocket(ISocket* sock, SSL_CTX* ctx, bool verify, bool addr
     {
         throw MakeStringException(-1, "Can't create ssl");
     }
+
+    // there is no MSG_NOSIGNAL or SO_NOSIGPIPE for SSL_write() ...
+#ifndef _WIN32
+    signal(SIGPIPE, SIG_IGN);
+#endif
+
     SSL_set_fd(m_ssl, sock->OShandle());
 }
 
@@ -467,6 +490,12 @@ CSecureSocket::CSecureSocket(int sockfd, SSL_CTX* ctx, bool verify, bool address
     {
         throw MakeStringException(-1, "Can't create ssl");
     }
+
+    // there is no MSG_NOSIGNAL or SO_NOSIGPIPE for SSL_write() ...
+#ifndef _WIN32
+    signal(SIGPIPE, SIG_IGN);
+#endif
+
     SSL_set_fd(m_ssl, sockfd);
 }
 
@@ -623,7 +652,7 @@ int CSecureSocket::secure_accept(int logLevel)
         // which can happen with port scan / VIP ...
         // NOTE: ret could also be SSL_ERROR_ZERO_RETURN if client closed
         // gracefully after ssl neg initiated ...
-        if ( (logLevel >= 5) || (ret != SSL_ERROR_SYSCALL) )
+        if ( (logLevel >= SSLogNormal) || (ret != SSL_ERROR_SYSCALL) )
         {
             char errbuf[512];
             ERR_error_string_n(ERR_get_error(), errbuf, 512);
@@ -646,8 +675,8 @@ int CSecureSocket::secure_accept(int logLevel)
         return err;
     }
 
-    if (logLevel)
-        DBGLOG("SSL connection using %s", SSL_get_cipher(m_ssl));
+    if (logLevel > SSLogNormal)
+        DBGLOG("SSL accept ok, using %s", SSL_get_cipher(m_ssl));
 
     if(m_verify)
     {
@@ -683,6 +712,8 @@ int CSecureSocket::secure_connect(int logLevel)
         throw MakeStringException(-1, "SSL_connect failed: %s", errbuf);
     }
     
+    if (logLevel > SSLogNormal)
+        DBGLOG("SSL connect ok, using %s", SSL_get_cipher (m_ssl));
 
     // Currently only do fake verify - simply logging the subject and issuer
     // The verify parameter makes it possible for the application to verify only
@@ -692,10 +723,6 @@ int CSecureSocket::secure_connect(int logLevel)
         // Following two steps are optional and not required for
         // data exchange to be successful.
         
-        // Get the cipher - opt
-        if (logLevel)
-            DBGLOG("SSL connection using %s\n", SSL_get_cipher (m_ssl));
-
         // Get server's certificate (note: beware of dynamic allocation) - opt
         X509* server_cert = SSL_get_peer_certificate (m_ssl);
         bool verified = false;
@@ -819,17 +846,233 @@ void CSecureSocket::read(void* buf, size32_t min_size, size32_t max_size, size32
 
 size32_t CSecureSocket::write(void const* buf, size32_t size)
 {
+    if (size == 0)
+        return 0;
     int numwritten = SSL_write(m_ssl, buf, size);
+    // 0 is an error
+    if (numwritten <= 0)
+    {
+        int err = SSL_get_error(m_ssl, numwritten);
+        // SSL_ERROR_WANT_READ/WRITE errors and retry should not be required
+        // b/c of blocking bio and SSL_MODE_ENABLE_PARTIAL_WRITE is not set
+        char errbuf[512];
+        ERR_error_string_n(err, errbuf, 512);
+        ERR_clear_error();
+        VStringBuffer errmsg("SSL_write error %d - %s", err, errbuf);
+        if (err == SSL_ERROR_ZERO_RETURN)
+            throw createJSocketException(JSOCKERR_graceful_close, errmsg);
+        else
+            throw createJSocketException(JSOCKERR_broken_pipe, errmsg);
+    }
     return numwritten;
 }
 
 size32_t CSecureSocket::writetms(void const* buf, size32_t size, unsigned timeoutms)
 {
     // timeoutms not implemented yet ...
+    if (size == 0)
+        return 0;
     int numwritten = SSL_write(m_ssl, buf, size);
+    // 0 is an error
+    if (numwritten <= 0)
+    {
+        int err = SSL_get_error(m_ssl, numwritten);
+        // SSL_ERROR_WANT_READ/WRITE errors and retry should not be required
+        // b/c of blocking bio and SSL_MODE_ENABLE_PARTIAL_WRITE is not set
+        char errbuf[512];
+        ERR_error_string_n(err, errbuf, 512);
+        ERR_clear_error();
+        VStringBuffer errmsg("SSL_write (tms) error %d - %s", err, errbuf);
+        if (err == SSL_ERROR_ZERO_RETURN)
+            throw createJSocketException(JSOCKERR_graceful_close, errmsg);
+        else
+            throw createJSocketException(JSOCKERR_broken_pipe, errmsg);
+    }
     return numwritten;
 }
 
+// ----------------------------
+
+void CSecureSocket::set_block_mode(unsigned flags, size32_t recsize, unsigned _timeoutms)
+{
+    blockflags = flags;
+    nextblocksize = UINT_MAX;
+    blocktimeoutms = _timeoutms?_timeoutms:WAIT_FOREVER;
+}
+
+size32_t CSecureSocket::receive_block_size()
+{
+    // assumed always paired with receive_block
+    if (nextblocksize) {
+        if (blockflags&BF_SYNC_TRANSFER_PULL) {
+            bool eof=false;
+            write(&eof,sizeof(eof));
+        }
+        size32_t rd;
+        readtms(&nextblocksize,sizeof(nextblocksize),sizeof(nextblocksize),rd,blocktimeoutms);
+        _WINREV(nextblocksize);
+        if (nextblocksize==0) { // confirm eof
+            try {
+                bool confirm=true;
+                write(&confirm,sizeof(confirm));
+            }
+            catch (IJSOCK_Exception *e) {
+                if ((e->errorCode()!=JSOCKERR_broken_pipe)&&(e->errorCode()!=JSOCKERR_graceful_close))
+                    EXCLOG(e,"receive_block_size");
+                e->Release();
+            }
+        }
+        else if (blockflags&BF_SYNC_TRANSFER_PUSH) {  // leaves receiveblocksem clear
+#ifdef USERECVSEM
+            CSemProtect semprot; // this will catch exception in write
+            while (!semprot.wait(&receiveblocksem,&receiveblocksemowned,60*1000*5))
+                IWARNLOG("Receive block stalled");
+#endif
+            bool eof=false;
+            write(&eof,sizeof(eof));
+#ifdef USERECVSEM
+            semprot.clear();
+#endif
+        }
+    }
+    return nextblocksize;
+}
+
+size32_t CSecureSocket::receive_block(void *blk, size32_t maxsize)
+{
+#ifdef USERECVSEM
+    CSemProtect semprot; // this will catch exceptions
+#endif
+    size32_t sz = nextblocksize;
+    if (sz) {
+        if (sz==UINT_MAX) { // need to get size
+            if (!blk||!maxsize) {
+                if (blockflags&BF_SYNC_TRANSFER_PUSH) { // ignore block size
+                    size32_t rd;
+                    readtms(&nextblocksize,sizeof(nextblocksize),sizeof(nextblocksize),rd,blocktimeoutms);
+                }
+                if (blockflags&(BF_SYNC_TRANSFER_PULL|BF_SYNC_TRANSFER_PUSH)) { // signal eof
+                    bool eof=true;
+                    write(&eof,sizeof(eof));
+                    nextblocksize = 0;
+                    return 0;
+                }
+            }
+            sz = receive_block_size();
+            if (!sz)
+                return 0;
+        }
+        unsigned startt=usTick();   // include sem block but not initial handshake
+#ifdef USERECVSEM
+        if (blockflags&BF_SYNC_TRANSFER_PUSH)  // read_block_size sets semaphore
+            semprot.set(&receiveblocksem,&receiveblocksemowned);  // this will reset semaphore on exit
+#endif
+        nextblocksize = UINT_MAX;
+        size32_t rd;
+        if (sz<=maxsize) {
+            readtms(blk,sz,sz,rd,blocktimeoutms);
+        }
+        else { // truncate
+            readtms(blk,maxsize,maxsize,rd,blocktimeoutms);
+            sz -= maxsize;
+            OwnedMalloc<void> tmp = malloc(sz);
+            readtms(tmp,sz,sz,rd,blocktimeoutms);
+            sz = maxsize;
+        }
+        if (blockflags&BF_RELIABLE_TRANSFER) {
+            bool isok=true;
+            write(&isok,sizeof(isok));
+        }
+        unsigned elapsed = usTick()-startt;
+        SSTATS = getSocketStatPtr();
+        if (SSTATS)
+        {
+            SSTATS->blockrecvtime+=elapsed;
+            SSTATS->numblockrecvs++;
+            SSTATS->blockrecvsize+=sz;
+        }
+    }
+    return sz;
+}
+
+bool CSecureSocket::send_block(const void *blk, size32_t sz)
+{
+    unsigned startt=usTick();
+#ifdef TRACE_SLOW_BLOCK_TRANSFER
+    unsigned startt2 = startt;
+    unsigned startt3 = startt;
+#endif
+    if (blockflags&BF_SYNC_TRANSFER_PULL) {
+        size32_t rd;
+        bool eof = true;
+        readtms(&eof,sizeof(eof),sizeof(eof),rd,blocktimeoutms);
+        if (eof)
+            return false;
+#ifdef TRACE_SLOW_BLOCK_TRANSFER
+        startt2=usTick();
+#endif
+    }
+    if (!blk||!sz) {
+        sz = 0;
+        write(&sz,sizeof(sz));
+        try {
+            bool reply;
+            size32_t rd;
+            readtms(&reply,sizeof(reply),sizeof(reply),rd,blocktimeoutms);
+        }
+        catch (IJSOCK_Exception *e) {
+            if ((e->errorCode()!=JSOCKERR_broken_pipe)&&(e->errorCode()!=JSOCKERR_graceful_close))
+                EXCLOG(e,"CSocket::send_block");
+            e->Release();
+        }
+        return false;
+    }
+    size32_t rsz=sz;
+    _WINREV(rsz);
+    write(&rsz,sizeof(rsz));
+    if (blockflags&BF_SYNC_TRANSFER_PUSH) {
+#ifdef TRACE_SLOW_BLOCK_TRANSFER
+        startt2=usTick();
+#endif
+        size32_t rd;
+        bool eof = true;
+        readtms(&eof,sizeof(eof),sizeof(eof),rd,blocktimeoutms);
+        if (eof)
+            return false;
+#ifdef TRACE_SLOW_BLOCK_TRANSFER
+        startt3=usTick();
+#endif
+    }
+    write(blk,sz);
+    if (blockflags&BF_RELIABLE_TRANSFER) {
+        bool isok=false;
+        size32_t rd;
+        readtms(&isok,sizeof(isok),sizeof(isok),rd,blocktimeoutms);
+        if (!isok)
+            return false;
+    }
+    unsigned nowt = usTick();
+    unsigned elapsed = nowt-startt;
+    SSTATS = getSocketStatPtr();
+    if (SSTATS)
+    {
+        SSTATS->blocksendtime+=elapsed;
+        SSTATS->numblocksends++;
+        SSTATS->blocksendsize+=sz;
+        if (elapsed>SSTATS->longestblocksend) {
+            SSTATS->longestblocksend = elapsed;
+            SSTATS->longestblocksize = sz;
+        }
+    }
+#ifdef TRACE_SLOW_BLOCK_TRANSFER
+    if (elapsed>1000000*60)  // over 1min
+        IWARNLOG("send_block took %ds to %s  (%d,%d,%d)",elapsed/1000000,tracename,startt2-startt,startt3-startt2,nowt-startt3);
+#endif
+    return true;
+}
+
+// ----------------------------
+
 int verify_callback(int ok, X509_STORE_CTX *store)
 {
     if(!ok)
@@ -1567,7 +1810,7 @@ public:
     }
 };
 
-}
+} // namespace securesocket
 
 extern "C" {
 CriticalSection factoryCrit;
@@ -1631,6 +1874,18 @@ SECURESOCKET_API ISecureSocketContext* createSecureSocketContextSecret(const cha
         return createSecureSocketContext(sockettype);
 }
 
+SECURESOCKET_API ISecureSocketContext* createSecureSocketContextSecretSrv(const char *mtlsSecretName)
+{
+    if (!queryMtls())
+        throw makeStringException(-100, "TLS secure communication requested but not configured");
+
+    IPropertyTree *info = queryMtlsSecretInfo(mtlsSecretName);
+    if (info)
+        return createSecureSocketContextEx2(info, ServerSocket);
+    else
+        throw makeStringException(-101, "TLS secure communication requested but not configured");
+}
+
 SECURESOCKET_API ICertificate *createCertificate()
 {
     return new securesocket::CRsaCertificate();
@@ -1726,7 +1981,7 @@ SECURESOCKET_API int signCertificate(const char* csr, const char* ca_certificate
     return 0;
 }
 
-}
+} // extern C
 
 class CSecureSmartSocketFactory : public CSmartSocketFactory
 {
@@ -1765,3 +2020,93 @@ ISmartSocketFactory *createSecureSmartSocketFactory(const char *_socklist, bool
 {
     return new CSecureSmartSocketFactory(_socklist, _retry, _retryInterval, _dnsInterval);
 }
+
+class CSingletonSecureSocketConnection: public CSingletonSocketConnection
+{
+public:
+    Owned<ISecureSocketContext> secureContextClient;
+    Owned<ISecureSocketContext> secureContextServer;
+    int tlsLogLevel;
+
+    CSingletonSecureSocketConnection(SocketEndpoint &_ep)
+    {
+        ep = _ep;
+        state = Snone;
+        cancelling = false;
+        secureContextClient.setown(createSecureSocketContextSecret("local", ClientSocket));
+        secureContextServer.setown(createSecureSocketContextSecretSrv("local"));
+#ifdef _CONTAINERIZED
+        tlsLogLevel = getComponentConfigSP()->getPropInt("logging/@detail", SSLogMin);
+        if (tlsLogLevel >= ExtraneousMsgThreshold) // or InfoMsgThreshold ?
+            tlsLogLevel = SSLogMax;
+#else
+        tlsLogLevel = SSLogMin;
+#endif
+    }
+
+    virtual ~CSingletonSecureSocketConnection()
+    {
+        try {
+            if (sock)
+                sock->close();
+        }
+        catch (IException *e) {
+            if (e->errorCode()!=JSOCKERR_graceful_close)
+                EXCLOG(e,"CSingletonSocketConnection close");
+            e->Release();
+        }
+    }
+
+    bool connect(unsigned timeoutms) override
+    {
+        bool srtn = CSingletonSocketConnection::connect(timeoutms);
+        if (srtn)
+        {
+            Owned<ISecureSocket> ssock = secureContextClient->createSecureSocket(sock.getClear(), tlsLogLevel);
+            int status = ssock->secure_connect(tlsLogLevel);
+            if (status < 0)
+            {
+                ssock->close();
+                return false;
+            }
+            else
+            {
+                sock.setown(ssock.getClear());
+                return true;
+            }
+        }
+        return srtn;
+    }
+
+    bool accept(unsigned timeoutms) override
+    {
+        bool srtn = CSingletonSocketConnection::accept(timeoutms);
+        if (srtn)
+        {
+            Owned<ISecureSocket> ssock = secureContextServer->createSecureSocket(sock.getClear(), tlsLogLevel);
+            int status = ssock->secure_accept(tlsLogLevel);
+            if (status < 0)
+            {
+                ssock->close();
+                return false;
+            }
+            else
+            {
+                sock.setown(ssock.getClear());
+                return true;
+            }
+        }
+        return srtn;
+    }
+
+};
+
+IConversation *createSingletonSecureSocketConnection(unsigned short port,SocketEndpoint *_ep)
+{
+    SocketEndpoint ep;
+    if (_ep)
+        ep = *_ep;
+    if (port)
+        ep.port = port;
+    return new CSingletonSecureSocketConnection(ep);
+}

+ 3 - 0
system/security/securesocket/securesocket.hpp

@@ -89,11 +89,14 @@ SECURESOCKET_API ISecureSocketContext* createSecureSocketContext(SecureSocketTyp
 SECURESOCKET_API ISecureSocketContext* createSecureSocketContextEx(const char* certfile, const char* privkeyfile, const char* passphrase, SecureSocketType);
 SECURESOCKET_API ISecureSocketContext* createSecureSocketContextEx2(IPropertyTree* config, SecureSocketType);
 SECURESOCKET_API ISecureSocketContext* createSecureSocketContextSecret(const char *mtlsSecretName, SecureSocketType);
+SECURESOCKET_API ISecureSocketContext* createSecureSocketContextSecretSrv(const char *mtlsSecretName);
 SECURESOCKET_API ICertificate *createCertificate();
 SECURESOCKET_API int signCertificate(const char* csr, const char* ca_certificate, const char* ca_privkey, const char* ca_passphrase, int days, StringBuffer& certificate);
 };
 
 SECURESOCKET_API ISmartSocketFactory *createSecureSmartSocketFactory(const char *_socklist, bool _retry = false, unsigned _retryInterval = 60, unsigned _dnsInterval = (unsigned) -1);
 
+SECURESOCKET_API IConversation *createSingletonSecureSocketConnection(unsigned short port,SocketEndpoint *_ep=nullptr);
+
 #endif
 

+ 3 - 0
thorlcr/activities/CMakeLists.txt

@@ -28,3 +28,6 @@ project (AllProjects)
 include ( ${CMAKE_CURRENT_SOURCE_DIR}/activitymasters_lcr.cmake)
 include ( ${CMAKE_CURRENT_SOURCE_DIR}/activityslaves_lcr.cmake)
 
+include_directories (
+    ./../../system/security/securesocket
+    )

+ 4 - 3
thorlcr/master/CMakeLists.txt

@@ -57,6 +57,7 @@ include_directories (
          ${CMAKE_BINARY_DIR}
          ${CMAKE_BINARY_DIR}/oss
          ./../../system/security/shared
+         ./../../system/security/securesocket
     )
 
 ADD_DEFINITIONS( -D_CONSOLE )
@@ -89,6 +90,6 @@ target_link_libraries (  thormaster_lcr
          activitymasters_lcr 
          swapnodelib
     )
-
-
-
+if (USE_OPENSSL)
+    target_link_libraries(thormaster_lcr securesocket)
+endif()

+ 10 - 1
thorlcr/master/thgraphmanager.cpp

@@ -20,6 +20,7 @@
 #include "jfile.hpp"
 #include "jmutex.hpp"
 #include "jlog.hpp"
+#include "jsecrets.hpp"
 #include "rmtfile.hpp"
 
 #include "portlist.h"
@@ -45,6 +46,7 @@
 #include "thdemonserver.hpp"
 #include "thgraphmanager.hpp"
 #include "roxiehelper.hpp"
+#include "securesocket.hpp"
 #include "environment.hpp"
 
 class CJobManager : public CSimpleInterface, implements IJobManager, implements IExceptionHandler
@@ -175,6 +177,7 @@ class CJobManager : public CSimpleInterface, implements IJobManager, implements
                 try
                 {
                     Owned<ISocket> client = sock->accept(true);
+                    // TLS TODO: secure_accept() on Thor debug socket if globally configured for mtls ...
                     if (client)
                     {
                         client->set_linger(-1);
@@ -651,7 +654,13 @@ void CJobManager::run()
                                 {
                                     SocketEndpoint ep = _item->queryEndpoint();
                                     ep.port = _item->getPort();
-                                    Owned<IConversation> acceptconv = createSingletonSocketConnection(ep.port,&ep);
+                                    Owned<IConversation> acceptconv;
+#if defined(_USE_OPENSSL)
+                                    if (queryMtls())
+                                        acceptconv.setown(createSingletonSecureSocketConnection(ep.port,&ep));
+                                    else
+#endif
+                                        acceptconv.setown(createSingletonSocketConnection(ep.port,&ep));
                                     if (acceptconv->connect(60*1000)) // shouldn't need that long
                                     {
                                         acceptconv->set_keep_alive(true);

+ 4 - 1
thorlcr/msort/CMakeLists.txt

@@ -38,6 +38,7 @@ include_directories (
          ./..
          ./../slave 
          ./../../system/mp 
+         ./../../system/security/securesocket
          ./../graph 
          ./../../rtl/include 
          ./../../rtl/eclrtl 
@@ -68,4 +69,6 @@ target_link_libraries ( thorsort_lcr
         graph_lcr
         roxiemem
 )
-
+if (USE_OPENSSL)
+    target_link_libraries(thorsort_lcr securesocket)
+endif()

+ 4 - 11
thorlcr/msort/tsortl.cpp

@@ -25,6 +25,7 @@
 #include <process.h>
 #endif
 
+#include "mpcomm.hpp"
 #include "jfile.hpp"
 #include "jio.hpp"
 #include "jsocket.hpp"
@@ -33,18 +34,16 @@
 #include "thbuf.hpp"
 #include "thmem.hpp"
 
+#include "securesocket.hpp"
+
 #ifdef _DEBUG
 //#define _FULL_TRACE
 #endif
 
-#define DEFAULTTIMEOUT 3600 // 60 minutes 
-#define CONNECTTIMEOUT 300  // seconds
-
 #ifdef _MSC_VER
 #pragma warning( disable : 4355 )
 #endif
 
-
 class CREcheck { 
     bool &busy;
 public:
@@ -84,11 +83,6 @@ struct TransferStreamHeader
 };
 
 
-static ISocket *DoConnect(SocketEndpoint &nodeaddr)
-{
-    return ISocket::connect_wait(nodeaddr,CONNECTTIMEOUT*1000);
-}
-
 class CSocketRowStream: public CSimpleInterface, implements IRowStream
 {
     MemoryBuffer inbuf;
@@ -286,9 +280,8 @@ public:
 };
 
 
-IRowStream *ConnectMergeRead(unsigned id, IThorRowInterfaces *rowif,SocketEndpoint &nodeaddr,rowcount_t startrec,rowcount_t numrecs)
+IRowStream *ConnectMergeRead(unsigned id, IThorRowInterfaces *rowif,SocketEndpoint &nodeaddr,rowcount_t startrec,rowcount_t numrecs, ISocket *socket)
 {
-    Owned<ISocket> socket = DoConnect(nodeaddr);
     TransferStreamHeader hdr(startrec, numrecs, id);
 #ifdef _FULL_TRACE
     StringBuffer s;

+ 35 - 0
thorlcr/msort/tsorts.cpp

@@ -23,6 +23,7 @@
 #include <mpcomm.hpp>
 #include "thorport.hpp"
 #include "jsocket.hpp"
+#include "jsecrets.hpp"
 #include "jthread.hpp"
 #include "thormisc.hpp"
 #include "jisem.hpp"
@@ -600,6 +601,8 @@ class CThorSorter : public CSimpleInterface, implements IThorSorter, implements
     size32_t transferblocksize, midkeybufsize;
     CRuntimeStatisticCollection spillStats;
     rowcount_t globalCount = 0;
+    bool useTLS = false;
+    unsigned traceLevel = 0;
 
     class CRowToKeySerializer : public CSimpleInterfaceOf<IOutputRowSerializer>
     {
@@ -787,6 +790,30 @@ public:
         transferblocksize = TRANSFERBLOCKSIZE;
         isstable = true;
         stopping = false;
+        useTLS = queryMtls();
+#ifdef _CONTAINERIZED
+        traceLevel = getComponentConfigSP()->getPropInt("logging/@detail", InfoMsgThreshold);
+#else
+        if (globals)
+        {
+            traceLevel = globals->getPropInt("@traceLevel", 0);
+            switch (traceLevel)
+            {
+                case 0:
+                    traceLevel = InfoMsgThreshold;
+                    break;
+                case 1:
+                    traceLevel = DebugMsgThreshold;
+                    break;
+                case 2:
+                    traceLevel = ExtraneousMsgThreshold;
+                    break;
+                default:
+                    traceLevel = ExtraneousMsgThreshold + 10;
+                    break;
+            }
+        }
+#endif
         threaded.start();
     }
     ~CThorSorter()
@@ -1321,6 +1348,14 @@ public:
         return spillStats.getStatisticValue(kind);
     }
     virtual rowcount_t getGlobalCount() const { return globalCount; }
+    virtual bool queryTLS() const override
+    {
+        return useTLS;
+    }
+    virtual bool queryTraceLevel() const override
+    {
+        return traceLevel;
+    }
 };
 
 

+ 7 - 1
thorlcr/msort/tsorts.hpp

@@ -32,6 +32,10 @@
 #include "jsort.hpp"
 #include "mptag.hpp"
 #include "mpbase.hpp"
+#include "securesocket.hpp"
+
+#define DEFAULTTIMEOUT 3600 // 60 minutes
+#define CONNECTTIMEOUT 300  // seconds
 
 interface ISortKeySerializer;
 interface IThorRowInterfaces;
@@ -73,7 +77,7 @@ interface ISocketRowWriter: extends IRowWriter
 
 class CActivityBase;
 THORSORT_API IThorSorter *CreateThorSorter(CActivityBase *activity, SocketEndpoint &ep,IDiskUsage *iDiskUsage,ICommunicator *clusterComm, mptag_t _mpTagRPC);
-IRowStream *ConnectMergeRead(unsigned id,IThorRowInterfaces *rowif,SocketEndpoint &nodeaddr,rowcount_t startrec,rowcount_t numrecs);
+IRowStream *ConnectMergeRead(unsigned id,IThorRowInterfaces *rowif,SocketEndpoint &nodeaddr,rowcount_t startrec,rowcount_t numrecs, ISocket *socket);
 ISocketRowWriter *ConnectMergeWrite(IThorRowInterfaces *rowif,ISocket *socket,size32_t bufsize,rowcount_t &startrec,rowcount_t &numrecs);
 #define SOCKETSERVERINC                    1
 #define NUMSLAVESOCKETS                    2
@@ -94,6 +98,8 @@ interface ISortSlaveBase  // for global merging
     virtual size32_t getTransferBlockSize() = 0;
     virtual unsigned getTransferPort() = 0;
     virtual void startMerging(IArrayOf<IRowStream> &readers, rowcount_t _totalrows) = 0;
+    virtual bool queryTLS() const = 0;
+    virtual bool queryTraceLevel() const = 0;
 };
 
 

+ 54 - 4
thorlcr/msort/tsorts1.cpp

@@ -26,6 +26,7 @@
 #include "tsorts.hpp"
 #include "thmem.hpp"
 
+#include "securesocket.hpp"
 
 #ifdef _DEBUG
 //#define TRACE_UNIQUE
@@ -35,7 +36,6 @@
 //#define TRACE_PARTITION_OVERFLOW
 #endif
 
-
 // This contains the original global merge method
 
 class CSortMerge;
@@ -55,7 +55,7 @@ protected:
     }
 public:
     IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
-    CMergeReadStream(IThorRowInterfaces *rowif, unsigned streamno,SocketEndpoint &targetep, rowcount_t startrec, rowcount_t numrecs)
+    CMergeReadStream(IThorRowInterfaces *rowif, unsigned streamno,SocketEndpoint &targetep, rowcount_t startrec, rowcount_t numrecs, unsigned sortTraceLevel=0, ISecureSocketContext *secureContextClient=nullptr)
     {
         endpoint = targetep;
         char url[100];
@@ -63,7 +63,29 @@ public:
         LOG(MCthorDetailedDebugInfo, thorJob, "SORT Merge READ: Stream(%u) %s, pos=%" RCPF "d len=%" RCPF "u",streamno,url,startrec,numrecs);
         SocketEndpoint mergeep = targetep;
         mergeep.port+=SOCKETSERVERINC; 
-        stream = ConnectMergeRead(streamno,rowif,mergeep,startrec,numrecs);
+
+        Owned<ISocket> socket = ISocket::connect_wait(mergeep,CONNECTTIMEOUT*1000);
+
+#if defined(_USE_OPENSSL)
+        if (secureContextClient)
+        {
+            Owned<ISecureSocket> ssock = secureContextClient->createSecureSocket(socket.getClear());
+            int tlsTraceLevel = SSLogMin;
+            if (sortTraceLevel >= MPVerboseMsgThreshold)
+                tlsTraceLevel = SSLogMax;
+            int status = ssock->secure_connect(tlsTraceLevel);
+            if (status < 0)
+            {
+                ssock->close();
+                VStringBuffer errmsg("Secure connect failed: %d", status);
+                throw createJSocketException(JSOCKERR_connection_failed, errmsg);
+            }
+            socket.setown(ssock.getClear());
+        }
+#endif // OPENSSL
+
+        stream = ConnectMergeRead(streamno,rowif,mergeep,startrec,numrecs,socket.getClear());
+
         LOG(MCthorDetailedDebugInfo, thorJob, "SORT Merge READ: Stream(%u) connected to %s",streamno,url);
     }
     virtual ~CMergeReadStream()
@@ -274,6 +296,8 @@ protected: friend class CSortMerge;
     Linked<IThorRowInterfaces> rowif;
     CriticalSection rowifsect;
     Semaphore rowifsem;
+    Owned<ISecureSocketContext> secureContextServer;
+    Owned<ISecureSocketContext> secureContextClients;
 public:
     IMPLEMENT_IINTERFACE_USING(Thread)
 
@@ -288,6 +312,13 @@ public:
         unsigned port = in.getTransferPort();
         server.setown(ISocket::create(port));
         term = false;
+#if defined(_USE_OPENSSL)
+        if (slave.queryTLS())
+        {
+            secureContextServer.setown(createSecureSocketContextSecretSrv("local"));
+            secureContextClients.setown(createSecureSocketContextSecret("local", ClientSocket));
+        }
+#endif
     }
 
     void setRowIF(IThorRowInterfaces *_rowif)
@@ -334,6 +365,25 @@ public:
                 if (!socket)
                     break;
 
+#if defined(_USE_OPENSSL)
+                if (slave.queryTLS())
+                {
+                    Owned<ISecureSocket> ssock = secureContextServer->createSecureSocket(socket.getClear());
+                    int tlsTraceLevel = SSLogMin;
+                    unsigned sortTraceLevel = slave.queryTraceLevel();
+                    if (sortTraceLevel >= MPVerboseMsgThreshold)
+                        tlsTraceLevel = SSLogMax;
+                    int status = ssock->secure_accept(tlsTraceLevel);
+                    if (status < 0)
+                    {
+                        ssock->close();
+                        VStringBuffer errmsg("Secure accept failed: %d", status);
+                        throw createJSocketException(JSOCKERR_connection_failed, errmsg);
+                    }
+                    socket.setown(ssock.getClear());
+                }
+#endif // OPENSSL
+
                 rowcount_t poscount=0;
                 rowcount_t numrecs=0;
                 ISocketRowWriter *strm=NULL;
@@ -489,7 +539,7 @@ public:
                         readers.append(*slave.createMergeInputStream(sstart,snum));
                     }
                     else
-                        readers.append(*new CMergeReadStream(rowif,i,endpoints[i], sstart, snum));
+                        readers.append(*new CMergeReadStream(rowif,i,endpoints[i], sstart, snum, slave.queryTraceLevel(), secureContextClients));
                 }
             }
         }