|
@@ -1013,9 +1013,11 @@ public:
|
|
|
|
|
|
//---------------------------------------------------------------------------------------
|
|
|
|
|
|
-class CSlaveContext : public CInterface, implements IRoxieSlaveContext, implements ICodeContext, implements roxiemem::ITimeLimiter, implements IRowAllocatorMetaActIdCacheCallback
|
|
|
+class CRoxieContextBase : public CInterface, implements IRoxieSlaveContext, implements ICodeContext, implements roxiemem::ITimeLimiter, implements IRowAllocatorMetaActIdCacheCallback
|
|
|
{
|
|
|
protected:
|
|
|
+ Owned<IConstWUGraphProgress> progress; // These need to be destroyed very late (particularly, after the childgraphs)
|
|
|
+ Owned<IWUGraphStats> graphStats;
|
|
|
mutable Owned<IRowAllocatorMetaActIdCache> allocatorMetaCache;
|
|
|
Owned<IRowManager> rowManager; // NOTE: the order of destruction here is significant. For leak check to work destroy this BEFORE allAllocators, but after most other things
|
|
|
Owned <IDebuggableContext> debugContext;
|
|
@@ -1025,7 +1027,6 @@ protected:
|
|
|
Owned<IActivityGraph> graph;
|
|
|
StringBuffer authToken;
|
|
|
Owned<IPropertyTree> probeQuery;
|
|
|
- RoxiePacketHeader *header;
|
|
|
unsigned lastWuAbortCheck;
|
|
|
unsigned startTime;
|
|
|
unsigned totSlavesReplyLen;
|
|
@@ -1096,34 +1097,15 @@ protected:
|
|
|
|
|
|
public:
|
|
|
IMPLEMENT_IINTERFACE;
|
|
|
- CSlaveContext(const IQueryFactory *_factory, const IRoxieContextLogger &_logctx, IRoxieQueryPacket *_packet, bool _hasChildren)
|
|
|
+ CRoxieContextBase(const IQueryFactory *_factory, const IRoxieContextLogger &_logctx)
|
|
|
: factory(_factory), logctx(_logctx), options(factory->queryOptions())
|
|
|
{
|
|
|
- bool debuggerActive = false;
|
|
|
- if (_packet)
|
|
|
- {
|
|
|
- header = &_packet->queryHeader();
|
|
|
- const byte *traceInfo = _packet->queryTraceInfo();
|
|
|
- options.setFromSlaveLoggingFlags(*traceInfo);
|
|
|
- debuggerActive = (*traceInfo & LOGGING_DEBUGGERACTIVE) != 0 && _hasChildren; // No option to debug simple remote activity
|
|
|
- }
|
|
|
- else
|
|
|
- header = NULL;
|
|
|
startTime = lastWuAbortCheck = msTick();
|
|
|
persists = NULL;
|
|
|
temporaries = NULL;
|
|
|
deserializedResultStore = NULL;
|
|
|
rereadResults = NULL;
|
|
|
xmlStoredDatasetReadFlags = ptr_none;
|
|
|
- if (debuggerActive)
|
|
|
- {
|
|
|
- assertex(header);
|
|
|
- CSlaveDebugContext *slaveDebugContext = new CSlaveDebugContext(this, logctx, *header);
|
|
|
- slaveDebugContext->init(_packet);
|
|
|
- debugContext.setown(slaveDebugContext);
|
|
|
- probeManager.setown(createDebugManager(debugContext, "slaveDebugger"));
|
|
|
- }
|
|
|
-
|
|
|
aborted = false;
|
|
|
exceptionLogged = false;
|
|
|
totSlavesReplyLen = 0;
|
|
@@ -1133,7 +1115,7 @@ public:
|
|
|
//MORE: If checking heap required then should have
|
|
|
//rowManager.setown(createCheckingHeap(rowManager)) or something similar.
|
|
|
}
|
|
|
- ~CSlaveContext()
|
|
|
+ ~CRoxieContextBase()
|
|
|
{
|
|
|
::Release(rereadResults);
|
|
|
::Release(persists);
|
|
@@ -1208,41 +1190,6 @@ public:
|
|
|
return logctx.queryTraceLevel();
|
|
|
}
|
|
|
|
|
|
- virtual void noteProcessed(const IRoxieContextLogger &activityContext, const IRoxieServerActivity *activity, unsigned _idx, unsigned _processed, unsigned __int64 _totalCycles, unsigned __int64 _localCycles) const
|
|
|
- {
|
|
|
- if (options.traceActivityTimes)
|
|
|
- {
|
|
|
- StringBuffer text, prefix;
|
|
|
- text.appendf("%s outputIdx %d processed %d total %d us local %d us",
|
|
|
- getActivityText(activity->getKind()), _idx, _processed, (unsigned) (cycle_to_nanosec(_totalCycles)/1000), (unsigned)(cycle_to_nanosec(_localCycles)/1000));
|
|
|
- activityContext.getLogPrefix(prefix);
|
|
|
- CTXLOGa(LOG_TIMING, prefix.str(), text.str());
|
|
|
- }
|
|
|
- if (graphStats)
|
|
|
- {
|
|
|
- IStatisticGatherer & builder = graphStats->queryStatsBuilder();
|
|
|
- StatsSubgraphScope graphScope(builder, activity->querySubgraphId());
|
|
|
-
|
|
|
- {
|
|
|
- StatsActivityScope scope(builder, activity->queryId());
|
|
|
- //MORE: This is potentially problematic if noteProcessed is called for multiple edges => check if totalCycles is 0.
|
|
|
- //There should really be two separate functions - one to update activity statistics, and another for edge statistics
|
|
|
- if (_totalCycles)
|
|
|
- {
|
|
|
- builder.addStatistic(StTimeTotalExecute, cycle_to_nanosec(_totalCycles));
|
|
|
- builder.addStatistic(StTimeLocalExecute, cycle_to_nanosec(_localCycles));
|
|
|
- }
|
|
|
- //MORE: Should this be done via a callback instead - so the activity can add stats for when started + other interesting info
|
|
|
- }
|
|
|
-
|
|
|
- if (_processed)
|
|
|
- {
|
|
|
- StatsEdgeScope scope(builder, activity->queryId(), _idx);
|
|
|
- builder.addStatistic(StNumRowsProcessed, _processed);
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
virtual void checkAbort()
|
|
|
{
|
|
|
// MORE - really should try to apply limits at slave end too
|
|
@@ -1360,9 +1307,6 @@ public:
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- Owned<IConstWUGraphProgress> progress;
|
|
|
- Owned<IWUGraphStats> graphStats;
|
|
|
-
|
|
|
void beginGraph(const char *graphName)
|
|
|
{
|
|
|
if (debugContext)
|
|
@@ -1522,47 +1466,11 @@ public:
|
|
|
return (const char *) dll->getResource(id);
|
|
|
}
|
|
|
|
|
|
- virtual const IResolvedFile *resolveLFN(const char *filename, bool isOpt)
|
|
|
- {
|
|
|
- CDateTime cacheDate; // Note - this is empty meaning we don't know...
|
|
|
- return querySlaveDynamicFileCache()->lookupDynamicFile(*this, filename, cacheDate, 0, header, isOpt, false);
|
|
|
- }
|
|
|
-
|
|
|
- virtual IRoxieWriteHandler *createLFN(const char *filename, bool overwrite, bool extend, const StringArray &clusters)
|
|
|
- {
|
|
|
- throwUnexpected(); // only support writing on the server
|
|
|
- }
|
|
|
-
|
|
|
- virtual void onFileCallback(const RoxiePacketHeader &header, const char *lfn, bool isOpt, bool isLocal)
|
|
|
- {
|
|
|
- // On a slave, we need to request info using our own header (not the one passed in) and need to get global rather than just local info
|
|
|
- // (possibly we could get just local if the channel matches but not sure there is any point)
|
|
|
- Owned<const IResolvedFile> dFile = resolveLFN(lfn, isOpt);
|
|
|
- if (dFile)
|
|
|
- {
|
|
|
- MemoryBuffer mb;
|
|
|
- mb.append(sizeof(RoxiePacketHeader), &header);
|
|
|
- mb.append(lfn);
|
|
|
- dFile->serializePartial(mb, header.channel, isLocal);
|
|
|
- ((RoxiePacketHeader *) mb.toByteArray())->activityId = ROXIE_FILECALLBACK;
|
|
|
- Owned<IRoxieQueryPacket> reply = createRoxiePacket(mb);
|
|
|
- reply->queryHeader().retries = 0;
|
|
|
- ROQ->sendPacket(reply, *this); // MORE - the caller's log context might be better? Should we unicast? Note that this does not release the packet
|
|
|
- return;
|
|
|
- }
|
|
|
- ROQ->sendAbortCallback(header, lfn, *this);
|
|
|
- throwUnexpected();
|
|
|
- }
|
|
|
virtual ICodeContext *queryCodeContext()
|
|
|
{
|
|
|
return this;
|
|
|
}
|
|
|
|
|
|
- virtual IRoxieServerContext *queryServerContext()
|
|
|
- {
|
|
|
- return NULL;
|
|
|
- }
|
|
|
-
|
|
|
virtual IProbeManager *queryProbeManager() const
|
|
|
{
|
|
|
return probeManager;
|
|
@@ -1593,7 +1501,7 @@ public:
|
|
|
virtual IEngineContext *queryEngineContext() { return NULL; }
|
|
|
virtual char *getDaliServers() { throwUnexpected(); }
|
|
|
|
|
|
- // The following from ICodeContext should never be executed in slave activity. If we are on Roxie server (or in child query on slave), they will be implemented by more derived CRoxieServerContext class
|
|
|
+ // The following from ICodeContext should never be executed in slave activity. If we are on Roxie server, they will be implemented by more derived CRoxieServerContext class
|
|
|
virtual void setResultBool(const char *name, unsigned sequence, bool value) { throwUnexpected(); }
|
|
|
virtual void setResultData(const char *name, unsigned sequence, int len, const void * data) { throwUnexpected(); }
|
|
|
virtual void setResultDecimal(const char * stepname, unsigned sequence, int len, int precision, bool isSigned, const void *val) { throwUnexpected(); }
|
|
@@ -2360,11 +2268,94 @@ protected:
|
|
|
}
|
|
|
};
|
|
|
|
|
|
-IRoxieSlaveContext *createSlaveContext(const IQueryFactory *_factory, const IRoxieContextLogger &_logctx, IRoxieQueryPacket *packet, bool hasChildren)
|
|
|
+//-----------------------------------------------------------------------------------------------
|
|
|
+
|
|
|
+class CSlaveContext : public CRoxieContextBase
|
|
|
+{
|
|
|
+protected:
|
|
|
+ RoxiePacketHeader *header;
|
|
|
+
|
|
|
+public:
|
|
|
+ CSlaveContext(const IQueryFactory *_factory, const SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, bool _hasChildren)
|
|
|
+ : CRoxieContextBase(_factory, _logctx)
|
|
|
+ {
|
|
|
+ header = &_packet->queryHeader();
|
|
|
+ const byte *traceInfo = _packet->queryTraceInfo();
|
|
|
+ options.setFromSlaveLoggingFlags(*traceInfo);
|
|
|
+ bool debuggerActive = (*traceInfo & LOGGING_DEBUGGERACTIVE) != 0 && _hasChildren; // No option to debug simple remote activity
|
|
|
+ if (debuggerActive)
|
|
|
+ {
|
|
|
+ CSlaveDebugContext *slaveDebugContext = new CSlaveDebugContext(this, logctx, *header);
|
|
|
+ slaveDebugContext->init(_packet);
|
|
|
+ debugContext.setown(slaveDebugContext);
|
|
|
+ probeManager.setown(createDebugManager(debugContext, "slaveDebugger"));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ virtual void beforeDispose()
|
|
|
+ {
|
|
|
+ // NOTE: This is needed to ensure that owned activities are destroyed BEFORE I am,
|
|
|
+ // to avoid pure virtual calls when they come to call noteProcessed()
|
|
|
+ childGraphs.kill();
|
|
|
+ }
|
|
|
+
|
|
|
+ virtual IRoxieServerContext *queryServerContext()
|
|
|
+ {
|
|
|
+ return NULL;
|
|
|
+ }
|
|
|
+
|
|
|
+ virtual const IResolvedFile *resolveLFN(const char *filename, bool isOpt)
|
|
|
+ {
|
|
|
+ CDateTime cacheDate; // Note - this is empty meaning we don't know...
|
|
|
+ return querySlaveDynamicFileCache()->lookupDynamicFile(*this, filename, cacheDate, 0, header, isOpt, false);
|
|
|
+ }
|
|
|
+
|
|
|
+ virtual IRoxieWriteHandler *createLFN(const char *filename, bool overwrite, bool extend, const StringArray &clusters)
|
|
|
+ {
|
|
|
+ throwUnexpected(); // only support writing on the server
|
|
|
+ }
|
|
|
+
|
|
|
+ virtual void onFileCallback(const RoxiePacketHeader &header, const char *lfn, bool isOpt, bool isLocal)
|
|
|
+ {
|
|
|
+ // On a slave, we need to request info using our own header (not the one passed in) and need to get global rather than just local info
|
|
|
+ // (possibly we could get just local if the channel matches but not sure there is any point)
|
|
|
+ Owned<const IResolvedFile> dFile = resolveLFN(lfn, isOpt);
|
|
|
+ if (dFile)
|
|
|
+ {
|
|
|
+ MemoryBuffer mb;
|
|
|
+ mb.append(sizeof(RoxiePacketHeader), &header);
|
|
|
+ mb.append(lfn);
|
|
|
+ dFile->serializePartial(mb, header.channel, isLocal);
|
|
|
+ ((RoxiePacketHeader *) mb.toByteArray())->activityId = ROXIE_FILECALLBACK;
|
|
|
+ Owned<IRoxieQueryPacket> reply = createRoxiePacket(mb);
|
|
|
+ reply->queryHeader().retries = 0;
|
|
|
+ ROQ->sendPacket(reply, *this); // MORE - the caller's log context might be better? Should we unicast? Note that this does not release the packet
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ ROQ->sendAbortCallback(header, lfn, *this);
|
|
|
+ throwUnexpected();
|
|
|
+ }
|
|
|
+
|
|
|
+ virtual void noteProcessed(unsigned subgraphId, unsigned activityId, unsigned _idx, unsigned _processed) const
|
|
|
+ {
|
|
|
+ const SlaveContextLogger &slaveLogCtx = static_cast<const SlaveContextLogger &>(logctx);
|
|
|
+ slaveLogCtx.putStatProcessed(subgraphId, activityId, _idx, _processed);
|
|
|
+ }
|
|
|
+
|
|
|
+ virtual void mergeActivityStats(const CRuntimeStatisticCollection &fromStats, unsigned subgraphId, unsigned activityId, const ActivityTimeAccumulator &_totalCycles, unsigned __int64 _localCycles) const
|
|
|
+ {
|
|
|
+ const SlaveContextLogger &slaveLogCtx = static_cast<const SlaveContextLogger &>(logctx);
|
|
|
+ slaveLogCtx.putStats(subgraphId, activityId, fromStats);
|
|
|
+ }
|
|
|
+
|
|
|
+};
|
|
|
+
|
|
|
+IRoxieSlaveContext *createSlaveContext(const IQueryFactory *_factory, const SlaveContextLogger &_logctx, IRoxieQueryPacket *packet, bool hasChildren)
|
|
|
{
|
|
|
return new CSlaveContext(_factory, _logctx, packet, hasChildren);
|
|
|
}
|
|
|
|
|
|
+//-----------------------------------------------------------------------------------------------
|
|
|
+
|
|
|
class CRoxieServerDebugContext : extends CBaseServerDebugContext
|
|
|
{
|
|
|
// Some questions:
|
|
@@ -2503,7 +2494,7 @@ public:
|
|
|
|
|
|
};
|
|
|
|
|
|
-class CRoxieServerContext : public CSlaveContext, implements IRoxieServerContext, implements IGlobalCodeContext
|
|
|
+class CRoxieServerContext : public CRoxieContextBase, implements IRoxieServerContext, implements IGlobalCodeContext
|
|
|
{
|
|
|
const IQueryFactory *serverQueryFactory;
|
|
|
CriticalSection daliUpdateCrit;
|
|
@@ -2633,7 +2624,7 @@ public:
|
|
|
IMPLEMENT_IINTERFACE;
|
|
|
|
|
|
CRoxieServerContext(const IQueryFactory *_factory, const IRoxieContextLogger &_logctx)
|
|
|
- : CSlaveContext(_factory, _logctx, NULL, false), serverQueryFactory(_factory)
|
|
|
+ : CRoxieContextBase(_factory, _logctx), serverQueryFactory(_factory)
|
|
|
{
|
|
|
init();
|
|
|
rowManager->setMemoryLimit(options.memoryLimit);
|
|
@@ -2642,7 +2633,7 @@ public:
|
|
|
}
|
|
|
|
|
|
CRoxieServerContext(IConstWorkUnit *_workUnit, const IQueryFactory *_factory, const ContextLogger &_logctx)
|
|
|
- : CSlaveContext(_factory, _logctx, NULL, false), serverQueryFactory(_factory)
|
|
|
+ : CRoxieContextBase(_factory, _logctx), serverQueryFactory(_factory)
|
|
|
{
|
|
|
init();
|
|
|
workUnit.set(_workUnit);
|
|
@@ -2657,7 +2648,7 @@ public:
|
|
|
}
|
|
|
|
|
|
CRoxieServerContext(IPropertyTree *_context, const IQueryFactory *_factory, SafeSocket &_client, TextMarkupFormat _mlFmt, bool _isRaw, bool _isBlocked, HttpHelper &httpHelper, bool _trim, const ContextLogger &_logctx, PTreeReaderOptions _xmlReadFlags, const char *_querySetName)
|
|
|
- : CSlaveContext(_factory, _logctx, NULL, false), serverQueryFactory(_factory), querySetName(_querySetName)
|
|
|
+ : CRoxieContextBase(_factory, _logctx), serverQueryFactory(_factory), querySetName(_querySetName)
|
|
|
{
|
|
|
init();
|
|
|
context.set(_context);
|
|
@@ -2696,9 +2687,36 @@ public:
|
|
|
rowManager->setMemoryLimit(options.memoryLimit);
|
|
|
authToken.append(httpHelper.queryAuthToken());
|
|
|
workflow.setown(_factory->createWorkflowMachine(workUnit, false, logctx));
|
|
|
+ }
|
|
|
+
|
|
|
+ virtual void noteProcessed(unsigned subgraphId, unsigned activityId, unsigned _idx, unsigned _processed) const
|
|
|
+ {
|
|
|
+ if (_processed)
|
|
|
+ {
|
|
|
+ if (graphStats)
|
|
|
+ {
|
|
|
+ IStatisticGatherer & builder = graphStats->queryStatsBuilder();
|
|
|
+ StatsSubgraphScope graphScope(builder, subgraphId);
|
|
|
+ StatsEdgeScope scope(builder, activityId, _idx);
|
|
|
+ builder.addStatistic(StNumRowsProcessed, _processed);
|
|
|
+ }
|
|
|
+ logctx.noteStatistic(StNumRowsProcessed, _processed);
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
- if (options.traceActivityTimes)
|
|
|
- options.timeActivities = true;
|
|
|
+ virtual void mergeActivityStats(const CRuntimeStatisticCollection &fromStats, unsigned subgraphId, unsigned activityId, const ActivityTimeAccumulator &_totalCycles, unsigned __int64 _localCycles) const
|
|
|
+ {
|
|
|
+ if (graphStats)
|
|
|
+ {
|
|
|
+ IStatisticGatherer & builder = graphStats->queryStatsBuilder();
|
|
|
+ StatsSubgraphScope graphScope(builder, subgraphId);
|
|
|
+ StatsActivityScope scope(builder, activityId);
|
|
|
+ _totalCycles.addStatistics(builder);
|
|
|
+ if (_localCycles)
|
|
|
+ builder.addStatistic(StTimeLocalExecute, cycle_to_nanosec(_localCycles));
|
|
|
+ fromStats.recordStatistics(builder);
|
|
|
+ }
|
|
|
+ logctx.mergeStats(fromStats);
|
|
|
}
|
|
|
|
|
|
virtual roxiemem::IRowManager &queryRowManager()
|
|
@@ -2716,7 +2734,7 @@ public:
|
|
|
|
|
|
virtual void checkAbort()
|
|
|
{
|
|
|
- CSlaveContext::checkAbort();
|
|
|
+ CRoxieContextBase::checkAbort();
|
|
|
unsigned ticksNow = msTick();
|
|
|
if (options.warnTimeLimit)
|
|
|
{
|
|
@@ -3479,7 +3497,7 @@ public:
|
|
|
virtual void endGraph(cycle_t startCycles, bool aborting)
|
|
|
{
|
|
|
fileCache.kill();
|
|
|
- CSlaveContext::endGraph(startCycles, aborting);
|
|
|
+ CRoxieContextBase::endGraph(startCycles, aborting);
|
|
|
}
|
|
|
|
|
|
virtual void onFileCallback(const RoxiePacketHeader &header, const char *lfn, bool isOpt, bool isLocal)
|