Browse Source

HPCC-9415 Use epoll, fix critical section around init

Signed-off-by: Mark Kelly <mark.kelly@lexisnexis.com>
Mark Kelly 11 years ago
parent
commit
626080651a
3 changed files with 229 additions and 194 deletions
  1. 129 1
      dali/datest/datest.cpp
  2. 3 1
      initfiles/etc/DIR_NAME/environment.conf.in
  3. 97 192
      system/jlib/jsocket.cpp

+ 129 - 1
dali/datest/datest.cpp

@@ -45,6 +45,7 @@ static unsigned nIter = 1;
 
 //#define TEST_REMOTEFILE
 //#define TEST_REMOTEFILE2
+//#define TEST_REMOTEFILE3
 //#define TEST_DEADLOCK
 //#define TEST_THREADS
 #define MDELAY 100
@@ -704,8 +705,125 @@ struct RecordStruct
 };
 
 #define NRECS ((__int64)1024*1024*1024*20/sizeof(RecordStruct)) // i.e. ~20GB
+// #define NRECS ((__int64)1024*1024*1024*2/sizeof(RecordStruct)) // i.e. ~2GB
+// #define NRECS ((__int64)1024*1024*500/sizeof(RecordStruct)) // i.e. ~500MB
+// #define NRECS ((__int64)1024*500/sizeof(RecordStruct)) // i.e. ~500KB
 
+void TestRemoteFile3(int nfiles, int fsizemb)
+{
+    //SocketEndpoint ep("10.150.10.8:7100");
+    SocketEndpoint ep("127.0.1.1:7100");
+    //ISocket *sock = ISocket::connect(7100,"10.150.10.8");
+
+    // ------------------------------
+
+    printf("TestRemoteFile3: nfiles = %d fsizemb = %d\n", nfiles, fsizemb);
+
+    IFile *fileA[32769];
+    IFileIO *ioA[32769];
+    char filen[256] = { "" };
+
+    size_t nrecs = (fsizemb*1024*1024) / sizeof(RecordStruct);
+
+    unsigned t=msTick();
+
+    if(nfiles > 0){
+
+    for (int oi=0;oi<=nfiles;oi++) {
+        sprintf(filen, "testfile.%d", oi);
+        fileA[oi] = createRemoteFile(ep, filen);
+        ioA[oi] = fileA[oi]->open(IFOcreate);
+    }
 
+    for (int oi=0;oi<=nfiles;oi++) {
+        ioA[oi]->Release();
+        fileA[oi]->Release();
+    }
+
+    unsigned r = msTick()-t;
+    printf("elapsed time createRemoteFile (%d) = %lf (sec)\n", nfiles, (double)r/1000.0);
+
+    }
+
+    // ------------------------------
+
+    if(nrecs > 0){
+
+#if 1
+    for (int oi=0;oi<=0;oi++) {
+
+        IFile *file = createRemoteFile(ep, "testfile20.d00");
+        IFileIO *io = file->open(IFOcreate);
+        byte *buffer = (byte *)malloc(0x8000);
+        unsigned br = 0x8000/sizeof(RecordStruct);
+        size32_t buffsize = br*sizeof(RecordStruct);
+        unsigned curidx = 0;
+        unsigned nr = nrecs;
+        __int64 pos = 0;
+        t=msTick();
+#if 1
+        unsigned j;
+        RecordStruct *rs;
+        while (nr) {
+            if (nr<br)
+                br = nr;
+            rs = (RecordStruct *)buffer;
+            for (j=0;j<br;j++) {
+                rs->idx = curidx++;
+                itoa(rs->idx,rs->fill,16);
+                unsigned k;
+                for (k=strlen(rs->fill);k<sizeof(rs->fill);k++)
+                    rs->fill[k] = ' ';
+                rs->key = getRandom()%1000+1;
+                rs->check = rs->idx*rs->key;
+                rs->sum = 0;
+                rs++;
+            }
+            size32_t wr = io->write(pos, br*sizeof(RecordStruct),buffer);
+            assertex(wr==br*sizeof(RecordStruct));
+            pos += br*sizeof(RecordStruct);
+            nr -= br;
+        }
+        io->Release();
+        unsigned r = msTick()-t;
+        printf("Write (buffsize = %dk): elapsed time write = %lf (sec)\n",(buffsize+1023)/1024,(double)r/1000.0);
+        Sleep(10);
+#endif
+        br = 0x2000/sizeof(RecordStruct);
+        for (unsigned iter=1;iter<10;iter++) {
+            t=msTick();
+            buffsize = br*sizeof(RecordStruct);
+            buffer = (byte *)realloc(buffer,buffsize);
+            curidx = 0;
+            nr = nrecs;
+            pos = 0;
+            unsigned r = msTick();
+            IFileIO *io = file->open(IFOread);
+            while (nr) {
+                if (nr<br)
+                    br = nr;
+                size32_t rd = io->read(pos, br*sizeof(RecordStruct),buffer);
+                // fprintf(stderr, "nr = %u rd = %u br = %u sizeof(RecordStruct) = %lu\n", nr, rd, br, sizeof(RecordStruct));
+                assertex(rd==br*sizeof(RecordStruct));
+                pos += br*sizeof(RecordStruct);
+                nr -= br;
+            }
+            io->Release();
+            r = msTick()-t;
+            printf("Read (buffsize = %dk): elapsed time read = %lf (sec)\n",(buffsize+1023)/1024,(double)r/1000.0);
+            Sleep(10);
+            br += br;
+        }
+
+        file->Release();
+
+    }
+#endif
+
+    }
+
+    return;
+}
 
 void TestRemoteFile2()
 {
@@ -767,7 +885,7 @@ void TestRemoteFile2()
         }
         io->Release();
         r = msTick()-t;
