|
@@ -203,21 +203,18 @@ class CDistributorBase : public CSimpleInterface, implements IHashDistributor, i
|
|
|
CDistributorBase &distributor;
|
|
|
Owned<CSendBucket> _sendBucket;
|
|
|
unsigned nextPending;
|
|
|
- bool aborted;
|
|
|
|
|
|
public:
|
|
|
IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
|
|
|
|
|
|
CWriteHandler(CSender &_owner) : owner(_owner), distributor(_owner.owner)
|
|
|
{
|
|
|
- aborted = false;
|
|
|
}
|
|
|
void init(void *startInfo)
|
|
|
{
|
|
|
nextPending = getRandom()%distributor.numnodes;
|
|
|
_sendBucket.setown((CSendBucket *)startInfo);
|
|
|
- owner.setActiveWriter(_sendBucket->queryDestination(), this);
|
|
|
- aborted = false;
|
|
|
+ owner.addActiveWriter(_sendBucket->queryDestination());
|
|
|
}
|
|
|
void main()
|
|
|
{
|
|
@@ -226,7 +223,7 @@ class CDistributorBase : public CSimpleInterface, implements IHashDistributor, i
|
|
|
size32_t writerTotalSz = 0;
|
|
|
size32_t sendSz = 0;
|
|
|
MemoryBuffer mb;
|
|
|
- while (!aborted)
|
|
|
+ while (!owner.aborted)
|
|
|
{
|
|
|
writerTotalSz += sendBucket->querySize();
|
|
|
owner.dedup(sendBucket); // conditional
|
|
@@ -252,7 +249,7 @@ class CDistributorBase : public CSimpleInterface, implements IHashDistributor, i
|
|
|
continue; // NB: it will flow into else "remote" arm
|
|
|
}
|
|
|
}
|
|
|
- while (!aborted)
|
|
|
+ while (!owner.aborted)
|
|
|
{
|
|
|
// JCSMORE check if worth compressing
|
|
|
CMessageBuffer msg;
|
|
@@ -270,25 +267,22 @@ class CDistributorBase : public CSimpleInterface, implements IHashDistributor, i
|
|
|
// see if others to process
|
|
|
// NB: this will never start processing a bucket for a destination which already has an active writer.
|
|
|
CriticalBlock b(owner.activeWritersLock);
|
|
|
- owner.setActiveWriter(dest, NULL);
|
|
|
+ owner.removeActiveWriter(dest);
|
|
|
sendBucket.setown(owner.getAnotherBucket(nextPending));
|
|
|
if (!sendBucket)
|
|
|
break;
|
|
|
dest = sendBucket->queryDestination();
|
|
|
- owner.setActiveWriter(dest, this);
|
|
|
+ owner.addActiveWriter(dest);
|
|
|
HDSendPrintLog3("CWriteHandler, now dealing with (b=%d), size=%d", sendBucket->queryDestination(), sendBucket->querySize());
|
|
|
}
|
|
|
}
|
|
|
bool canReuse() { return true; }
|
|
|
bool stop() { return true; }
|
|
|
- void abort()
|
|
|
- {
|
|
|
- aborted = true;
|
|
|
- }
|
|
|
- } **activeWriters;
|
|
|
+ };
|
|
|
|
|
|
+ OwnedMalloc<unsigned> activeWriters;
|
|
|
CDistributorBase &owner;
|
|
|
- CriticalSection activeWritersLock;
|
|
|
+ mutable CriticalSection activeWritersLock;
|
|
|
mutable SpinLock totalSzLock;
|
|
|
SpinLock doDedupLock;
|
|
|
PointerIArrayOf<CSendBucket> buckets;
|
|
@@ -301,6 +295,7 @@ class CDistributorBase : public CSimpleInterface, implements IHashDistributor, i
|
|
|
unsigned numFinished, dedupSamples, dedupSuccesses, self;
|
|
|
Owned<IThreadPool> writerPool;
|
|
|
PointerArrayOf<CSendBucketQueue> pendingBuckets;
|
|
|
+ PointerArrayOf<CriticalSection> dstCrits;
|
|
|
unsigned numActiveWriters;
|
|
|
|
|
|
void init()
|
|
@@ -311,17 +306,19 @@ class CDistributorBase : public CSimpleInterface, implements IHashDistributor, i
|
|
|
totalSz = 0;
|
|
|
senderFull = false;
|
|
|
senderFinished.allocateN(owner.numnodes, true);
|
|
|
+ activeWriters.allocateN(owner.numnodes, true);
|
|
|
numFinished = 0;
|
|
|
dedupSamples = dedupSuccesses = 0;
|
|
|
doDedup = owner.doDedup;
|
|
|
writerPool.setown(createThreadPool("HashDist writer pool", this, this, owner.writerPoolSize, 5*60*1000));
|
|
|
self = owner.activity->queryJob().queryMyRank()-1;
|
|
|
for (n=0; n<owner.numnodes; n++)
|
|
|
+ {
|
|
|
pendingBuckets.append(new CSendBucketQueue);
|
|
|
+ dstCrits.append(new CriticalSection);
|
|
|
+ }
|
|
|
numActiveWriters = 0;
|
|
|
aborted = false;
|
|
|
- activeWriters = new CWriteHandler *[owner.numnodes];
|
|
|
- memset(activeWriters, 0, owner.numnodes * sizeof(CWriteHandler *));
|
|
|
initialized = true;
|
|
|
}
|
|
|
void reset()
|
|
@@ -352,6 +349,11 @@ class CDistributorBase : public CSimpleInterface, implements IHashDistributor, i
|
|
|
else
|
|
|
init();
|
|
|
}
|
|
|
+ unsigned queryInactiveWriters() const
|
|
|
+ {
|
|
|
+ CriticalBlock b(activeWritersLock);
|
|
|
+ return owner.writerPoolSize - numActiveWriters;
|
|
|
+ }
|
|
|
void dedup(CSendBucket *sendBucket)
|
|
|
{
|
|
|
{
|
|
@@ -404,6 +406,7 @@ class CDistributorBase : public CSimpleInterface, implements IHashDistributor, i
|
|
|
}
|
|
|
void send(unsigned dest, CMessageBuffer &mb)
|
|
|
{
|
|
|
+ CriticalBlock b(* dstCrits.item(dest));
|
|
|
if (!senderFinished[dest])
|
|
|
{
|
|
|
if (selfPush(dest))
|
|
@@ -469,10 +472,10 @@ class CDistributorBase : public CSimpleInterface, implements IHashDistributor, i
|
|
|
{
|
|
|
if (initialized)
|
|
|
{
|
|
|
- delete [] activeWriters;
|
|
|
for (unsigned n=0; n<owner.numnodes; n++)
|
|
|
{
|
|
|
CSendBucketQueue *queue = pendingBuckets.item(n);
|
|
|
+ CriticalSection *dstCrit = dstCrits.item(n);
|
|
|
loop
|
|
|
{
|
|
|
CSendBucket *bucket = queue->dequeueNow();
|
|
@@ -481,6 +484,7 @@ class CDistributorBase : public CSimpleInterface, implements IHashDistributor, i
|
|
|
::Release(bucket);
|
|
|
}
|
|
|
delete queue;
|
|
|
+ delete dstCrit;
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -497,7 +501,7 @@ class CDistributorBase : public CSimpleInterface, implements IHashDistributor, i
|
|
|
next = 0;
|
|
|
if (c)
|
|
|
{
|
|
|
- if (NULL == activeWriters[current])
|
|
|
+ if (!owner.targetWriterLimit || (activeWriters[current] < owner.targetWriterLimit))
|
|
|
return pendingBuckets.item(current)->dequeueNow();
|
|
|
}
|
|
|
if (next == start)
|
|
@@ -520,71 +524,24 @@ class CDistributorBase : public CSimpleInterface, implements IHashDistributor, i
|
|
|
else
|
|
|
{
|
|
|
CriticalBlock b(activeWritersLock);
|
|
|
- CWriteHandler *writer = activeWriters[dest];
|
|
|
- if (!writer && (numActiveWriters < owner.writerPoolSize))
|
|
|
+ if ((numActiveWriters < owner.writerPoolSize) && (!owner.targetWriterLimit || (activeWriters[dest] < owner.targetWriterLimit)))
|
|
|
{
|
|
|
- HDSendPrintLog2("CSender::add (new thread), dest=%d", dest);
|
|
|
+ HDSendPrintLog3("CSender::add (new thread), dest=%d, active=%d", dest, numActiveWriters);
|
|
|
writerPool->start(bucket);
|
|
|
}
|
|
|
else // an existing writer will pick up
|
|
|
pendingBuckets.item(dest)->enqueue(bucket);
|
|
|
}
|
|
|
}
|
|
|
- void setActiveWriter(unsigned n, CWriteHandler *writer)
|
|
|
+ void addActiveWriter(unsigned n)
|
|
|
{
|
|
|
- if (writer)
|
|
|
- {
|
|
|
- assertex(!activeWriters[n]);
|
|
|
- ++numActiveWriters;
|
|
|
- }
|
|
|
- else
|
|
|
- {
|
|
|
- assertex(activeWriters[n]);
|
|
|
- --numActiveWriters;
|
|
|
- }
|
|
|
- activeWriters[n] = writer;
|
|
|
+ activeWriters[n]++;
|
|
|
+ ++numActiveWriters;
|
|
|
}
|
|
|
- unsigned getSendCandidate(bool &doSelf)
|
|
|
+ void removeActiveWriter(unsigned n)
|
|
|
{
|
|
|
- unsigned i;
|
|
|
- unsigned maxsz=0;
|
|
|
- for (i=0;i<owner.numnodes;i++)
|
|
|
- {
|
|
|
- CSendBucket *bucket = queryBucket(i);
|
|
|
- if (bucket)
|
|
|
- {
|
|
|
- size32_t bucketSz = bucket->querySize();
|
|
|
- if (bucketSz > maxsz)
|
|
|
- maxsz = bucketSz;
|
|
|
- }
|
|
|
- }
|
|
|
- doSelf = false;
|
|
|
- if (0 == maxsz)
|
|
|
- return NotFound;
|
|
|
- candidates.kill();
|
|
|
- for (i=0; i<owner.numnodes; i++)
|
|
|
- {
|
|
|
- CSendBucket *bucket = queryBucket(i);
|
|
|
- if (bucket)
|
|
|
- {
|
|
|
- size32_t bucketSz = bucket->querySize();
|
|
|
- if (bucketSz > maxsz/2)
|
|
|
- {
|
|
|
- if (i==self)
|
|
|
- doSelf = true;
|
|
|
- else
|
|
|
- candidates.append(i);
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- if (0 == candidates.ordinality())
|
|
|
- return NotFound;
|
|
|
- unsigned h;
|
|
|
- if (candidates.ordinality()==1)
|
|
|
- h = candidates.item(0);
|
|
|
- else
|
|
|
- h = candidates.item(getRandom()%candidates.ordinality());
|
|
|
- return h;
|
|
|
+ --activeWriters[n];
|
|
|
+ --numActiveWriters;
|
|
|
}
|
|
|
void process(IRowStream *input)
|
|
|
{
|
|
@@ -602,39 +559,112 @@ class CDistributorBase : public CSimpleInterface, implements IHashDistributor, i
|
|
|
break;
|
|
|
|
|
|
HDSendPrintLog("process exceeded inputBufferSize");
|
|
|
- bool doSelf;
|
|
|
- unsigned which = getSendCandidate(doSelf);
|
|
|
- if (NotFound != which)
|
|
|
- {
|
|
|
- HDSendPrintLog3("process exceeded: send to %d, size=%d", which, queryBucket(which)->querySize());
|
|
|
- add(getBucketClear(which));
|
|
|
- }
|
|
|
- if (doSelf)
|
|
|
+
|
|
|
+ // establish largest partial bucket
|
|
|
+ unsigned maxSz=0;
|
|
|
+ if (queryInactiveWriters())
|
|
|
{
|
|
|
- HDSendPrintLog2("process exceeded: doSelf, size=%d", queryBucket(self)->querySize());
|
|
|
- add(getBucketClear(self));
|
|
|
+ for (unsigned i=0; i<owner.numnodes; i++)
|
|
|
+ {
|
|
|
+ CSendBucket *bucket = queryBucket(i);
|
|
|
+ if (bucket)
|
|
|
+ {
|
|
|
+ size32_t bucketSz = bucket->querySize();
|
|
|
+ if (bucketSz > maxSz)
|
|
|
+ maxSz = bucketSz;
|
|
|
+ HDSendPrintLog4("b[%d], rows=%d, size=%d", i, bucket->count(), bucketSz);
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
- else if (NotFound == which) // i.e. none
|
|
|
+ /* Only add buckets if some inactive writers
|
|
|
+ * choose larger candidate buckets to targets that are inactive
|
|
|
+ * and randomize from that list which are queued to writers
|
|
|
+ */
|
|
|
+ if (maxSz)
|
|
|
{
|
|
|
- HDSendPrintLog("process exceeded inputBufferSize, none to send");
|
|
|
+ // pick candidates that are at >= 50% size of largest
|
|
|
+ candidates.kill();
|
|
|
+ bool doSelf = false;
|
|
|
+ for (unsigned i=0; i<owner.numnodes; i++)
|
|
|
{
|
|
|
- SpinBlock b(totalSzLock);
|
|
|
- // some may have been freed after lock
|
|
|
- if (totalSz < owner.inputBufferSize)
|
|
|
- break;
|
|
|
- senderFull = true;
|
|
|
+ CSendBucket *bucket = queryBucket(i);
|
|
|
+ if (bucket)
|
|
|
+ {
|
|
|
+ size32_t bucketSz = bucket->querySize();
|
|
|
+ if (bucketSz >= maxSz/2)
|
|
|
+ {
|
|
|
+ CriticalBlock b(activeWritersLock);
|
|
|
+ if (0 == activeWriters[i]) // only if there are no active writer threads for this target
|
|
|
+ {
|
|
|
+ if (i==self)
|
|
|
+ doSelf = true; // always send to self if candidate
|
|
|
+ else
|
|
|
+ {
|
|
|
+ candidates.append(i);
|
|
|
+ HDSendPrintLog4("c[%d], rows=%d, size=%d", i, bucket->count(), bucketSz);
|
|
|
+ if (candidates.ordinality() >= queryInactiveWriters())
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
- loop
|
|
|
+ unsigned limit = owner.candidateLimit;
|
|
|
+ while (candidates.ordinality())
|
|
|
{
|
|
|
- if (timer.elapsedCycles() >= queryOneSecCycles()*10)
|
|
|
- ActPrintLog(owner.activity, "HD sender, waiting for space");
|
|
|
- timer.reset();
|
|
|
-
|
|
|
- if (senderFullSem.wait(10000))
|
|
|
+ if (0 == queryInactiveWriters())
|
|
|
break;
|
|
|
- if (aborted)
|
|
|
+ else if (1 == candidates.ordinality())
|
|
|
+ {
|
|
|
+ unsigned c = candidates.item(0);
|
|
|
+ CSendBucket *bucket = queryBucket(c);
|
|
|
+ assertex(bucket);
|
|
|
+ HDSendPrintLog3("process exceeded: send to %d, size=%d", c, bucket->querySize());
|
|
|
+ add(getBucketClear(c));
|
|
|
break;
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ unsigned pos = getRandom()%candidates.ordinality();
|
|
|
+ unsigned c = candidates.item(pos);
|
|
|
+ CSendBucket *bucket = queryBucket(c);
|
|
|
+ assertex(bucket);
|
|
|
+ HDSendPrintLog3("process exceeded: send to %d, size=%d", c, bucket->querySize());
|
|
|
+ add(getBucketClear(c));
|
|
|
+ if (limit)
|
|
|
+ {
|
|
|
+ --limit;
|
|
|
+ if (0 == limit)
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ candidates.remove(pos);
|
|
|
+ }
|
|
|
}
|
|
|
+ if (doSelf)
|
|
|
+ {
|
|
|
+ CSendBucket *bucket = queryBucket(self);
|
|
|
+ assertex(bucket);
|
|
|
+ HDSendPrintLog2("process exceeded: doSelf, size=%d", bucket->querySize());
|
|
|
+ add(getBucketClear(self));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ {
|
|
|
+ SpinBlock b(totalSzLock);
|
|
|
+ // some may have been written by now
|
|
|
+ if (totalSz < owner.inputBufferSize)
|
|
|
+ break;
|
|
|
+ senderFull = true;
|
|
|
+ }
|
|
|
+ loop
|
|
|
+ {
|
|
|
+ if (timer.elapsedCycles() >= queryOneSecCycles()*10)
|
|
|
+ ActPrintLog(owner.activity, "HD sender, waiting for space, active writers = %d", queryInactiveWriters());
|
|
|
+ timer.reset();
|
|
|
+
|
|
|
+ if (senderFullSem.wait(10000))
|
|
|
+ break;
|
|
|
+ if (aborted)
|
|
|
+ break;
|
|
|
}
|
|
|
}
|
|
|
if (aborted)
|
|
@@ -700,16 +730,6 @@ class CDistributorBase : public CSimpleInterface, implements IHashDistributor, i
|
|
|
return;
|
|
|
aborted = true;
|
|
|
senderFullSem.signal();
|
|
|
- if (initialized)
|
|
|
- {
|
|
|
- CriticalBlock b(activeWritersLock);
|
|
|
- for (unsigned w=0; w<owner.numnodes; w++)
|
|
|
- {
|
|
|
- CWriteHandler *writer = activeWriters[w];
|
|
|
- if (writer)
|
|
|
- writer->abort();
|
|
|
- }
|
|
|
- }
|
|
|
}
|
|
|
// IThreadFactory impl.
|
|
|
virtual IPooledThread *createNew()
|
|
@@ -798,6 +818,8 @@ protected:
|
|
|
CriticalSection putsect;
|
|
|
bool pull, aborted;
|
|
|
CSender sender;
|
|
|
+ unsigned candidateLimit;
|
|
|
+ unsigned targetWriterLimit;
|
|
|
public:
|
|
|
IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
|
|
|
|
|
@@ -811,9 +833,9 @@ public:
|
|
|
iCompare = NULL;
|
|
|
ihash = NULL;
|
|
|
fixedEstSize = 0;
|
|
|
- bucketSendSize = globals->getPropInt("@hd_out_buffer_size", DEFAULT_OUT_BUFFER_SIZE);
|
|
|
+ bucketSendSize = activity->getOptUInt(THOROPT_HDIST_BUCKET_SIZE, DEFAULT_OUT_BUFFER_SIZE);
|
|
|
istop = _istop;
|
|
|
- inputBufferSize = globals->getPropInt("@hd_in_buffer_size", DEFAULT_IN_BUFFER_SIZE);
|
|
|
+ inputBufferSize = activity->getOptUInt(THOROPT_HDIST_BUFFER_SIZE, DEFAULT_IN_BUFFER_SIZE);
|
|
|
pullBufferSize = DISTRIBUTE_PULL_BUFFER_SIZE;
|
|
|
selfstopped = false;
|
|
|
pull = false;
|
|
@@ -823,9 +845,14 @@ public:
|
|
|
if (allowSpill)
|
|
|
ActPrintLog(activity, "Using spilling buffer (will spill if overflows)");
|
|
|
writerPoolSize = activity->getOptUInt(THOROPT_HDIST_WRITE_POOL_SIZE, DEFAULT_WRITEPOOLSIZE);
|
|
|
- if (writerPoolSize>numnodes)
|
|
|
- writerPoolSize = numnodes; // no point in more
|
|
|
+ if (writerPoolSize>(numnodes*2))
|
|
|
+ writerPoolSize = numnodes*2; // limit to 2 per target
|
|
|
ActPrintLog(activity, "Writer thread pool size : %d", writerPoolSize);
|
|
|
+ candidateLimit = activity->getOptUInt(THOROPT_HDIST_CANDIDATELIMIT);
|
|
|
+ ActPrintLog(activity, "candidateLimit : %d", candidateLimit);
|
|
|
+ ActPrintLog(activity, "inputBufferSize : %d, bucketSendSize = %d", inputBufferSize, bucketSendSize);
|
|
|
+ targetWriterLimit = activity->getOptUInt(THOROPT_HDIST_TARGETWRITELIMIT);
|
|
|
+ ActPrintLog(activity, "targetWriterLimit : %d", targetWriterLimit);
|
|
|
}
|
|
|
|
|
|
~CDistributorBase()
|
|
@@ -1013,7 +1040,6 @@ public:
|
|
|
piperd->stop();
|
|
|
pipewr.clear();
|
|
|
ActPrintLog(activity, "HDIST: Read loop done");
|
|
|
-
|
|
|
}
|
|
|
|
|
|
void sendloop()
|