|
@@ -599,7 +599,7 @@ protected:
|
|
|
if (owner.sendBlock(target, msg))
|
|
|
return;
|
|
|
markStopped(target); // Probably a bit pointless if target is 'self' - process loop will have done already
|
|
|
- owner.ActPrintLog("CSender::sendBlock stopped slave %d (finished=%d)", target+1, atomic_read(&numFinished));
|
|
|
+ ::ActPrintLog(owner.activity, thorDetailedLogLevel, "CSender::sendBlock stopped slave %d (finished=%d)", target+1, atomic_read(&numFinished));
|
|
|
}
|
|
|
void closeWrite()
|
|
|
{
|
|
@@ -1076,7 +1076,7 @@ public:
|
|
|
}
|
|
|
else
|
|
|
compressHandler = queryDefaultCompressHandler();
|
|
|
- ActPrintLog("Using compressor: %s", compressHandler ? compressHandler->queryType() : "NONE");
|
|
|
+ ::ActPrintLog(activity, thorDetailedLogLevel, "Using compressor: %s", compressHandler ? compressHandler->queryType() : "NONE");
|
|
|
|
|
|
allowSpill = activity->getOptBool(THOROPT_HDIST_SPILL, true);
|
|
|
if (allowSpill)
|
|
@@ -1084,12 +1084,12 @@ public:
|
|
|
writerPoolSize = activity->getOptUInt(THOROPT_HDIST_WRITE_POOL_SIZE, DEFAULT_WRITEPOOLSIZE);
|
|
|
if (writerPoolSize>(numnodes*2))
|
|
|
writerPoolSize = numnodes*2; // limit to 2 per target
|
|
|
- ActPrintLog("Writer thread pool size : %d", writerPoolSize);
|
|
|
+ ::ActPrintLog(activity, thorDetailedLogLevel, "Writer thread pool size : %d", writerPoolSize);
|
|
|
candidateLimit = activity->getOptUInt(THOROPT_HDIST_CANDIDATELIMIT);
|
|
|
- ActPrintLog("candidateLimit : %d", candidateLimit);
|
|
|
- ActPrintLog("inputBufferSize : %d, bucketSendSize = %d, pullBufferSize=%d", inputBufferSize, bucketSendSize, pullBufferSize);
|
|
|
+ ::ActPrintLog(activity, thorDetailedLogLevel, "candidateLimit : %d", candidateLimit);
|
|
|
+ ::ActPrintLog(activity, thorDetailedLogLevel, "inputBufferSize : %d, bucketSendSize = %d, pullBufferSize=%d", inputBufferSize, bucketSendSize, pullBufferSize);
|
|
|
targetWriterLimit = activity->getOptUInt(THOROPT_HDIST_TARGETWRITELIMIT);
|
|
|
- ActPrintLog("targetWriterLimit : %d", targetWriterLimit);
|
|
|
+ ::ActPrintLog(activity, thorDetailedLogLevel, "targetWriterLimit : %d", targetWriterLimit);
|
|
|
}
|
|
|
|
|
|
virtual void beforeDispose()
|
|
@@ -1133,7 +1133,7 @@ public:
|
|
|
|
|
|
virtual IRowStream *connect(IThorRowInterfaces *_rowIf, IRowStream *_input, IHash *_ihash, ICompare *_iCompare, ICompare *_keepBestCompare)
|
|
|
{
|
|
|
- ActPrintLog("HASHDISTRIB: connect");
|
|
|
+ ::ActPrintLog(activity, thorDetailedLogLevel, "HASHDISTRIB: connect");
|
|
|
|
|
|
rowIf.set(_rowIf);
|
|
|
allocator = _rowIf->queryRowAllocator();
|
|
@@ -1164,7 +1164,7 @@ public:
|
|
|
sendException.clear();
|
|
|
recvException.clear();
|
|
|
start();
|
|
|
- ActPrintLog("HASHDISTRIB: connected");
|
|
|
+ ::ActPrintLog(activity, thorDetailedLogLevel, "HASHDISTRIB: connected");
|
|
|
return piperd.getLink();
|
|
|
}
|
|
|
|
|
@@ -1215,7 +1215,7 @@ public:
|
|
|
MemoryBuffer tempMb;
|
|
|
try
|
|
|
{
|
|
|
- ActPrintLog("Read loop start");
|
|
|
+ ::ActPrintLog(activity, thorDetailedLogLevel, "Read loop start");
|
|
|
CMessageBuffer recvMb;
|
|
|
Owned<ISerialStream> stream = createMemoryBufferSerialStream(tempMb);
|
|
|
CThorStreamDeserializerSource rowSource;
|
|
@@ -1224,14 +1224,12 @@ public:
|
|
|
Owned<IExpander> expander = getExpander();
|
|
|
while (left && !aborted)
|
|
|
{
|
|
|
-#ifdef _FULL_TRACE
|
|
|
- ActPrintLog("HDIST: Receiving block");
|
|
|
-#endif
|
|
|
+ ::ActPrintLog(activity, thorDetailedLogLevel, "HDIST: Receiving block");
|
|
|
unsigned n = recvBlock(recvMb);
|
|
|
if (n==(unsigned)-1)
|
|
|
break;
|
|
|
#ifdef _FULL_TRACE
|
|
|
- ActPrintLog("HDIST: Received block %d from slave %d",recvMb.length(),n+1);
|
|
|
+ ::ActPrintLog(activity, thorDetailedLogLevel, "HDIST: Received block %d from slave %d",recvMb.length(),n+1);
|
|
|
#endif
|
|
|
if (recvMb.length())
|
|
|
{
|
|
@@ -1276,20 +1274,16 @@ public:
|
|
|
else
|
|
|
{
|
|
|
left--;
|
|
|
- ActPrintLog("HDIST: finished slave %d, %d left",n+1,left);
|
|
|
+ ::ActPrintLog(activity, thorDetailedLogLevel, "HDIST: finished slave %d, %d left",n+1,left);
|
|
|
}
|
|
|
-#ifdef _FULL_TRACE
|
|
|
- ActPrintLog("HDIST: Put block %d from slave %d",recvMb.length(),n+1);
|
|
|
-#endif
|
|
|
+ ::ActPrintLog(activity, thorDetailedLogLevel, "HDIST: Put block %d from slave %d",recvMb.length(),n+1);
|
|
|
}
|
|
|
}
|
|
|
catch (IException *e)
|
|
|
{
|
|
|
setRecvExc(e);
|
|
|
}
|
|
|
-#ifdef _FULL_TRACE
|
|
|
- ActPrintLog("HDIST: waiting localFinishedSem");
|
|
|
-#endif
|
|
|
+ ::ActPrintLog(activity, thorDetailedLogLevel, "HDIST: waiting localFinishedSem");
|
|
|
}
|
|
|
|
|
|
void recvloopdone()
|
|
@@ -2081,7 +2075,7 @@ public:
|
|
|
virtual void init(MemoryBuffer &data, MemoryBuffer &slaveData) override
|
|
|
{
|
|
|
mptag = container.queryJobChannel().deserializeMPTag(data);
|
|
|
- ActPrintLog("HASHDISTRIB: %sinit tag %d",mergecmp?"merge, ":"",(int)mptag);
|
|
|
+ ::ActPrintLog(this, thorDetailedLogLevel, "HASHDISTRIB: %sinit tag %d",mergecmp?"merge, ":"",(int)mptag);
|
|
|
|
|
|
if (mergecmp)
|
|
|
distributor = createPullHashDistributor(this, queryJobChannel().queryJobComm(), mptag, false, this);
|
|
@@ -2109,7 +2103,7 @@ public:
|
|
|
}
|
|
|
virtual void stop() override
|
|
|
{
|
|
|
- ActPrintLog("HASHDISTRIB: stopping");
|
|
|
+ ::ActPrintLog(this, thorDetailedLogLevel, "HASHDISTRIB: stopping");
|
|
|
if (out)
|
|
|
{
|
|
|
out->stop();
|
|
@@ -2123,11 +2117,6 @@ public:
|
|
|
instrm.clear();
|
|
|
PARENT::stop();
|
|
|
}
|
|
|
- virtual void kill() override
|
|
|
- {
|
|
|
- ActPrintLog("HASHDISTRIB: kill");
|
|
|
- CSlaveActivity::kill();
|
|
|
- }
|
|
|
virtual void abort() override
|
|
|
{
|
|
|
CSlaveActivity::abort();
|
|
@@ -2373,18 +2362,19 @@ public:
|
|
|
if (i==self)
|
|
|
sizes[i] = s;
|
|
|
}
|
|
|
- for (i=0;i<n;i++) {
|
|
|
#ifdef _DEBUG
|
|
|
- ActPrintLog(activity, "after Node %d has %" I64F "d",i, insz[i]);
|
|
|
+ for (i=0;i<n;i++)
|
|
|
+ ::ActPrintLog(activity, thorDetailedLogLevel, "after Node %d has %" I64F "d",i, insz[i]);
|
|
|
#endif
|
|
|
- }
|
|
|
tot = 0;
|
|
|
- for (i=0;i<n;i++) {
|
|
|
- if (sizes[i]) {
|
|
|
+ for (i=0;i<n;i++)
|
|
|
+ {
|
|
|
+ if (sizes[i])
|
|
|
+ {
|
|
|
if (i==self)
|
|
|
- ActPrintLog(activity, "Keep %" I64F "d local",sizes[i]);
|
|
|
+ ::ActPrintLog(activity, thorDetailedLogLevel, "Keep %" I64F "d local",sizes[i]);
|
|
|
else
|
|
|
- ActPrintLog(activity, "Redistribute %" I64F "d to %d",sizes[i],i);
|
|
|
+ ::ActPrintLog(activity, thorDetailedLogLevel, "Redistribute %" I64F "d to %d",sizes[i],i);
|
|
|
}
|
|
|
tot += sizes[i];
|
|
|
}
|
|
@@ -3142,7 +3132,6 @@ public:
|
|
|
}
|
|
|
void kill()
|
|
|
{
|
|
|
- ActPrintLog("kill");
|
|
|
currentInput.clear();
|
|
|
bucketHandler.clear();
|
|
|
bucketHandlerStack.kill();
|
|
@@ -3333,7 +3322,7 @@ void CHashTableRowTable::rehash(const void **newRows)
|
|
|
}
|
|
|
|
|
|
if (maxRows)
|
|
|
- ActPrintLog(&activity, "Rehashed bucket %d - old size = %d, new size = %d, elements = %d", owner->queryBucketNumber(), maxRows, newMaxRows, htElements);
|
|
|
+ ::ActPrintLog(&activity, thorDetailedLogLevel, "Rehashed bucket %d - old size = %d, new size = %d, elements = %d", owner->queryBucketNumber(), maxRows, newMaxRows, htElements);
|
|
|
|
|
|
const void **oldRows = rows;
|
|
|
rows = (const void **)_newRows.getClear();
|
|
@@ -3420,7 +3409,7 @@ bool CBucket::flush(bool critical)
|
|
|
{
|
|
|
if (clearHashTable(critical))
|
|
|
{
|
|
|
- PROGLOG("Flushed%s bucket %d - %d elements", critical?"(critical)":"", queryBucketNumber(), count);
|
|
|
+ LOG(MCthorDetailedDebugInfo, thorJob, "Flushed%s bucket %d - %d elements", critical?"(critical)":"", queryBucketNumber(), count);
|
|
|
return true;
|
|
|
}
|
|
|
}
|
|
@@ -3686,7 +3675,7 @@ void CBucketHandler::init(unsigned _numBuckets, IRowStream *keyStream)
|
|
|
buckets[i] = new CBucket(owner, rowIf, keyIf, extractKey, i, &htRows);
|
|
|
htRows.setOwner(buckets[i]);
|
|
|
}
|
|
|
- ActPrintLog(&owner, "Max %d buckets, current depth = %d", numBuckets, depth+1);
|
|
|
+ ::ActPrintLog(&owner, thorDetailedLogLevel, "Max %d buckets, current depth = %d", numBuckets, depth+1);
|
|
|
initCallbacks();
|
|
|
if (keyStream)
|
|
|
{
|
|
@@ -3719,7 +3708,7 @@ CBucketHandler *CBucketHandler::getNextBucketHandler(Owned<IRowStream> &nextInpu
|
|
|
Owned<IRowStream> keyStream = bucket->getSpillKeyStream(&keyCount);
|
|
|
dbgassertex(keyStream);
|
|
|
Owned<CBucketHandler> newBucketHandler = new CBucketHandler(owner, rowIf, keyIf, iRowHash, iKeyHash, iCompare, extractKey, depth+1, div*numBuckets);
|
|
|
- ActPrintLog(&owner, "Created bucket handler %d, depth %d", currentBucket, depth+1);
|
|
|
+ ::ActPrintLog(&owner, thorDetailedLogLevel, "Created bucket handler %d, depth %d", currentBucket, depth+1);
|
|
|
nextInput.setown(bucket->getSpillRowStream(&count));
|
|
|
dbgassertex(nextInput);
|
|
|
// Use peak in mem keys as estimate for next round of buckets.
|
|
@@ -3796,7 +3785,6 @@ public:
|
|
|
}
|
|
|
virtual void stop() override
|
|
|
{
|
|
|
- ActPrintLog("stopping");
|
|
|
if (instrm)
|
|
|
{
|
|
|
instrm->stop();
|
|
@@ -3879,7 +3867,7 @@ public:
|
|
|
joinargs = (IHThorHashJoinArg *)queryHelper();
|
|
|
mptag = container.queryJobChannel().deserializeMPTag(data);
|
|
|
mptag2 = container.queryJobChannel().deserializeMPTag(data);
|
|
|
- ActPrintLog("HASHJOIN: init tags %d,%d",(int)mptag,(int)mptag2);
|
|
|
+ ::ActPrintLog(this, thorDetailedLogLevel, "HASHJOIN: init tags %d,%d",(int)mptag,(int)mptag2);
|
|
|
}
|
|
|
virtual void start()
|
|
|
{
|
|
@@ -3887,7 +3875,6 @@ public:
|
|
|
startAllInputs();
|
|
|
leftdone = false;
|
|
|
eof = false;
|
|
|
- ActPrintLog("HASHJOIN: starting");
|
|
|
inL = queryInput(0);
|
|
|
inR = queryInput(1);
|
|
|
IHash *ihashL = joinargs->queryHashLeft();
|
|
@@ -3957,7 +3944,6 @@ public:
|
|
|
}
|
|
|
virtual void stop()
|
|
|
{
|
|
|
- ActPrintLog("HASHJOIN: stopping");
|
|
|
stopInputL();
|
|
|
stopInputR();
|
|
|
if (joinhelper)
|
|
@@ -3973,11 +3959,6 @@ public:
|
|
|
}
|
|
|
PARENT::stop();
|
|
|
}
|
|
|
- void kill()
|
|
|
- {
|
|
|
- ActPrintLog("HASHJOIN: kill");
|
|
|
- CSlaveActivity::kill();
|
|
|
- }
|
|
|
void abort()
|
|
|
{
|
|
|
CSlaveActivity::abort();
|
|
@@ -4436,7 +4417,7 @@ public:
|
|
|
if (!container.queryLocalOrGrouped())
|
|
|
{
|
|
|
mptag = container.queryJobChannel().deserializeMPTag(data);
|
|
|
- ActPrintLog("HASHAGGREGATE: init tags %d",(int)mptag);
|
|
|
+ ::ActPrintLog(this, thorDetailedLogLevel, "HASHAGGREGATE: init tags %d",(int)mptag);
|
|
|
}
|
|
|
localAggTable.setown(createRowAggregator(*this, *helper, *helper));
|
|
|
localAggTable->init(queryRowAllocator());
|
|
@@ -4447,7 +4428,7 @@ public:
|
|
|
PARENT::start();
|
|
|
doNextGroup(); // or local set if !grouped
|
|
|
if (!container.queryGrouped())
|
|
|
- ActPrintLog("Table before distribution contains %d entries", localAggTable->elementCount());
|
|
|
+ ::ActPrintLog(this, thorDetailedLogLevel, "Table before distribution contains %d entries", localAggTable->elementCount());
|
|
|
if (!container.queryLocalOrGrouped() && container.queryJob().querySlaves()>1)
|
|
|
{
|
|
|
Owned<IRowStream> localAggStream = localAggTable->getRowStream(true);
|
|
@@ -4459,7 +4440,6 @@ public:
|
|
|
}
|
|
|
virtual void stop() override
|
|
|
{
|
|
|
- ActPrintLog("HASHAGGREGATE: stopping");
|
|
|
if (localAggTable)
|
|
|
localAggTable->reset();
|
|
|
if (aggregateStream)
|
|
@@ -4573,55 +4553,46 @@ CActivityBase *createHashDistributeSlave(CGraphElementBase *container)
|
|
|
{
|
|
|
if (container&&(((IHThorHashDistributeArg *)container->queryHelper())->queryHash()==NULL))
|
|
|
return createReDistributeSlave(container);
|
|
|
- ActPrintLog(container, "HASHDISTRIB: createHashDistributeSlave");
|
|
|
return new HashDistributeSlaveActivity(container);
|
|
|
}
|
|
|
|
|
|
CActivityBase *createNWayDistributeSlave(CGraphElementBase *container)
|
|
|
{
|
|
|
- ActPrintLog(container, "NWAYDISTRIB: createNWayDistributeSlave");
|
|
|
return new NWayDistributeSlaveActivity(container);
|
|
|
}
|
|
|
|
|
|
CActivityBase *createHashDistributeMergeSlave(CGraphElementBase *container)
|
|
|
{
|
|
|
- ActPrintLog(container, "HASHDISTRIB: createHashDistributeMergeSlave");
|
|
|
return new HashDistributeMergeSlaveActivity(container);
|
|
|
}
|
|
|
|
|
|
CActivityBase *createHashDedupSlave(CGraphElementBase *container)
|
|
|
{
|
|
|
- ActPrintLog(container, "HASHDEDUP: createHashDedupSlave");
|
|
|
return new GlobalHashDedupSlaveActivity(container);
|
|
|
}
|
|
|
|
|
|
CActivityBase *createHashLocalDedupSlave(CGraphElementBase *container)
|
|
|
{
|
|
|
- ActPrintLog(container, "LOCALHASHDEDUP: createHashLocalDedupSlave");
|
|
|
return new LocalHashDedupSlaveActivity(container);
|
|
|
}
|
|
|
|
|
|
CActivityBase *createHashJoinSlave(CGraphElementBase *container)
|
|
|
{
|
|
|
- ActPrintLog(container, "HASHJOIN: createHashJoinSlave");
|
|
|
return new HashJoinSlaveActivity(container);
|
|
|
}
|
|
|
|
|
|
CActivityBase *createHashAggregateSlave(CGraphElementBase *container)
|
|
|
{
|
|
|
- ActPrintLog(container, "HASHAGGREGATE: createHashAggregateSlave");
|
|
|
return new CHashAggregateSlave(container);
|
|
|
}
|
|
|
|
|
|
CActivityBase *createIndexDistributeSlave(CGraphElementBase *container)
|
|
|
{
|
|
|
- ActPrintLog(container, "DISTRIBUTEINDEX: createIndexDistributeSlave");
|
|
|
return new IndexDistributeSlaveActivity(container);
|
|
|
}
|
|
|
|
|
|
CActivityBase *createReDistributeSlave(CGraphElementBase *container)
|
|
|
{
|
|
|
- ActPrintLog(container, "REDISTRIBUTE: createReDistributeSlave");
|
|
|
return new ReDistributeSlaveActivity(container);
|
|
|
}
|
|
|
|