|
@@ -956,7 +956,7 @@ class CKeyedJoinSlave : public CSlaveActivity, public CThorDataLink, implements
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
- void stop()
|
|
|
+ void stop(bool stopPending)
|
|
|
{
|
|
|
crit.enter();
|
|
|
if (stopped)
|
|
@@ -971,8 +971,11 @@ class CKeyedJoinSlave : public CSlaveActivity, public CThorDataLink, implements
|
|
|
}
|
|
|
crit.leave();
|
|
|
threaded.join();
|
|
|
- requestProcessor->stop();
|
|
|
- resultProcessor->stop();
|
|
|
+ if (stopPending) // stop groups in progress
|
|
|
+ {
|
|
|
+ requestProcessor->stop();
|
|
|
+ resultProcessor->stop();
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
void abort()
|
|
@@ -980,7 +983,7 @@ class CKeyedJoinSlave : public CSlaveActivity, public CThorDataLink, implements
|
|
|
if (!aborted)
|
|
|
{
|
|
|
aborted = true;
|
|
|
- stop();
|
|
|
+ stop(true);
|
|
|
}
|
|
|
}
|
|
|
void sendStop()
|
|
@@ -1994,7 +1997,7 @@ public:
|
|
|
virtual void stop()
|
|
|
{
|
|
|
if (fetchHandler)
|
|
|
- fetchHandler->stop();
|
|
|
+ fetchHandler->stop(true);
|
|
|
if (!eos)
|
|
|
{
|
|
|
eos = true;
|
|
@@ -2232,7 +2235,7 @@ public:
|
|
|
continue;
|
|
|
else if (needsDiskRead)
|
|
|
{
|
|
|
- fetchHandler->stop();
|
|
|
+ fetchHandler->stop(false);
|
|
|
if (waitPendingGroups())
|
|
|
continue;
|
|
|
}
|