-        printf("Read (buffsize = %dk): elapsed time write = %d\n",(buffsize+1023)/1024,t/1000); 
+        printf("Read (buffsize = %dk): elapsed time write = %d\n",(buffsize+1023)/1024,r/1000);
         Sleep(10);
         br += br;
     }
@@ -2828,6 +2946,16 @@ int main(int argc, char* argv[])
         TestRemoteFile2();
         return 0;
 #endif
+#if defined(TEST_REMOTEFILE3)
+        int nfiles = 1000;
+        int fsizemb = 512;
+        if(argc >= 2)
+            nfiles = atoi(argv[1]);
+        if(argc >= 3)
+            fsizemb = atoi(argv[2]);
+        TestRemoteFile3(nfiles, fsizemb);
+        return 0;
+#endif
         if (argc<2) {
             usage();
             return 1;

+ 3 - 1
initfiles/etc/DIR_NAME/environment.conf.in

@@ -16,4 +16,6 @@ home=${HOME_DIR}
 environment=${ENV_XML_FILE}
 sourcedir=${CONFIG_SOURCE_PATH}
 blockname=${DIR_NAME}
-interface=*
+interface=*
+# enable epoll method for notification events (true/false)
+use_epoll=true

+ 97 - 192
system/jlib/jsocket.cpp

@@ -96,11 +96,7 @@
 
 #ifdef _DEBUG
 //#define SOCKTRACE
-# ifdef _HAS_EPOLL_SUPPORT
-#  if defined(SOCKTRACE) || !defined(EPOLLTRACE)
-#   define EPOLLTRACE
-#  endif
-# endif
+//#define EPOLLTRACE
 #endif
 
 #ifdef _TESTING
@@ -3508,7 +3504,7 @@ inline bool findfds(T_FD_SET &s,T_SOCKET h,bool &c)
 }
 #endif
 
