|
@@ -5321,6 +5321,7 @@ IRoxieServerActivityFactory *createRoxieServerInlineTableActivityFactory(unsigne
|
|
|
class CRoxieServerWorkUnitReadActivity : public CRoxieServerActivity
|
|
|
{
|
|
|
IHThorWorkunitReadArg &helper;
|
|
|
+ CriticalSection readerCrit;
|
|
|
Owned<IWorkUnitRowReader> wuReader; // MORE - can we use IRoxieInput instead?
|
|
|
public:
|
|
|
CRoxieServerWorkUnitReadActivity(const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager)
|
|
@@ -5348,8 +5349,10 @@ public:
|
|
|
|
|
|
virtual void reset()
|
|
|
{
|
|
|
- CriticalBlock b(statecrit);
|
|
|
- wuReader.clear();
|
|
|
+ {
|
|
|
+ CriticalBlock b(readerCrit);
|
|
|
+ wuReader.clear();
|
|
|
+ }
|
|
|
CRoxieServerActivity::reset();
|
|
|
};
|
|
|
|
|
@@ -5357,11 +5360,15 @@ public:
|
|
|
|
|
|
virtual const void *nextInGroup()
|
|
|
{
|
|
|
- CriticalBlock b(statecrit);
|
|
|
ActivityTimer t(totalCycles, timeActivities);
|
|
|
- if (!wuReader)
|
|
|
- return NULL;
|
|
|
- const void *ret = wuReader->nextInGroup();
|
|
|
+ Linked<IWorkUnitRowReader> useReader;
|
|
|
+ {
|
|
|
+ CriticalBlock b(readerCrit);
|
|
|
+ if (!wuReader)
|
|
|
+ return NULL;
|
|
|
+ useReader.set(wuReader);
|
|
|
+ }
|
|
|
+ const void *ret = useReader->nextInGroup();
|
|
|
if (ret)
|
|
|
processed++;
|
|
|
return ret;
|
|
@@ -5747,6 +5754,7 @@ protected:
|
|
|
class CRoxieServerLocalResultReadActivity : public CRoxieServerActivity
|
|
|
{
|
|
|
IHThorLocalResultReadArg &helper;
|
|
|
+ CriticalSection iterCrit;
|
|
|
Owned<IRoxieInput> iter;
|
|
|
ILocalGraphEx * graph;
|
|
|
unsigned graphId;
|
|
@@ -5775,18 +5783,24 @@ public:
|
|
|
|
|
|
virtual void reset()
|
|
|
{
|
|
|
- CriticalBlock b(statecrit);
|
|
|
- iter.clear();
|
|
|
+ {
|
|
|
+ CriticalBlock b(iterCrit);
|
|
|
+ iter.clear();
|
|
|
+ }
|
|
|
CRoxieServerActivity::reset();
|
|
|
};
|
|
|
|
|
|
virtual const void *nextInGroup()
|
|
|
{
|
|
|
- CriticalBlock b(statecrit);
|
|
|
ActivityTimer t(totalCycles, timeActivities);
|
|
|
- if (!iter)
|
|
|
- return NULL;
|
|
|
- const void * next = iter->nextInGroup();
|
|
|
+ Linked<IRoxieInput> useIter;
|
|
|
+ {
|
|
|
+ CriticalBlock b(iterCrit);
|
|
|
+ if (!iter)
|
|
|
+ return NULL;
|
|
|
+ useIter.set(iter);
|
|
|
+ }
|
|
|
+ const void * next = useIter->nextInGroup();
|
|
|
if (next)
|
|
|
{
|
|
|
processed++;
|
|
@@ -6076,6 +6090,7 @@ class CRoxieServerGraphLoopResultReadActivity : public CRoxieServerActivity
|
|
|
{
|
|
|
protected:
|
|
|
IHThorGraphLoopResultReadArg &helper;
|
|
|
+ CriticalSection iterCrit;
|
|
|
Owned<IRoxieInput> iter;
|
|
|
ILocalGraphEx * graph;
|
|
|
unsigned graphId;
|
|
@@ -6126,18 +6141,26 @@ public:
|
|
|
|
|
|
virtual void reset()
|
|
|
{
|
|
|
- CriticalBlock b(statecrit);
|
|
|
- if (iter)
|
|
|
- iter->reset();
|
|
|
- iter.clear();
|
|
|
+ {
|
|
|
+ CriticalBlock b(iterCrit);
|
|
|
+ if (iter)
|
|
|
+ iter->reset();
|
|
|
+ iter.clear();
|
|
|
+ }
|
|
|
CRoxieServerActivity::reset();
|
|
|
};
|
|
|
|
|
|
virtual const void *nextInGroup()
|
|
|
{
|
|
|
- CriticalBlock b(statecrit);
|
|
|
ActivityTimer t(totalCycles, timeActivities);
|
|
|
- const void * next = iter ? iter->nextInGroup() : NULL;
|
|
|
+ Linked<IRoxieInput> useIter;
|
|
|
+ {
|
|
|
+ CriticalBlock b(iterCrit);
|
|
|
+ if (!iter)
|
|
|
+ return NULL;
|
|
|
+ useIter.set(iter);
|
|
|
+ }
|
|
|
+ const void * next = useIter->nextInGroup();
|
|
|
if (next)
|
|
|
{
|
|
|
processed++;
|