|
@@ -4617,13 +4617,18 @@ class CSocketSelectHandler: implements ISocketSelectHandler, public CInterface
|
|
CIArrayOf<CSocketSelectThread> threads;
|
|
CIArrayOf<CSocketSelectThread> threads;
|
|
CriticalSection sect;
|
|
CriticalSection sect;
|
|
bool started;
|
|
bool started;
|
|
|
|
+ std::atomic<bool> stopped;
|
|
StringAttr selecttrace;
|
|
StringAttr selecttrace;
|
|
public:
|
|
public:
|
|
IMPLEMENT_IINTERFACE;
|
|
IMPLEMENT_IINTERFACE;
|
|
CSocketSelectHandler(const char *trc)
|
|
CSocketSelectHandler(const char *trc)
|
|
- : selecttrace(trc)
|
|
|
|
|
|
+ : started(false), stopped(false), selecttrace(trc)
|
|
{
|
|
{
|
|
- started = false;
|
|
|
|
|
|
+ }
|
|
|
|
+ ~CSocketSelectHandler()
|
|
|
|
+ {
|
|
|
|
+ stop(true);
|
|
|
|
+ threads.kill();
|
|
}
|
|
}
|
|
void start()
|
|
void start()
|
|
{
|
|
{
|
|
@@ -4639,6 +4644,8 @@ public:
|
|
void add(ISocket *sock,unsigned mode,ISocketSelectNotify *nfy)
|
|
void add(ISocket *sock,unsigned mode,ISocketSelectNotify *nfy)
|
|
{
|
|
{
|
|
CriticalBlock block(sect);
|
|
CriticalBlock block(sect);
|
|
|
|
+ if (stopped)
|
|
|
|
+ return;
|
|
for (;;) {
|
|
for (;;) {
|
|
bool added=false;
|
|
bool added=false;
|
|
ForEachItemIn(i,threads) {
|
|
ForEachItemIn(i,threads) {
|
|
@@ -4667,6 +4674,7 @@ public:
|
|
{
|
|
{
|
|
IException *e=NULL;
|
|
IException *e=NULL;
|
|
CriticalBlock block(sect);
|
|
CriticalBlock block(sect);
|
|
|
|
+ stopped = true;
|
|
unsigned i = 0;
|
|
unsigned i = 0;
|
|
while (i<threads.ordinality()) {
|
|
while (i<threads.ordinality()) {
|
|
CSocketSelectThread &t=threads.item(i);
|
|
CSocketSelectThread &t=threads.item(i);
|
|
@@ -5195,12 +5203,13 @@ class CSocketEpollHandler: implements ISocketSelectHandler, public CInterface
|
|
CIArrayOf<CSocketEpollThread> threads;
|
|
CIArrayOf<CSocketEpollThread> threads;
|
|
CriticalSection sect;
|
|
CriticalSection sect;
|
|
bool started;
|
|
bool started;
|
|
|
|
+ std::atomic<bool> stopped;
|
|
StringAttr epolltrace;
|
|
StringAttr epolltrace;
|
|
unsigned hdlPerThrd;
|
|
unsigned hdlPerThrd;
|
|
public:
|
|
public:
|
|
IMPLEMENT_IINTERFACE;
|
|
IMPLEMENT_IINTERFACE;
|
|
CSocketEpollHandler(const char *trc, unsigned _hdlPerThrd)
|
|
CSocketEpollHandler(const char *trc, unsigned _hdlPerThrd)
|
|
- : started(false), epolltrace(trc), hdlPerThrd(_hdlPerThrd)
|
|
|
|
|
|
+ : started(false), stopped(false), epolltrace(trc), hdlPerThrd(_hdlPerThrd)
|
|
{
|
|
{
|
|
}
|
|
}
|
|
|
|
|
|
@@ -5230,6 +5239,9 @@ public:
|
|
throw MakeStringException(-1,"CSocketEpollHandler::add() invalid sock or nfy or mode");
|
|
throw MakeStringException(-1,"CSocketEpollHandler::add() invalid sock or nfy or mode");
|
|
|
|
|
|
CriticalBlock block(sect);
|
|
CriticalBlock block(sect);
|
|
|
|
+ if (stopped)
|
|
|
|
+ return;
|
|
|
|
+
|
|
// Create new handler thread if current one has hdlPerThrd fds.
|
|
// Create new handler thread if current one has hdlPerThrd fds.
|
|
// epoll() handles many fds faster than select so this would
|
|
// epoll() handles many fds faster than select so this would
|
|
// seem not as important, but we are still serializing on
|
|
// seem not as important, but we are still serializing on
|
|
@@ -5273,6 +5285,7 @@ public:
|
|
void stop(bool wait)
|
|
void stop(bool wait)
|
|
{
|
|
{
|
|
CriticalBlock block(sect);
|
|
CriticalBlock block(sect);
|
|
|
|
+ stopped = true;
|
|
ForEachItemIn(i,threads)
|
|
ForEachItemIn(i,threads)
|
|
{
|
|
{
|
|
CSocketEpollThread &t=threads.item(i);
|
|
CSocketEpollThread &t=threads.item(i);
|