|
@@ -6412,7 +6412,7 @@ public:
|
|
|
|
|
|
virtual unsigned __int64 queryTotalCycles() const
|
|
|
{
|
|
|
- return input->queryTotalCycles();
|
|
|
+ return input ? input->queryTotalCycles() : 0;
|
|
|
}
|
|
|
|
|
|
virtual bool gatherConjunctions(ISteppedConjunctionCollector & collector)
|
|
@@ -21308,34 +21308,50 @@ class CRoxieServerWorkUnitWriteActivity : public CRoxieServerInternalSinkActivit
|
|
|
bool isReread;
|
|
|
bool grouped;
|
|
|
IRoxieServerContext *serverContext;
|
|
|
+ int sequence;
|
|
|
|
|
|
public:
|
|
|
CRoxieServerWorkUnitWriteActivity(IRoxieSlaveContext *_ctx, const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager, bool _isReread, unsigned _numOutputs)
|
|
|
: CRoxieServerInternalSinkActivity(_ctx, _factory, _probeManager, _numOutputs), helper((IHThorWorkUnitWriteArg &)basehelper), isReread(_isReread)
|
|
|
{
|
|
|
grouped = (helper.getFlags() & POFgrouped) != 0;
|
|
|
- serverContext = NULL;
|
|
|
- }
|
|
|
-
|
|
|
- virtual void onCreate(IHThorArg *_colocalParent)
|
|
|
- {
|
|
|
- CRoxieServerInternalSinkActivity::onCreate(_colocalParent);
|
|
|
serverContext = ctx->queryServerContext();
|
|
|
if (!serverContext)
|
|
|
{
|
|
|
- throw MakeStringException(ROXIE_PIPE_ERROR, "Pipe output activity cannot be executed in slave context");
|
|
|
+ throw MakeStringException(ROXIE_PIPE_ERROR, "Workunit output activity cannot be executed in slave context");
|
|
|
+ }
|
|
|
+ sequence = helper.getSequence();
|
|
|
+ if (sequence >= LibraryBaseSequence)
|
|
|
+ {
|
|
|
+ IConstWorkUnit *workunit = serverContext->queryQueryFactory()->queryWorkUnit();
|
|
|
+ assertex(workunit);
|
|
|
+ const char *storedName = helper.queryName();
|
|
|
+ assertex(storedName);
|
|
|
+ Owned<IConstWUResult> queryRes = workunit->getQueryResultByName(storedName);
|
|
|
+ if (!queryRes)
|
|
|
+ throw makeStringExceptionV(0, "Library cannot write to result %s that does not exist in calling query", storedName);
|
|
|
+ Owned<IConstWUResult> libraryRes = factory->queryQueryFactory().queryWorkUnit()->getResultBySequence(sequence);
|
|
|
+ if (libraryRes) // Should always be present, but rather than assert, just ignore if not
|
|
|
+ {
|
|
|
+ SCMStringBuffer queryFormat;
|
|
|
+ SCMStringBuffer libraryFormat;
|
|
|
+ queryRes->getResultEclSchema(queryFormat);
|
|
|
+ libraryRes->getResultEclSchema(libraryFormat);
|
|
|
+ if (!streq(queryFormat.str(), libraryFormat.str()))
|
|
|
+ {
|
|
|
+ DBGLOG("Query format: %s", queryFormat.str());
|
|
|
+ DBGLOG("Library format: %s", libraryFormat.str());
|
|
|
+ throw makeStringExceptionV(0, "Library cannot write to result %s: result type in query does not match", storedName);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ sequence = queryRes->getResultSequence();
|
|
|
}
|
|
|
}
|
|
|
|
|
|
virtual bool needsAllocator() const { return true; }
|
|
|
|
|
|
- virtual void onExecute()
|
|
|
+ virtual void onExecute()
|
|
|
{
|
|
|
- int sequence = helper.getSequence();
|
|
|
- const char *storedName = helper.queryName();
|
|
|
- if (!storedName)
|
|
|
- storedName = "Dataset";
|
|
|
-
|
|
|
MemoryBuffer result;
|
|
|
bool saveInContext = (int) sequence < 0 || isReread;
|
|
|
if (!meta.queryOriginal()) // this is a bit of a hack - don't know why no meta on an output....
|
|
@@ -21463,6 +21479,9 @@ public:
|
|
|
}
|
|
|
if (xmlwriter)
|
|
|
xmlwriter->outputEndArray(DEFAULTXMLROWTAG);
|
|
|
+ const char *storedName = helper.queryName();
|
|
|
+ if (!storedName)
|
|
|
+ storedName = "Dataset";
|
|
|
if (saveInContext)
|
|
|
serverContext->appendResultDeserialized(storedName, sequence, builder.getcount(), builder.linkrows(), (helper.getFlags() & POFextend) != 0, LINK(meta.queryOriginal()));
|
|
|
if (workunit)
|