Browse Source

Merge pull request #10846 from mckellyln/hpcc-18996-c

HPCC-18996 Epoll notify handler add/remove refactoring

Reviewed-By: Jake Smith <jake.smith@lexisnexis.com>
Reviewed-By: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 7 years ago
parent
commit
f7843ed37d
1 changed files with 92 additions and 133 deletions
  1. 92 133
      system/jlib/jsocket.cpp

+ 92 - 133
system/jlib/jsocket.cpp

@@ -3756,8 +3756,7 @@ struct SelectItem
     T_SOCKET handle;
     ISocketSelectNotify *nfy;
     byte mode;
-    bool del;
-    bool add_epoll;
+    bool del; // only used in select handler method
     bool operator == (const SelectItem & other) const { return sock == other.sock; }
 };
 
@@ -3895,7 +3894,9 @@ public:
 
     bool sockOk(T_SOCKET sock)
     {
+#ifdef _DEBUG
         PROGLOG("CSocketBaseThread: sockOk testing %d",sock);
+#endif
         int t=0;
         socklen_t tl = sizeof(t);
         if (getsockopt(sock, SOL_SOCKET, SO_TYPE, (char *)&t, &tl)!=0) {
@@ -3920,8 +3921,10 @@ public:
             LOGERR2(ERRNO(),2,"CSocketBaseThread select handle");
             return false;
         }
+# ifdef _DEBUG
         else if (rc>0)
             PROGLOG("CSocketBaseThread: select handle %d selected(2) %d",sock,rc);
+# endif
         XFD_ZERO(&fds);
         FD_SET((unsigned)sock, &fds);
         tv.tv_sec = 0;
@@ -3933,8 +3936,10 @@ public:
             LOGERR2(ERRNO(),3,"CSocketBaseThread select handle");
             return false;
         }
+# ifdef _DEBUG
         else if (rc>0)
             PROGLOG("CSocketBaseThread: select handle %d selected(2) %d",sock,rc);
+# endif
         return true;
 #else
         struct pollfd fds[1];
@@ -3946,9 +3951,9 @@ public:
             return true;
         else if (rc>0)
         {
-            if ( !(fds[0].revents & POLLNVAL) )
+            if ( !(fds[0].revents & POLLNVAL) ) // TODO: MCK - also check POLLERR here ?
             {
-                PROGLOG("CSocketBaseThread: poll handle %d selected(2) %d",sock,rc);
+                // PROGLOG("CSocketBaseThread: poll handle %d selected(2) %d",sock,rc);
                 return true;
             }
         }
@@ -4234,7 +4239,6 @@ public:
         sn.handle = (T_SOCKET)sock->OShandle();
         CHECKSOCKRANGE(sn.handle);
         sn.del = false;
-        sn.add_epoll = false;
         items.append(sn);
         selectvarschange = true;        
         triggerselect();
@@ -4631,7 +4635,6 @@ class CSocketEpollThread: public CSocketBaseThread
             sidummy->sock = nullptr;
             sidummy->nfy = nullptr;
             sidummy->del = true;  // so its not added to tonotify ...
-            sidummy->add_epoll = false;
             sidummy->mode = 0;
 #ifdef _USE_PIPE_FOR_SELECT_TRIGGER
             if(pipe(dummysock))
@@ -4691,6 +4694,33 @@ class CSocketEpollThread: public CSocketBaseThread
         }
     }
 
+    void delSelItem(SelectItem *si)
+    {
+        epoll_op(epfd, EPOLL_CTL_DEL, si, 0);
+        // Release/dtors should not throw but leaving try/catch here until all paths checked
+        try
+        {
+            si->nfy->Release();
+            si->sock->Release();
+            delete si;
+        }
+        catch (IException *e)
+        {
+            EXCLOG(e,"CSocketEpollThread::delSelItem()");
+            e->Release();
+        }
+    }
+
+    void delSelItemPos(SelectItem *si, unsigned pos)
+    {
+        unsigned last = items.ordinality();
+        delSelItem(si);
+        last--;
+        if (pos < last)
+            items.swap(pos, last);
+        items.remove(last);
+    }
+
 public:
     IMPLEMENT_IINTERFACE;
     CSocketEpollThread(const char *trc)