-class CSocketMultiThread: public Thread
+class CSocketBaseThread: public Thread
 {
 protected:
     bool terminating;
@@ -3531,14 +3527,15 @@ protected:
 #endif
     bool dummysockopen;
 
-    CSocketMultiThread(const char *trc) : Thread("CSocketMultiThread")
+    CSocketBaseThread(const char *trc) : Thread("CSocketBaseThread")
     {
     }
 
-    ~CSocketMultiThread()
+    ~CSocketBaseThread()
     {
     }
 
+public:
     void triggerselect()
     {
         if (atomic_read(&tickwait))
@@ -3600,14 +3597,14 @@ protected:
 
     bool sockOk(T_SOCKET sock)
     {
-        PROGLOG("CSocketMultiThread: sockOk testing %d",sock);
+        PROGLOG("CSocketBaseThread: sockOk testing %d",sock);
         int err = 0;
         int t=0;
         socklen_t tl = sizeof(t);
         if (getsockopt(sock, SOL_SOCKET, SO_TYPE, (char *)&t, &tl)!=0) {
             StringBuffer sockstr;
             const char *tracename = sockstr.append((unsigned)sock).str();
-            LOGERR2(ERRNO(),1,"CSocketMultiThread select handle");
+            LOGERR2(ERRNO(),1,"CSocketBaseThread select handle");
             return false;
         }
         T_FD_SET fds;
@@ -3622,11 +3619,11 @@ protected:
         if (rc<0) {
             StringBuffer sockstr;
             const char *tracename = sockstr.append((unsigned)sock).str();
-            LOGERR2(ERRNO(),2,"CSocketMultiThread select handle");
+            LOGERR2(ERRNO(),2,"CSocketBaseThread select handle");
             return false;
         }
         else if (rc>0)
-            PROGLOG("CSocketMultiThread: select handle %d selected(2) %d",sock,rc);
+            PROGLOG("CSocketBaseThread: select handle %d selected(2) %d",sock,rc);
         XFD_ZERO(&fds);
         FD_SET((unsigned)sock, &fds);
         tv.tv_sec = 0;
@@ -3635,11 +3632,11 @@ protected:
         if (rc<0) {
             StringBuffer sockstr;
             const char *tracename = sockstr.append((unsigned)sock).str();
-            LOGERR2(ERRNO(),3,"CSocketMultiThread select handle");
+            LOGERR2(ERRNO(),3,"CSocketBaseThread select handle");
             return false;
         }
         else if (rc>0)
-            PROGLOG("CSocketMultiThread: select handle %d selected(2) %d",sock,rc);
+            PROGLOG("CSocketBaseThread: select handle %d selected(2) %d",sock,rc);
         return true;
     }
 
@@ -3660,7 +3657,7 @@ protected:
 
 };
 
-class CSocketSelectThread: public CSocketMultiThread
+class CSocketSelectThread: public CSocketBaseThread
 {
     void opendummy()
     {
@@ -3718,16 +3715,6 @@ class CSocketSelectThread: public CSocketMultiThread
         }
     }
 
-    void triggerselect()
-    {
-        CSocketMultiThread::triggerselect();
-    }
-
-    void resettrigger()
-    {
-        CSocketMultiThread::resettrigger();
-    }
-
 
 #ifdef _WIN32
 #define HASHTABSIZE 256
@@ -3793,7 +3780,7 @@ class CSocketSelectThread: public CSocketMultiThread
 public:
     IMPLEMENT_IINTERFACE;
     CSocketSelectThread(const char *trc)
-        : CSocketMultiThread("CSocketSelectThread")
+        : CSocketBaseThread("CSocketSelectThread")
     {
         dummysockopen = false;
         opendummy();
@@ -3897,26 +3884,6 @@ public:
         return true;
     }
 
-    bool remove(ISocket *sock)
-    {
-        return CSocketMultiThread::remove(sock);
-    }
-
-    void stop(bool wait)
-    {
-        CSocketMultiThread::stop(wait);
-    }
-
-    bool sockOk(T_SOCKET sock)
-    {
-        return CSocketMultiThread::sockOk(sock);
-    }
-
-    bool checkSocks()
-    {
-        return CSocketMultiThread::checkSocks();
-    }
-
     void updateSelectVars(T_FD_SET &rdfds,T_FD_SET &wrfds,T_FD_SET &exfds,bool &isrd,bool &iswr,bool &isex,unsigned &ni,T_SOCKET &max_sockid)
     {
         CriticalBlock block(sect);
@@ -4233,10 +4200,29 @@ public:
 };
 
 #ifdef _HAS_EPOLL_SUPPORT
-class CSocketEpollThread: public CSocketMultiThread
+class CSocketEpollThread: public CSocketBaseThread
 {
     int epfd;
-    int *epfdtbl;
+    int *epfdtbl; // table of fd<->item index for lookups
+    struct epoll_event *epevents;
+
+    void epoll_op(int efd, int op, int fd, unsigned int event_mask)
+    {
+        int srtn;
+        struct epoll_event event;
+        event.events = event_mask;
+        event.data.fd = fd;
+# ifdef EPOLLTRACE
+        DBGLOG("EPOLL: op(%d) fd %d to epfd %d", op, fd, efd);
+# endif
+        srtn = ::epoll_ctl(efd, op, fd, &event);
+        // if another thread closed fd before here don't fail
+        if ( (srtn < 0) && (op != EPOLL_CTL_DEL) ){
+            int err = ERRNO();
+            LOGERR(err,1,"epoll_ctl");
+            THROWJSOCKEXCEPTION2(err);
+        }
+    }
 
     void opendummy()
     {
@@ -4260,38 +4246,14 @@ class CSocketEpollThread: public CSocketMultiThread
                 }
             }
             CHECKSOCKRANGE(dummysock[0]);
-            int srtn;
-            struct epoll_event event;
-            event.events = EPOLLIN; // TODO - add other bits ? (such as RDHUP)
-            event.data.fd = dummysock[0];
-            srtn = ::epoll_ctl(epfd, EPOLL_CTL_ADD, dummysock[0], &event);
-            if (srtn < 0) {
-                int err = ERRNO();
-                LOGERR(err,1,"epoll_ctl(ADD)");
-                THROWJSOCKEXCEPTION2(err);
-            }
-#  ifdef EPOLLTRACE
-            DBGLOG("EPOLL: added dummy fd %d to epfd %d", dummysock[0], epfd);
-#  endif
+            epoll_op(epfd, EPOLL_CTL_ADD, dummysock[0], EPOLLIN);
 #else
             if (IP6preferred)
                 dummysock = ::socket(AF_INET6, SOCK_STREAM, PF_INET6);
             else
                 dummysock = ::socket(AF_INET, SOCK_STREAM, 0);
             CHECKSOCKRANGE(dummysock);
-            int srtn;
-            struct epoll_event event;
-            event.events = EPOLLIN | EPOLLERR; // TODO - add other bits ? (such as RDHUP)
-            event.data.fd = dummysock;
-            srtn = ::epoll_ctl(epfd, EPOLL_CTL_ADD, dummysock, &event);
-            if (srtn < 0) {
-                int err = ERRNO();
-                LOGERR(err,1,"epoll_ctl(ADD)");
-                THROWJSOCKEXCEPTION2(err);
-            }
-#  ifdef EPOLLTRACE
-            DBGLOG("EPOLL: added dummy fd %d to epfd %d", dummysock, epfd);
-#  endif
+            epoll_op(epfd, EPOLL_CTL_ADD, dummysock, (EPOLLIN | EPOLLERR));
 #endif
             dummysockopen = true;
         }
