Browse Source

HPCC-14337 Protect against a potential corruption + invalid access

Signed-off-by: Jake Smith <jake.smith@lexisnexis.com>
Jake Smith 9 years ago
parent
commit
91ae6c14ae
1 changed files with 73 additions and 74 deletions
  1. 73 74
      common/remote/sockfile.cpp

+ 73 - 74
common/remote/sockfile.cpp

@@ -2927,7 +2927,7 @@ inline void appendErr3(MemoryBuffer &reply, RemoteFileCommandType e, int code, c
 
 
 
 
 
 
-#define MAPCOMMAND(c,p) case c: { ret =  this->p(msg, reply) ; break; }
+#define MAPCOMMAND(c,p) case c: { ret = this->p(msg, reply) ; break; }
 #define MAPCOMMANDCLIENT(c,p,client) case c: { ret = this->p(msg, reply, client); break; }
 #define MAPCOMMANDCLIENT(c,p,client) case c: { ret = this->p(msg, reply, client); break; }
 #define MAPCOMMANDCLIENTTHROTTLE(c,p,client,throttler) case c: { ret = this->p(msg, reply, client, throttler); break; }
 #define MAPCOMMANDCLIENTTHROTTLE(c,p,client,throttler) case c: { ret = this->p(msg, reply, client, throttler); break; }
 #define MAPCOMMANDSTATS(c,p,stats) case c: { ret = this->p(msg, reply, stats); break; }
 #define MAPCOMMANDSTATS(c,p,stats) case c: { ret = this->p(msg, reply, stats); break; }
@@ -2940,39 +2940,38 @@ static CriticalSection ClientCountSect;
 #define DEFAULT_THROTTLOG_LOG_INTERVAL_SECS 60 // log total throttled delay period
 #define DEFAULT_THROTTLOG_LOG_INTERVAL_SECS 60 // log total throttled delay period
 
 
 
 
-struct ClientStats
+class CClientStats : public CInterface
 {
 {
-    ClientStats(const char *_client) : client(_client) { count = 0; bRead = 0; bWritten = 0; }
+    SpinLock spin;
+public:
+    CClientStats(const char *_client) : client(_client) { count = 0; bRead = 0; bWritten = 0; }
     const char *queryFindString() const { return client; }
     const char *queryFindString() const { return client; }
+    inline void addRead(unsigned len)
+    {
+        SpinBlock b(spin); // rare event, but we should change to a atomic<__int64> for >= c++11
+        bRead += len;
+    }
+    inline void addWrite(unsigned len)
+    {
+        SpinBlock b(spin); // rare event, but we should change to a atomic<__int64> for >= c++11
+        bWritten += len;
+    }
 
 
     StringAttr client;
     StringAttr client;
     unsigned __int64 count;
     unsigned __int64 count;
     unsigned __int64 bRead;
     unsigned __int64 bRead;
     unsigned __int64 bWritten;
     unsigned __int64 bWritten;
 };
 };
-class CClientStatsTable : public StringSuperHashTableOf<ClientStats>
+class CClientStatsTable : public OwningStringSuperHashTableOf<CClientStats>
 {
 {
-    typedef StringSuperHashTableOf<ClientStats> PARENT;
+    typedef OwningStringSuperHashTableOf<CClientStats> PARENT;
     CriticalSection crit;
     CriticalSection crit;
     unsigned cmdStats[RFCmax];
     unsigned cmdStats[RFCmax];
 
 
-    inline ClientStats *addClientCommon(RemoteFileCommandType cmd, const char *client)
-    {
-        ClientStats *stats = PARENT::find(client);
-        if (!stats)
-        {
-            stats = new ClientStats(client);
-            PARENT::replace(*stats);
-        }
-        cmdStats[cmd]++;
-        ++stats->count;
-        return stats;
-    }
-
     static int compareElement(void* const *ll, void* const *rr)
     static int compareElement(void* const *ll, void* const *rr)
     {
     {
-        const ClientStats *l = (const ClientStats *) *ll;
-        const ClientStats *r = (const ClientStats *) *rr;
+        const CClientStats *l = (const CClientStats *) *ll;
+        const CClientStats *r = (const CClientStats *) *rr;
         if (l->count == r->count)
         if (l->count == r->count)
             return 0;
             return 0;
         else if (l->count<r->count)
         else if (l->count<r->count)
@@ -2989,30 +2988,19 @@ public:
     {
     {
         kill();
         kill();
     }
     }
-    void kill()
+    CClientStats *getClientReference(RemoteFileCommandType cmd, const char *client)
     {
     {
-        SuperHashIteratorOf<ClientStats> iter(*this);
-        ForEach(iter)
+        CriticalBlock b(crit);
+        CClientStats *stats = PARENT::find(client);
+        if (!stats)
         {
         {
-            ClientStats *elem = &iter.query();
-            delete elem;
+            stats = new CClientStats(client);
+            PARENT::replace(*stats);
         }
         }
-        PARENT::kill();
-    }
-    ClientStats *addClientReference(RemoteFileCommandType cmd, const char *client)
-    {
-        CriticalBlock b(crit);
-        return addClientCommon(cmd, client);
-    }
-    void addRead(ClientStats &stats, unsigned len)
-    {
-        CriticalBlock b(crit);
-        stats.bRead += len;
-    }
-    void addWrite(ClientStats &stats, unsigned len)
-    {
-        CriticalBlock b(crit);
-        stats.bWritten += len;
+        if (cmd<RFCmax) // i.e. ignore duff command (which will be traced), but still record client connected
+            cmdStats[cmd]++;
+        ++stats->count;
+        return LINK(stats);
     }
     }
     StringBuffer &getInfo(StringBuffer &info, unsigned level=1)
     StringBuffer &getInfo(StringBuffer &info, unsigned level=1)
     {
     {
@@ -3034,11 +3022,11 @@ public:
         }
         }
         if (totalClients)
         if (totalClients)
         {
         {
-            SuperHashIteratorOf<ClientStats> iter(*this);
-            PointerArrayOf<ClientStats> elements;
+            SuperHashIteratorOf<CClientStats> iter(*this);
+            PointerArrayOf<CClientStats> elements;
             ForEach(iter)
             ForEach(iter)
             {
             {
-                ClientStats &elem = iter.query();
+                CClientStats &elem = iter.query();
                 elements.append(&elem);
                 elements.append(&elem);
             }
             }
             elements.sort(&compareElement);
             elements.sort(&compareElement);
@@ -3051,7 +3039,7 @@ public:
                 info.append("Top 10 clients:").newline();
                 info.append("Top 10 clients:").newline();
                 for (unsigned e=0; e<max; e++)
                 for (unsigned e=0; e<max; e++)
                 {
                 {
-                    const ClientStats &element = *elements.item(e);
+                    const CClientStats &element = *elements.item(e);
                     info.appendf("Client %s - %" I64F "d requests handled, bytes read = %" I64F "d, bytes written = % " I64F "d",
                     info.appendf("Client %s - %" I64F "d requests handled, bytes read = %" I64F "d, bytes written = % " I64F "d",
                             element.client.get(), element.count, element.bRead, element.bWritten).newline();
                             element.client.get(), element.count, element.bRead, element.bWritten).newline();
                 }
                 }
@@ -3061,7 +3049,7 @@ public:
                 info.append("All clients:").newline();
                 info.append("All clients:").newline();
                 ForEachItemIn(e, elements)
                 ForEachItemIn(e, elements)
                 {
                 {
-                    const ClientStats &element = *elements.item(e);
+                    const CClientStats &element = *elements.item(e);
                     info.appendf("Client %s - %" I64F "d requests handled, bytes read = %" I64F "d, bytes written = % " I64F "d",
                     info.appendf("Client %s - %" I64F "d requests handled, bytes read = %" I64F "d, bytes written = % " I64F "d",
                             element.client.get(), element.count, element.bRead, element.bWritten).newline();
                             element.client.get(), element.count, element.bRead, element.bWritten).newline();
                 }
                 }
@@ -3105,25 +3093,32 @@ class CRemoteFileServer : public CInterface, implements IRemoteFileServer
             : socket(_socket), user(_user), globallasttick(_globallasttick)
             : socket(_socket), user(_user), globallasttick(_globallasttick)
         {
         {
             previdx = (unsigned)-1;
             previdx = (unsigned)-1;
-            if (socket)
+            StringBuffer peerBuf;
+            char name[256];
+            name[0] = 0;
+            int port = socket->peer_name(name,sizeof(name)-1);
+            if (port>=0)
             {
             {
-                StringBuffer peerBuf;
-                char name[256];
-                name[0] = 0;
-                int port = socket->peer_name(name,sizeof(name)-1);
-                if (port>=0)
-                {
-                    peerBuf.append(name);
-                    if (port)
-                        peerBuf.append(':').append(port);
-                    peerName.set(peerBuf);
-                }
+                peerBuf.append(name);
+                if (port)
+                    peerBuf.append(':').append(port);
+                peerName.set(peerBuf);
+            }
+            else
+            {
+                /* There's a possibility the socket closed before got here, in which case, peer name is unavailable
+                 * May potentially be unavailable for other reasons also.
+                 * Must be set, as used in client stats HT.
+                 * If socket closed, the handler will start up but notice closed and quit
+                 */
+                peerName.set("UNKNOWN PEER NAME");
             }
             }
             {
             {
                 CriticalBlock block(ClientCountSect);
                 CriticalBlock block(ClientCountSect);
                 if (++ClientCount>MaxClientCount)
                 if (++ClientCount>MaxClientCount)
                     MaxClientCount = ClientCount;
                     MaxClientCount = ClientCount;
-                if (TF_TRACE_CLIENT_CONN) {
+                if (TF_TRACE_CLIENT_CONN)
+                {
                     StringBuffer s;
                     StringBuffer s;
                     s.appendf("Connecting(%p) [%d,%d] to ",this,ClientCount,MaxClientCount);
                     s.appendf("Connecting(%p) [%d,%d] to ",this,ClientCount,MaxClientCount);
                     s.append(peerName);
                     s.append(peerName);
@@ -3346,6 +3341,7 @@ class CRemoteFileServer : public CInterface, implements IRemoteFileServer
                  */
                  */
                 selecthandled = true;
                 selecthandled = true;
                 parent->addClient(this);    // add to select handler
                 parent->addClient(this);    // add to select handler
+                // NB: this (CRemoteClientHandler) is now linked by the selecthandler and owned by the 'clients' list
             }
             }
         }
         }
 
 
@@ -3450,7 +3446,7 @@ class CRemoteFileServer : public CInterface, implements IRemoteFileServer
         unsigned queryQueueLimit() const { return queueLimit; }
         unsigned queryQueueLimit() const { return queueLimit; }
         StringBuffer &getInfoSummary(StringBuffer &info)
         StringBuffer &getInfoSummary(StringBuffer &info)
         {
         {
-            info.appendf("Thottler(%s) - limit=%u, delayMs=%u, cpuThreshold=%u, queueLimit=%u", title.get(), limit, delayMs, cpuThreshold, queueLimit).newline();
+            info.appendf("Throttler(%s) - limit=%u, delayMs=%u, cpuThreshold=%u, queueLimit=%u", title.get(), limit, delayMs, cpuThreshold, queueLimit).newline();
             unsigned elapsedSecs = totalThrottleDelayTimer.elapsedMs()/1000;
             unsigned elapsedSecs = totalThrottleDelayTimer.elapsedMs()/1000;
             time_t simple;
             time_t simple;
             time(&simple);
             time(&simple);
@@ -3587,10 +3583,7 @@ class CRemoteFileServer : public CInterface, implements IRemoteFileServer
                          * NB: The overall number of threads is still capped by the thread pool.
                          * NB: The overall number of threads is still capped by the thread pool.
                          */
                          */
                         unsigned ms = timer.elapsedMs();
                         unsigned ms = timer.elapsedMs();
-                        {
-                            CriticalBlock b(crit);
-                            totalThrottleDelay += ms;
-                        }
+                        totalThrottleDelay += ms;
                         PROGLOG("Throttler(%s): transaction delayed [cmd=%s] for : %u milliseconds, proceeding as cpu(%u)<throttleCPULimit(%u)", title.get(), getRFCText(cmd), cpu, ms, cpuThreshold);
                         PROGLOG("Throttler(%s): transaction delayed [cmd=%s] for : %u milliseconds, proceeding as cpu(%u)<throttleCPULimit(%u)", title.get(), getRFCText(cmd), cpu, ms, cpuThreshold);
                         hadSem = false;
                         hadSem = false;
                     }
                     }
@@ -3617,7 +3610,7 @@ class CRemoteFileServer : public CInterface, implements IRemoteFileServer
 
 
             /* Whilst holding on this throttle slot (i.e. before signalling semaphore back), process
             /* Whilst holding on this throttle slot (i.e. before signalling semaphore back), process
              * queued items. NB: other threads that are finishing will do also.
              * queued items. NB: other threads that are finishing will do also.
-             * Queued items are processed 1st, the current request, then anything that was queued when handling current request
+             * Queued items are processed 1st, then the current request, then anything that was queued when handling current request
              * Throttle slot (semaphore) is only given back when no more to do.
              * Throttle slot (semaphore) is only given back when no more to do.
              */
              */
             Linked<CRemoteClientHandler> currentClient;
             Linked<CRemoteClientHandler> currentClient;
@@ -3923,8 +3916,14 @@ public:
         StringBuffer s(client->queryPeerName());
         StringBuffer s(client->queryPeerName());
         PROGLOG("onCloseSocket(%d) %s",which,s.str());
         PROGLOG("onCloseSocket(%d) %s",which,s.str());
 #endif
 #endif
-        if (client->socket) {
-            try {
+        if (client->socket)
+        {
+            try
+            {
+                /* JCSMORE - shouldn't this really be dependent on whether selecthandled=true
+                 * It has not been added to the selecthandler
+                 * Harmless, but wasteful if so.
+                 */
                 selecthandler->remove(client->socket);
                 selecthandler->remove(client->socket);
             }
             }
             catch (IException *e) {
             catch (IException *e) {
@@ -4012,7 +4011,7 @@ public:
         return true;
         return true;
     }
     }
 
 
-    bool cmdRead(MemoryBuffer & msg, MemoryBuffer & reply, ClientStats &stats)
+    bool cmdRead(MemoryBuffer & msg, MemoryBuffer & reply, CClientStats &stats)
     {
     {
         int handle;
         int handle;
         __int64 pos;
         __int64 pos;
@@ -4046,7 +4045,7 @@ public:
             e->Release();
             e->Release();
             return false;
             return false;
         }
         }
-        clientStatsTable.addRead(stats, len);
+        stats.addRead(len);
         if (TF_TRACE)
         if (TF_TRACE)
             PROGLOG("read file,  handle = %d, pos = %" I64F "d, toread = %d, read = %d",handle,pos,len,numRead);
             PROGLOG("read file,  handle = %d, pos = %" I64F "d, toread = %d, read = %d",handle,pos,len,numRead);
         {
         {
@@ -4086,7 +4085,7 @@ public:
     }
     }
 
 
 
 
-    bool cmdWrite(MemoryBuffer & msg, MemoryBuffer & reply, ClientStats &stats)
+    bool cmdWrite(MemoryBuffer & msg, MemoryBuffer & reply, CClientStats &stats)
     {
     {
         int handle;
         int handle;
         __int64 pos;
         __int64 pos;
@@ -4100,7 +4099,7 @@ public:
         if (TF_TRACE_PRE_IO)
         if (TF_TRACE_PRE_IO)
             PROGLOG("before write file,  handle = %d, towrite = %d",handle,len);
             PROGLOG("before write file,  handle = %d, towrite = %d",handle,len);
         size32_t numWritten = fileio->write(pos,len,data);
         size32_t numWritten = fileio->write(pos,len,data);
-        clientStatsTable.addWrite(stats, numWritten);
+        stats.addWrite(numWritten);
         if (TF_TRACE)
         if (TF_TRACE)
             PROGLOG("write file,  handle = %d, towrite = %d, written = %d",handle,len,numWritten);
             PROGLOG("write file,  handle = %d, towrite = %d, written = %d",handle,len,numWritten);
         reply.append((unsigned)RFEnoerror).append(numWritten);
         reply.append((unsigned)RFEnoerror).append(numWritten);
@@ -4192,7 +4191,7 @@ public:
         return true;
         return true;
     }
     }
 
 
-    bool cmdAppend(MemoryBuffer & msg, MemoryBuffer & reply, CRemoteClientHandler &client, ClientStats &stats)
+    bool cmdAppend(MemoryBuffer & msg, MemoryBuffer & reply, CRemoteClientHandler &client, CClientStats &stats)
     {
     {
         IMPERSONATE_USER(client);
         IMPERSONATE_USER(client);
         int handle;
         int handle;
@@ -4206,7 +4205,7 @@ public:
 
 
         Owned<IFile> file = createIFile(srcname.get());
         Owned<IFile> file = createIFile(srcname.get());
         __int64 written = fileio->appendFile(file,pos,len);
         __int64 written = fileio->appendFile(file,pos,len);
-        clientStatsTable.addWrite(stats, written);
+        stats.addWrite(written);
         if (TF_TRACE)
         if (TF_TRACE)
             PROGLOG("append file,  handle = %d, file=%s, pos = %" I64F "d len = %" I64F "d written = %" I64F "d",handle,srcname.get(),pos,len,written);
             PROGLOG("append file,  handle = %d, file=%s, pos = %" I64F "d len = %" I64F "d written = %" I64F "d",handle,srcname.get(),pos,len,written);
         reply.append((unsigned)RFEnoerror).append(written);
         reply.append((unsigned)RFEnoerror).append(written);
@@ -4848,7 +4847,7 @@ public:
 
 
     bool processCommand(RemoteFileCommandType cmd, MemoryBuffer & msg, MemoryBuffer & reply, CRemoteClientHandler *client, CThrottler *throttler)
     bool processCommand(RemoteFileCommandType cmd, MemoryBuffer & msg, MemoryBuffer & reply, CRemoteClientHandler *client, CThrottler *throttler)
     {
     {
-        ClientStats *stats = clientStatsTable.addClientReference(cmd, client->queryPeerName());
+        Owned<CClientStats> stats = clientStatsTable.getClientReference(cmd, client->queryPeerName());
         bool ret = true;
         bool ret = true;
         try
         try
         {
         {