@@ -4738,22 +4768,10 @@ public:
     ~CSocketEpollThread()
     {
         closedummy();
-        ForEachItemIn(i,items)
+        ForEachItemIn(i, items)
         {
             SelectItem *si = items.element(i);
-            epoll_op(epfd, EPOLL_CTL_DEL, si, 0);
-            // Release/dtors should not throw but leaving try/catch here until all paths checked
-            try
-            {
-                si->nfy->Release();
-                si->sock->Release();
-            }
-            catch (IException *e)
-            {
-                EXCLOG(e,"~CSocketEpollThread");
-                e->Release();
-            }
-            delete si;
+            delSelItem(si);
         }
         if (epfd >= 0)
         {
@@ -4768,75 +4786,43 @@ public:
 
     Owned<IException> termexcept;
 
-    void updateItems()
+    bool checkSocks()
     {
-        // must be in CriticalBlock block(sect);
+        bool ret = false;
+        // must be holding CriticalBlock (sect)
         unsigned n = items.ordinality();
         for (unsigned i=0;i<n;)
         {
             SelectItem *si = items.element(i);
-            if (si->del)
+            if (!sockOk(si->handle))
             {
-                epoll_op(epfd, EPOLL_CTL_DEL, si, 0);
-                // Release/dtors should not throw but leaving try/catch here until all paths checked
-                try
-                {
-#ifdef SOCKTRACE
-                    PROGLOG("CSocketEpollThread::updateItems release %d",si->handle);
-#endif
-                    si->nfy->Release();
-                    si->sock->Release();
-                }
-                catch (IException *e)
-                {
-                    EXCLOG(e,"CSocketEpollThread::updateItems");
-                    e->Release();
-                }
-                delete si;
-                si = nullptr;
+                delSelItemPos(si, i);
                 n--;
-                if (i<n)
-                    items.swap(i,n);
-                items.remove(n);
+                ret = true;
             }
             else
-            {
-                if (si->add_epoll)
-                {
-                    si->add_epoll = false;
-                    if (si->mode != 0)
-                    {
-                        unsigned int ep_mode = 0;
-                        if (si->mode & SELECTMODE_READ)
-                            ep_mode |= EPOLLIN;
-                        if (si->mode & SELECTMODE_WRITE)
-                            ep_mode |= EPOLLOUT;
-                        if (si->mode & SELECTMODE_EXCEPT)
-                            ep_mode |= EPOLLPRI;
-                        if (ep_mode != 0)
-                            epoll_op(epfd, EPOLL_CTL_ADD, si, ep_mode);
-                    }
-                }
                 i++;
-            }
         }
+        return ret;
     }
 
-    bool checkSocks()
+    bool removeSock(ISocket *sock)
     {
-        bool ret = false;
-        ForEachItemIn(i,items)
+        // must be holding CriticalBlock (sect)
+        unsigned n = items.ordinality();
+        for (unsigned i=0;i<n;)
         {
             SelectItem *si = items.element(i);
-            if (si->del)
-                ret = true; // maybe that bad one
-            else if (!sockOk(si->handle))
+            if (si->sock==sock)
             {
-                si->del = true;
-                ret = true;
+                delSelItemPos(si, i);
+                n--;
+                return true;
             }
+            else
+                i++;
         }
-        return ret;
+        return false;
     }
 
     bool remove(ISocket *sock)
@@ -4854,44 +4840,41 @@ public:
             }
             return true;
         }
-        ForEachItemIn(i,items)
+        if (removeSock(sock))
         {
-            SelectItem *si = items.element(i);
-            if ( (!si->del) && (si->sock==sock) )
-            {
-                si->del = true;
-                selectvarschange = true;
-                triggerselect();
-                return true;
-            }
+            selectvarschange = true;
+            triggerselect();
+            return true;
         }
         return false;
     }
 
     bool add(ISocket *sock,unsigned mode,ISocketSelectNotify *nfy)
     {
-        if ( !sock || !nfy )
+        if ( !sock || !nfy ||
+             !(mode & (SELECTMODE_READ|SELECTMODE_WRITE|SELECTMODE_EXCEPT)) )
         {
-            WARNLOG("EPOLL: adding fd but sock or nfy is NULL");
+            WARNLOG("EPOLL: adding fd but sock or nfy is NULL or mode is empty");
             dbgassertex(false);
             return false;
         }
-        // maybe check once to prevent 1st delay? TBD
         CriticalBlock block(sect);
-        ForEachItemIn(i,items)
-        {
-            SelectItem *si = items.element(i);
-            if ( !si->del && (si->sock==sock) )
-                si->del = true;
-        }
+        removeSock(sock);
         SelectItem *sn = new SelectItem;
         sn->nfy = LINK(nfy);
         sn->sock = LINK(sock);
         sn->mode = (byte)mode;
         sn->handle = (T_SOCKET)sock->OShandle();
         sn->del = false;
-        sn->add_epoll = true;
         items.append(sn);
+        unsigned int ep_mode = 0;
+        if (mode & SELECTMODE_READ)
+            ep_mode |= EPOLLIN;
+        if (mode & SELECTMODE_WRITE)
+            ep_mode |= EPOLLOUT;
+        if (mode & SELECTMODE_EXCEPT)
+            ep_mode |= EPOLLPRI;
+        epoll_op(epfd, EPOLL_CTL_ADD, sn, ep_mode);
         selectvarschange = true;
         triggerselect();
         return true;
@@ -4912,14 +4895,13 @@ public:
             if (!checkSocks())
             {
                 // bad socket not found
-                PROGLOG("CSocketEpollThread::updateEpollVars cannot find socket error");
+                WARNLOG("CSocketEpollThread::updateEpollVars cannot find socket error");
                 if (validateerrcount>10)
                     throw MakeStringException(-1,"CSocketEpollThread:Socket epoll error %d",validateselecterror);
             }
         }
         else
             validateerrcount = 0;