@@ -4304,42 +4266,25 @@ class CSocketEpollThread: public CSocketMultiThread
         CriticalBlock block(sect);
         if (dummysockopen) {
 #ifdef _USE_PIPE_FOR_SELECT_TRIGGER
-            struct epoll_event event;
-            ::epoll_ctl(epfd, EPOLL_CTL_DEL, dummysock[0], &event);
-#  ifdef EPOLLTRACE
-            DBGLOG("EPOLL: removed dummy fd %d from epfd %d", dummysock[0], epfd);
-#  endif
+            epoll_op(epfd, EPOLL_CTL_DEL, dummysock[0], 0);
 #ifdef SOCKTRACE
             PROGLOG("SOCKTRACE: Closing dummy sockets %x %d %x %d (%x)", dummysock[0], dummysock[0], dummysock[1], dummysock[1], this);
 #endif
             ::close(dummysock[0]);
             ::close(dummysock[1]);
 #else
-            struct epoll_event event;
-            ::epoll_ctl(epfd, EPOLL_CTL_DEL, dummysock, &event);
-#  ifdef EPOLLTRACE
-            DBGLOG("EPOLL: removed dummy fd %d from epfd %d", dummysock, epfd);
-#  endif
+            epoll_op(epfd, EPOLL_CTL_DEL, dummysock, 0);
             ::close(dummysock);
 #endif
             dummysockopen = false;
         }
     }
 
