|
@@ -188,6 +188,103 @@ class CDistributorBase : public CSimpleInterface, implements IHashDistributor, i
|
|
|
*/
|
|
|
class CSender : public CSimpleInterface, implements IThreadFactory, implements IExceptionHandler
|
|
|
{
|
|
|
+ class CTarget
|
|
|
+ {
|
|
|
+ CSender &owner;
|
|
|
+ unsigned target;
|
|
|
+ atomic_t activeWriters;
|
|
|
+ atomic_t senderFinished;
|
|
|
+ CSendBucketQueue pendingBuckets;
|
|
|
+ mutable CriticalSection crit;
|
|
|
+ Owned<CSendBucket> bucket;
|
|
|
+ public:
|
|
|
+ CTarget(CSender &_owner, unsigned _target) : owner(_owner), target(_target)
|
|
|
+ {
|
|
|
+ atomic_set(&activeWriters, 0);
|
|
|
+ atomic_set(&senderFinished, 0);
|
|
|
+ }
|
|
|
+ ~CTarget()
|
|
|
+ {
|
|
|
+ reset();
|
|
|
+ }
|
|
|
+ void reset()
|
|
|
+ {
|
|
|
+ loop
|
|
|
+ {
|
|
|
+ CSendBucket *sendBucket = pendingBuckets.dequeueNow();
|
|
|
+ if (!sendBucket)
|
|
|
+ break;
|
|
|
+ ::Release(sendBucket);
|
|
|
+ }
|
|
|
+ bucket.clear();
|
|
|
+ atomic_set(&activeWriters, 0);
|
|
|
+ atomic_set(&senderFinished, 0);
|
|
|
+ }
|
|
|
+ void send(CMessageBuffer &mb)
|
|
|
+ {
|
|
|
+ CriticalBlock b(crit);
|
|
|
+ if (!atomic_read(&senderFinished))
|
|
|
+ {
|
|
|
+ if (owner.selfPush(target))
|
|
|
+ assertex(target != owner.self);
|
|
|
+ if (!owner.sendBlock(target, mb))
|
|
|
+ atomic_set(&senderFinished, 1);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ inline unsigned getNumPendingBuckets() const
|
|
|
+ {
|
|
|
+ return pendingBuckets.ordinality();
|
|
|
+ }
|
|
|
+ inline CSendBucket *dequeuePendingBucket()
|
|
|
+ {
|
|
|
+ return pendingBuckets.dequeueNow();
|
|
|
+ }
|
|
|
+ inline void enqueuePendingBucket(CSendBucket *bucket)
|
|
|
+ {
|
|
|
+ pendingBuckets.enqueue(bucket);
|
|
|
+ }
|
|
|
+ inline void incActiveWriters()
|
|
|
+ {
|
|
|
+ atomic_inc(&activeWriters);
|
|
|
+ ++owner.totalActiveWriters; // NB: incActiveWriters() is always called within a activeWritersLock crit
|
|
|
+ }
|
|
|
+ inline void decActiveWriters()
|
|
|
+ {
|
|
|
+ atomic_dec(&activeWriters);
|
|
|
+ --owner.totalActiveWriters; // NB: decActiveWriters() is always called within a activeWritersLock crit
|
|
|
+ }
|
|
|
+ inline unsigned getActiveWriters() const
|
|
|
+ {
|
|
|
+ return atomic_read(&activeWriters);
|
|
|
+ }
|
|
|
+ inline bool getSenderFinished() const
|
|
|
+ {
|
|
|
+ return atomic_read(&senderFinished);
|
|
|
+ }
|
|
|
+ inline void checkSenderFinished()
|
|
|
+ {
|
|
|
+ CriticalBlock b(crit);
|
|
|
+ if (!atomic_read(&senderFinished))
|
|
|
+ {
|
|
|
+ atomic_set(&senderFinished, 1);
|
|
|
+ atomic_inc(&owner.numFinished);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ inline CSendBucket *queryBucket()
|
|
|
+ {
|
|
|
+ return bucket;
|
|
|
+ }
|
|
|
+ inline CSendBucket *queryBucketCreate()
|
|
|
+ {
|
|
|
+ if (!bucket)
|
|
|
+ bucket.setown(new CSendBucket(owner.owner, target));
|
|
|
+ return bucket;
|
|
|
+ }
|
|
|
+ inline CSendBucket *getBucketClear()
|
|
|
+ {
|
|
|
+ return bucket.getClear();
|
|
|
+ }
|
|
|
+ };
|
|
|
/*
|
|
|
* CWriterHandler, a per thread class and member of the writerPool
|
|
|
* a write handler, is given an initial CSendBucket and handles the dedup(if applicable)
|
|
@@ -203,21 +300,21 @@ class CDistributorBase : public CSimpleInterface, implements IHashDistributor, i
|
|
|
CDistributorBase &distributor;
|
|
|
Owned<CSendBucket> _sendBucket;
|
|
|
unsigned nextPending;
|
|
|
- bool aborted;
|
|
|
+ CTarget *target;
|
|
|
|
|
|
public:
|
|
|
IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
|
|
|
|
|
|
CWriteHandler(CSender &_owner) : owner(_owner), distributor(_owner.owner)
|
|
|
{
|
|
|
- aborted = false;
|
|
|
+ target = NULL;
|
|
|
}
|
|
|
void init(void *startInfo)
|
|
|
{
|
|
|
nextPending = getRandom()%distributor.numnodes;
|
|
|
_sendBucket.setown((CSendBucket *)startInfo);
|
|
|
- owner.setActiveWriter(_sendBucket->queryDestination(), this);
|
|
|
- aborted = false;
|
|
|
+ target = owner.targets.item(_sendBucket->queryDestination());
|
|
|
+ target->incActiveWriters();
|
|
|
}
|
|
|
void main()
|
|
|
{
|
|
@@ -226,7 +323,7 @@ class CDistributorBase : public CSimpleInterface, implements IHashDistributor, i
|
|
|
size32_t writerTotalSz = 0;
|
|
|
size32_t sendSz = 0;
|
|
|
MemoryBuffer mb;
|
|
|
- while (!aborted)
|
|
|
+ while (!owner.aborted)
|
|
|
{
|
|
|
writerTotalSz += sendBucket->querySize();
|
|
|
owner.dedup(sendBucket); // conditional
|
|
@@ -243,7 +340,7 @@ class CDistributorBase : public CSimpleInterface, implements IHashDistributor, i
|
|
|
// more added to dest I'm processing?
|
|
|
{
|
|
|
CriticalBlock b(owner.activeWritersLock);
|
|
|
- sendBucket.setown(owner.pendingBuckets.item(dest)->dequeueNow());
|
|
|
+ sendBucket.setown(target->dequeuePendingBucket());
|
|
|
}
|
|
|
if (sendBucket)
|
|
|
{
|
|
@@ -252,13 +349,13 @@ class CDistributorBase : public CSimpleInterface, implements IHashDistributor, i
|
|
|
continue; // NB: it will flow into else "remote" arm
|
|
|
}
|
|
|
}
|
|
|
- while (!aborted)
|
|
|
+ while (!owner.aborted)
|
|
|
{
|
|
|
// JCSMORE check if worth compressing
|
|
|
CMessageBuffer msg;
|
|
|
fastLZCompressToBuffer(msg, mb.length(), mb.bufferBase());
|
|
|
mb.clear();
|
|
|
- owner.send(dest, msg);
|
|
|
+ target->send(msg);
|
|
|
sendSz = 0;
|
|
|
if (wholeBucket)
|
|
|
break;
|
|
@@ -270,25 +367,25 @@ class CDistributorBase : public CSimpleInterface, implements IHashDistributor, i
|
|
|
// see if others to process
|
|
|
// NB: this will never start processing a bucket for a destination which already has an active writer.
|
|
|
CriticalBlock b(owner.activeWritersLock);
|
|
|
- owner.setActiveWriter(dest, NULL);
|
|
|
+ target->decActiveWriters();
|
|
|
sendBucket.setown(owner.getAnotherBucket(nextPending));
|
|
|
if (!sendBucket)
|
|
|
+ {
|
|
|
+ target = NULL; // will be reinitialized to new target in init(), when thread pool thread is reused
|
|
|
break;
|
|
|
+ }
|
|
|
dest = sendBucket->queryDestination();
|
|
|
- owner.setActiveWriter(dest, this);
|
|
|
+ target = owner.targets.item(dest);
|
|
|
+ target->incActiveWriters();
|
|
|
HDSendPrintLog3("CWriteHandler, now dealing with (b=%d), size=%d", sendBucket->queryDestination(), sendBucket->querySize());
|
|
|
}
|
|
|
}
|
|
|
bool canReuse() { return true; }
|
|
|
bool stop() { return true; }
|
|
|
- void abort()
|
|
|
- {
|
|
|
- aborted = true;
|
|
|
- }
|
|
|
- } **activeWriters;
|
|
|
+ };
|
|
|
|
|
|
CDistributorBase &owner;
|
|
|
- CriticalSection activeWritersLock;
|
|
|
+ mutable CriticalSection activeWritersLock;
|
|
|
mutable SpinLock totalSzLock;
|
|
|
SpinLock doDedupLock;
|
|
|
IPointerArrayOf<CSendBucket> buckets;
|
|
@@ -297,52 +394,39 @@ class CDistributorBase : public CSimpleInterface, implements IHashDistributor, i
|
|
|
bool senderFull, doDedup, aborted, initialized;
|
|
|
Semaphore senderFullSem;
|
|
|
Linked<IException> exception;
|
|
|
- OwnedMalloc<bool> senderFinished;
|
|
|
- unsigned numFinished, dedupSamples, dedupSuccesses, self;
|
|
|
+ atomic_t numFinished;
|
|
|
+ unsigned dedupSamples, dedupSuccesses, self;
|
|
|
Owned<IThreadPool> writerPool;
|
|
|
- PointerArrayOf<CSendBucketQueue> pendingBuckets;
|
|
|
- unsigned numActiveWriters;
|
|
|
+ unsigned totalActiveWriters;
|
|
|
+ PointerArrayOf<CTarget> targets;
|
|
|
|
|
|
void init()
|
|
|
{
|
|
|
- unsigned n;
|
|
|
- for (unsigned n=0; n<owner.numnodes; n++)
|
|
|
- buckets.append(NULL);
|
|
|
totalSz = 0;
|
|
|
senderFull = false;
|
|
|
- senderFinished.allocateN(owner.numnodes, true);
|
|
|
- numFinished = 0;
|
|
|
+ atomic_set(&numFinished, 0);
|
|
|
dedupSamples = dedupSuccesses = 0;
|
|
|
doDedup = owner.doDedup;
|
|
|
writerPool.setown(createThreadPool("HashDist writer pool", this, this, owner.writerPoolSize, 5*60*1000));
|
|
|
self = owner.activity->queryJob().queryMyRank()-1;
|
|
|
- for (n=0; n<owner.numnodes; n++)
|
|
|
- pendingBuckets.append(new CSendBucketQueue);
|
|
|
- numActiveWriters = 0;
|
|
|
+
|
|
|
+ targets.ensure(owner.numnodes);
|
|
|
+ for (unsigned n=0; n<owner.numnodes; n++)
|
|
|
+ targets.append(new CTarget(*this, n));
|
|
|
+
|
|
|
+ totalActiveWriters = 0;
|
|
|
aborted = false;
|
|
|
- activeWriters = new CWriteHandler *[owner.numnodes];
|
|
|
- memset(activeWriters, 0, owner.numnodes * sizeof(CWriteHandler *));
|
|
|
initialized = true;
|
|
|
}
|
|
|
void reset()
|
|
|
{
|
|
|
- assertex(0 == numActiveWriters);
|
|
|
+ assertex(0 == totalActiveWriters);
|
|
|
// unless it was aborted, there shouldn't be any pending or non-null buckets
|
|
|
for (unsigned n=0; n<owner.numnodes; n++)
|
|
|
- {
|
|
|
- CSendBucketQueue *queue = pendingBuckets.item(n);
|
|
|
- loop
|
|
|
- {
|
|
|
- CSendBucket *bucket = queue->dequeueNow();
|
|
|
- if (!bucket)
|
|
|
- break;
|
|
|
- ::Release(bucket);
|
|
|
- }
|
|
|
- buckets.replace(NULL, n);
|
|
|
- }
|
|
|
+ targets.item(n)->reset();
|
|
|
totalSz = 0;
|
|
|
senderFull = false;
|
|
|
- numFinished = 0;
|
|
|
+ atomic_set(&numFinished, 0);
|
|
|
aborted = false;
|
|
|
}
|
|
|
void reinit()
|
|
@@ -352,6 +436,11 @@ class CDistributorBase : public CSimpleInterface, implements IHashDistributor, i
|
|
|
else
|
|
|
init();
|
|
|
}
|
|
|
+ unsigned queryInactiveWriters() const
|
|
|
+ {
|
|
|
+ CriticalBlock b(activeWritersLock);
|
|
|
+ return owner.writerPoolSize - totalActiveWriters;
|
|
|
+ }
|
|
|
void dedup(CSendBucket *sendBucket)
|
|
|
{
|
|
|
{
|
|
@@ -380,42 +469,6 @@ class CDistributorBase : public CSimpleInterface, implements IHashDistributor, i
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
- CSendBucket *queryBucket(unsigned n)
|
|
|
- {
|
|
|
- return buckets.item(n);
|
|
|
- }
|
|
|
- CSendBucket *queryBucketCreate(unsigned n)
|
|
|
- {
|
|
|
- CSendBucket *bucket = buckets.item(n);
|
|
|
- if (!bucket)
|
|
|
- {
|
|
|
- bucket = new CSendBucket(owner, n);
|
|
|
- buckets.replace(bucket, n);
|
|
|
- }
|
|
|
- return bucket;
|
|
|
- }
|
|
|
- CSendBucket *getBucketClear(unsigned n)
|
|
|
- {
|
|
|
- Linked<CSendBucket> bucket = buckets.item(n);
|
|
|
- if (!bucket)
|
|
|
- return NULL;
|
|
|
- buckets.replace(NULL, n);
|
|
|
- return bucket.getClear();
|
|
|
- }
|
|
|
- void send(unsigned dest, CMessageBuffer &mb)
|
|
|
- {
|
|
|
- if (!senderFinished[dest])
|
|
|
- {
|
|
|
- if (selfPush(dest))
|
|
|
- assertex(dest != self);
|
|
|
- if (!owner.sendBlock(dest, mb))
|
|
|
- {
|
|
|
- ActPrintLog(owner.activity, "CDistributorBase::sendBlock stopped slave %d", dest+1);
|
|
|
- senderFinished[dest] = true;
|
|
|
- numFinished++;
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
void decTotal(size32_t sz)
|
|
|
{
|
|
|
SpinBlock b(totalSzLock);
|
|
@@ -432,7 +485,15 @@ class CDistributorBase : public CSimpleInterface, implements IHashDistributor, i
|
|
|
SpinBlock b(totalSzLock);
|
|
|
return totalSz;
|
|
|
}
|
|
|
- inline bool selfPush(unsigned i)
|
|
|
+ inline bool sendBlock(unsigned i, CMessageBuffer &msg)
|
|
|
+ {
|
|
|
+ if (owner.sendBlock(i, msg))
|
|
|
+ return true;
|
|
|
+ atomic_inc(&numFinished);
|
|
|
+ ActPrintLog(owner.activity, "CSender::sendBlock stopped slave %d (finished=%d)", i+1, atomic_read(&numFinished));
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ inline bool selfPush(unsigned i) const
|
|
|
{
|
|
|
return (i==self)&&!owner.pull;
|
|
|
}
|
|
@@ -447,7 +508,7 @@ class CDistributorBase : public CSimpleInterface, implements IHashDistributor, i
|
|
|
try
|
|
|
{
|
|
|
nullMsg.clear();
|
|
|
- owner.sendBlock(i, nullMsg);
|
|
|
+ sendBlock(i, nullMsg);
|
|
|
}
|
|
|
catch (IException *e)
|
|
|
{
|
|
@@ -469,19 +530,8 @@ class CDistributorBase : public CSimpleInterface, implements IHashDistributor, i
|
|
|
{
|
|
|
if (initialized)
|
|
|
{
|
|
|
- delete [] activeWriters;
|
|
|
for (unsigned n=0; n<owner.numnodes; n++)
|
|
|
- {
|
|
|
- CSendBucketQueue *queue = pendingBuckets.item(n);
|
|
|
- loop
|
|
|
- {
|
|
|
- CSendBucket *bucket = queue->dequeueNow();
|
|
|
- if (!bucket)
|
|
|
- break;
|
|
|
- ::Release(bucket);
|
|
|
- }
|
|
|
- delete queue;
|
|
|
- }
|
|
|
+ delete targets.item(n);
|
|
|
}
|
|
|
}
|
|
|
CSendBucket *getAnotherBucket(unsigned &next)
|
|
@@ -490,15 +540,15 @@ class CDistributorBase : public CSimpleInterface, implements IHashDistributor, i
|
|
|
unsigned start = next;
|
|
|
loop
|
|
|
{
|
|
|
- unsigned current=next;
|
|
|
- unsigned c = pendingBuckets.item(current)->ordinality();
|
|
|
+ CTarget *target = targets.item(next);
|
|
|
+ unsigned c = target->getNumPendingBuckets();
|
|
|
++next;
|
|
|
if (next>=owner.numnodes)
|
|
|
next = 0;
|
|
|
if (c)
|
|
|
{
|
|
|
- if (NULL == activeWriters[current])
|
|
|
- return pendingBuckets.item(current)->dequeueNow();
|
|
|
+ if (!owner.targetWriterLimit || (target->getActiveWriters() < owner.targetWriterLimit))
|
|
|
+ return target->dequeuePendingBucket();
|
|
|
}
|
|
|
if (next == start)
|
|
|
return NULL;
|
|
@@ -506,13 +556,11 @@ class CDistributorBase : public CSimpleInterface, implements IHashDistributor, i
|
|
|
}
|
|
|
void add(CSendBucket *bucket)
|
|
|
{
|
|
|
- if (owner.selfstopped && !senderFinished[self])
|
|
|
- {
|
|
|
- senderFinished[self] = true;
|
|
|
- ++numFinished;
|
|
|
- }
|
|
|
+ if (owner.selfstopped)
|
|
|
+ targets.item(self)->checkSenderFinished();
|
|
|
unsigned dest = bucket->queryDestination();
|
|
|
- if (senderFinished[dest])
|
|
|
+ CTarget *target = targets.item(dest);
|
|
|
+ if (target->getSenderFinished())
|
|
|
{
|
|
|
HDSendPrintLog2("CSender::add disposing of bucket [finished(%d)]", dest);
|
|
|
bucket->Release();
|
|
@@ -520,71 +568,14 @@ class CDistributorBase : public CSimpleInterface, implements IHashDistributor, i
|
|
|
else
|
|
|
{
|
|
|
CriticalBlock b(activeWritersLock);
|
|
|
- CWriteHandler *writer = activeWriters[dest];
|
|
|
- if (!writer && (numActiveWriters < owner.writerPoolSize))
|
|
|
+ if ((totalActiveWriters < owner.writerPoolSize) && (!owner.targetWriterLimit || (target->getActiveWriters() < owner.targetWriterLimit)))
|
|
|
{
|
|
|
- HDSendPrintLog2("CSender::add (new thread), dest=%d", dest);
|
|
|
+ HDSendPrintLog3("CSender::add (new thread), dest=%d, active=%d", dest, totalActiveWriters);
|
|
|
writerPool->start(bucket);
|
|
|
}
|
|
|
else // an existing writer will pick up
|
|
|
- pendingBuckets.item(dest)->enqueue(bucket);
|
|
|
- }
|
|
|
- }
|
|
|
- void setActiveWriter(unsigned n, CWriteHandler *writer)
|
|
|
- {
|
|
|
- if (writer)
|
|
|
- {
|
|
|
- assertex(!activeWriters[n]);
|
|
|
- ++numActiveWriters;
|
|
|
- }
|
|
|
- else
|
|
|
- {
|
|
|
- assertex(activeWriters[n]);
|
|
|
- --numActiveWriters;
|
|
|
- }
|
|
|
- activeWriters[n] = writer;
|
|
|
- }
|
|
|
- unsigned getSendCandidate(bool &doSelf)
|
|
|
- {
|
|
|
- unsigned i;
|
|
|
- unsigned maxsz=0;
|
|
|
- for (i=0;i<owner.numnodes;i++)
|
|
|
- {
|
|
|
- CSendBucket *bucket = queryBucket(i);
|
|
|
- if (bucket)
|
|
|
- {
|
|
|
- size32_t bucketSz = bucket->querySize();
|
|
|
- if (bucketSz > maxsz)
|
|
|
- maxsz = bucketSz;
|
|
|
- }
|
|
|
- }
|
|
|
- doSelf = false;
|
|
|
- if (0 == maxsz)
|
|
|
- return NotFound;
|
|
|
- candidates.kill();
|
|
|
- for (i=0; i<owner.numnodes; i++)
|
|
|
- {
|
|
|
- CSendBucket *bucket = queryBucket(i);
|
|
|
- if (bucket)
|
|
|
- {
|
|
|
- size32_t bucketSz = bucket->querySize();
|
|
|
- if (bucketSz > maxsz/2)
|
|
|
- {
|
|
|
- if (i==self)
|
|
|
- doSelf = true;
|
|
|
- else
|
|
|
- candidates.append(i);
|
|
|
- }
|
|
|
- }
|
|
|
+ target->enqueuePendingBucket(bucket);
|
|
|
}
|
|
|
- if (0 == candidates.ordinality())
|
|
|
- return NotFound;
|
|
|
- unsigned h;
|
|
|
- if (candidates.ordinality()==1)
|
|
|
- h = candidates.item(0);
|
|
|
- else
|
|
|
- h = candidates.item(getRandom()%candidates.ordinality());
|
|
|
- return h;
|
|
|
}
|
|
|
void process(IRowStream *input)
|
|
|
{
|
|
@@ -594,7 +585,7 @@ class CDistributorBase : public CSimpleInterface, implements IHashDistributor, i
|
|
|
rowcount_t totalSent = 0;
|
|
|
try
|
|
|
{
|
|
|
- while (!aborted && numFinished < owner.numnodes)
|
|
|
+ while (!aborted && atomic_read(&numFinished) < owner.numnodes)
|
|
|
{
|
|
|
while (queryTotalSz() >= owner.inputBufferSize)
|
|
|
{
|
|
@@ -602,40 +593,110 @@ class CDistributorBase : public CSimpleInterface, implements IHashDistributor, i
|
|
|
break;
|
|
|
|
|
|
HDSendPrintLog("process exceeded inputBufferSize");
|
|
|
- bool doSelf;
|
|
|
- unsigned which = getSendCandidate(doSelf);
|
|
|
- if (NotFound != which)
|
|
|
- {
|
|
|
- HDSendPrintLog3("process exceeded: send to %d, size=%d", which, queryBucket(which)->querySize());
|
|
|
- add(getBucketClear(which));
|
|
|
- }
|
|
|
- if (doSelf)
|
|
|
+
|
|
|
+ // establish largest partial bucket
|
|
|
+ unsigned maxSz=0;
|
|
|
+ if (queryInactiveWriters())
|
|
|
{
|
|
|
- HDSendPrintLog2("process exceeded: doSelf, size=%d", queryBucket(self)->querySize());
|
|
|
- add(getBucketClear(self));
|
|
|
+ for (unsigned i=0; i<owner.numnodes; i++)
|
|
|
+ {
|
|
|
+ CSendBucket *bucket = targets.item(i)->queryBucket();
|
|
|
+ if (bucket)
|
|
|
+ {
|
|
|
+ size32_t bucketSz = bucket->querySize();
|
|
|
+ if (bucketSz > maxSz)
|
|
|
+ maxSz = bucketSz;
|
|
|
+ HDSendPrintLog4("b[%d], rows=%d, size=%d", i, bucket->count(), bucketSz);
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
- else if (NotFound == which) // i.e. none
|
|
|
+ /* Only add buckets if some inactive writers
|
|
|
+ * choose larger candidate buckets to targets that are inactive
|
|
|
+ * and randomize from that list which are queued to writers
|
|
|
+ */
|
|
|
+ if (maxSz)
|
|
|
{
|
|
|
- HDSendPrintLog("process exceeded inputBufferSize, none to send");
|
|
|
+ // pick candidates that are at >= 50% size of largest
|
|
|
+ candidates.clear();
|
|
|
+ bool doSelf = false;
|
|
|
+ unsigned inactiveWriters = queryInactiveWriters();
|
|
|
+ for (unsigned i=0; i<owner.numnodes; i++)
|
|
|
{
|
|
|
- SpinBlock b(totalSzLock);
|
|
|
- // some may have been freed after lock
|
|
|
- if (totalSz < owner.inputBufferSize)
|
|
|
- break;
|
|
|
- senderFull = true;
|
|
|
+ CTarget *target = targets.item(i);
|
|
|
+ CSendBucket *bucket = target->queryBucket();
|
|
|
+ if (bucket)
|
|
|
+ {
|
|
|
+ size32_t bucketSz = bucket->querySize();
|
|
|
+ if (bucketSz >= maxSz/2)
|
|
|
+ {
|
|
|
+ if (0 == target->getActiveWriters()) // only if there are no active writer threads for this target
|
|
|
+ {
|
|
|
+ if (i==self)
|
|
|
+ doSelf = true; // always send to self if candidate
|
|
|
+ else
|
|
|
+ {
|
|
|
+ candidates.append(i);
|
|
|
+ HDSendPrintLog4("c[%d], rows=%d, size=%d", i, bucket->count(), bucketSz);
|
|
|
+ /* NB: in theory could be more if some finished since checking, but that's okay
|
|
|
+ * some candidates, or free space will be picked up in next section
|
|
|
+ */
|
|
|
+ if (candidates.ordinality() >= inactiveWriters)
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
- loop
|
|
|
+ unsigned limit = owner.candidateLimit;
|
|
|
+ while (candidates.ordinality())
|
|
|
{
|
|
|
- if (timer.elapsedCycles() >= queryOneSecCycles()*10)
|
|
|
- ActPrintLog(owner.activity, "HD sender, waiting for space");
|
|
|
- timer.reset();
|
|
|
-
|
|
|
- if (senderFullSem.wait(10000))
|
|
|
- break;
|
|
|
- if (aborted)
|
|
|
+ if (0 == queryInactiveWriters())
|
|
|
break;
|
|
|
+ else
|
|
|
+ {
|
|
|
+ unsigned pos = getRandom()%candidates.ordinality();
|
|
|
+ unsigned c = candidates.item(pos);
|
|
|
+ CTarget *target = targets.item(c);
|
|
|
+ CSendBucket *bucket = target->queryBucket();
|
|
|
+ assertex(bucket);
|
|
|
+ HDSendPrintLog3("process exceeded: send to %d, size=%d", c, bucket->querySize());
|
|
|
+ add(target->getBucketClear());
|
|
|
+ if (limit)
|
|
|
+ {
|
|
|
+ --limit;
|
|
|
+ if (0 == limit)
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ candidates.remove(pos);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (doSelf)
|
|
|
+ {
|
|
|
+ CTarget *target = targets.item(self);
|
|
|
+ CSendBucket *bucket = target->queryBucket();
|
|
|
+ assertex(bucket);
|
|
|
+ HDSendPrintLog2("process exceeded: doSelf, size=%d", bucket->querySize());
|
|
|
+ add(target->getBucketClear());
|
|
|
}
|
|
|
}
|
|
|
+ {
|
|
|
+ SpinBlock b(totalSzLock);
|
|
|
+ // some may have been written by now
|
|
|
+ if (totalSz < owner.inputBufferSize)
|
|
|
+ break;
|
|
|
+ senderFull = true;
|
|
|
+ }
|
|
|
+ loop
|
|
|
+ {
|
|
|
+ if (timer.elapsedCycles() >= queryOneSecCycles()*10)
|
|
|
+ ActPrintLog(owner.activity, "HD sender, waiting for space, active writers = %d", queryInactiveWriters());
|
|
|
+ timer.reset();
|
|
|
+
|
|
|
+ if (senderFullSem.wait(10000))
|
|
|
+ break;
|
|
|
+ if (aborted)
|
|
|
+ break;
|
|
|
+ }
|
|
|
}
|
|
|
if (aborted)
|
|
|
break;
|
|
@@ -643,11 +704,12 @@ class CDistributorBase : public CSimpleInterface, implements IHashDistributor, i
|
|
|
if (!row)
|
|
|
break;
|
|
|
unsigned dest = owner.ihash->hash(row)%owner.numnodes;
|
|
|
- if (senderFinished[dest]) // does this need to be thread safe?
|
|
|
+ CTarget *target = targets.item(dest);
|
|
|
+ if (target->getSenderFinished()) // does this need to be thread safe?
|
|
|
ReleaseThorRow(row);
|
|
|
else
|
|
|
{
|
|
|
- CSendBucket *bucket = queryBucketCreate(dest);
|
|
|
+ CSendBucket *bucket = target->queryBucketCreate();
|
|
|
size32_t rs;
|
|
|
bucket->add(row, rs);
|
|
|
totalSent++;
|
|
@@ -658,7 +720,7 @@ class CDistributorBase : public CSimpleInterface, implements IHashDistributor, i
|
|
|
if (bucket->querySize() >= owner.bucketSendSize)
|
|
|
{
|
|
|
HDSendPrintLog3("adding new bucket: %d, size = %d", bucket->queryDestination(), bucket->querySize());
|
|
|
- add(getBucketClear(dest));
|
|
|
+ add(target->getBucketClear());
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -678,7 +740,7 @@ class CDistributorBase : public CSimpleInterface, implements IHashDistributor, i
|
|
|
ForEach(*iter)
|
|
|
{
|
|
|
unsigned dest=iter->get();
|
|
|
- Owned<CSendBucket> bucket = getBucketClear(dest);
|
|
|
+ Owned<CSendBucket> bucket = targets.item(dest)->getBucketClear();
|
|
|
HDSendPrintLog4("Looking at last bucket(%d): %d, size = %d", dest, bucket.get()?bucket->queryDestination():0, bucket.get()?bucket->querySize():-1);
|
|
|
if (bucket && bucket->querySize())
|
|
|
{
|
|
@@ -700,16 +762,6 @@ class CDistributorBase : public CSimpleInterface, implements IHashDistributor, i
|
|
|
return;
|
|
|
aborted = true;
|
|
|
senderFullSem.signal();
|
|
|
- if (initialized)
|
|
|
- {
|
|
|
- CriticalBlock b(activeWritersLock);
|
|
|
- for (unsigned w=0; w<owner.numnodes; w++)
|
|
|
- {
|
|
|
- CWriteHandler *writer = activeWriters[w];
|
|
|
- if (writer)
|
|
|
- writer->abort();
|
|
|
- }
|
|
|
- }
|
|
|
}
|
|
|
// IThreadFactory impl.
|
|
|
virtual IPooledThread *createNew()
|
|
@@ -798,6 +850,8 @@ protected:
|
|
|
CriticalSection putsect;
|
|
|
bool pull, aborted;
|
|
|
CSender sender;
|
|
|
+ unsigned candidateLimit;
|
|
|
+ unsigned targetWriterLimit;
|
|
|
public:
|
|
|
IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
|
|
|
|
|
@@ -811,9 +865,9 @@ public:
|
|
|
iCompare = NULL;
|
|
|
ihash = NULL;
|
|
|
fixedEstSize = 0;
|
|
|
- bucketSendSize = globals->getPropInt("@hd_out_buffer_size", DEFAULT_OUT_BUFFER_SIZE);
|
|
|
+ bucketSendSize = activity->getOptUInt(THOROPT_HDIST_BUCKET_SIZE, DEFAULT_OUT_BUFFER_SIZE);
|
|
|
istop = _istop;
|
|
|
- inputBufferSize = globals->getPropInt("@hd_in_buffer_size", DEFAULT_IN_BUFFER_SIZE);
|
|
|
+ inputBufferSize = activity->getOptUInt(THOROPT_HDIST_BUFFER_SIZE, DEFAULT_IN_BUFFER_SIZE);
|
|
|
pullBufferSize = DISTRIBUTE_PULL_BUFFER_SIZE;
|
|
|
selfstopped = false;
|
|
|
pull = false;
|
|
@@ -823,9 +877,14 @@ public:
|
|
|
if (allowSpill)
|
|
|
ActPrintLog(activity, "Using spilling buffer (will spill if overflows)");
|
|
|
writerPoolSize = activity->getOptUInt(THOROPT_HDIST_WRITE_POOL_SIZE, DEFAULT_WRITEPOOLSIZE);
|
|
|
- if (writerPoolSize>numnodes)
|
|
|
- writerPoolSize = numnodes; // no point in more
|
|
|
+ if (writerPoolSize>(numnodes*2))
|
|
|
+ writerPoolSize = numnodes*2; // limit to 2 per target
|
|
|
ActPrintLog(activity, "Writer thread pool size : %d", writerPoolSize);
|
|
|
+ candidateLimit = activity->getOptUInt(THOROPT_HDIST_CANDIDATELIMIT);
|
|
|
+ ActPrintLog(activity, "candidateLimit : %d", candidateLimit);
|
|
|
+ ActPrintLog(activity, "inputBufferSize : %d, bucketSendSize = %d", inputBufferSize, bucketSendSize);
|
|
|
+ targetWriterLimit = activity->getOptUInt(THOROPT_HDIST_TARGETWRITELIMIT);
|
|
|
+ ActPrintLog(activity, "targetWriterLimit : %d", targetWriterLimit);
|
|
|
}
|
|
|
|
|
|
~CDistributorBase()
|
|
@@ -1013,7 +1072,6 @@ public:
|
|
|
piperd->stop();
|
|
|
pipewr.clear();
|
|
|
ActPrintLog(activity, "HDIST: Read loop done");
|
|
|
-
|
|
|
}
|
|
|
|
|
|
void sendloop()
|