|
@@ -87,11 +87,16 @@ class ThorLookaheadCache: public IThorDataLink, public CSimpleInterface
|
|
|
} thread;
|
|
|
|
|
|
public:
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
|
|
|
|
|
|
+ void doNotify()
|
|
|
+ {
|
|
|
+ if (notify)
|
|
|
+ notify->onInputFinished(count);
|
|
|
+ if (smartbuf)
|
|
|
+ smartbuf->queryWriter()->flush();
|
|
|
+ }
|
|
|
+
|
|
|
int run()
|
|
|
{
|
|
|
if (!started) {
|
|
@@ -161,13 +166,42 @@ public:
|
|
|
{
|
|
|
ActPrintLog(&activity, e, "ThorLookaheadCache get exception");
|
|
|
getexception.setown(e);
|
|
|
- }
|
|
|
- if (notify)
|
|
|
- notify->onInputFinished(count);
|
|
|
- if (smartbuf)
|
|
|
- smartbuf->queryWriter()->flush();
|
|
|
+ }
|
|
|
+
|
|
|
+ // notify and flush async, as these can block, but we do not want to block in->stop()
|
|
|
+ // especially if this is a spilling read ahead, where use case scenarios include not wanting to
|
|
|
+ // block the upstream input.
|
|
|
+ // An example is a firstn which if stop() it not called, it may block
|
|
|
+ // other nodes from pulling because it is blocked upstream on full buffers (which can be discarded
|
|
|
+ // on stop()), and those in turn are blocking other arms of the graph.
|
|
|
+ class CNotifyThread : implements IThreaded
|
|
|
+ {
|
|
|
+ CThreaded threaded;
|
|
|
+ ThorLookaheadCache &owner;
|
|
|
+ public:
|
|
|
+ CNotifyThread(ThorLookaheadCache &_owner) : threaded("Lookahead-CNotifyThread"), owner(_owner)
|
|
|
+ {
|
|
|
+ threaded.init(this);
|
|
|
+ }
|
|
|
+ ~CNotifyThread()
|
|
|
+ {
|
|
|
+ loop
|
|
|
+ {
|
|
|
+ if (threaded.join(60000))
|
|
|
+ break;
|
|
|
+ PROGLOG("Still waiting on lookahead CNotifyThread thread to complete");
|
|
|
+ }
|
|
|
+ }
|
|
|
+ // IThreaded impl.
|
|
|
+ virtual void main()
|
|
|
+ {
|
|
|
+ owner.doNotify();
|
|
|
+ }
|
|
|
+ } notifyThread(*this);
|
|
|
+
|
|
|
running = false;
|
|
|
- try {
|
|
|
+ try
|
|
|
+ {
|
|
|
if (in)
|
|
|
in->stop();
|
|
|
}
|
|
@@ -176,7 +210,8 @@ public:
|
|
|
ActPrintLog(&activity, e, "ThorLookaheadCache stop exception");
|
|
|
if (!getexception.get())
|
|
|
getexception.setown(e);
|
|
|
- }
|
|
|
+ }
|
|
|
+ // NB: Will wait on CNotifyThread to finish before returning
|
|
|
return 0;
|
|
|
}
|
|
|
|
|
@@ -249,8 +284,7 @@ public:
|
|
|
}
|
|
|
return row.getClear();
|
|
|
}
|
|
|
-
|
|
|
-
|
|
|
+
|
|
|
bool isGrouped() { return false; }
|
|
|
|
|
|
void getMetaInfo(ThorDataLinkMetaInfo &info)
|