|
@@ -5322,7 +5322,6 @@ class CRoxieServerWorkUnitReadActivity : public CRoxieServerActivity
|
|
|
{
|
|
|
IHThorWorkunitReadArg &helper;
|
|
|
Owned<IWorkUnitRowReader> wuReader; // MORE - can we use IRoxieInput instead?
|
|
|
-
|
|
|
public:
|
|
|
CRoxieServerWorkUnitReadActivity(const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager)
|
|
|
: CRoxieServerActivity(_factory, _probeManager), helper((IHThorWorkunitReadArg &)basehelper)
|
|
@@ -5349,6 +5348,7 @@ public:
|
|
|
|
|
|
virtual void reset()
|
|
|
{
|
|
|
+ CriticalBlock b(statecrit);
|
|
|
wuReader.clear();
|
|
|
CRoxieServerActivity::reset();
|
|
|
};
|
|
@@ -5357,7 +5357,10 @@ public:
|
|
|
|
|
|
virtual const void *nextInGroup()
|
|
|
{
|
|
|
+ CriticalBlock b(statecrit);
|
|
|
ActivityTimer t(totalCycles, timeActivities);
|
|
|
+ if (!wuReader)
|
|
|
+ return NULL;
|
|
|
const void *ret = wuReader->nextInGroup();
|
|
|
if (ret)
|
|
|
processed++;
|
|
@@ -5772,13 +5775,17 @@ public:
|
|
|
|
|
|
virtual void reset()
|
|
|
{
|
|
|
+ CriticalBlock b(statecrit);
|
|
|
iter.clear();
|
|
|
CRoxieServerActivity::reset();
|
|
|
};
|
|
|
|
|
|
virtual const void *nextInGroup()
|
|
|
{
|
|
|
+ CriticalBlock b(statecrit);
|
|
|
ActivityTimer t(totalCycles, timeActivities);
|
|
|
+ if (!iter)
|
|
|
+ return NULL;
|
|
|
const void * next = iter->nextInGroup();
|
|
|
if (next)
|
|
|
{
|
|
@@ -6119,6 +6126,7 @@ public:
|
|
|
|
|
|
virtual void reset()
|
|
|
{
|
|
|
+ CriticalBlock b(statecrit);
|
|
|
if (iter)
|
|
|
iter->reset();
|
|
|
iter.clear();
|
|
@@ -6127,6 +6135,7 @@ public:
|
|
|
|
|
|
virtual const void *nextInGroup()
|
|
|
{
|
|
|
+ CriticalBlock b(statecrit);
|
|
|
ActivityTimer t(totalCycles, timeActivities);
|
|
|
const void * next = iter ? iter->nextInGroup() : NULL;
|
|
|
if (next)
|