|
@@ -1988,8 +1988,6 @@ public:
|
|
|
HashDistributeSlaveBase(CGraphElementBase *_container)
|
|
|
: CSlaveActivity(_container)
|
|
|
{
|
|
|
- IHThorHashDistributeArg *distribargs = (IHThorHashDistributeArg *)queryHelper();
|
|
|
- ihash = distribargs->queryHash();
|
|
|
appendOutputLinked(this);
|
|
|
}
|
|
|
~HashDistributeSlaveBase()
|
|
@@ -2089,18 +2087,22 @@ public:
|
|
|
|
|
|
class HashDistributeSlaveActivity : public HashDistributeSlaveBase
|
|
|
{
|
|
|
+ typedef HashDistributeSlaveBase PARENT;
|
|
|
public:
|
|
|
- HashDistributeSlaveActivity(CGraphElementBase *container) : HashDistributeSlaveBase(container)
|
|
|
+ HashDistributeSlaveActivity(CGraphElementBase *container) : PARENT(container)
|
|
|
{
|
|
|
+ IHThorHashDistributeArg *distribargs = (IHThorHashDistributeArg *)queryHelper();
|
|
|
+ ihash = distribargs->queryHash();
|
|
|
}
|
|
|
};
|
|
|
|
|
|
//===========================================================================
|
|
|
|
|
|
-class HashDistributeMergeSlaveActivity : public HashDistributeSlaveBase
|
|
|
+class HashDistributeMergeSlaveActivity : public HashDistributeSlaveActivity
|
|
|
{
|
|
|
+ typedef HashDistributeSlaveActivity PARENT;
|
|
|
public:
|
|
|
- HashDistributeMergeSlaveActivity(CGraphElementBase *container) : HashDistributeSlaveBase(container)
|
|
|
+ HashDistributeMergeSlaveActivity(CGraphElementBase *container) : PARENT(container)
|
|
|
{
|
|
|
IHThorHashDistributeArg *distribargs = (IHThorHashDistributeArg *)queryHelper();
|
|
|
mergecmp = distribargs->queryMergeCompare();
|
|
@@ -2327,16 +2329,16 @@ public:
|
|
|
};
|
|
|
|
|
|
|
|
|
-class ReDistributeSlaveActivity : public HashDistributeSlaveBase
|
|
|
+class ReDistributeSlaveActivity : public HashDistributeSlaveActivity
|
|
|
{
|
|
|
- typedef HashDistributeSlaveBase PARENT;
|
|
|
+ typedef HashDistributeSlaveActivity PARENT;
|
|
|
|
|
|
Owned<CHDRproportional> partitioner;
|
|
|
public:
|
|
|
- ReDistributeSlaveActivity(CGraphElementBase *container) : HashDistributeSlaveBase(container) { }
|
|
|
+ ReDistributeSlaveActivity(CGraphElementBase *container) : PARENT(container) { }
|
|
|
virtual void init(MemoryBuffer &data, MemoryBuffer &slaveData) override
|
|
|
{
|
|
|
- HashDistributeSlaveBase::init(data, slaveData);
|
|
|
+ PARENT::init(data, slaveData);
|
|
|
mptag_t tag = container.queryJobChannel().deserializeMPTag(data);
|
|
|
IHThorHashDistributeArg *distribargs = (IHThorHashDistributeArg *)queryHelper();
|
|
|
partitioner.setown(new CHDRproportional(this, distribargs,tag));
|
|
@@ -2358,7 +2360,7 @@ public:
|
|
|
}
|
|
|
virtual void stop() override
|
|
|
{
|
|
|
- HashDistributeSlaveBase::stop();
|
|
|
+ PARENT::stop();
|
|
|
if (instrm)
|
|
|
instrm.clear(); // should remove here rather than later?
|
|
|
}
|
|
@@ -2368,6 +2370,8 @@ public:
|
|
|
|
|
|
class IndexDistributeSlaveActivity : public HashDistributeSlaveBase
|
|
|
{
|
|
|
+ typedef HashDistributeSlaveBase PARENT;
|
|
|
+
|
|
|
class CKeyLookup : implements IHash
|
|
|
{
|
|
|
IndexDistributeSlaveActivity &owner;
|
|
@@ -2397,7 +2401,7 @@ class IndexDistributeSlaveActivity : public HashDistributeSlaveBase
|
|
|
} *lookup;
|
|
|
|
|
|
public:
|
|
|
- IndexDistributeSlaveActivity(CGraphElementBase *container) : HashDistributeSlaveBase(container), lookup(NULL)
|
|
|
+ IndexDistributeSlaveActivity(CGraphElementBase *container) : PARENT(container), lookup(NULL)
|
|
|
{
|
|
|
}
|
|
|
~IndexDistributeSlaveActivity()
|
|
@@ -2407,7 +2411,7 @@ public:
|
|
|
}
|
|
|
virtual void init(MemoryBuffer &data, MemoryBuffer &slaveData) override
|
|
|
{
|
|
|
- HashDistributeSlaveBase::init(data, slaveData);
|
|
|
+ PARENT::init(data, slaveData);
|
|
|
|
|
|
IHThorKeyedDistributeArg *helper = (IHThorKeyedDistributeArg *) queryHelper();
|
|
|
|