|
@@ -27,11 +27,10 @@ protected:
|
|
|
rowcount_t counter, localRecCount;
|
|
|
rowcount_t denominator, numerator;
|
|
|
Owned<IThorDataLink> input;
|
|
|
- bool localCountReq;
|
|
|
|
|
|
bool haveLocalCount() { return RCUNBOUND != localRecCount; }
|
|
|
inline bool wanted()
|
|
|
- {
|
|
|
+ {
|
|
|
counter += numerator;
|
|
|
if(counter >= denominator)
|
|
|
{
|
|
@@ -66,13 +65,31 @@ protected:
|
|
|
ActPrintLog("%s: Initial value of counter %" RCPF "d", actStr.str(), counter);
|
|
|
#endif
|
|
|
}
|
|
|
-
|
|
|
+ void setLocalCountReq()
|
|
|
+ {
|
|
|
+ ThorDataLinkMetaInfo info;
|
|
|
+ input->getMetaInfo(info);
|
|
|
+ // Need lookahead _unless_ row count pre-known.
|
|
|
+ if (0 == numerator)
|
|
|
+ localRecCount = 0;
|
|
|
+ else if (info.totalRowsMin == info.totalRowsMax)
|
|
|
+ {
|
|
|
+ localRecCount = (rowcount_t)info.totalRowsMax;
|
|
|
+ ActPrintLog("%s: row count pre-known to be %" RCPF "d", actStr.str(), localRecCount);
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ localRecCount = RCUNBOUND;
|
|
|
+ input.setown(createDataLinkSmartBuffer(this, input,ENTH_SMART_BUFFER_SIZE,true,false,RCUNBOUND,this,true,&container.queryJob().queryIDiskUsage()));
|
|
|
+ StringBuffer tmpStr(actStr);
|
|
|
+ startInput(input);
|
|
|
+ }
|
|
|
+ }
|
|
|
public:
|
|
|
IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
|
|
|
|
|
|
BaseEnthActivity(CGraphElementBase *_container) : CSlaveActivity(_container), CThorDataLink(this)
|
|
|
{
|
|
|
- localCountReq = false;
|
|
|
}
|
|
|
~BaseEnthActivity()
|
|
|
{
|
|
@@ -91,27 +108,6 @@ public:
|
|
|
input.set(inputs.item(0));
|
|
|
startInput(input);
|
|
|
dataLinkStart();
|
|
|
-
|
|
|
- if (localCountReq)
|
|
|
- {
|
|
|
- ThorDataLinkMetaInfo info;
|
|
|
- input->getMetaInfo(info);
|
|
|
- // Need lookahead _unless_ row count pre-known.
|
|
|
- if (0 == numerator)
|
|
|
- localRecCount = 0;
|
|
|
- else if (info.totalRowsMin == info.totalRowsMax)
|
|
|
- {
|
|
|
- localRecCount = (rowcount_t)info.totalRowsMax;
|
|
|
- ActPrintLog("%s: row count pre-known to be %" RCPF "d", actStr.str(), localRecCount);
|
|
|
- }
|
|
|
- else
|
|
|
- {
|
|
|
- localRecCount = RCUNBOUND;
|
|
|
- input.setown(createDataLinkSmartBuffer(this, input,ENTH_SMART_BUFFER_SIZE,true,false,RCUNBOUND,this,true,&container.queryJob().queryIDiskUsage()));
|
|
|
- StringBuffer tmpStr(actStr);
|
|
|
- startInput(input);
|
|
|
- }
|
|
|
- }
|
|
|
}
|
|
|
virtual void stop()
|
|
|
{
|
|
@@ -134,16 +130,21 @@ public:
|
|
|
|
|
|
class CLocalEnthSlaveActivity : public BaseEnthActivity
|
|
|
{
|
|
|
+ bool localCountReq;
|
|
|
public:
|
|
|
CLocalEnthSlaveActivity(CGraphElementBase *container) : BaseEnthActivity(container)
|
|
|
{
|
|
|
actStr.append("LOCALENTH");
|
|
|
+ localCountReq = false;
|
|
|
}
|
|
|
- virtual void init(MemoryBuffer & data, MemoryBuffer &slaveData)
|
|
|
+ virtual void start()
|
|
|
{
|
|
|
- BaseEnthActivity::init(data, slaveData);
|
|
|
+ BaseEnthActivity::start();
|
|
|
if (RCUNBOUND == denominator)
|
|
|
+ {
|
|
|
localCountReq = true;
|
|
|
+ setLocalCountReq();
|
|
|
+ }
|
|
|
else
|
|
|
setInitialCounter(0);
|
|
|
}
|
|
@@ -219,7 +220,6 @@ class CEnthSlaveActivity : public BaseEnthActivity
|
|
|
public:
|
|
|
CEnthSlaveActivity(CGraphElementBase *container) : BaseEnthActivity(container)
|
|
|
{
|
|
|
- localCountReq = true;
|
|
|
actStr.append("ENTH");
|
|
|
}
|
|
|
virtual void init(MemoryBuffer & data, MemoryBuffer &slaveData)
|
|
@@ -232,6 +232,7 @@ public:
|
|
|
BaseEnthActivity::start();
|
|
|
prevRecCount = 0;
|
|
|
first = true;
|
|
|
+ setLocalCountReq();
|
|
|
}
|
|
|
virtual void abort()
|
|
|
{
|