-    void triggerselect()
-    {
-        CSocketMultiThread::triggerselect();
-    }
-
-    void resettrigger()
-    {
-        CSocketMultiThread::resettrigger();
-    }
 
 public:
     IMPLEMENT_IINTERFACE;
     CSocketEpollThread(const char *trc)
-        : CSocketMultiThread("CSocketEpollThread")
+        : CSocketBaseThread("CSocketEpollThread")
     {
         dummysockopen = false;
         terminating = false;
@@ -4357,7 +4302,7 @@ public:
           THROWJSOCKEXCEPTION2(err);
         }
 # if defined(_DEBUG) || defined(EPOLLTRACE)
-        DBGLOG("EPOLL: creating epfd %d", epfd );
+        DBGLOG("CSocketEpollThread: creating epoll fd %d", epfd );
 # endif
         try {
             epfdtbl = new int[XFD_SETSIZE];
@@ -4369,6 +4314,13 @@ public:
         for (int i=0; i<XFD_SETSIZE; i++) {
             epfdtbl[i] = -1;
         }
+        try {
+            epevents = new struct epoll_event[XFD_SETSIZE];
+        } catch (const std::bad_alloc &e) {
+            int err = ERRNO();
+            LOGERR(err,1,"epevents alloc()");
+            THROWJSOCKEXCEPTION2(err);
+        }
         opendummy();
     }
 
@@ -4378,11 +4330,7 @@ public:
         ForEachItemIn(i,items) {
             try {
                 SelectItem &si = items.item(i);
-                struct epoll_event event;
-                ::epoll_ctl(epfd, EPOLL_CTL_DEL, si.handle, &event);
-# ifdef EPOLLTRACE
-                DBGLOG("EPOLL: removed fd %d from epfd %d", si.handle, epfd);
-# endif
+                epoll_op(epfd, EPOLL_CTL_DEL, si.handle, 0);
                 si.sock->Release();
                 si.nfy->Release();
             }
@@ -4398,6 +4346,7 @@ public:
             ::close(epfd);
             epfd = -1;
             delete [] epfdtbl;
+            delete [] epevents;
         }
     }
 
@@ -4414,11 +4363,7 @@ public:
                 reindex = true;
             }
             if (si.del) {
-                struct epoll_event event;
-                ::epoll_ctl(epfd, EPOLL_CTL_DEL, si.handle, &event);
-# ifdef EPOLLTRACE
-                DBGLOG("EPOLL: removed fd %d from epfd %d", si.handle, epfd);
-# endif
+                epoll_op(epfd, EPOLL_CTL_DEL, si.handle, 0);
                 epfdtbl[si.handle] = -1;
                 reindex = true;
                 si.nfy->Release();
@@ -4465,17 +4410,7 @@ public:
                         }
                         if (ep_mode != 0) {
                             ep_mode |= EPOLLRDHUP;
-                            event.events = ep_mode;
-                            event.data.fd = si.handle;
-                            srtn = ::epoll_ctl(epfd, EPOLL_CTL_ADD, si.handle, &event);
-                            if (srtn < 0) {
-                                int err = ERRNO();
-                                LOGERR(err,1,"epoll_ctl(ADD)");
-                                THROWJSOCKEXCEPTION2(err);
-                            }
-# ifdef EPOLLTRACE
-                            DBGLOG("EPOLL: added fd %d to epfd %d", si.handle, epfd);
-# endif
+                            epoll_op(epfd, EPOLL_CTL_ADD, si.handle, ep_mode);
                         }
                     }
 # ifdef EPOLLTRACE