-        updateItems();
 #ifndef _USE_PIPE_FOR_SELECT_TRIGGER
         opendummy();
 #endif
@@ -4941,7 +4923,7 @@ public:
             while (!terminating)
             {
                 if (selectvarschange)
-                        updateEpollVars(ni);
+                    updateEpollVars(ni);
 
                 if (ni==0)
                 {
@@ -5010,38 +4992,28 @@ public:
                                     continue;
                                 }
 # endif
-                                if (!epsi->del)
+                                unsigned int ep_mode = 0;
+                                if (epevents[j].events & (EPOLLIN | EPOLLHUP | EPOLLERR))
+                                    ep_mode |= SELECTMODE_READ;
+                                if (epevents[j].events & EPOLLOUT)
+                                    ep_mode |= SELECTMODE_WRITE;
+                                if (epevents[j].events & EPOLLPRI)
+                                    ep_mode |= SELECTMODE_EXCEPT;
+                                if (ep_mode != 0)
                                 {
-                                    if (!epsi->sock || !epsi->nfy)
-                                    {
-                                        WARNLOG("EPOLL: epevents[%d].data.fd = %d, emask = %u, del = false but sock or nfy is NULL", j, tfd, epevents[j].events);
-                                    }
-                                    else
-                                    {
-                                        unsigned int ep_mode = 0;
-                                        if (epevents[j].events & (EPOLLIN | EPOLLHUP | EPOLLERR))
-                                            ep_mode |= SELECTMODE_READ;
-                                        if (epevents[j].events & EPOLLOUT)
-                                            ep_mode |= SELECTMODE_WRITE;
-                                        if (epevents[j].events & EPOLLPRI)
-                                            ep_mode |= SELECTMODE_EXCEPT;
-                                        if (ep_mode != 0)
-                                        {
-                                            tonotify.append(*epsi);
-                                            SelectItem &itm = tonotify.element(tonotify.length()-1);
-                                            itm.nfy->Link();
-                                            itm.sock->Link();
+                                    tonotify.append(*epsi);
+                                    SelectItem &itm = tonotify.element(tonotify.length()-1);
+                                    itm.nfy->Link();
+                                    itm.sock->Link();
 #ifdef _TRACELINKCLOSED
-                                            // temporary, to help diagnose spurious socket closes (hpcc-15043)
-                                            // currently no implementation of notifySelected() uses the mode
-                                            // argument so we can pass in the epoll events mask and log that
-                                            // if there is no data and the socket gets closed
-                                            itm.mode = epevents[j].events;
+                                    // temporary, to help diagnose spurious socket closes (hpcc-15043)
+                                    // currently no implementation of notifySelected() uses the mode
+                                    // argument so we can pass in the epoll events mask and log that
+                                    // if there is no data and the socket gets closed
+                                    itm.mode = epevents[j].events;
 #else
-                                            itm.mode = ep_mode;
+                                    itm.mode = ep_mode;
 #endif
-                                        }
-                                    }
                                 }
                             }
                         }
@@ -5103,19 +5075,6 @@ public:
             EXCLOG(e,"CSocketEpollThread");
             termexcept.setown(e);
         }
-        CriticalBlock block(sect);
-        try
-        {
-            updateItems();
-        }
-        catch (IException *e)
-        {
-            EXCLOG(e,"CSocketEpollThread(2)");
-            if (!termexcept)
-                termexcept.setown(e);
-            else
-                e->Release();
-        }
         return 0;
     }
 };