@@ -4483,8 +4418,7 @@ public:
 # endif
                 }
 # ifdef EPOLLTRACE
-                max_sockid++;
-                for(int ix=0; ix<max_sockid; ix++) {
+                for(int ix=0; ix<=max_sockid; ix++) {
                     DBGLOG("EPOLL: epfdtbl[%d] = %d", ix, epfdtbl[ix]);
                 }
 # endif
@@ -4526,26 +4460,6 @@ public:
         return true;
     }
 
-    bool remove(ISocket *sock)
-    {
-        return CSocketMultiThread::remove(sock);
-    }
-
-    void stop(bool wait)
-    {
-        CSocketMultiThread::stop(wait);
-    }
-
-    bool sockOk(T_SOCKET sock)
-    {
-        return CSocketMultiThread::sockOk(sock);
-    }
-
-    bool checkSocks()
-    {
-        return CSocketMultiThread::checkSocks();
-    }
-
     void updateEpollVars(unsigned &ni)
     {
         CriticalBlock block(sect);
@@ -4581,7 +4495,6 @@ public:
             unsigned lastnumto = 0;
             unsigned totnum = 0;
             unsigned total = 0;
-            struct epoll_event events[XFD_SETSIZE];
             selectvarschange = true;
 
             while (!terminating) {
@@ -4598,7 +4511,7 @@ public:
                     continue;
                 }
 
-                int n = ::epoll_wait(epfd, events, XFD_SETSIZE, 1000);
+                int n = ::epoll_wait(epfd, epevents, XFD_SETSIZE, 1000);
 
 # ifdef EPOLLTRACE
                 if(n > 0)
@@ -4635,30 +4548,30 @@ public:
 
                         for (int j=0;j<n;j++) {
 # ifdef EPOLLTRACE
-                            DBGLOG("EPOLL: events[%d].data.fd = %d, epfdtbl = %d, events mask = %d", j, events[j].data.fd, epfdtbl[events[j].data.fd], events[j].events);
+                            DBGLOG("EPOLL: epevents[%d].data.fd = %d, epfdtbl = %d, emask = %d", j, epevents[j].data.fd, epfdtbl[epevents[j].data.fd], epevents[j].events);
 # endif
 # ifdef _USE_PIPE_FOR_SELECT_TRIGGER
-                            if ((dummysockopen) && (events[j].data.fd == dummysock[0])) {
+                            if ((dummysockopen) && (epevents[j].data.fd == dummysock[0])) {
                                 resettrigger();
                                 continue;
                             }
 # endif
-                            if (events[j].data.fd >= 0) {
-                                assertex(epfdtbl[events[j].data.fd] >= 0);
-                                SelectItem *epsi = items.getArray(epfdtbl[events[j].data.fd]);
+                            if (epevents[j].data.fd >= 0) {
+                                assertex(epfdtbl[epevents[j].data.fd] >= 0);
+                                SelectItem *epsi = items.getArray(epfdtbl[epevents[j].data.fd]);
                                 if (!epsi->del) {
                                     unsigned int ep_mode = 0;
-                                    if (events[j].events & (EPOLLIN | EPOLLPRI)) {
+                                    if (epevents[j].events & (EPOLLIN | EPOLLPRI)) {
                                         ep_mode |= SELECTMODE_READ;
                                     }
-                                    if (events[j].events & (EPOLLERR | EPOLLHUP)) {
+                                    if (epevents[j].events & (EPOLLERR | EPOLLHUP)) {
                                         ep_mode |= SELECTMODE_READ;
                                     }
-                                    if (events[j].events & EPOLLRDHUP) {
+                                    if (epevents[j].events & EPOLLRDHUP) {
                                         // TODO - or should we set EXCEPT ?
                                         ep_mode |= SELECTMODE_READ;
                                     }
-                                    if (events[j].events & EPOLLOUT) {
+                                    if (epevents[j].events & EPOLLOUT) {
                                         ep_mode |= SELECTMODE_WRITE;
                                     }
                                     if (ep_mode != 0) {
@@ -4719,7 +4632,7 @@ public:
 
 class CSocketEpollHandler: public CInterface, implements ISocketSelectHandler
 {
-    CIArrayOf<CSocketEpollThread> threads;
+    CSocketEpollThread *epollthread;
     CriticalSection sect;
     bool started;
     StringAttr epolltrace;
@@ -4728,61 +4641,38 @@ public:
     CSocketEpollHandler(const char *trc)
         : epolltrace(trc)
     {
-        started = false;
+        epollthread = new CSocketEpollThread(epolltrace);
+    }
+
+    ~CSocketEpollHandler()
+    {
+        delete epollthread;
     }
+
     void start()
     {
         CriticalBlock block(sect);
-        if (!started) {
-            started = true;
-            ForEachItemIn(i,threads) {
-                threads.item(i).start();
-            }
-        }
-
+        epollthread->start();
     }
+
     void add(ISocket *sock,unsigned mode,ISocketSelectNotify *nfy)
     {
         CriticalBlock block(sect);
-        loop {
-            bool added=false;
-            ForEachItemIn(i,threads) {
-                if (added)
-                    threads.item(i).remove(sock);
-                else
-                    added = threads.item(i).add(sock,mode,nfy);
-            }
-            if (added)
-                return;
-            CSocketEpollThread *thread = new CSocketEpollThread(epolltrace);
-            threads.append(*thread);
-            if (started)
-                thread->start();
-        }
+        epollthread->add(sock,mode,nfy);
     }
+
     void remove(ISocket *sock)
     {
         CriticalBlock block(sect);
-        ForEachItemIn(i,threads) {
-            if (threads.item(i).remove(sock)&&sock)
-                break;
-        }
+        epollthread->remove(sock);
     }
+
     void stop(bool wait)
     {
         IException *e=NULL;
-        CriticalBlock block(sect);
-        unsigned i = 0;
-        while (i<threads.ordinality()) {
-            CSocketEpollThread &t=threads.item(i);
-            {
-                CriticalUnblock unblock(sect);
-                t.stop(wait);           // not quite as quick as could be if wait true
-            }
-            if (wait && !e && t.termexcept)
-                e = t.termexcept.getClear();
-            i++;
-        }
+        epollthread->stop(wait);           // not quite as quick as could be if wait true
+        if (wait && !e && epollthread->termexcept)
+            e = epollthread->termexcept.getClear();
 #if 0 // don't throw error as too late
         if (e)
             throw e;
@@ -4793,14 +4683,29 @@ public:
 };
 #endif // _HAS_EPOLL_SUPPORT
 
+enum EpollMethod { EPOLL_INIT = 0, EPOLL_DISABLED, EPOLL_ENABLED };
+static EpollMethod epoll_method = EPOLL_INIT;
+static CriticalSection epollsect;
+
 ISocketSelectHandler *createSocketSelectHandler(const char *trc)
 {
 #ifdef _HAS_EPOLL_SUPPORT
-# if 0 // enable once we know method to get env file settings ...
-    if (env_file.use_epoll)
+    {
+        CriticalBlock block(epollsect);
+        // DBGLOG("createSocketSelectHandler(): epoll_method = %d",epoll_method);
+        if (epoll_method == EPOLL_INIT) {
+            Owned<IProperties> conf = createProperties(CONFIG_DIR PATHSEPSTR "environment.conf", true);
+            if (conf->getPropBool("use_epoll", true)) {
+                epoll_method = EPOLL_ENABLED;
+            } else {
+                epoll_method = EPOLL_DISABLED;
+            }
+            // DBGLOG("createSocketSelectHandler(): after reading conf file, epoll_method = %d",epoll_method);
+        }
+    }
+    if (epoll_method == EPOLL_ENABLED)
       return new CSocketEpollHandler(trc);
     else
-# endif
       return new CSocketSelectHandler(trc);
 #else
     return new CSocketSelectHandler(trc);